1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21 BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use partition::partition::PartitionRuleRef;
40use session::context::QueryContextRef;
41use smallvec::SmallVec;
42use snafu::{OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use table::metadata::{TableInfo, TableInfoRef};
45use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
46
47use crate::error;
48use crate::error::{Error, Result};
49use crate::metrics::{
50 FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
51 PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
52 PENDING_WORKERS,
53};
54use crate::prom_row_builder::{
55 build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
56 rows_to_aligned_record_batch,
57};
58
59const PHYSICAL_TABLE_KEY: &str = "physical_table";
60const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
62const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
63const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
64
65#[async_trait]
66pub trait PendingRowsSchemaAlterer: Send + Sync {
67 async fn create_tables_if_missing_batch(
70 &self,
71 catalog: &str,
72 schema: &str,
73 tables: &[(&str, &[ColumnSchema])],
74 with_metric_engine: bool,
75 ctx: QueryContextRef,
76 ) -> Result<()>;
77
78 async fn add_missing_prom_tag_columns_batch(
81 &self,
82 catalog: &str,
83 schema: &str,
84 tables: &[(&str, &[String])],
85 ctx: QueryContextRef,
86 ) -> Result<()>;
87}
88
89pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
90
91#[derive(Clone)]
92pub struct PhysicalTableMetadata {
93 pub table_info: TableInfoRef,
94 pub col_name_to_ids: Option<HashMap<String, u32>>,
96}
97
98#[async_trait]
99pub trait PhysicalFlushCatalogProvider: Send + Sync {
100 async fn physical_table(
101 &self,
102 catalog: &str,
103 schema: &str,
104 table_name: &str,
105 query_ctx: &session::context::QueryContext,
106 ) -> catalog::error::Result<Option<PhysicalTableMetadata>>;
107}
108
109#[async_trait]
110pub trait PhysicalFlushPartitionProvider: Send + Sync {
111 async fn find_table_partition_rule(
112 &self,
113 table_info: &TableInfo,
114 ) -> partition::error::Result<PartitionRuleRef>;
115
116 async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer>;
117}
118
119#[async_trait]
120pub trait PhysicalFlushNodeRequester: Send + Sync {
121 async fn handle(
122 &self,
123 peer: &Peer,
124 request: RegionRequest,
125 ) -> Result<api::region::RegionResponse>;
126}
127
128#[derive(Clone)]
129struct CatalogManagerPhysicalFlushAdapter {
130 catalog_manager: CatalogManagerRef,
131}
132
133#[async_trait]
134impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter {
135 async fn physical_table(
136 &self,
137 catalog: &str,
138 schema: &str,
139 table_name: &str,
140 query_ctx: &session::context::QueryContext,
141 ) -> catalog::error::Result<Option<PhysicalTableMetadata>> {
142 self.catalog_manager
143 .table(catalog, schema, table_name, Some(query_ctx))
144 .await
145 .map(|table| {
146 table.map(|table| {
147 let table_info = table.table_info();
148 let name_to_ids = table_info.name_to_ids();
149 PhysicalTableMetadata {
150 table_info,
151 col_name_to_ids: name_to_ids,
152 }
153 })
154 })
155 }
156}
157
158#[derive(Clone)]
159struct PartitionManagerPhysicalFlushAdapter {
160 partition_manager: PartitionRuleManagerRef,
161}
162
163#[async_trait]
164impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter {
165 async fn find_table_partition_rule(
166 &self,
167 table_info: &TableInfo,
168 ) -> partition::error::Result<PartitionRuleRef> {
169 self.partition_manager
170 .find_table_partition_rule(table_info)
171 .await
172 .map(|(rule, _)| rule)
173 }
174
175 async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176 let peer = self.partition_manager.find_region_leader(region_id).await?;
177 Ok(peer)
178 }
179}
180
181#[derive(Clone)]
182struct NodeManagerPhysicalFlushAdapter {
183 node_manager: NodeManagerRef,
184}
185
186#[async_trait]
187impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter {
188 async fn handle(
189 &self,
190 peer: &Peer,
191 request: RegionRequest,
192 ) -> error::Result<api::region::RegionResponse> {
193 let datanode = self.node_manager.datanode(peer).await;
194 datanode
195 .handle(request)
196 .await
197 .context(error::CommonMetaSnafu)
198 }
199}
200
201#[derive(Debug, Clone, Hash, Eq, PartialEq)]
202struct BatchKey {
203 catalog: String,
204 schema: String,
205 physical_table: String,
206}
207
208#[derive(Debug)]
209pub struct TableBatch {
210 pub table_name: String,
211 pub table_id: TableId,
212 pub batches: Vec<RecordBatch>,
213 pub row_count: usize,
214}
215
216struct TableResolutionPlan {
219 region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
221 tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
223 tables_to_alter: Vec<(String, Vec<String>)>,
225}
226
227struct PendingBatch {
228 tables: HashMap<String, TableBatch>,
229 created_at: Instant,
230 total_row_count: usize,
231 ctx: QueryContextRef,
232 waiters: Vec<FlushWaiter>,
233}
234
235struct FlushWaiter {
236 response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
237 _permit: OwnedSemaphorePermit,
238}
239
240struct FlushBatch {
241 table_batches: Vec<TableBatch>,
242 total_row_count: usize,
243 ctx: QueryContextRef,
244 waiters: Vec<FlushWaiter>,
245}
246
247#[derive(Clone)]
248struct PendingWorker {
249 tx: mpsc::Sender<WorkerCommand>,
250}
251
252enum WorkerCommand {
253 Submit {
254 table_batches: Vec<(String, u32, RecordBatch)>,
255 total_rows: usize,
256 ctx: QueryContextRef,
257 response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
258 _permit: OwnedSemaphorePermit,
259 },
260}
261
262fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
265 let physical_table = ctx
266 .extension(PHYSICAL_TABLE_KEY)
267 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
268 .to_string();
269 BatchKey {
270 catalog: ctx.current_catalog().to_string(),
271 schema: ctx.current_schema(),
272 physical_table,
273 }
274}
275
276pub struct PendingRowsBatcher {
278 workers: Arc<DashMap<BatchKey, PendingWorker>>,
279 flush_interval: Duration,
280 max_batch_rows: usize,
281 partition_manager: PartitionRuleManagerRef,
282 node_manager: NodeManagerRef,
283 catalog_manager: CatalogManagerRef,
284 flush_semaphore: Arc<Semaphore>,
285 inflight_semaphore: Arc<Semaphore>,
286 worker_channel_capacity: usize,
287 prom_store_with_metric_engine: bool,
288 schema_alterer: PendingRowsSchemaAltererRef,
289 pending_rows_batch_sync: bool,
290 shutdown: broadcast::Sender<()>,
291}
292
293impl PendingRowsBatcher {
294 #[allow(clippy::too_many_arguments)]
295 pub fn try_new(
296 partition_manager: PartitionRuleManagerRef,
297 node_manager: NodeManagerRef,
298 catalog_manager: CatalogManagerRef,
299 prom_store_with_metric_engine: bool,
300 schema_alterer: PendingRowsSchemaAltererRef,
301 flush_interval: Duration,
302 max_batch_rows: usize,
303 max_concurrent_flushes: usize,
304 worker_channel_capacity: usize,
305 max_inflight_requests: usize,
306 ) -> Option<Arc<Self>> {
307 if flush_interval.is_zero()
311 || max_batch_rows == 0
312 || max_concurrent_flushes == 0
313 || worker_channel_capacity == 0
314 || max_inflight_requests == 0
315 {
316 return None;
317 }
318
319 let (shutdown, _) = broadcast::channel(1);
320 let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
321 .ok()
322 .as_deref()
323 .and_then(|v| v.parse::<bool>().ok())
324 .unwrap_or(true);
325 let workers = Arc::new(DashMap::new());
326 PENDING_WORKERS.set(workers.len() as i64);
327
328 Some(Arc::new(Self {
329 workers,
330 flush_interval,
331 max_batch_rows,
332 partition_manager,
333 node_manager,
334 catalog_manager,
335 prom_store_with_metric_engine,
336 schema_alterer,
337 flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
338 inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
339 worker_channel_capacity,
340 pending_rows_batch_sync,
341 shutdown,
342 }))
343 }
344
345 pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
346 let (table_batches, total_rows) = {
347 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
348 .with_label_values(&["submit_build_and_align"])
349 .start_timer();
350 self.build_and_align_table_batches(requests, &ctx).await?
351 };
352 if total_rows == 0 {
353 return Ok(0);
354 }
355
356 let permit = {
357 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
358 .with_label_values(&["submit_acquire_inflight_permit"])
359 .start_timer();
360 self.inflight_semaphore
361 .clone()
362 .acquire_owned()
363 .await
364 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
365 };
366
367 let (response_tx, response_rx) = oneshot::channel();
368
369 let batch_key = batch_key_from_ctx(&ctx);
370 let mut cmd = Some(WorkerCommand::Submit {
371 table_batches,
372 total_rows,
373 ctx,
374 response_tx,
375 _permit: permit,
376 });
377
378 {
379 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
380 .with_label_values(&["submit_send_to_worker"])
381 .start_timer();
382
383 for _ in 0..2 {
384 let worker = self.get_or_spawn_worker(batch_key.clone());
385 let Some(worker_cmd) = cmd.take() else {
386 break;
387 };
388
389 match worker.tx.send(worker_cmd).await {
390 Ok(()) => break,
391 Err(err) => {
392 cmd = Some(err.0);
393 remove_worker_if_same_channel(
394 self.workers.as_ref(),
395 &batch_key,
396 &worker.tx,
397 );
398 }
399 }
400 }
401
402 if cmd.is_some() {
403 return Err(Error::BatcherChannelClosed);
404 }
405 }
406
407 if self.pending_rows_batch_sync {
408 let result = {
409 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
410 .with_label_values(&["submit_wait_flush_result"])
411 .start_timer();
412 response_rx
413 .await
414 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
415 };
416 result
417 .context(error::SubmitBatchSnafu)
418 .map(|()| total_rows as u64)
419 } else {
420 Ok(total_rows as u64)
421 }
422 }
423
424 async fn build_and_align_table_batches(
429 &self,
430 requests: RowInsertRequests,
431 ctx: &QueryContextRef,
432 ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
433 let catalog = ctx.current_catalog().to_string();
434 let schema = ctx.current_schema();
435
436 let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
437 if total_rows == 0 {
438 return Ok((Vec::new(), 0));
439 }
440
441 let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
442 let mut plan = self
443 .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
444 .await?;
445
446 self.create_missing_tables_and_refresh_schemas(
447 &catalog,
448 &schema,
449 ctx,
450 &table_rows,
451 &mut plan,
452 )
453 .await?;
454
455 self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
456 .await?;
457
458 let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
459
460 Ok((aligned_batches, total_rows))
461 }
462
463 fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
466 let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
467 let mut total_rows = 0;
468
469 for request in requests.inserts {
470 let Some(rows) = request.rows else {
471 continue;
472 };
473 if rows.rows.is_empty() {
474 continue;
475 }
476
477 total_rows += rows.rows.len();
478 table_rows.push((request.table_name, rows));
479 }
480
481 (table_rows, total_rows)
482 }
483
484 fn collect_unique_table_schemas(
487 table_rows: &[(String, Rows)],
488 ) -> Result<Vec<(&str, &[ColumnSchema])>> {
489 let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
490 let mut seen = HashSet::new();
491
492 for (table_name, rows) in table_rows {
493 if seen.insert(table_name.as_str()) {
494 unique_tables.push((table_name.as_str(), &rows.schema));
495 } else {
496 return error::InvalidPromRemoteRequestSnafu {
498 msg: format!(
499 "Found duplicated table name in RowInsertRequest: {}",
500 table_name
501 ),
502 }
503 .fail();
504 }
505 }
506
507 Ok(unique_tables)
508 }
509
510 async fn plan_table_resolution(
513 &self,
514 catalog: &str,
515 schema: &str,
516 ctx: &QueryContextRef,
517 unique_tables: &[(&str, &[ColumnSchema])],
518 ) -> Result<TableResolutionPlan> {
519 let mut plan = TableResolutionPlan {
520 region_schemas: HashMap::with_capacity(unique_tables.len()),
521 tables_to_create: Vec::new(),
522 tables_to_alter: Vec::new(),
523 };
524
525 let resolved_tables = {
526 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
527 .with_label_values(&["align_resolve_table"])
528 .start_timer();
529 futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
530 self.catalog_manager
531 .table(catalog, schema, table_name, Some(ctx.as_ref()))
532 }))
533 .await
534 };
535
536 for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
537 let table = table_result?;
538
539 if let Some(table) = table {
540 let table_info = table.table_info();
541 let table_id = table_info.ident.table_id;
542 let region_schema = table_info.meta.schema.arrow_schema().clone();
543
544 let missing_columns = {
545 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
546 .with_label_values(&["align_identify_missing_columns"])
547 .start_timer();
548 identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
549 };
550 if !missing_columns.is_empty() {
551 plan.tables_to_alter
552 .push(((*table_name).to_string(), missing_columns));
553 }
554 plan.region_schemas
555 .insert((*table_name).to_string(), (region_schema, table_id));
556 } else {
557 let request_schema = {
558 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
559 .with_label_values(&["align_build_create_table_schema"])
560 .start_timer();
561 build_prom_create_table_schema_from_proto(rows_schema)?
562 };
563 plan.tables_to_create
564 .push(((*table_name).to_string(), request_schema));
565 }
566 }
567
568 Ok(plan)
569 }
570
571 async fn create_missing_tables_and_refresh_schemas(
574 &self,
575 catalog: &str,
576 schema: &str,
577 ctx: &QueryContextRef,
578 table_rows: &[(String, Rows)],
579 plan: &mut TableResolutionPlan,
580 ) -> Result<()> {
581 if plan.tables_to_create.is_empty() {
582 return Ok(());
583 }
584
585 let create_refs: Vec<(&str, &[ColumnSchema])> = plan
586 .tables_to_create
587 .iter()
588 .map(|(name, schema)| (name.as_str(), schema.as_slice()))
589 .collect();
590
591 {
592 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
593 .with_label_values(&["align_batch_create_tables"])
594 .start_timer();
595 self.schema_alterer
596 .create_tables_if_missing_batch(
597 catalog,
598 schema,
599 &create_refs,
600 self.prom_store_with_metric_engine,
601 ctx.clone(),
602 )
603 .await?;
604 }
605
606 let created_table_results = {
607 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
608 .with_label_values(&["align_resolve_table_after_create"])
609 .start_timer();
610 futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
611 self.catalog_manager
612 .table(catalog, schema, table_name, Some(ctx.as_ref()))
613 }))
614 .await
615 };
616
617 for ((table_name, _), table_result) in
618 plan.tables_to_create.iter().zip(created_table_results)
619 {
620 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
621 reason: format!(
622 "Table not found after pending batch create attempt: {}",
623 table_name
624 ),
625 })?;
626 let table_info = table.table_info();
627 let table_id = table_info.ident.table_id;
628 let region_schema = table_info.meta.schema.arrow_schema().clone();
629 plan.region_schemas
630 .insert(table_name.clone(), (region_schema, table_id));
631 }
632
633 Self::enqueue_alter_for_new_tables(table_rows, plan)?;
634
635 Ok(())
636 }
637
638 fn enqueue_alter_for_new_tables(
641 table_rows: &[(String, Rows)],
642 plan: &mut TableResolutionPlan,
643 ) -> Result<()> {
644 let created_tables: HashSet<&str> = plan
645 .tables_to_create
646 .iter()
647 .map(|(table_name, _)| table_name.as_str())
648 .collect();
649
650 for (table_name, rows) in table_rows {
651 if !created_tables.contains(table_name.as_str()) {
652 continue;
653 }
654
655 let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
656 continue;
657 };
658
659 let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
660 if missing_columns.is_empty()
661 || plan
662 .tables_to_alter
663 .iter()
664 .any(|(existing_name, _)| existing_name == table_name)
665 {
666 continue;
667 }
668
669 plan.tables_to_alter
670 .push((table_name.clone(), missing_columns));
671 }
672
673 Ok(())
674 }
675
676 async fn alter_tables_and_refresh_schemas(
679 &self,
680 catalog: &str,
681 schema: &str,
682 ctx: &QueryContextRef,
683 plan: &mut TableResolutionPlan,
684 ) -> Result<()> {
685 if plan.tables_to_alter.is_empty() {
686 return Ok(());
687 }
688
689 let alter_refs: Vec<(&str, &[String])> = plan
690 .tables_to_alter
691 .iter()
692 .map(|(name, cols)| (name.as_str(), cols.as_slice()))
693 .collect();
694 {
695 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
696 .with_label_values(&["align_batch_add_missing_columns"])
697 .start_timer();
698 self.schema_alterer
699 .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
700 .await?;
701 }
702
703 let altered_table_results = {
704 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
705 .with_label_values(&["align_resolve_table_after_schema_alter"])
706 .start_timer();
707 futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
708 self.catalog_manager
709 .table(catalog, schema, table_name, Some(ctx.as_ref()))
710 }))
711 .await
712 };
713
714 for ((table_name, _), table_result) in
715 plan.tables_to_alter.iter().zip(altered_table_results)
716 {
717 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
718 reason: format!(
719 "Table not found after pending batch schema alter: {}",
720 table_name
721 ),
722 })?;
723 let table_info = table.table_info();
724 let table_id = table_info.ident.table_id;
725 let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
726 plan.region_schemas
727 .insert(table_name.clone(), (refreshed_region_schema, table_id));
728 }
729
730 Ok(())
731 }
732
733 fn build_aligned_batches(
736 table_rows: &[(String, Rows)],
737 region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
738 ) -> Result<Vec<(String, u32, RecordBatch)>> {
739 let mut aligned_batches = Vec::with_capacity(table_rows.len());
740 for (table_name, rows) in table_rows {
741 let (region_schema, table_id) =
742 region_schemas.get(table_name).cloned().with_context(|| {
743 error::UnexpectedResultSnafu {
744 reason: format!("Region schema not resolved for table: {}", table_name),
745 }
746 })?;
747
748 let record_batch = {
749 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
750 .with_label_values(&["align_rows_to_record_batch"])
751 .start_timer();
752 rows_to_aligned_record_batch(rows, region_schema.as_ref())?
753 };
754 aligned_batches.push((table_name.clone(), table_id, record_batch));
755 }
756
757 Ok(aligned_batches)
758 }
759
760 fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
761 if let Some(worker) = self.workers.get(&key)
762 && !worker.tx.is_closed()
763 {
764 return worker.clone();
765 }
766
767 let entry = self.workers.entry(key.clone());
768 match entry {
769 Entry::Occupied(mut worker) => {
770 if worker.get().tx.is_closed() {
771 let new_worker = self.spawn_worker(key);
772 worker.insert(new_worker.clone());
773 PENDING_WORKERS.set(self.workers.len() as i64);
774 new_worker
775 } else {
776 worker.get().clone()
777 }
778 }
779 Entry::Vacant(vacant) => {
780 let worker = self.spawn_worker(key);
781
782 vacant.insert(worker.clone());
783 PENDING_WORKERS.set(self.workers.len() as i64);
784 worker
785 }
786 }
787 }
788
789 fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
790 let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
791 let worker = PendingWorker { tx: tx.clone() };
792 let worker_idle_timeout = self
793 .flush_interval
794 .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
795 .unwrap_or(self.flush_interval);
796
797 start_worker(
798 key,
799 worker.tx.clone(),
800 self.workers.clone(),
801 rx,
802 self.shutdown.clone(),
803 self.partition_manager.clone(),
804 self.node_manager.clone(),
805 self.catalog_manager.clone(),
806 self.flush_interval,
807 worker_idle_timeout,
808 self.max_batch_rows,
809 self.flush_semaphore.clone(),
810 );
811
812 worker
813 }
814}
815
816impl Drop for PendingRowsBatcher {
817 fn drop(&mut self) {
818 let _ = self.shutdown.send(());
819 }
820}
821
822impl PendingBatch {
823 fn new(ctx: QueryContextRef) -> Self {
824 Self {
825 tables: HashMap::new(),
826 created_at: Instant::now(),
827 total_row_count: 0,
828 ctx,
829 waiters: Vec::new(),
830 }
831 }
832}
833
834#[allow(clippy::too_many_arguments)]
835fn start_worker(
836 key: BatchKey,
837 worker_tx: mpsc::Sender<WorkerCommand>,
838 workers: Arc<DashMap<BatchKey, PendingWorker>>,
839 mut rx: mpsc::Receiver<WorkerCommand>,
840 shutdown: broadcast::Sender<()>,
841 partition_manager: PartitionRuleManagerRef,
842 node_manager: NodeManagerRef,
843 catalog_manager: CatalogManagerRef,
844 flush_interval: Duration,
845 worker_idle_timeout: Duration,
846 max_batch_rows: usize,
847 flush_semaphore: Arc<Semaphore>,
848) {
849 tokio::spawn(async move {
850 let mut batch = None;
851 let mut interval = tokio::time::interval(flush_interval);
852 let mut shutdown_rx = shutdown.subscribe();
853 let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
854 let idle_timer = tokio::time::sleep_until(idle_deadline);
855 tokio::pin!(idle_timer);
856
857 loop {
858 tokio::select! {
859 cmd = rx.recv() => {
860 match cmd {
861 Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
862 idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
863
864 let pending_batch = batch.get_or_insert_with(||{
865 PENDING_BATCHES.inc();
866 PendingBatch::new(ctx)
867 });
868
869 pending_batch.waiters.push(FlushWaiter { response_tx, _permit });
870
871 for (table_name, table_id, record_batch) in table_batches {
872 let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
873 table_name,
874 table_id,
875 batches: Vec::new(),
876 row_count: 0,
877 });
878 entry.row_count += record_batch.num_rows();
879 entry.batches.push(record_batch);
880 }
881
882 pending_batch.total_row_count += total_rows;
883 PENDING_ROWS.add(total_rows as i64);
884
885 if pending_batch.total_row_count >= max_batch_rows
886 && let Some(flush) = drain_batch(&mut batch) {
887 spawn_flush(
888 flush,
889 partition_manager.clone(),
890 node_manager.clone(),
891 catalog_manager.clone(),
892 flush_semaphore.clone(),
893 ).await;
894 }
895 }
896 None => {
897 if let Some(flush) = drain_batch(&mut batch) {
898 flush_batch(
899 flush,
900 partition_manager.clone(),
901 node_manager.clone(),
902 catalog_manager.clone(),
903 ).await;
904 }
905 break;
906 }
907 }
908 }
909 _ = &mut idle_timer => {
910 if !should_close_worker_on_idle_timeout(
911 batch.as_ref().map_or(0, |batch| batch.total_row_count),
912 rx.len(),
913 ) {
914 idle_timer
915 .as_mut()
916 .reset(tokio::time::Instant::now() + worker_idle_timeout);
917 continue;
918 }
919
920 debug!(
921 "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
922 key.catalog,
923 key.schema,
924 key.physical_table
925 );
926 break;
927 }
928 _ = interval.tick() => {
929 if batch
930 .as_ref()
931 .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval)
932 && let Some(flush) = drain_batch(&mut batch) {
933 spawn_flush(
934 flush,
935 partition_manager.clone(),
936 node_manager.clone(),
937 catalog_manager.clone(),
938 flush_semaphore.clone(),
939 ).await;
940 }
941 }
942 _ = shutdown_rx.recv() => {
943 if let Some(flush) = drain_batch(&mut batch) {
944 flush_batch(
945 flush,
946 partition_manager.clone(),
947 node_manager.clone(),
948 catalog_manager.clone(),
949 ).await;
950 }
951 break;
952 }
953 }
954 }
955
956 remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
957 });
958}
959
960fn remove_worker_if_same_channel(
961 workers: &DashMap<BatchKey, PendingWorker>,
962 key: &BatchKey,
963 worker_tx: &mpsc::Sender<WorkerCommand>,
964) -> bool {
965 if let Some(worker) = workers.get(key)
966 && worker.tx.same_channel(worker_tx)
967 {
968 drop(worker);
969 workers.remove(key);
970 PENDING_WORKERS.set(workers.len() as i64);
971 return true;
972 }
973
974 false
975}
976
977fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
978 total_row_count == 0 && queued_requests == 0
979}
980
981fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
982 let batch = batch.take()?;
983 let total_row_count = batch.total_row_count;
984
985 if total_row_count == 0 {
986 return None;
987 }
988
989 let table_batches = batch.tables.into_values().collect();
990 let waiters = batch.waiters;
991
992 PENDING_ROWS.sub(total_row_count as i64);
993 PENDING_BATCHES.dec();
994
995 Some(FlushBatch {
996 table_batches,
997 total_row_count,
998 ctx: batch.ctx,
999 waiters,
1000 })
1001}
1002
1003async fn spawn_flush(
1004 flush: FlushBatch,
1005 partition_manager: PartitionRuleManagerRef,
1006 node_manager: NodeManagerRef,
1007 catalog_manager: CatalogManagerRef,
1008 semaphore: Arc<Semaphore>,
1009) {
1010 match semaphore.acquire_owned().await {
1011 Ok(permit) => {
1012 tokio::spawn(async move {
1013 let _permit = permit;
1014 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1015 });
1016 }
1017 Err(err) => {
1018 warn!(err; "Flush semaphore closed, flushing inline");
1019 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1020 }
1021 }
1022}
1023
1024struct FlushRegionWrite {
1025 datanode: Peer,
1026 request: RegionRequest,
1027}
1028
1029struct PlannedRegionBatch {
1030 region_id: RegionId,
1031 batch: RecordBatch,
1032}
1033
1034#[cfg(test)]
1035impl PlannedRegionBatch {
1036 fn num_rows(&self) -> usize {
1037 self.batch.num_rows()
1038 }
1039}
1040
1041struct ResolvedRegionBatch {
1042 planned: PlannedRegionBatch,
1043 datanode: Peer,
1044}
1045
1046fn should_dispatch_concurrently(region_write_count: usize) -> bool {
1047 region_write_count > 1
1048}
1049
1050fn columns_taxonomy(
1059 batch_schema: &Arc<ArrowSchema>,
1060 table_name: &str,
1061 name_to_ids: &HashMap<String, u32>,
1062 partition_columns: &HashSet<&str>,
1063) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
1064 let mut tag_columns = Vec::new();
1065 let mut essential_column_indices =
1066 SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
1067 essential_column_indices.push(0);
1069 essential_column_indices.push(0);
1070
1071 let mut timestamp_index = None;
1072 let mut value_index = None;
1073
1074 for (index, field) in batch_schema.fields().iter().enumerate() {
1075 match field.data_type() {
1076 ArrowDataType::Utf8 => {
1077 let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
1078 error::InvalidPromRemoteRequestSnafu {
1079 msg: format!(
1080 "Column '{}' from logical table '{}' not found in physical table column IDs",
1081 field.name(),
1082 table_name
1083 ),
1084 }
1085 })?;
1086 tag_columns.push(TagColumnInfo {
1087 name: field.name().clone(),
1088 index,
1089 column_id,
1090 });
1091
1092 if partition_columns.contains(field.name().as_str()) {
1093 essential_column_indices.push(index);
1094 }
1095 }
1096 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
1097 ensure!(
1098 timestamp_index.replace(index).is_none(),
1099 error::InvalidPromRemoteRequestSnafu {
1100 msg: format!(
1101 "Duplicated timestamp column in logical table '{}' batch schema",
1102 table_name
1103 ),
1104 }
1105 );
1106 }
1107 ArrowDataType::Float64 => {
1108 ensure!(
1109 value_index.replace(index).is_none(),
1110 error::InvalidPromRemoteRequestSnafu {
1111 msg: format!(
1112 "Duplicated value column in logical table '{}' batch schema",
1113 table_name
1114 ),
1115 }
1116 );
1117 }
1118 datatype => {
1119 return error::InvalidPromRemoteRequestSnafu {
1120 msg: format!(
1121 "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1122 table_name
1123 ),
1124 }
1125 .fail();
1126 }
1127 }
1128 }
1129
1130 let timestamp_index =
1131 timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1132 msg: format!(
1133 "Missing essential column '{}' in logical table '{}' batch schema",
1134 greptime_timestamp(),
1135 table_name
1136 ),
1137 })?;
1138 let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1139 msg: format!(
1140 "Missing essential column '{}' in logical table '{}' batch schema",
1141 greptime_value(),
1142 table_name
1143 ),
1144 })?;
1145
1146 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1147
1148 essential_column_indices[0] = timestamp_index;
1149 essential_column_indices[1] = value_index;
1150
1151 Ok((tag_columns, essential_column_indices))
1152}
1153
1154fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1155 ensure!(
1156 batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1157 error::InternalSnafu {
1158 err_msg: format!(
1159 "Expected at least {} columns in physical batch, got {}",
1160 PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1161 batch.num_columns()
1162 ),
1163 }
1164 );
1165 let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1166 batch.project(&essential_indices).context(error::ArrowSnafu)
1167}
1168
1169async fn flush_region_writes_concurrently(
1170 node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1171 writes: Vec<FlushRegionWrite>,
1172) -> Result<()> {
1173 if !should_dispatch_concurrently(writes.len()) {
1174 for write in writes {
1175 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1176 .with_label_values(&["flush_write_region"])
1177 .start_timer();
1178 node_manager.handle(&write.datanode, write.request).await?;
1179 }
1180 return Ok(());
1181 }
1182
1183 let write_futures = writes.into_iter().map(|write| async move {
1184 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1185 .with_label_values(&["flush_write_region"])
1186 .start_timer();
1187
1188 node_manager.handle(&write.datanode, write.request).await?;
1189 Ok::<_, Error>(())
1190 });
1191
1192 futures::future::try_join_all(write_futures).await?;
1194 Ok(())
1195}
1196
1197async fn flush_batch(
1198 flush: FlushBatch,
1199 partition_manager: PartitionRuleManagerRef,
1200 node_manager: NodeManagerRef,
1201 catalog_manager: CatalogManagerRef,
1202) {
1203 let FlushBatch {
1204 table_batches,
1205 total_row_count,
1206 ctx,
1207 waiters,
1208 } = flush;
1209 let start = Instant::now();
1210
1211 let physical_table_name = ctx
1214 .extension(PHYSICAL_TABLE_KEY)
1215 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1216 .to_string();
1217 let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager };
1218 let node_requester = NodeManagerPhysicalFlushAdapter { node_manager };
1219 let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager };
1220 let result = flush_batch_physical(
1221 &table_batches,
1222 &physical_table_name,
1223 &ctx,
1224 &partition_provider,
1225 &node_requester,
1226 &catalog_provider,
1227 )
1228 .await;
1229
1230 let elapsed = start.elapsed().as_secs_f64();
1231 FLUSH_ELAPSED.observe(elapsed);
1232
1233 if result.is_err() {
1234 FLUSH_FAILURES.inc();
1235 FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
1236 } else {
1237 FLUSH_TOTAL.inc();
1238 FLUSH_ROWS.observe(total_row_count as f64);
1239 }
1240
1241 debug!(
1242 "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1243 total_row_count, elapsed
1244 );
1245
1246 notify_waiters(waiters, result);
1247}
1248
1249pub async fn flush_batch_physical(
1260 table_batches: &[TableBatch],
1261 physical_table_name: &str,
1262 ctx: &QueryContextRef,
1263 partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1264 node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1265 catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
1266) -> Result<()> {
1267 let physical_table = {
1269 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1270 .with_label_values(&["flush_physical_resolve_table"])
1271 .start_timer();
1272 catalog_manager
1273 .physical_table(
1274 ctx.current_catalog(),
1275 &ctx.current_schema(),
1276 physical_table_name,
1277 ctx.as_ref(),
1278 )
1279 .await?
1280 .with_context(|| error::InternalSnafu {
1281 err_msg: format!(
1282 "Physical table '{}' not found during pending flush",
1283 physical_table_name
1284 ),
1285 })?
1286 };
1287
1288 let physical_table_info = physical_table.table_info;
1289 let name_to_ids = physical_table
1290 .col_name_to_ids
1291 .with_context(|| error::InternalSnafu {
1292 err_msg: format!(
1293 "Physical table '{}' has no column IDs for pending flush",
1294 physical_table_name
1295 ),
1296 })?;
1297
1298 let partition_rule = {
1300 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1301 .with_label_values(&["flush_physical_fetch_partition_rule"])
1302 .start_timer();
1303 partition_manager
1304 .find_table_partition_rule(physical_table_info.as_ref())
1305 .await?
1306 };
1307 let partition_columns = partition_rule.partition_columns();
1308 let partition_columns_set: HashSet<&str> =
1309 partition_columns.iter().map(String::as_str).collect();
1310
1311 let modified_batches =
1313 transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?;
1314
1315 let combined_batch = concat_modified_batches(&modified_batches)?;
1317
1318 let physical_table_id = physical_table_info.table_id();
1320 let planned_batches = plan_region_batches(
1321 combined_batch,
1322 physical_table_id,
1323 partition_rule.as_ref(),
1324 partition_columns,
1325 )?;
1326
1327 let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?;
1328 let region_writes = encode_region_write_requests(resolved_batches)?;
1329 flush_region_writes_concurrently(node_manager, region_writes).await
1330}
1331
1332fn transform_logical_batches_to_physical(
1337 table_batches: &[TableBatch],
1338 name_to_ids: &HashMap<String, u32>,
1339 partition_columns_set: &HashSet<&str>,
1340) -> Result<Vec<RecordBatch>> {
1341 let mut modified_batches: Vec<RecordBatch> =
1342 Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum());
1343
1344 let mut modify_elapsed = Duration::ZERO;
1345 let mut columns_taxonomy_elapsed = Duration::ZERO;
1346
1347 for table_batch in table_batches {
1348 let table_id = table_batch.table_id;
1349
1350 for batch in &table_batch.batches {
1351 let batch_schema = batch.schema();
1352 let start = Instant::now();
1353 let (tag_columns, essential_col_indices) = columns_taxonomy(
1354 &batch_schema,
1355 &table_batch.table_name,
1356 name_to_ids,
1357 partition_columns_set,
1358 )?;
1359
1360 columns_taxonomy_elapsed += start.elapsed();
1361 if tag_columns.is_empty() && essential_col_indices.is_empty() {
1362 continue;
1363 }
1364
1365 let modified = {
1366 let start = Instant::now();
1367 let batch = modify_batch_sparse(
1368 batch.clone(),
1369 table_id,
1370 &tag_columns,
1371 &essential_col_indices,
1372 )?;
1373 modify_elapsed += start.elapsed();
1374 batch
1375 };
1376
1377 modified_batches.push(modified);
1378 }
1379 }
1380
1381 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1382 .with_label_values(&["flush_physical_modify_batch"])
1383 .observe(modify_elapsed.as_secs_f64());
1384 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1385 .with_label_values(&["flush_physical_columns_taxonomy"])
1386 .observe(columns_taxonomy_elapsed.as_secs_f64());
1387
1388 ensure!(
1389 !modified_batches.is_empty(),
1390 error::InternalSnafu {
1391 err_msg: "No batches can be transformed during pending flush",
1392 }
1393 );
1394 Ok(modified_batches)
1395}
1396
1397fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result<RecordBatch> {
1401 let combined_schema = modified_batches[0].schema();
1402 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1403 .with_label_values(&["flush_physical_concat_all"])
1404 .start_timer();
1405 concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu)
1406}
1407
1408fn split_combined_batch_by_region(
1409 combined_batch: &RecordBatch,
1410 partition_rule: &dyn partition::partition::PartitionRule,
1411) -> Result<HashMap<u32, partition::partition::RegionMask>> {
1412 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1413 .with_label_values(&["flush_physical_split_record_batch"])
1414 .start_timer();
1415 let map = partition_rule.split_record_batch(combined_batch)?;
1416 Ok(map)
1417}
1418
1419fn prepare_physical_region_routing_batch(
1420 combined_batch: RecordBatch,
1421 partition_columns: &[String],
1422) -> Result<RecordBatch> {
1423 if partition_columns.is_empty() {
1424 return Ok(combined_batch);
1425 }
1426 strip_partition_columns_from_batch(combined_batch)
1427}
1428
1429fn plan_region_batch(
1430 stripped_batch: &RecordBatch,
1431 physical_table_id: TableId,
1432 region_number: u32,
1433 mask: &partition::partition::RegionMask,
1434) -> Result<Option<PlannedRegionBatch>> {
1435 if mask.select_none() {
1436 return Ok(None);
1437 }
1438
1439 let region_batch = if mask.select_all() {
1440 stripped_batch.clone()
1441 } else {
1442 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1443 .with_label_values(&["flush_physical_filter_record_batch"])
1444 .start_timer();
1445 filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)?
1446 };
1447
1448 let row_count = region_batch.num_rows();
1449 if row_count == 0 {
1450 return Ok(None);
1451 }
1452
1453 Ok(Some(PlannedRegionBatch {
1454 region_id: RegionId::new(physical_table_id, region_number),
1455 batch: region_batch,
1456 }))
1457}
1458
1459fn plan_region_batches(
1460 combined_batch: RecordBatch,
1461 physical_table_id: TableId,
1462 partition_rule: &dyn partition::partition::PartitionRule,
1463 partition_columns: &[String],
1464) -> Result<Vec<PlannedRegionBatch>> {
1465 let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?;
1466 let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?;
1467
1468 let mut planned_batches = Vec::new();
1469 for (region_number, mask) in region_masks {
1470 if let Some(planned_batch) =
1471 plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)?
1472 {
1473 planned_batches.push(planned_batch);
1474 }
1475 }
1476
1477 Ok(planned_batches)
1478}
1479
1480async fn resolve_region_targets(
1481 planned_batches: Vec<PlannedRegionBatch>,
1482 partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1483) -> Result<Vec<ResolvedRegionBatch>> {
1484 let mut resolved_batches = Vec::with_capacity(planned_batches.len());
1485 for planned in planned_batches {
1486 let datanode = {
1487 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1488 .with_label_values(&["flush_physical_resolve_region_leader"])
1489 .start_timer();
1490 partition_manager
1491 .find_region_leader(planned.region_id)
1492 .await?
1493 };
1494
1495 resolved_batches.push(ResolvedRegionBatch { planned, datanode });
1496 }
1497
1498 Ok(resolved_batches)
1499}
1500
1501fn encode_region_write_requests(
1502 resolved_batches: Vec<ResolvedRegionBatch>,
1503) -> Result<Vec<FlushRegionWrite>> {
1504 let mut region_writes = Vec::with_capacity(resolved_batches.len());
1505 for resolved in resolved_batches {
1506 let region_id = resolved.planned.region_id;
1507 let (schema_bytes, data_header, payload) = {
1508 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1509 .with_label_values(&["flush_physical_encode_ipc"])
1510 .start_timer();
1511 record_batch_to_ipc(resolved.planned.batch)?
1512 };
1513
1514 let request = RegionRequest {
1515 header: Some(RegionRequestHeader {
1516 tracing_context: TracingContext::from_current_span().to_w3c(),
1517 ..Default::default()
1518 }),
1519 body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1520 region_id: region_id.as_u64(),
1521 partition_expr_version: None,
1522 body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1523 schema: schema_bytes,
1524 data_header,
1525 payload,
1526 })),
1527 })),
1528 };
1529
1530 region_writes.push(FlushRegionWrite {
1531 datanode: resolved.datanode,
1532 request,
1533 });
1534 }
1535
1536 Ok(region_writes)
1537}
1538
1539fn notify_waiters(waiters: Vec<FlushWaiter>, result: Result<()>) {
1540 let shared_result = result.map_err(Arc::new);
1541 for waiter in waiters {
1542 let _ = waiter.response_tx.send(match &shared_result {
1543 Ok(()) => Ok(()),
1544 Err(error) => Err(Arc::clone(error)),
1545 });
1546 }
1548}
1549
1550fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1551 let mut encoder = FlightEncoder::default();
1552 let schema = encoder.encode_schema(record_batch.schema().as_ref());
1553 let mut iter = encoder
1554 .encode(FlightMessage::RecordBatch(record_batch))
1555 .into_iter();
1556 let Some(flight_data) = iter.next() else {
1557 return Err(Error::Internal {
1558 err_msg: "Failed to encode empty flight data".to_string(),
1559 });
1560 };
1561 if iter.next().is_some() {
1562 return Err(Error::NotSupported {
1563 feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1564 });
1565 }
1566
1567 Ok((
1568 schema.data_header,
1569 flight_data.data_header,
1570 flight_data.data_body,
1571 ))
1572}
1573
1574#[cfg(test)]
1575mod tests {
1576 use std::collections::{HashMap, HashSet};
1577 use std::sync::Arc;
1578 use std::sync::atomic::{AtomicUsize, Ordering};
1579 use std::time::{Duration, Instant};
1580
1581 use api::region::RegionResponse;
1582 use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1583 use api::v1::meta::Peer;
1584 use api::v1::region::{InsertRequests, RegionRequest, region_request};
1585 use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1586 use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray};
1587 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1588 use arrow::record_batch::RecordBatch;
1589 use async_trait::async_trait;
1590 use catalog::error::Result as CatalogResult;
1591 use common_meta::error::Result as MetaResult;
1592 use common_meta::node_manager::{
1593 Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1594 };
1595 use common_query::request::QueryRequest;
1596 use common_recordbatch::SendableRecordBatchStream;
1597 use dashmap::DashMap;
1598 use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
1599 use partition::error::Result as PartitionResult;
1600 use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
1601 use smallvec::SmallVec;
1602 use snafu::ResultExt;
1603 use store_api::storage::RegionId;
1604 use table::metadata::TableId;
1605 use table::test_util::table_info::test_table_info;
1606 use tokio::sync::{Semaphore, mpsc, oneshot};
1607 use tokio::time::sleep;
1608
1609 use super::{
1610 BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher,
1611 PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester,
1612 PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch,
1613 ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch,
1614 encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently,
1615 plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1616 should_dispatch_concurrently, strip_partition_columns_from_batch,
1617 transform_logical_batches_to_physical,
1618 };
1619 use crate::error;
1620
1621 fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1622 Rows {
1623 schema: vec![ColumnSchema {
1624 column_name: schema_name.to_string(),
1625 ..Default::default()
1626 }],
1627 rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1628 }
1629 }
1630
1631 fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch {
1632 let schema = Arc::new(ArrowSchema::new(vec![
1633 Field::new(
1634 "greptime_timestamp",
1635 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1636 false,
1637 ),
1638 Field::new("greptime_value", ArrowDataType::Float64, true),
1639 Field::new(tag_name, ArrowDataType::Utf8, true),
1640 ]));
1641
1642 RecordBatch::try_new(
1643 schema,
1644 vec![
1645 Arc::new(TimestampMillisecondArray::from(vec![ts])),
1646 Arc::new(arrow::array::Float64Array::from(vec![val])),
1647 Arc::new(StringArray::from(vec![tag_value])),
1648 ],
1649 )
1650 .unwrap()
1651 }
1652
1653 fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata {
1654 let schema = Arc::new(
1655 DtSchema::try_new(vec![
1656 DtColumnSchema::new(
1657 "__primary_key",
1658 datatypes::prelude::ConcreteDataType::binary_datatype(),
1659 false,
1660 ),
1661 DtColumnSchema::new(
1662 "greptime_timestamp",
1663 datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(),
1664 false,
1665 ),
1666 DtColumnSchema::new(
1667 "greptime_value",
1668 datatypes::prelude::ConcreteDataType::float64_datatype(),
1669 true,
1670 ),
1671 DtColumnSchema::new(
1672 "tag1",
1673 datatypes::prelude::ConcreteDataType::string_datatype(),
1674 true,
1675 ),
1676 ])
1677 .unwrap(),
1678 );
1679 let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema);
1680 table_info.meta.column_ids = vec![0, 1, 2, 3];
1681
1682 PhysicalTableMetadata {
1683 table_info: Arc::new(table_info),
1684 col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])),
1685 }
1686 }
1687
1688 struct MockFlushCatalogProvider {
1689 table: Option<PhysicalTableMetadata>,
1690 }
1691
1692 #[async_trait]
1693 impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider {
1694 async fn physical_table(
1695 &self,
1696 _catalog: &str,
1697 _schema: &str,
1698 _table_name: &str,
1699 _query_ctx: &session::context::QueryContext,
1700 ) -> CatalogResult<Option<PhysicalTableMetadata>> {
1701 Ok(self.table.clone())
1702 }
1703 }
1704
1705 struct SingleRegionPartitionRule;
1706
1707 impl PartitionRule for SingleRegionPartitionRule {
1708 fn as_any(&self) -> &dyn std::any::Any {
1709 self
1710 }
1711
1712 fn partition_columns(&self) -> &[String] {
1713 &[]
1714 }
1715
1716 fn find_region(
1717 &self,
1718 _values: &[datatypes::prelude::Value],
1719 ) -> partition::error::Result<store_api::storage::RegionNumber> {
1720 unimplemented!()
1721 }
1722
1723 fn split_record_batch(
1724 &self,
1725 record_batch: &RecordBatch,
1726 ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1727 {
1728 Ok(HashMap::from([(
1729 1,
1730 RegionMask::new(
1731 arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]),
1732 record_batch.num_rows(),
1733 ),
1734 )]))
1735 }
1736 }
1737
1738 struct TwoRegionPartitionRule {
1739 partition_columns: Vec<String>,
1740 }
1741
1742 impl PartitionRule for TwoRegionPartitionRule {
1743 fn as_any(&self) -> &dyn std::any::Any {
1744 self
1745 }
1746
1747 fn partition_columns(&self) -> &[String] {
1748 &self.partition_columns
1749 }
1750
1751 fn find_region(
1752 &self,
1753 _values: &[datatypes::prelude::Value],
1754 ) -> partition::error::Result<store_api::storage::RegionNumber> {
1755 unimplemented!()
1756 }
1757
1758 fn split_record_batch(
1759 &self,
1760 _record_batch: &RecordBatch,
1761 ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1762 {
1763 Ok(HashMap::from([
1764 (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)),
1765 (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)),
1766 (
1767 3,
1768 RegionMask::new(BooleanArray::from(vec![false, false]), 0),
1769 ),
1770 ]))
1771 }
1772 }
1773
1774 struct MockFlushPartitionProvider {
1775 partition_rule_calls: Arc<AtomicUsize>,
1776 region_leader_calls: Arc<AtomicUsize>,
1777 }
1778
1779 #[async_trait]
1780 impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider {
1781 async fn find_table_partition_rule(
1782 &self,
1783 _table_info: &table::metadata::TableInfo,
1784 ) -> PartitionResult<PartitionRuleRef> {
1785 self.partition_rule_calls.fetch_add(1, Ordering::SeqCst);
1786 Ok(Arc::new(SingleRegionPartitionRule))
1787 }
1788
1789 async fn find_region_leader(&self, _region_id: RegionId) -> error::Result<Peer> {
1790 self.region_leader_calls.fetch_add(1, Ordering::SeqCst);
1791 Ok(Peer {
1792 id: 1,
1793 addr: "node-1".to_string(),
1794 })
1795 }
1796 }
1797
1798 #[derive(Default)]
1799 struct MockFlushNodeRequester {
1800 writes: Arc<AtomicUsize>,
1801 }
1802
1803 #[async_trait]
1804 impl PhysicalFlushNodeRequester for MockFlushNodeRequester {
1805 async fn handle(
1806 &self,
1807 _peer: &Peer,
1808 _request: RegionRequest,
1809 ) -> error::Result<RegionResponse> {
1810 self.writes.fetch_add(1, Ordering::SeqCst);
1811 Ok(RegionResponse::new(0))
1812 }
1813 }
1814
1815 #[test]
1816 fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1817 let requests = RowInsertRequests {
1818 inserts: vec![
1819 RowInsertRequest {
1820 table_name: "cpu".to_string(),
1821 rows: Some(mock_rows(2, "host")),
1822 },
1823 RowInsertRequest {
1824 table_name: "mem".to_string(),
1825 rows: Some(mock_rows(0, "host")),
1826 },
1827 RowInsertRequest {
1828 table_name: "disk".to_string(),
1829 rows: None,
1830 },
1831 ],
1832 };
1833
1834 let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1835
1836 assert_eq!(2, total_rows);
1837 assert_eq!(1, table_rows.len());
1838 assert_eq!("cpu", table_rows[0].0);
1839 assert_eq!(2, table_rows[0].1.rows.len());
1840 }
1841
1842 #[test]
1843 fn test_drain_batch_takes_initialized_pending_batch_from_option() {
1844 let ctx = session::context::QueryContext::arc();
1845 let (response_tx, _response_rx) = oneshot::channel();
1846 let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap();
1847 let mut batch = Some(PendingBatch {
1848 tables: HashMap::from([(
1849 "cpu".to_string(),
1850 TableBatch {
1851 table_name: "cpu".to_string(),
1852 table_id: 42,
1853 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
1854 row_count: 1,
1855 },
1856 )]),
1857 created_at: Instant::now(),
1858 total_row_count: 1,
1859 ctx: ctx.clone(),
1860 waiters: vec![FlushWaiter {
1861 response_tx,
1862 _permit: permit,
1863 }],
1864 });
1865
1866 let flush = drain_batch(&mut batch).unwrap();
1867
1868 assert!(batch.is_none());
1869 assert_eq!(1, flush.total_row_count);
1870 assert_eq!(1, flush.table_batches.len());
1871 assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
1872 }
1873
1874 #[derive(Clone)]
1875 struct ConcurrentMockDatanode {
1876 delay: Duration,
1877 inflight: Arc<AtomicUsize>,
1878 max_inflight: Arc<AtomicUsize>,
1879 }
1880
1881 #[async_trait]
1882 impl Datanode for ConcurrentMockDatanode {
1883 async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1884 let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1885 loop {
1886 let max = self.max_inflight.load(Ordering::SeqCst);
1887 if now <= max {
1888 break;
1889 }
1890 if self
1891 .max_inflight
1892 .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1893 .is_ok()
1894 {
1895 break;
1896 }
1897 }
1898
1899 sleep(self.delay).await;
1900 self.inflight.fetch_sub(1, Ordering::SeqCst);
1901 Ok(RegionResponse::new(0))
1902 }
1903
1904 async fn handle_query(
1905 &self,
1906 _request: QueryRequest,
1907 ) -> MetaResult<SendableRecordBatchStream> {
1908 unimplemented!()
1909 }
1910 }
1911
1912 #[derive(Clone)]
1913 struct ConcurrentMockNodeManager {
1914 datanodes: Arc<HashMap<u64, DatanodeRef>>,
1915 }
1916
1917 #[async_trait]
1918 impl DatanodeManager for ConcurrentMockNodeManager {
1919 async fn datanode(&self, node: &Peer) -> DatanodeRef {
1920 self.datanodes
1921 .get(&node.id)
1922 .expect("datanode not found")
1923 .clone()
1924 }
1925 }
1926
1927 struct NoopFlownode;
1928
1929 #[async_trait]
1930 impl Flownode for NoopFlownode {
1931 async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1932 unimplemented!()
1933 }
1934
1935 async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1936 unimplemented!()
1937 }
1938
1939 async fn handle_mark_window_dirty(
1940 &self,
1941 _req: DirtyWindowRequests,
1942 ) -> MetaResult<FlowResponse> {
1943 unimplemented!()
1944 }
1945 }
1946
1947 #[async_trait]
1948 impl FlownodeManager for ConcurrentMockNodeManager {
1949 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1950 Arc::new(NoopFlownode)
1951 }
1952 }
1953
1954 #[async_trait]
1955 impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager {
1956 async fn handle(
1957 &self,
1958 peer: &Peer,
1959 request: RegionRequest,
1960 ) -> error::Result<RegionResponse> {
1961 let datanode = self.datanode(peer).await;
1962 datanode
1963 .handle(request)
1964 .await
1965 .context(error::CommonMetaSnafu)
1966 }
1967 }
1968
1969 #[test]
1970 fn test_remove_worker_if_same_channel_removes_matching_entry() {
1971 let workers = DashMap::new();
1972 let key = BatchKey {
1973 catalog: "greptime".to_string(),
1974 schema: "public".to_string(),
1975 physical_table: "phy".to_string(),
1976 };
1977
1978 let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
1979 workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
1980
1981 assert!(remove_worker_if_same_channel(&workers, &key, &tx));
1982 assert!(!workers.contains_key(&key));
1983 }
1984
1985 #[test]
1986 fn test_remove_worker_if_same_channel_keeps_newer_entry() {
1987 let workers = DashMap::new();
1988 let key = BatchKey {
1989 catalog: "greptime".to_string(),
1990 schema: "public".to_string(),
1991 physical_table: "phy".to_string(),
1992 };
1993
1994 let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
1995 let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
1996 workers.insert(
1997 key.clone(),
1998 PendingWorker {
1999 tx: fresh_tx.clone(),
2000 },
2001 );
2002
2003 assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
2004 assert!(workers.contains_key(&key));
2005 assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
2006 }
2007
2008 #[test]
2009 fn test_worker_idle_timeout_close_decision() {
2010 assert!(should_close_worker_on_idle_timeout(0, 0));
2011 assert!(!should_close_worker_on_idle_timeout(1, 0));
2012 assert!(!should_close_worker_on_idle_timeout(0, 1));
2013 }
2014
2015 #[tokio::test]
2016 async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
2017 let inflight = Arc::new(AtomicUsize::new(0));
2018 let max_inflight = Arc::new(AtomicUsize::new(0));
2019 let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2020 delay: Duration::from_millis(100),
2021 inflight: inflight.clone(),
2022 max_inflight: max_inflight.clone(),
2023 });
2024 let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2025 delay: Duration::from_millis(100),
2026 inflight,
2027 max_inflight: max_inflight.clone(),
2028 });
2029
2030 let mut datanodes = HashMap::new();
2031 datanodes.insert(1, datanode1);
2032 datanodes.insert(2, datanode2);
2033 let node_manager = Arc::new(ConcurrentMockNodeManager {
2034 datanodes: Arc::new(datanodes),
2035 });
2036
2037 let writes = vec![
2038 FlushRegionWrite {
2039 datanode: Peer {
2040 id: 1,
2041 addr: "node1".to_string(),
2042 },
2043 request: RegionRequest::default(),
2044 },
2045 FlushRegionWrite {
2046 datanode: Peer {
2047 id: 2,
2048 addr: "node2".to_string(),
2049 },
2050 request: RegionRequest::default(),
2051 },
2052 ];
2053
2054 flush_region_writes_concurrently(node_manager.as_ref(), writes)
2055 .await
2056 .unwrap();
2057 assert!(max_inflight.load(Ordering::SeqCst) >= 2);
2058 }
2059
2060 #[test]
2061 fn test_should_dispatch_concurrently_by_region_count() {
2062 assert!(!should_dispatch_concurrently(0));
2063 assert!(!should_dispatch_concurrently(1));
2064 assert!(should_dispatch_concurrently(2));
2065 }
2066
2067 #[test]
2068 fn test_strip_partition_columns_from_batch_removes_partition_tags() {
2069 let batch = RecordBatch::try_new(
2070 Arc::new(ArrowSchema::new(vec![
2071 Field::new("__primary_key", ArrowDataType::Binary, false),
2072 Field::new(
2073 "greptime_timestamp",
2074 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2075 false,
2076 ),
2077 Field::new("greptime_value", ArrowDataType::Float64, true),
2078 Field::new("host", ArrowDataType::Utf8, true),
2079 ])),
2080 vec![
2081 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2082 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2083 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2084 Arc::new(StringArray::from(vec!["node-1"])),
2085 ],
2086 )
2087 .unwrap();
2088
2089 let stripped = strip_partition_columns_from_batch(batch).unwrap();
2090
2091 assert_eq!(3, stripped.num_columns());
2092 assert_eq!("__primary_key", stripped.schema().field(0).name());
2093 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2094 assert_eq!("greptime_value", stripped.schema().field(2).name());
2095 }
2096
2097 #[test]
2098 fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
2099 let batch = RecordBatch::try_new(
2100 Arc::new(ArrowSchema::new(vec![
2101 Field::new("__primary_key", ArrowDataType::Binary, false),
2102 Field::new(
2103 "greptime_timestamp",
2104 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2105 false,
2106 ),
2107 Field::new("greptime_value", ArrowDataType::Float64, true),
2108 Field::new("host", ArrowDataType::Utf8, true),
2109 ])),
2110 vec![
2111 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2112 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2113 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2114 Arc::new(StringArray::from(vec!["node-1"])),
2115 ],
2116 )
2117 .unwrap();
2118
2119 let stripped = strip_partition_columns_from_batch(batch).unwrap();
2120
2121 assert_eq!(3, stripped.num_columns());
2122 assert_eq!("__primary_key", stripped.schema().field(0).name());
2123 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2124 assert_eq!("greptime_value", stripped.schema().field(2).name());
2125 }
2126
2127 #[test]
2128 fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
2129 let schema = Arc::new(ArrowSchema::new(vec![
2130 Field::new(
2131 "greptime_timestamp",
2132 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2133 false,
2134 ),
2135 Field::new("greptime_value", ArrowDataType::Float64, true),
2136 Field::new("host", ArrowDataType::Utf8, true),
2137 Field::new("region", ArrowDataType::Utf8, true),
2138 ]));
2139 let name_to_ids =
2140 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2141 let partition_columns = HashSet::from(["host"]);
2142
2143 let (tag_columns, non_tag_indices) =
2144 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2145
2146 assert_eq!(2, tag_columns.len());
2147 assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
2148 }
2149
2150 #[test]
2151 fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
2152 let schema = Arc::new(ArrowSchema::new(vec![
2153 Field::new("host", ArrowDataType::Utf8, true),
2154 Field::new("greptime_value", ArrowDataType::Float64, true),
2155 Field::new(
2156 "greptime_timestamp",
2157 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2158 false,
2159 ),
2160 Field::new("region", ArrowDataType::Utf8, true),
2161 ]));
2162 let name_to_ids =
2163 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2164 let partition_columns = HashSet::from(["host", "region"]);
2165
2166 let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
2167 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2168
2169 assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
2170 }
2171
2172 #[test]
2173 fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
2174 let schema = Arc::new(ArrowSchema::new(vec![
2175 Field::new(
2176 "greptime_timestamp",
2177 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2178 false,
2179 ),
2180 Field::new("greptime_value", ArrowDataType::Float64, true),
2181 Field::new("host", ArrowDataType::Utf8, true),
2182 Field::new("invalid", ArrowDataType::Boolean, true),
2183 ]));
2184 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2185 let partition_columns = HashSet::from(["host"]);
2186
2187 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2188
2189 assert!(matches!(
2190 result,
2191 Err(Error::InvalidPromRemoteRequest { .. })
2192 ));
2193 }
2194
2195 #[test]
2196 fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
2197 let schema = Arc::new(ArrowSchema::new(vec![
2198 Field::new("greptime_timestamp", ArrowDataType::Int64, false),
2199 Field::new("greptime_value", ArrowDataType::Float64, true),
2200 Field::new("host", ArrowDataType::Utf8, true),
2201 ]));
2202 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2203 let partition_columns = HashSet::from(["host"]);
2204
2205 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2206
2207 assert!(matches!(
2208 result,
2209 Err(Error::InvalidPromRemoteRequest { .. })
2210 ));
2211 }
2212
2213 #[test]
2214 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
2215 let schema = Arc::new(ArrowSchema::new(vec![
2216 Field::new(
2217 "ts1",
2218 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2219 false,
2220 ),
2221 Field::new(
2222 "ts2",
2223 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2224 false,
2225 ),
2226 Field::new("greptime_value", ArrowDataType::Float64, true),
2227 Field::new("host", ArrowDataType::Utf8, true),
2228 ]));
2229 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2230 let partition_columns = HashSet::from(["host"]);
2231
2232 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2233
2234 assert!(matches!(
2235 result,
2236 Err(Error::InvalidPromRemoteRequest { .. })
2237 ));
2238 }
2239
2240 #[test]
2241 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2242 let schema = Arc::new(ArrowSchema::new(vec![
2243 Field::new(
2244 "greptime_timestamp",
2245 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2246 false,
2247 ),
2248 Field::new("value1", ArrowDataType::Float64, true),
2249 Field::new("value2", ArrowDataType::Float64, true),
2250 Field::new("host", ArrowDataType::Utf8, true),
2251 ]));
2252 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2253 let partition_columns = HashSet::from(["host"]);
2254
2255 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2256
2257 assert!(matches!(
2258 result,
2259 Err(Error::InvalidPromRemoteRequest { .. })
2260 ));
2261 }
2262
2263 #[test]
2264 fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2265 use arrow::array::BinaryArray;
2266 use metric_engine::batch_modifier::modify_batch_sparse;
2267
2268 let schema1 = Arc::new(ArrowSchema::new(vec![
2269 Field::new(
2270 "greptime_timestamp",
2271 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2272 false,
2273 ),
2274 Field::new("greptime_value", ArrowDataType::Float64, true),
2275 Field::new("tag1", ArrowDataType::Utf8, true),
2276 ]));
2277
2278 let schema2 = Arc::new(ArrowSchema::new(vec![
2279 Field::new(
2280 "greptime_timestamp",
2281 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2282 false,
2283 ),
2284 Field::new("greptime_value", ArrowDataType::Float64, true),
2285 Field::new("tag1", ArrowDataType::Utf8, true),
2286 Field::new("tag2", ArrowDataType::Utf8, true),
2287 ]));
2288 let batch2 = RecordBatch::try_new(
2289 schema2.clone(),
2290 vec![
2291 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2292 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2293 Arc::new(StringArray::from(vec!["v1"])),
2294 Arc::new(StringArray::from(vec!["v2"])),
2295 ],
2296 )
2297 .unwrap();
2298
2299 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2300 let partition_columns = HashSet::new();
2301
2302 let batch3 = RecordBatch::try_new(
2304 schema1.clone(),
2305 vec![
2306 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2307 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2308 Arc::new(StringArray::from(vec!["v1"])),
2309 ],
2310 )
2311 .unwrap();
2312
2313 let (tag_columns2, indices2) =
2316 columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2317 let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2318
2319 let (tag_columns3, indices3) =
2320 columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2321 let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2322
2323 let pk2 = modified2
2324 .column(0)
2325 .as_any()
2326 .downcast_ref::<BinaryArray>()
2327 .unwrap();
2328 let pk3 = modified3
2329 .column(0)
2330 .as_any()
2331 .downcast_ref::<BinaryArray>()
2332 .unwrap();
2333
2334 assert_ne!(
2336 pk2.value(0),
2337 pk3.value(0),
2338 "PK should be different because batch2 has tag2!"
2339 );
2340 }
2341
2342 #[test]
2343 fn test_transform_logical_batches_to_physical_success() {
2344 let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2345
2346 let table_batches = vec![TableBatch {
2347 table_name: "t1".to_string(),
2348 table_id: 1,
2349 batches: vec![batch],
2350 row_count: 1,
2351 }];
2352
2353 let name_to_ids = HashMap::from([("tag1".to_string(), 1)]);
2354 let partition_columns = HashSet::new();
2355 let modified =
2356 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2357 .unwrap();
2358
2359 assert_eq!(1, modified.len());
2360 assert_eq!(3, modified[0].num_columns());
2361 assert_eq!("__primary_key", modified[0].schema().field(0).name());
2362 assert_eq!("greptime_timestamp", modified[0].schema().field(1).name());
2363 assert_eq!("greptime_value", modified[0].schema().field(2).name());
2364 }
2365
2366 #[test]
2367 fn test_transform_logical_batches_to_physical_taxonomy_failure() {
2368 let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2369
2370 let table_batches = vec![TableBatch {
2371 table_name: "t1".to_string(),
2372 table_id: 1,
2373 batches: vec![batch],
2374 row_count: 1,
2375 }];
2376
2377 let name_to_ids = HashMap::new();
2379 let partition_columns = HashSet::new();
2380 let err =
2381 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2382 .unwrap_err();
2383
2384 assert!(
2385 err.to_string()
2386 .contains("not found in physical table column IDs")
2387 );
2388 }
2389
2390 #[test]
2391 fn test_transform_logical_batches_to_physical_multiple_batches() {
2392 let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2393 let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2394
2395 let table_batches = vec![
2396 TableBatch {
2397 table_name: "t1".to_string(),
2398 table_id: 1,
2399 batches: vec![batch1],
2400 row_count: 1,
2401 },
2402 TableBatch {
2403 table_name: "t2".to_string(),
2404 table_id: 2,
2405 batches: vec![batch2],
2406 row_count: 1,
2407 },
2408 ];
2409
2410 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2411 let partition_columns = HashSet::new();
2412 let modified =
2413 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2414 .unwrap();
2415
2416 assert_eq!(2, modified.len());
2417 }
2418
2419 #[test]
2420 fn test_transform_logical_batches_to_physical_mixed_success_failure() {
2421 let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2422 let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2423
2424 let table_batches = vec![
2425 TableBatch {
2426 table_name: "t1".to_string(),
2427 table_id: 1,
2428 batches: vec![batch1],
2429 row_count: 1,
2430 },
2431 TableBatch {
2432 table_name: "t2".to_string(),
2433 table_id: 2,
2434 batches: vec![batch2],
2435 row_count: 1,
2436 },
2437 ];
2438
2439 let name_to_ids = HashMap::from([("tag2".to_string(), 2)]);
2441 let partition_columns = HashSet::new();
2442 let err =
2443 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2444 .unwrap_err();
2445
2446 assert!(err.to_string().contains("tag1"));
2447 }
2448
2449 #[tokio::test]
2450 async fn test_flush_batch_physical_uses_mockable_trait_dependencies() {
2451 let table_batches = vec![TableBatch {
2452 table_name: "t1".to_string(),
2453 table_id: 11,
2454 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2455 row_count: 1,
2456 }];
2457 let partition_calls = Arc::new(AtomicUsize::new(0));
2458 let leader_calls = Arc::new(AtomicUsize::new(0));
2459 let node = MockFlushNodeRequester::default();
2460 let ctx = session::context::QueryContext::arc();
2461
2462 flush_batch_physical(
2463 &table_batches,
2464 "phy",
2465 &ctx,
2466 &MockFlushPartitionProvider {
2467 partition_rule_calls: partition_calls.clone(),
2468 region_leader_calls: leader_calls.clone(),
2469 },
2470 &node,
2471 &MockFlushCatalogProvider {
2472 table: Some(mock_physical_table_metadata(1024)),
2473 },
2474 )
2475 .await
2476 .unwrap();
2477
2478 assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2479 assert_eq!(1, leader_calls.load(Ordering::SeqCst));
2480 assert_eq!(1, node.writes.load(Ordering::SeqCst));
2481 }
2482
2483 #[tokio::test]
2484 async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
2485 let table_batches = vec![TableBatch {
2486 table_name: "t1".to_string(),
2487 table_id: 11,
2488 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2489 row_count: 1,
2490 }];
2491 let partition_calls = Arc::new(AtomicUsize::new(0));
2492 let leader_calls = Arc::new(AtomicUsize::new(0));
2493 let node = MockFlushNodeRequester::default();
2494 let ctx = session::context::QueryContext::arc();
2495
2496 let err = flush_batch_physical(
2497 &table_batches,
2498 "missing_phy",
2499 &ctx,
2500 &MockFlushPartitionProvider {
2501 partition_rule_calls: partition_calls.clone(),
2502 region_leader_calls: leader_calls.clone(),
2503 },
2504 &node,
2505 &MockFlushCatalogProvider { table: None },
2506 )
2507 .await
2508 .unwrap_err();
2509
2510 assert!(
2511 err.to_string()
2512 .contains("Physical table 'missing_phy' not found")
2513 );
2514 assert_eq!(0, partition_calls.load(Ordering::SeqCst));
2515 assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2516 assert_eq!(0, node.writes.load(Ordering::SeqCst));
2517 }
2518
2519 #[tokio::test]
2520 async fn test_flush_batch_physical_aborts_immediately_on_transform_error() {
2521 let table_batches = vec![
2522 TableBatch {
2523 table_name: "broken".to_string(),
2524 table_id: 11,
2525 batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)],
2526 row_count: 1,
2527 },
2528 TableBatch {
2529 table_name: "healthy".to_string(),
2530 table_id: 12,
2531 batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)],
2532 row_count: 1,
2533 },
2534 ];
2535 let partition_calls = Arc::new(AtomicUsize::new(0));
2536 let leader_calls = Arc::new(AtomicUsize::new(0));
2537 let node = MockFlushNodeRequester::default();
2538 let ctx = session::context::QueryContext::arc();
2539
2540 let err = flush_batch_physical(
2541 &table_batches,
2542 "phy",
2543 &ctx,
2544 &MockFlushPartitionProvider {
2545 partition_rule_calls: partition_calls.clone(),
2546 region_leader_calls: leader_calls.clone(),
2547 },
2548 &node,
2549 &MockFlushCatalogProvider {
2550 table: Some(mock_physical_table_metadata(1024)),
2551 },
2552 )
2553 .await
2554 .unwrap_err();
2555
2556 assert!(err.to_string().contains("unknown_tag"));
2557 assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2558 assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2559 assert_eq!(0, node.writes.load(Ordering::SeqCst));
2560 }
2561
2562 #[test]
2563 fn test_plan_region_batches_splits_and_strips_partition_columns() {
2564 let combined_batch = RecordBatch::try_new(
2565 Arc::new(ArrowSchema::new(vec![
2566 Field::new("__primary_key", ArrowDataType::Binary, false),
2567 Field::new(
2568 "greptime_timestamp",
2569 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2570 false,
2571 ),
2572 Field::new("greptime_value", ArrowDataType::Float64, true),
2573 Field::new("host", ArrowDataType::Utf8, true),
2574 ])),
2575 vec![
2576 Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])),
2577 Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])),
2578 Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])),
2579 Arc::new(StringArray::from(vec!["node-1", "node-2"])),
2580 ],
2581 )
2582 .unwrap();
2583 let mut planned_batches = plan_region_batches(
2584 combined_batch,
2585 1024,
2586 &TwoRegionPartitionRule {
2587 partition_columns: vec!["host".to_string()],
2588 },
2589 &["host".to_string()],
2590 )
2591 .unwrap();
2592 planned_batches.sort_by_key(|planned| planned.region_id.region_number());
2593
2594 assert_eq!(2, planned_batches.len());
2595 assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id);
2596 assert_eq!(1, planned_batches[0].num_rows());
2597 assert_eq!(3, planned_batches[0].batch.num_columns());
2598 assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id);
2599 assert_eq!(1, planned_batches[1].num_rows());
2600 assert_eq!(3, planned_batches[1].batch.num_columns());
2601 }
2602
2603 #[test]
2604 fn test_encode_region_write_requests_builds_bulk_insert_requests() {
2605 let planned_batch = PlannedRegionBatch {
2606 region_id: RegionId::new(1024, 1),
2607 batch: RecordBatch::try_new(
2608 Arc::new(ArrowSchema::new(vec![
2609 Field::new("__primary_key", ArrowDataType::Binary, false),
2610 Field::new(
2611 "greptime_timestamp",
2612 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2613 false,
2614 ),
2615 Field::new("greptime_value", ArrowDataType::Float64, true),
2616 ])),
2617 vec![
2618 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2619 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2620 Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])),
2621 ],
2622 )
2623 .unwrap(),
2624 };
2625 let resolved_batch = ResolvedRegionBatch {
2626 planned: planned_batch,
2627 datanode: Peer {
2628 id: 1,
2629 addr: "node-1".to_string(),
2630 },
2631 };
2632 let writes = encode_region_write_requests(vec![resolved_batch]).unwrap();
2633
2634 assert_eq!(1, writes.len());
2635 assert_eq!(1, writes[0].datanode.id);
2636 let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else {
2637 panic!("expected bulk insert request");
2638 };
2639 assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id);
2640 }
2641}