1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21 BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use partition::partition::PartitionRuleRef;
40use session::context::QueryContextRef;
41use smallvec::SmallVec;
42use snafu::{OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use table::metadata::{TableInfo, TableInfoRef};
45use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
46
47use crate::error;
48use crate::error::{Error, Result};
49use crate::metrics::{
50 FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
51 PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
52 PENDING_WORKERS,
53};
54use crate::prom_row_builder::{
55 build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
56 rows_to_aligned_record_batch,
57};
58
59const PHYSICAL_TABLE_KEY: &str = "physical_table";
60const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
62const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
63const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
64
65#[async_trait]
66pub trait PendingRowsSchemaAlterer: Send + Sync {
67 async fn create_tables_if_missing_batch(
70 &self,
71 catalog: &str,
72 schema: &str,
73 tables: &[(&str, &[ColumnSchema])],
74 with_metric_engine: bool,
75 ctx: QueryContextRef,
76 ) -> Result<()>;
77
78 async fn add_missing_prom_tag_columns_batch(
81 &self,
82 catalog: &str,
83 schema: &str,
84 tables: &[(&str, &[String])],
85 ctx: QueryContextRef,
86 ) -> Result<()>;
87}
88
89pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
90
91#[derive(Clone)]
92pub struct PhysicalTableMetadata {
93 pub table_info: TableInfoRef,
94 pub col_name_to_ids: Option<HashMap<String, u32>>,
96}
97
98#[async_trait]
99pub trait PhysicalFlushCatalogProvider: Send + Sync {
100 async fn physical_table(
101 &self,
102 catalog: &str,
103 schema: &str,
104 table_name: &str,
105 query_ctx: &session::context::QueryContext,
106 ) -> catalog::error::Result<Option<PhysicalTableMetadata>>;
107}
108
109#[async_trait]
110pub trait PhysicalFlushPartitionProvider: Send + Sync {
111 async fn find_table_partition_rule(
112 &self,
113 table_info: &TableInfo,
114 ) -> partition::error::Result<PartitionRuleRef>;
115
116 async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer>;
117}
118
119#[async_trait]
120pub trait PhysicalFlushNodeRequester: Send + Sync {
121 async fn handle(
122 &self,
123 peer: &Peer,
124 request: RegionRequest,
125 ) -> Result<api::region::RegionResponse>;
126}
127
128#[derive(Clone)]
129struct CatalogManagerPhysicalFlushAdapter {
130 catalog_manager: CatalogManagerRef,
131}
132
133#[async_trait]
134impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter {
135 async fn physical_table(
136 &self,
137 catalog: &str,
138 schema: &str,
139 table_name: &str,
140 query_ctx: &session::context::QueryContext,
141 ) -> catalog::error::Result<Option<PhysicalTableMetadata>> {
142 self.catalog_manager
143 .table(catalog, schema, table_name, Some(query_ctx))
144 .await
145 .map(|table| {
146 table.map(|table| {
147 let table_info = table.table_info();
148 let name_to_ids = table_info.name_to_ids();
149 PhysicalTableMetadata {
150 table_info,
151 col_name_to_ids: name_to_ids,
152 }
153 })
154 })
155 }
156}
157
158#[derive(Clone)]
159struct PartitionManagerPhysicalFlushAdapter {
160 partition_manager: PartitionRuleManagerRef,
161}
162
163#[async_trait]
164impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter {
165 async fn find_table_partition_rule(
166 &self,
167 table_info: &TableInfo,
168 ) -> partition::error::Result<PartitionRuleRef> {
169 self.partition_manager
170 .find_table_partition_rule(table_info)
171 .await
172 .map(|(rule, _)| rule)
173 }
174
175 async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176 let peer = self.partition_manager.find_region_leader(region_id).await?;
177 Ok(peer)
178 }
179}
180
181#[derive(Clone)]
182struct NodeManagerPhysicalFlushAdapter {
183 node_manager: NodeManagerRef,
184}
185
186#[async_trait]
187impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter {
188 async fn handle(
189 &self,
190 peer: &Peer,
191 request: RegionRequest,
192 ) -> error::Result<api::region::RegionResponse> {
193 let datanode = self.node_manager.datanode(peer).await;
194 datanode
195 .handle(request)
196 .await
197 .context(error::CommonMetaSnafu)
198 }
199}
200
201#[derive(Debug, Clone, Hash, Eq, PartialEq)]
202struct BatchKey {
203 catalog: String,
204 schema: String,
205 physical_table: String,
206}
207
208#[derive(Debug)]
209pub struct TableBatch {
210 pub table_name: String,
211 pub table_id: TableId,
212 pub batches: Vec<RecordBatch>,
213 pub row_count: usize,
214}
215
216struct TableResolutionPlan {
219 region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
221 tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
223 tables_to_alter: Vec<(String, Vec<String>)>,
225}
226
227struct PendingBatch {
228 tables: HashMap<String, TableBatch>,
229 created_at: Instant,
230 total_row_count: usize,
231 db_string: String,
232 ctx: QueryContextRef,
233 waiters: Vec<FlushWaiter>,
234}
235
236struct FlushWaiter {
237 response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
238 _permit: OwnedSemaphorePermit,
239}
240
241struct FlushBatch {
242 table_batches: Vec<TableBatch>,
243 total_row_count: usize,
244 db_string: String,
245 ctx: QueryContextRef,
246 waiters: Vec<FlushWaiter>,
247}
248
249#[derive(Clone)]
250struct PendingWorker {
251 tx: mpsc::Sender<WorkerCommand>,
252}
253
254enum WorkerCommand {
255 Submit {
256 table_batches: Vec<(String, u32, RecordBatch)>,
257 total_rows: usize,
258 ctx: QueryContextRef,
259 response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
260 _permit: OwnedSemaphorePermit,
261 },
262}
263
264fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
267 let physical_table = ctx
268 .extension(PHYSICAL_TABLE_KEY)
269 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
270 .to_string();
271 BatchKey {
272 catalog: ctx.current_catalog().to_string(),
273 schema: ctx.current_schema(),
274 physical_table,
275 }
276}
277
278pub struct PendingRowsBatcher {
280 workers: Arc<DashMap<BatchKey, PendingWorker>>,
281 flush_interval: Duration,
282 max_batch_rows: usize,
283 partition_manager: PartitionRuleManagerRef,
284 node_manager: NodeManagerRef,
285 catalog_manager: CatalogManagerRef,
286 flush_semaphore: Arc<Semaphore>,
287 inflight_semaphore: Arc<Semaphore>,
288 worker_channel_capacity: usize,
289 prom_store_with_metric_engine: bool,
290 schema_alterer: PendingRowsSchemaAltererRef,
291 pending_rows_batch_sync: bool,
292 shutdown: broadcast::Sender<()>,
293}
294
295impl PendingRowsBatcher {
296 #[allow(clippy::too_many_arguments)]
297 pub fn try_new(
298 partition_manager: PartitionRuleManagerRef,
299 node_manager: NodeManagerRef,
300 catalog_manager: CatalogManagerRef,
301 prom_store_with_metric_engine: bool,
302 schema_alterer: PendingRowsSchemaAltererRef,
303 flush_interval: Duration,
304 max_batch_rows: usize,
305 max_concurrent_flushes: usize,
306 worker_channel_capacity: usize,
307 max_inflight_requests: usize,
308 ) -> Option<Arc<Self>> {
309 if flush_interval.is_zero()
313 || max_batch_rows == 0
314 || max_concurrent_flushes == 0
315 || worker_channel_capacity == 0
316 || max_inflight_requests == 0
317 {
318 return None;
319 }
320
321 let (shutdown, _) = broadcast::channel(1);
322 let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
323 .ok()
324 .as_deref()
325 .and_then(|v| v.parse::<bool>().ok())
326 .unwrap_or(true);
327 let workers = Arc::new(DashMap::new());
328 PENDING_WORKERS.set(workers.len() as i64);
329
330 Some(Arc::new(Self {
331 workers,
332 flush_interval,
333 max_batch_rows,
334 partition_manager,
335 node_manager,
336 catalog_manager,
337 prom_store_with_metric_engine,
338 schema_alterer,
339 flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
340 inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
341 worker_channel_capacity,
342 pending_rows_batch_sync,
343 shutdown,
344 }))
345 }
346
347 pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
348 let (table_batches, total_rows) = {
349 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
350 .with_label_values(&["submit_build_and_align"])
351 .start_timer();
352 self.build_and_align_table_batches(requests, &ctx).await?
353 };
354 if total_rows == 0 {
355 return Ok(0);
356 }
357
358 let permit = {
359 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
360 .with_label_values(&["submit_acquire_inflight_permit"])
361 .start_timer();
362 self.inflight_semaphore
363 .clone()
364 .acquire_owned()
365 .await
366 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
367 };
368
369 let (response_tx, response_rx) = oneshot::channel();
370
371 let batch_key = batch_key_from_ctx(&ctx);
372 let mut cmd = Some(WorkerCommand::Submit {
373 table_batches,
374 total_rows,
375 ctx,
376 response_tx,
377 _permit: permit,
378 });
379
380 {
381 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
382 .with_label_values(&["submit_send_to_worker"])
383 .start_timer();
384
385 for _ in 0..2 {
386 let worker = self.get_or_spawn_worker(batch_key.clone());
387 let Some(worker_cmd) = cmd.take() else {
388 break;
389 };
390
391 match worker.tx.send(worker_cmd).await {
392 Ok(()) => break,
393 Err(err) => {
394 cmd = Some(err.0);
395 remove_worker_if_same_channel(
396 self.workers.as_ref(),
397 &batch_key,
398 &worker.tx,
399 );
400 }
401 }
402 }
403
404 if cmd.is_some() {
405 return Err(Error::BatcherChannelClosed);
406 }
407 }
408
409 if self.pending_rows_batch_sync {
410 let result = {
411 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
412 .with_label_values(&["submit_wait_flush_result"])
413 .start_timer();
414 response_rx
415 .await
416 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
417 };
418 result
419 .context(error::SubmitBatchSnafu)
420 .map(|()| total_rows as u64)
421 } else {
422 Ok(total_rows as u64)
423 }
424 }
425
426 async fn build_and_align_table_batches(
431 &self,
432 requests: RowInsertRequests,
433 ctx: &QueryContextRef,
434 ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
435 let catalog = ctx.current_catalog().to_string();
436 let schema = ctx.current_schema();
437
438 let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
439 if total_rows == 0 {
440 return Ok((Vec::new(), 0));
441 }
442
443 let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
444 let mut plan = self
445 .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
446 .await?;
447
448 self.create_missing_tables_and_refresh_schemas(
449 &catalog,
450 &schema,
451 ctx,
452 &table_rows,
453 &mut plan,
454 )
455 .await?;
456
457 self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
458 .await?;
459
460 let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
461
462 Ok((aligned_batches, total_rows))
463 }
464
465 fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
468 let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
469 let mut total_rows = 0;
470
471 for request in requests.inserts {
472 let Some(rows) = request.rows else {
473 continue;
474 };
475 if rows.rows.is_empty() {
476 continue;
477 }
478
479 total_rows += rows.rows.len();
480 table_rows.push((request.table_name, rows));
481 }
482
483 (table_rows, total_rows)
484 }
485
486 fn collect_unique_table_schemas(
489 table_rows: &[(String, Rows)],
490 ) -> Result<Vec<(&str, &[ColumnSchema])>> {
491 let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
492 let mut seen = HashSet::new();
493
494 for (table_name, rows) in table_rows {
495 if seen.insert(table_name.as_str()) {
496 unique_tables.push((table_name.as_str(), &rows.schema));
497 } else {
498 return error::InvalidPromRemoteRequestSnafu {
500 msg: format!(
501 "Found duplicated table name in RowInsertRequest: {}",
502 table_name
503 ),
504 }
505 .fail();
506 }
507 }
508
509 Ok(unique_tables)
510 }
511
512 async fn plan_table_resolution(
515 &self,
516 catalog: &str,
517 schema: &str,
518 ctx: &QueryContextRef,
519 unique_tables: &[(&str, &[ColumnSchema])],
520 ) -> Result<TableResolutionPlan> {
521 let mut plan = TableResolutionPlan {
522 region_schemas: HashMap::with_capacity(unique_tables.len()),
523 tables_to_create: Vec::new(),
524 tables_to_alter: Vec::new(),
525 };
526
527 let resolved_tables = {
528 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
529 .with_label_values(&["align_resolve_table"])
530 .start_timer();
531 futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
532 self.catalog_manager
533 .table(catalog, schema, table_name, Some(ctx.as_ref()))
534 }))
535 .await
536 };
537
538 for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
539 let table = table_result?;
540
541 if let Some(table) = table {
542 let table_info = table.table_info();
543 let table_id = table_info.ident.table_id;
544 let region_schema = table_info.meta.schema.arrow_schema().clone();
545
546 let missing_columns = {
547 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
548 .with_label_values(&["align_identify_missing_columns"])
549 .start_timer();
550 identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
551 };
552 if !missing_columns.is_empty() {
553 plan.tables_to_alter
554 .push(((*table_name).to_string(), missing_columns));
555 }
556 plan.region_schemas
557 .insert((*table_name).to_string(), (region_schema, table_id));
558 } else {
559 let request_schema = {
560 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
561 .with_label_values(&["align_build_create_table_schema"])
562 .start_timer();
563 build_prom_create_table_schema_from_proto(rows_schema)?
564 };
565 plan.tables_to_create
566 .push(((*table_name).to_string(), request_schema));
567 }
568 }
569
570 Ok(plan)
571 }
572
573 async fn create_missing_tables_and_refresh_schemas(
576 &self,
577 catalog: &str,
578 schema: &str,
579 ctx: &QueryContextRef,
580 table_rows: &[(String, Rows)],
581 plan: &mut TableResolutionPlan,
582 ) -> Result<()> {
583 if plan.tables_to_create.is_empty() {
584 return Ok(());
585 }
586
587 let create_refs: Vec<(&str, &[ColumnSchema])> = plan
588 .tables_to_create
589 .iter()
590 .map(|(name, schema)| (name.as_str(), schema.as_slice()))
591 .collect();
592
593 {
594 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
595 .with_label_values(&["align_batch_create_tables"])
596 .start_timer();
597 self.schema_alterer
598 .create_tables_if_missing_batch(
599 catalog,
600 schema,
601 &create_refs,
602 self.prom_store_with_metric_engine,
603 ctx.clone(),
604 )
605 .await?;
606 }
607
608 let created_table_results = {
609 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
610 .with_label_values(&["align_resolve_table_after_create"])
611 .start_timer();
612 futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
613 self.catalog_manager
614 .table(catalog, schema, table_name, Some(ctx.as_ref()))
615 }))
616 .await
617 };
618
619 for ((table_name, _), table_result) in
620 plan.tables_to_create.iter().zip(created_table_results)
621 {
622 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
623 reason: format!(
624 "Table not found after pending batch create attempt: {}",
625 table_name
626 ),
627 })?;
628 let table_info = table.table_info();
629 let table_id = table_info.ident.table_id;
630 let region_schema = table_info.meta.schema.arrow_schema().clone();
631 plan.region_schemas
632 .insert(table_name.clone(), (region_schema, table_id));
633 }
634
635 Self::enqueue_alter_for_new_tables(table_rows, plan)?;
636
637 Ok(())
638 }
639
640 fn enqueue_alter_for_new_tables(
643 table_rows: &[(String, Rows)],
644 plan: &mut TableResolutionPlan,
645 ) -> Result<()> {
646 let created_tables: HashSet<&str> = plan
647 .tables_to_create
648 .iter()
649 .map(|(table_name, _)| table_name.as_str())
650 .collect();
651
652 for (table_name, rows) in table_rows {
653 if !created_tables.contains(table_name.as_str()) {
654 continue;
655 }
656
657 let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
658 continue;
659 };
660
661 let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
662 if missing_columns.is_empty()
663 || plan
664 .tables_to_alter
665 .iter()
666 .any(|(existing_name, _)| existing_name == table_name)
667 {
668 continue;
669 }
670
671 plan.tables_to_alter
672 .push((table_name.clone(), missing_columns));
673 }
674
675 Ok(())
676 }
677
678 async fn alter_tables_and_refresh_schemas(
681 &self,
682 catalog: &str,
683 schema: &str,
684 ctx: &QueryContextRef,
685 plan: &mut TableResolutionPlan,
686 ) -> Result<()> {
687 if plan.tables_to_alter.is_empty() {
688 return Ok(());
689 }
690
691 let alter_refs: Vec<(&str, &[String])> = plan
692 .tables_to_alter
693 .iter()
694 .map(|(name, cols)| (name.as_str(), cols.as_slice()))
695 .collect();
696 {
697 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
698 .with_label_values(&["align_batch_add_missing_columns"])
699 .start_timer();
700 self.schema_alterer
701 .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
702 .await?;
703 }
704
705 let altered_table_results = {
706 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
707 .with_label_values(&["align_resolve_table_after_schema_alter"])
708 .start_timer();
709 futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
710 self.catalog_manager
711 .table(catalog, schema, table_name, Some(ctx.as_ref()))
712 }))
713 .await
714 };
715
716 for ((table_name, _), table_result) in
717 plan.tables_to_alter.iter().zip(altered_table_results)
718 {
719 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
720 reason: format!(
721 "Table not found after pending batch schema alter: {}",
722 table_name
723 ),
724 })?;
725 let table_info = table.table_info();
726 let table_id = table_info.ident.table_id;
727 let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
728 plan.region_schemas
729 .insert(table_name.clone(), (refreshed_region_schema, table_id));
730 }
731
732 Ok(())
733 }
734
735 fn build_aligned_batches(
738 table_rows: &[(String, Rows)],
739 region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
740 ) -> Result<Vec<(String, u32, RecordBatch)>> {
741 let mut aligned_batches = Vec::with_capacity(table_rows.len());
742 for (table_name, rows) in table_rows {
743 let (region_schema, table_id) =
744 region_schemas.get(table_name).cloned().with_context(|| {
745 error::UnexpectedResultSnafu {
746 reason: format!("Region schema not resolved for table: {}", table_name),
747 }
748 })?;
749
750 let record_batch = {
751 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
752 .with_label_values(&["align_rows_to_record_batch"])
753 .start_timer();
754 rows_to_aligned_record_batch(rows, region_schema.as_ref())?
755 };
756 aligned_batches.push((table_name.clone(), table_id, record_batch));
757 }
758
759 Ok(aligned_batches)
760 }
761
762 fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
763 if let Some(worker) = self.workers.get(&key)
764 && !worker.tx.is_closed()
765 {
766 return worker.clone();
767 }
768
769 let entry = self.workers.entry(key.clone());
770 match entry {
771 Entry::Occupied(mut worker) => {
772 if worker.get().tx.is_closed() {
773 let new_worker = self.spawn_worker(key);
774 worker.insert(new_worker.clone());
775 PENDING_WORKERS.set(self.workers.len() as i64);
776 new_worker
777 } else {
778 worker.get().clone()
779 }
780 }
781 Entry::Vacant(vacant) => {
782 let worker = self.spawn_worker(key);
783
784 vacant.insert(worker.clone());
785 PENDING_WORKERS.set(self.workers.len() as i64);
786 worker
787 }
788 }
789 }
790
791 fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
792 let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
793 let worker = PendingWorker { tx: tx.clone() };
794 let worker_idle_timeout = self
795 .flush_interval
796 .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
797 .unwrap_or(self.flush_interval);
798
799 start_worker(
800 key,
801 worker.tx.clone(),
802 self.workers.clone(),
803 rx,
804 self.shutdown.clone(),
805 self.partition_manager.clone(),
806 self.node_manager.clone(),
807 self.catalog_manager.clone(),
808 self.flush_interval,
809 worker_idle_timeout,
810 self.max_batch_rows,
811 self.flush_semaphore.clone(),
812 );
813
814 worker
815 }
816}
817
818impl Drop for PendingRowsBatcher {
819 fn drop(&mut self) {
820 let _ = self.shutdown.send(());
821 }
822}
823
824impl PendingBatch {
825 fn new(ctx: QueryContextRef) -> Self {
826 let db_string = ctx.get_db_string();
827 Self {
828 tables: HashMap::new(),
829 created_at: Instant::now(),
830 total_row_count: 0,
831 db_string,
832 ctx,
833 waiters: Vec::new(),
834 }
835 }
836}
837
838#[allow(clippy::too_many_arguments)]
839fn start_worker(
840 key: BatchKey,
841 worker_tx: mpsc::Sender<WorkerCommand>,
842 workers: Arc<DashMap<BatchKey, PendingWorker>>,
843 mut rx: mpsc::Receiver<WorkerCommand>,
844 shutdown: broadcast::Sender<()>,
845 partition_manager: PartitionRuleManagerRef,
846 node_manager: NodeManagerRef,
847 catalog_manager: CatalogManagerRef,
848 flush_interval: Duration,
849 worker_idle_timeout: Duration,
850 max_batch_rows: usize,
851 flush_semaphore: Arc<Semaphore>,
852) {
853 tokio::spawn(async move {
854 let mut batch = None;
855 let mut interval = tokio::time::interval(flush_interval);
856 let mut shutdown_rx = shutdown.subscribe();
857 let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
858 let idle_timer = tokio::time::sleep_until(idle_deadline);
859 tokio::pin!(idle_timer);
860
861 loop {
862 tokio::select! {
863 cmd = rx.recv() => {
864 match cmd {
865 Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
866 idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
867
868 let pending_batch = batch.get_or_insert_with(||{
869 PENDING_BATCHES.inc();
870 PendingBatch::new(ctx)
871 });
872
873 pending_batch.waiters.push(FlushWaiter { response_tx, _permit });
874
875 for (table_name, table_id, record_batch) in table_batches {
876 let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
877 table_name,
878 table_id,
879 batches: Vec::new(),
880 row_count: 0,
881 });
882 entry.row_count += record_batch.num_rows();
883 entry.batches.push(record_batch);
884 }
885
886 pending_batch.total_row_count += total_rows;
887 PENDING_ROWS.add(total_rows as i64);
888
889 if pending_batch.total_row_count >= max_batch_rows
890 && let Some(flush) = drain_batch(&mut batch) {
891 spawn_flush(
892 flush,
893 partition_manager.clone(),
894 node_manager.clone(),
895 catalog_manager.clone(),
896 flush_semaphore.clone(),
897 ).await;
898 }
899 }
900 None => {
901 if let Some(flush) = drain_batch(&mut batch) {
902 flush_batch(
903 flush,
904 partition_manager.clone(),
905 node_manager.clone(),
906 catalog_manager.clone(),
907 ).await;
908 }
909 break;
910 }
911 }
912 }
913 _ = &mut idle_timer => {
914 if !should_close_worker_on_idle_timeout(
915 batch.as_ref().map_or(0, |batch| batch.total_row_count),
916 rx.len(),
917 ) {
918 idle_timer
919 .as_mut()
920 .reset(tokio::time::Instant::now() + worker_idle_timeout);
921 continue;
922 }
923
924 debug!(
925 "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
926 key.catalog,
927 key.schema,
928 key.physical_table
929 );
930 break;
931 }
932 _ = interval.tick() => {
933 if batch
934 .as_ref()
935 .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval)
936 && let Some(flush) = drain_batch(&mut batch) {
937 spawn_flush(
938 flush,
939 partition_manager.clone(),
940 node_manager.clone(),
941 catalog_manager.clone(),
942 flush_semaphore.clone(),
943 ).await;
944 }
945 }
946 _ = shutdown_rx.recv() => {
947 if let Some(flush) = drain_batch(&mut batch) {
948 flush_batch(
949 flush,
950 partition_manager.clone(),
951 node_manager.clone(),
952 catalog_manager.clone(),
953 ).await;
954 }
955 break;
956 }
957 }
958 }
959
960 remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
961 });
962}
963
964fn remove_worker_if_same_channel(
965 workers: &DashMap<BatchKey, PendingWorker>,
966 key: &BatchKey,
967 worker_tx: &mpsc::Sender<WorkerCommand>,
968) -> bool {
969 if let Some(worker) = workers.get(key)
970 && worker.tx.same_channel(worker_tx)
971 {
972 drop(worker);
973 workers.remove(key);
974 PENDING_WORKERS.set(workers.len() as i64);
975 return true;
976 }
977
978 false
979}
980
981fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
982 total_row_count == 0 && queued_requests == 0
983}
984
985fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
986 let batch = batch.take()?;
987 let total_row_count = batch.total_row_count;
988
989 if total_row_count == 0 {
990 return None;
991 }
992
993 let table_batches = batch.tables.into_values().collect();
994 let waiters = batch.waiters;
995
996 PENDING_ROWS.sub(total_row_count as i64);
997 PENDING_BATCHES.dec();
998
999 Some(FlushBatch {
1000 table_batches,
1001 total_row_count,
1002 db_string: batch.db_string,
1003 ctx: batch.ctx,
1004 waiters,
1005 })
1006}
1007
1008async fn spawn_flush(
1009 flush: FlushBatch,
1010 partition_manager: PartitionRuleManagerRef,
1011 node_manager: NodeManagerRef,
1012 catalog_manager: CatalogManagerRef,
1013 semaphore: Arc<Semaphore>,
1014) {
1015 match semaphore.acquire_owned().await {
1016 Ok(permit) => {
1017 tokio::spawn(async move {
1018 let _permit = permit;
1019 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1020 });
1021 }
1022 Err(err) => {
1023 warn!(err; "Flush semaphore closed, flushing inline");
1024 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1025 }
1026 }
1027}
1028
1029struct FlushRegionWrite {
1030 datanode: Peer,
1031 request: RegionRequest,
1032}
1033
1034struct PlannedRegionBatch {
1035 region_id: RegionId,
1036 batch: RecordBatch,
1037}
1038
1039#[cfg(test)]
1040impl PlannedRegionBatch {
1041 fn num_rows(&self) -> usize {
1042 self.batch.num_rows()
1043 }
1044}
1045
1046struct ResolvedRegionBatch {
1047 planned: PlannedRegionBatch,
1048 datanode: Peer,
1049}
1050
1051fn should_dispatch_concurrently(region_write_count: usize) -> bool {
1052 region_write_count > 1
1053}
1054
1055fn columns_taxonomy(
1064 batch_schema: &Arc<ArrowSchema>,
1065 table_name: &str,
1066 name_to_ids: &HashMap<String, u32>,
1067 partition_columns: &HashSet<&str>,
1068) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
1069 let mut tag_columns = Vec::new();
1070 let mut essential_column_indices =
1071 SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
1072 essential_column_indices.push(0);
1074 essential_column_indices.push(0);
1075
1076 let mut timestamp_index = None;
1077 let mut value_index = None;
1078
1079 for (index, field) in batch_schema.fields().iter().enumerate() {
1080 match field.data_type() {
1081 ArrowDataType::Utf8 => {
1082 let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
1083 error::InvalidPromRemoteRequestSnafu {
1084 msg: format!(
1085 "Column '{}' from logical table '{}' not found in physical table column IDs",
1086 field.name(),
1087 table_name
1088 ),
1089 }
1090 })?;
1091 tag_columns.push(TagColumnInfo {
1092 name: field.name().clone(),
1093 index,
1094 column_id,
1095 });
1096
1097 if partition_columns.contains(field.name().as_str()) {
1098 essential_column_indices.push(index);
1099 }
1100 }
1101 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
1102 ensure!(
1103 timestamp_index.replace(index).is_none(),
1104 error::InvalidPromRemoteRequestSnafu {
1105 msg: format!(
1106 "Duplicated timestamp column in logical table '{}' batch schema",
1107 table_name
1108 ),
1109 }
1110 );
1111 }
1112 ArrowDataType::Float64 => {
1113 ensure!(
1114 value_index.replace(index).is_none(),
1115 error::InvalidPromRemoteRequestSnafu {
1116 msg: format!(
1117 "Duplicated value column in logical table '{}' batch schema",
1118 table_name
1119 ),
1120 }
1121 );
1122 }
1123 datatype => {
1124 return error::InvalidPromRemoteRequestSnafu {
1125 msg: format!(
1126 "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1127 table_name
1128 ),
1129 }
1130 .fail();
1131 }
1132 }
1133 }
1134
1135 let timestamp_index =
1136 timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1137 msg: format!(
1138 "Missing essential column '{}' in logical table '{}' batch schema",
1139 greptime_timestamp(),
1140 table_name
1141 ),
1142 })?;
1143 let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1144 msg: format!(
1145 "Missing essential column '{}' in logical table '{}' batch schema",
1146 greptime_value(),
1147 table_name
1148 ),
1149 })?;
1150
1151 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1152
1153 essential_column_indices[0] = timestamp_index;
1154 essential_column_indices[1] = value_index;
1155
1156 Ok((tag_columns, essential_column_indices))
1157}
1158
1159fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1160 ensure!(
1161 batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1162 error::InternalSnafu {
1163 err_msg: format!(
1164 "Expected at least {} columns in physical batch, got {}",
1165 PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1166 batch.num_columns()
1167 ),
1168 }
1169 );
1170 let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1171 batch.project(&essential_indices).context(error::ArrowSnafu)
1172}
1173
1174async fn flush_region_writes_concurrently(
1175 node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1176 writes: Vec<FlushRegionWrite>,
1177) -> Result<usize> {
1178 let mut affected_rows = 0;
1179 if !should_dispatch_concurrently(writes.len()) {
1180 for write in writes {
1181 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1182 .with_label_values(&["flush_write_region"])
1183 .start_timer();
1184 affected_rows += node_manager
1185 .handle(&write.datanode, write.request)
1186 .await?
1187 .affected_rows;
1188 }
1189 return Ok(affected_rows);
1190 }
1191
1192 let write_futures = writes.into_iter().map(|write| async move {
1193 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1194 .with_label_values(&["flush_write_region"])
1195 .start_timer();
1196
1197 let response = node_manager.handle(&write.datanode, write.request).await?;
1198 Ok::<_, Error>(response.affected_rows)
1199 });
1200
1201 let affected_rows = futures::future::try_join_all(write_futures)
1203 .await?
1204 .into_iter()
1205 .sum();
1206 Ok(affected_rows)
1207}
1208
1209async fn flush_batch(
1210 flush: FlushBatch,
1211 partition_manager: PartitionRuleManagerRef,
1212 node_manager: NodeManagerRef,
1213 catalog_manager: CatalogManagerRef,
1214) {
1215 let FlushBatch {
1216 table_batches,
1217 total_row_count,
1218 db_string,
1219 ctx,
1220 waiters,
1221 } = flush;
1222 let start = Instant::now();
1223
1224 let physical_table_name = ctx
1227 .extension(PHYSICAL_TABLE_KEY)
1228 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1229 .to_string();
1230 let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager };
1231 let node_requester = NodeManagerPhysicalFlushAdapter { node_manager };
1232 let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager };
1233 let result = flush_batch_physical(
1234 &table_batches,
1235 &physical_table_name,
1236 &ctx,
1237 &partition_provider,
1238 &node_requester,
1239 &catalog_provider,
1240 )
1241 .await;
1242
1243 let elapsed = start.elapsed().as_secs_f64();
1244 FLUSH_ELAPSED.observe(elapsed);
1245
1246 if result.is_err() {
1247 FLUSH_FAILURES.inc();
1248 FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
1249 } else if let Ok(affected_rows) = &result {
1250 FLUSH_TOTAL.inc();
1251 FLUSH_ROWS.observe(total_row_count as f64);
1252 operator::metrics::DIST_INGEST_ROW_COUNT
1253 .with_label_values(&[db_string.as_str()])
1254 .inc_by(*affected_rows as u64);
1255 }
1256
1257 debug!(
1258 "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1259 total_row_count, elapsed
1260 );
1261
1262 notify_waiters(waiters, result.map(|_| ()));
1263}
1264
1265pub async fn flush_batch_physical(
1276 table_batches: &[TableBatch],
1277 physical_table_name: &str,
1278 ctx: &QueryContextRef,
1279 partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1280 node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1281 catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
1282) -> Result<usize> {
1283 let physical_table = {
1285 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1286 .with_label_values(&["flush_physical_resolve_table"])
1287 .start_timer();
1288 catalog_manager
1289 .physical_table(
1290 ctx.current_catalog(),
1291 &ctx.current_schema(),
1292 physical_table_name,
1293 ctx.as_ref(),
1294 )
1295 .await?
1296 .with_context(|| error::InternalSnafu {
1297 err_msg: format!(
1298 "Physical table '{}' not found during pending flush",
1299 physical_table_name
1300 ),
1301 })?
1302 };
1303
1304 let physical_table_info = physical_table.table_info;
1305 let name_to_ids = physical_table
1306 .col_name_to_ids
1307 .with_context(|| error::InternalSnafu {
1308 err_msg: format!(
1309 "Physical table '{}' has no column IDs for pending flush",
1310 physical_table_name
1311 ),
1312 })?;
1313
1314 let partition_rule = {
1316 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1317 .with_label_values(&["flush_physical_fetch_partition_rule"])
1318 .start_timer();
1319 partition_manager
1320 .find_table_partition_rule(physical_table_info.as_ref())
1321 .await?
1322 };
1323 let partition_columns = partition_rule.partition_columns();
1324 let partition_columns_set: HashSet<&str> =
1325 partition_columns.iter().map(String::as_str).collect();
1326
1327 let modified_batches =
1329 transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?;
1330
1331 let combined_batch = concat_modified_batches(&modified_batches)?;
1333
1334 let physical_table_id = physical_table_info.table_id();
1336 let planned_batches = plan_region_batches(
1337 combined_batch,
1338 physical_table_id,
1339 partition_rule.as_ref(),
1340 partition_columns,
1341 )?;
1342
1343 let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?;
1344 let region_writes = encode_region_write_requests(resolved_batches)?;
1345 flush_region_writes_concurrently(node_manager, region_writes).await
1346}
1347
1348fn transform_logical_batches_to_physical(
1353 table_batches: &[TableBatch],
1354 name_to_ids: &HashMap<String, u32>,
1355 partition_columns_set: &HashSet<&str>,
1356) -> Result<Vec<RecordBatch>> {
1357 let mut modified_batches: Vec<RecordBatch> =
1358 Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum());
1359
1360 let mut modify_elapsed = Duration::ZERO;
1361 let mut columns_taxonomy_elapsed = Duration::ZERO;
1362
1363 for table_batch in table_batches {
1364 let table_id = table_batch.table_id;
1365
1366 for batch in &table_batch.batches {
1367 let batch_schema = batch.schema();
1368 let start = Instant::now();
1369 let (tag_columns, essential_col_indices) = columns_taxonomy(
1370 &batch_schema,
1371 &table_batch.table_name,
1372 name_to_ids,
1373 partition_columns_set,
1374 )?;
1375
1376 columns_taxonomy_elapsed += start.elapsed();
1377 if tag_columns.is_empty() && essential_col_indices.is_empty() {
1378 continue;
1379 }
1380
1381 let modified = {
1382 let start = Instant::now();
1383 let batch = modify_batch_sparse(
1384 batch.clone(),
1385 table_id,
1386 &tag_columns,
1387 &essential_col_indices,
1388 )?;
1389 modify_elapsed += start.elapsed();
1390 batch
1391 };
1392
1393 modified_batches.push(modified);
1394 }
1395 }
1396
1397 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1398 .with_label_values(&["flush_physical_modify_batch"])
1399 .observe(modify_elapsed.as_secs_f64());
1400 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1401 .with_label_values(&["flush_physical_columns_taxonomy"])
1402 .observe(columns_taxonomy_elapsed.as_secs_f64());
1403
1404 ensure!(
1405 !modified_batches.is_empty(),
1406 error::InternalSnafu {
1407 err_msg: "No batches can be transformed during pending flush",
1408 }
1409 );
1410 Ok(modified_batches)
1411}
1412
1413fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result<RecordBatch> {
1417 let combined_schema = modified_batches[0].schema();
1418 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1419 .with_label_values(&["flush_physical_concat_all"])
1420 .start_timer();
1421 concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu)
1422}
1423
1424fn split_combined_batch_by_region(
1425 combined_batch: &RecordBatch,
1426 partition_rule: &dyn partition::partition::PartitionRule,
1427) -> Result<HashMap<u32, partition::partition::RegionMask>> {
1428 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1429 .with_label_values(&["flush_physical_split_record_batch"])
1430 .start_timer();
1431 let map = partition_rule.split_record_batch(combined_batch)?;
1432 Ok(map)
1433}
1434
1435fn prepare_physical_region_routing_batch(
1436 combined_batch: RecordBatch,
1437 partition_columns: &[String],
1438) -> Result<RecordBatch> {
1439 if partition_columns.is_empty() {
1440 return Ok(combined_batch);
1441 }
1442 strip_partition_columns_from_batch(combined_batch)
1443}
1444
1445fn plan_region_batch(
1446 stripped_batch: &RecordBatch,
1447 physical_table_id: TableId,
1448 region_number: u32,
1449 mask: &partition::partition::RegionMask,
1450) -> Result<Option<PlannedRegionBatch>> {
1451 if mask.select_none() {
1452 return Ok(None);
1453 }
1454
1455 let region_batch = if mask.select_all() {
1456 stripped_batch.clone()
1457 } else {
1458 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1459 .with_label_values(&["flush_physical_filter_record_batch"])
1460 .start_timer();
1461 filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)?
1462 };
1463
1464 let row_count = region_batch.num_rows();
1465 if row_count == 0 {
1466 return Ok(None);
1467 }
1468
1469 Ok(Some(PlannedRegionBatch {
1470 region_id: RegionId::new(physical_table_id, region_number),
1471 batch: region_batch,
1472 }))
1473}
1474
1475fn plan_region_batches(
1476 combined_batch: RecordBatch,
1477 physical_table_id: TableId,
1478 partition_rule: &dyn partition::partition::PartitionRule,
1479 partition_columns: &[String],
1480) -> Result<Vec<PlannedRegionBatch>> {
1481 let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?;
1482 let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?;
1483
1484 let mut planned_batches = Vec::new();
1485 for (region_number, mask) in region_masks {
1486 if let Some(planned_batch) =
1487 plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)?
1488 {
1489 planned_batches.push(planned_batch);
1490 }
1491 }
1492
1493 Ok(planned_batches)
1494}
1495
1496async fn resolve_region_targets(
1497 planned_batches: Vec<PlannedRegionBatch>,
1498 partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1499) -> Result<Vec<ResolvedRegionBatch>> {
1500 let mut resolved_batches = Vec::with_capacity(planned_batches.len());
1501 for planned in planned_batches {
1502 let datanode = {
1503 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1504 .with_label_values(&["flush_physical_resolve_region_leader"])
1505 .start_timer();
1506 partition_manager
1507 .find_region_leader(planned.region_id)
1508 .await?
1509 };
1510
1511 resolved_batches.push(ResolvedRegionBatch { planned, datanode });
1512 }
1513
1514 Ok(resolved_batches)
1515}
1516
1517fn encode_region_write_requests(
1518 resolved_batches: Vec<ResolvedRegionBatch>,
1519) -> Result<Vec<FlushRegionWrite>> {
1520 let mut region_writes = Vec::with_capacity(resolved_batches.len());
1521 for resolved in resolved_batches {
1522 let region_id = resolved.planned.region_id;
1523 let (schema_bytes, data_header, payload) = {
1524 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1525 .with_label_values(&["flush_physical_encode_ipc"])
1526 .start_timer();
1527 record_batch_to_ipc(resolved.planned.batch)?
1528 };
1529
1530 let request = RegionRequest {
1531 header: Some(RegionRequestHeader {
1532 tracing_context: TracingContext::from_current_span().to_w3c(),
1533 ..Default::default()
1534 }),
1535 body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1536 region_id: region_id.as_u64(),
1537 partition_expr_version: None,
1538 body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1539 schema: schema_bytes,
1540 data_header,
1541 payload,
1542 })),
1543 })),
1544 };
1545
1546 region_writes.push(FlushRegionWrite {
1547 datanode: resolved.datanode,
1548 request,
1549 });
1550 }
1551
1552 Ok(region_writes)
1553}
1554
1555fn notify_waiters(waiters: Vec<FlushWaiter>, result: Result<()>) {
1556 let shared_result = result.map_err(Arc::new);
1557 for waiter in waiters {
1558 let _ = waiter.response_tx.send(match &shared_result {
1559 Ok(()) => Ok(()),
1560 Err(error) => Err(Arc::clone(error)),
1561 });
1562 }
1564}
1565
1566fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1567 let mut encoder = FlightEncoder::default();
1568 let schema = encoder.encode_schema(record_batch.schema().as_ref());
1569 let mut iter = encoder
1570 .encode(FlightMessage::RecordBatch(record_batch))
1571 .into_iter();
1572 let Some(flight_data) = iter.next() else {
1573 return Err(Error::Internal {
1574 err_msg: "Failed to encode empty flight data".to_string(),
1575 });
1576 };
1577 if iter.next().is_some() {
1578 return Err(Error::NotSupported {
1579 feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1580 });
1581 }
1582
1583 Ok((
1584 schema.data_header,
1585 flight_data.data_header,
1586 flight_data.data_body,
1587 ))
1588}
1589
1590#[cfg(test)]
1591mod tests {
1592 use std::collections::{HashMap, HashSet};
1593 use std::sync::Arc;
1594 use std::sync::atomic::{AtomicUsize, Ordering};
1595 use std::time::{Duration, Instant};
1596
1597 use api::region::RegionResponse;
1598 use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1599 use api::v1::meta::Peer;
1600 use api::v1::region::{InsertRequests, RegionRequest, region_request};
1601 use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1602 use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray};
1603 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1604 use arrow::record_batch::RecordBatch;
1605 use async_trait::async_trait;
1606 use catalog::error::Result as CatalogResult;
1607 use common_meta::error::Result as MetaResult;
1608 use common_meta::node_manager::{
1609 Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1610 };
1611 use common_query::request::QueryRequest;
1612 use common_recordbatch::SendableRecordBatchStream;
1613 use dashmap::DashMap;
1614 use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
1615 use partition::error::Result as PartitionResult;
1616 use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
1617 use smallvec::SmallVec;
1618 use snafu::ResultExt;
1619 use store_api::storage::RegionId;
1620 use table::metadata::TableId;
1621 use table::test_util::table_info::test_table_info;
1622 use tokio::sync::{Semaphore, mpsc, oneshot};
1623 use tokio::time::sleep;
1624
1625 use super::{
1626 BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher,
1627 PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester,
1628 PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch,
1629 ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch,
1630 encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently,
1631 plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1632 should_dispatch_concurrently, strip_partition_columns_from_batch,
1633 transform_logical_batches_to_physical,
1634 };
1635 use crate::error;
1636
1637 fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1638 Rows {
1639 schema: vec![ColumnSchema {
1640 column_name: schema_name.to_string(),
1641 ..Default::default()
1642 }],
1643 rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1644 }
1645 }
1646
1647 fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch {
1648 let schema = Arc::new(ArrowSchema::new(vec![
1649 Field::new(
1650 "greptime_timestamp",
1651 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1652 false,
1653 ),
1654 Field::new("greptime_value", ArrowDataType::Float64, true),
1655 Field::new(tag_name, ArrowDataType::Utf8, true),
1656 ]));
1657
1658 RecordBatch::try_new(
1659 schema,
1660 vec![
1661 Arc::new(TimestampMillisecondArray::from(vec![ts])),
1662 Arc::new(arrow::array::Float64Array::from(vec![val])),
1663 Arc::new(StringArray::from(vec![tag_value])),
1664 ],
1665 )
1666 .unwrap()
1667 }
1668
1669 fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata {
1670 let schema = Arc::new(
1671 DtSchema::try_new(vec![
1672 DtColumnSchema::new(
1673 "__primary_key",
1674 datatypes::prelude::ConcreteDataType::binary_datatype(),
1675 false,
1676 ),
1677 DtColumnSchema::new(
1678 "greptime_timestamp",
1679 datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(),
1680 false,
1681 ),
1682 DtColumnSchema::new(
1683 "greptime_value",
1684 datatypes::prelude::ConcreteDataType::float64_datatype(),
1685 true,
1686 ),
1687 DtColumnSchema::new(
1688 "tag1",
1689 datatypes::prelude::ConcreteDataType::string_datatype(),
1690 true,
1691 ),
1692 ])
1693 .unwrap(),
1694 );
1695 let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema);
1696 table_info.meta.column_ids = vec![0, 1, 2, 3];
1697
1698 PhysicalTableMetadata {
1699 table_info: Arc::new(table_info),
1700 col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])),
1701 }
1702 }
1703
1704 struct MockFlushCatalogProvider {
1705 table: Option<PhysicalTableMetadata>,
1706 }
1707
1708 #[async_trait]
1709 impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider {
1710 async fn physical_table(
1711 &self,
1712 _catalog: &str,
1713 _schema: &str,
1714 _table_name: &str,
1715 _query_ctx: &session::context::QueryContext,
1716 ) -> CatalogResult<Option<PhysicalTableMetadata>> {
1717 Ok(self.table.clone())
1718 }
1719 }
1720
1721 struct SingleRegionPartitionRule;
1722
1723 impl PartitionRule for SingleRegionPartitionRule {
1724 fn as_any(&self) -> &dyn std::any::Any {
1725 self
1726 }
1727
1728 fn partition_columns(&self) -> &[String] {
1729 &[]
1730 }
1731
1732 fn find_region(
1733 &self,
1734 _values: &[datatypes::prelude::Value],
1735 ) -> partition::error::Result<store_api::storage::RegionNumber> {
1736 unimplemented!()
1737 }
1738
1739 fn split_record_batch(
1740 &self,
1741 record_batch: &RecordBatch,
1742 ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1743 {
1744 Ok(HashMap::from([(
1745 1,
1746 RegionMask::new(
1747 arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]),
1748 record_batch.num_rows(),
1749 ),
1750 )]))
1751 }
1752 }
1753
1754 struct TwoRegionPartitionRule {
1755 partition_columns: Vec<String>,
1756 }
1757
1758 impl PartitionRule for TwoRegionPartitionRule {
1759 fn as_any(&self) -> &dyn std::any::Any {
1760 self
1761 }
1762
1763 fn partition_columns(&self) -> &[String] {
1764 &self.partition_columns
1765 }
1766
1767 fn find_region(
1768 &self,
1769 _values: &[datatypes::prelude::Value],
1770 ) -> partition::error::Result<store_api::storage::RegionNumber> {
1771 unimplemented!()
1772 }
1773
1774 fn split_record_batch(
1775 &self,
1776 _record_batch: &RecordBatch,
1777 ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1778 {
1779 Ok(HashMap::from([
1780 (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)),
1781 (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)),
1782 (
1783 3,
1784 RegionMask::new(BooleanArray::from(vec![false, false]), 0),
1785 ),
1786 ]))
1787 }
1788 }
1789
1790 struct MockFlushPartitionProvider {
1791 partition_rule_calls: Arc<AtomicUsize>,
1792 region_leader_calls: Arc<AtomicUsize>,
1793 }
1794
1795 #[async_trait]
1796 impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider {
1797 async fn find_table_partition_rule(
1798 &self,
1799 _table_info: &table::metadata::TableInfo,
1800 ) -> PartitionResult<PartitionRuleRef> {
1801 self.partition_rule_calls.fetch_add(1, Ordering::SeqCst);
1802 Ok(Arc::new(SingleRegionPartitionRule))
1803 }
1804
1805 async fn find_region_leader(&self, _region_id: RegionId) -> error::Result<Peer> {
1806 self.region_leader_calls.fetch_add(1, Ordering::SeqCst);
1807 Ok(Peer {
1808 id: 1,
1809 addr: "node-1".to_string(),
1810 })
1811 }
1812 }
1813
1814 #[derive(Default)]
1815 struct MockFlushNodeRequester {
1816 writes: Arc<AtomicUsize>,
1817 }
1818
1819 #[async_trait]
1820 impl PhysicalFlushNodeRequester for MockFlushNodeRequester {
1821 async fn handle(
1822 &self,
1823 _peer: &Peer,
1824 _request: RegionRequest,
1825 ) -> error::Result<RegionResponse> {
1826 self.writes.fetch_add(1, Ordering::SeqCst);
1827 Ok(RegionResponse::new(0))
1828 }
1829 }
1830
1831 #[test]
1832 fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1833 let requests = RowInsertRequests {
1834 inserts: vec![
1835 RowInsertRequest {
1836 table_name: "cpu".to_string(),
1837 rows: Some(mock_rows(2, "host")),
1838 },
1839 RowInsertRequest {
1840 table_name: "mem".to_string(),
1841 rows: Some(mock_rows(0, "host")),
1842 },
1843 RowInsertRequest {
1844 table_name: "disk".to_string(),
1845 rows: None,
1846 },
1847 ],
1848 };
1849
1850 let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1851
1852 assert_eq!(2, total_rows);
1853 assert_eq!(1, table_rows.len());
1854 assert_eq!("cpu", table_rows[0].0);
1855 assert_eq!(2, table_rows[0].1.rows.len());
1856 }
1857
1858 #[test]
1859 fn test_drain_batch_takes_initialized_pending_batch_from_option() {
1860 let ctx = session::context::QueryContext::arc();
1861 let (response_tx, _response_rx) = oneshot::channel();
1862 let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap();
1863 let mut batch = Some(PendingBatch {
1864 tables: HashMap::from([(
1865 "cpu".to_string(),
1866 TableBatch {
1867 table_name: "cpu".to_string(),
1868 table_id: 42,
1869 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
1870 row_count: 1,
1871 },
1872 )]),
1873 created_at: Instant::now(),
1874 total_row_count: 1,
1875 db_string: ctx.get_db_string(),
1876 ctx: ctx.clone(),
1877 waiters: vec![FlushWaiter {
1878 response_tx,
1879 _permit: permit,
1880 }],
1881 });
1882
1883 let flush = drain_batch(&mut batch).unwrap();
1884
1885 assert!(batch.is_none());
1886 assert_eq!(1, flush.total_row_count);
1887 assert_eq!(1, flush.table_batches.len());
1888 assert_eq!(ctx.get_db_string(), flush.db_string);
1889 assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
1890 }
1891
1892 #[derive(Clone)]
1893 struct ConcurrentMockDatanode {
1894 delay: Duration,
1895 inflight: Arc<AtomicUsize>,
1896 max_inflight: Arc<AtomicUsize>,
1897 }
1898
1899 #[async_trait]
1900 impl Datanode for ConcurrentMockDatanode {
1901 async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1902 let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1903 loop {
1904 let max = self.max_inflight.load(Ordering::SeqCst);
1905 if now <= max {
1906 break;
1907 }
1908 if self
1909 .max_inflight
1910 .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1911 .is_ok()
1912 {
1913 break;
1914 }
1915 }
1916
1917 sleep(self.delay).await;
1918 self.inflight.fetch_sub(1, Ordering::SeqCst);
1919 Ok(RegionResponse::new(0))
1920 }
1921
1922 async fn handle_query(
1923 &self,
1924 _request: QueryRequest,
1925 ) -> MetaResult<SendableRecordBatchStream> {
1926 unimplemented!()
1927 }
1928 }
1929
1930 #[derive(Clone)]
1931 struct ConcurrentMockNodeManager {
1932 datanodes: Arc<HashMap<u64, DatanodeRef>>,
1933 }
1934
1935 #[async_trait]
1936 impl DatanodeManager for ConcurrentMockNodeManager {
1937 async fn datanode(&self, node: &Peer) -> DatanodeRef {
1938 self.datanodes
1939 .get(&node.id)
1940 .expect("datanode not found")
1941 .clone()
1942 }
1943 }
1944
1945 struct NoopFlownode;
1946
1947 #[async_trait]
1948 impl Flownode for NoopFlownode {
1949 async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1950 unimplemented!()
1951 }
1952
1953 async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1954 unimplemented!()
1955 }
1956
1957 async fn handle_mark_window_dirty(
1958 &self,
1959 _req: DirtyWindowRequests,
1960 ) -> MetaResult<FlowResponse> {
1961 unimplemented!()
1962 }
1963 }
1964
1965 #[async_trait]
1966 impl FlownodeManager for ConcurrentMockNodeManager {
1967 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1968 Arc::new(NoopFlownode)
1969 }
1970 }
1971
1972 #[async_trait]
1973 impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager {
1974 async fn handle(
1975 &self,
1976 peer: &Peer,
1977 request: RegionRequest,
1978 ) -> error::Result<RegionResponse> {
1979 let datanode = self.datanode(peer).await;
1980 datanode
1981 .handle(request)
1982 .await
1983 .context(error::CommonMetaSnafu)
1984 }
1985 }
1986
1987 #[test]
1988 fn test_remove_worker_if_same_channel_removes_matching_entry() {
1989 let workers = DashMap::new();
1990 let key = BatchKey {
1991 catalog: "greptime".to_string(),
1992 schema: "public".to_string(),
1993 physical_table: "phy".to_string(),
1994 };
1995
1996 let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
1997 workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
1998
1999 assert!(remove_worker_if_same_channel(&workers, &key, &tx));
2000 assert!(!workers.contains_key(&key));
2001 }
2002
2003 #[test]
2004 fn test_remove_worker_if_same_channel_keeps_newer_entry() {
2005 let workers = DashMap::new();
2006 let key = BatchKey {
2007 catalog: "greptime".to_string(),
2008 schema: "public".to_string(),
2009 physical_table: "phy".to_string(),
2010 };
2011
2012 let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
2013 let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
2014 workers.insert(
2015 key.clone(),
2016 PendingWorker {
2017 tx: fresh_tx.clone(),
2018 },
2019 );
2020
2021 assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
2022 assert!(workers.contains_key(&key));
2023 assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
2024 }
2025
2026 #[test]
2027 fn test_worker_idle_timeout_close_decision() {
2028 assert!(should_close_worker_on_idle_timeout(0, 0));
2029 assert!(!should_close_worker_on_idle_timeout(1, 0));
2030 assert!(!should_close_worker_on_idle_timeout(0, 1));
2031 }
2032
2033 #[tokio::test]
2034 async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
2035 let inflight = Arc::new(AtomicUsize::new(0));
2036 let max_inflight = Arc::new(AtomicUsize::new(0));
2037 let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2038 delay: Duration::from_millis(100),
2039 inflight: inflight.clone(),
2040 max_inflight: max_inflight.clone(),
2041 });
2042 let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2043 delay: Duration::from_millis(100),
2044 inflight,
2045 max_inflight: max_inflight.clone(),
2046 });
2047
2048 let mut datanodes = HashMap::new();
2049 datanodes.insert(1, datanode1);
2050 datanodes.insert(2, datanode2);
2051 let node_manager = Arc::new(ConcurrentMockNodeManager {
2052 datanodes: Arc::new(datanodes),
2053 });
2054
2055 let writes = vec![
2056 FlushRegionWrite {
2057 datanode: Peer {
2058 id: 1,
2059 addr: "node1".to_string(),
2060 },
2061 request: RegionRequest::default(),
2062 },
2063 FlushRegionWrite {
2064 datanode: Peer {
2065 id: 2,
2066 addr: "node2".to_string(),
2067 },
2068 request: RegionRequest::default(),
2069 },
2070 ];
2071
2072 flush_region_writes_concurrently(node_manager.as_ref(), writes)
2073 .await
2074 .unwrap();
2075 assert!(max_inflight.load(Ordering::SeqCst) >= 2);
2076 }
2077
2078 #[test]
2079 fn test_should_dispatch_concurrently_by_region_count() {
2080 assert!(!should_dispatch_concurrently(0));
2081 assert!(!should_dispatch_concurrently(1));
2082 assert!(should_dispatch_concurrently(2));
2083 }
2084
2085 #[test]
2086 fn test_strip_partition_columns_from_batch_removes_partition_tags() {
2087 let batch = RecordBatch::try_new(
2088 Arc::new(ArrowSchema::new(vec![
2089 Field::new("__primary_key", ArrowDataType::Binary, false),
2090 Field::new(
2091 "greptime_timestamp",
2092 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2093 false,
2094 ),
2095 Field::new("greptime_value", ArrowDataType::Float64, true),
2096 Field::new("host", ArrowDataType::Utf8, true),
2097 ])),
2098 vec![
2099 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2100 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2101 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2102 Arc::new(StringArray::from(vec!["node-1"])),
2103 ],
2104 )
2105 .unwrap();
2106
2107 let stripped = strip_partition_columns_from_batch(batch).unwrap();
2108
2109 assert_eq!(3, stripped.num_columns());
2110 assert_eq!("__primary_key", stripped.schema().field(0).name());
2111 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2112 assert_eq!("greptime_value", stripped.schema().field(2).name());
2113 }
2114
2115 #[test]
2116 fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
2117 let batch = RecordBatch::try_new(
2118 Arc::new(ArrowSchema::new(vec![
2119 Field::new("__primary_key", ArrowDataType::Binary, false),
2120 Field::new(
2121 "greptime_timestamp",
2122 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2123 false,
2124 ),
2125 Field::new("greptime_value", ArrowDataType::Float64, true),
2126 Field::new("host", ArrowDataType::Utf8, true),
2127 ])),
2128 vec![
2129 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2130 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2131 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2132 Arc::new(StringArray::from(vec!["node-1"])),
2133 ],
2134 )
2135 .unwrap();
2136
2137 let stripped = strip_partition_columns_from_batch(batch).unwrap();
2138
2139 assert_eq!(3, stripped.num_columns());
2140 assert_eq!("__primary_key", stripped.schema().field(0).name());
2141 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2142 assert_eq!("greptime_value", stripped.schema().field(2).name());
2143 }
2144
2145 #[test]
2146 fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
2147 let schema = Arc::new(ArrowSchema::new(vec![
2148 Field::new(
2149 "greptime_timestamp",
2150 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2151 false,
2152 ),
2153 Field::new("greptime_value", ArrowDataType::Float64, true),
2154 Field::new("host", ArrowDataType::Utf8, true),
2155 Field::new("region", ArrowDataType::Utf8, true),
2156 ]));
2157 let name_to_ids =
2158 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2159 let partition_columns = HashSet::from(["host"]);
2160
2161 let (tag_columns, non_tag_indices) =
2162 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2163
2164 assert_eq!(2, tag_columns.len());
2165 assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
2166 }
2167
2168 #[test]
2169 fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
2170 let schema = Arc::new(ArrowSchema::new(vec![
2171 Field::new("host", ArrowDataType::Utf8, true),
2172 Field::new("greptime_value", ArrowDataType::Float64, true),
2173 Field::new(
2174 "greptime_timestamp",
2175 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2176 false,
2177 ),
2178 Field::new("region", ArrowDataType::Utf8, true),
2179 ]));
2180 let name_to_ids =
2181 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2182 let partition_columns = HashSet::from(["host", "region"]);
2183
2184 let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
2185 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2186
2187 assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
2188 }
2189
2190 #[test]
2191 fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
2192 let schema = Arc::new(ArrowSchema::new(vec![
2193 Field::new(
2194 "greptime_timestamp",
2195 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2196 false,
2197 ),
2198 Field::new("greptime_value", ArrowDataType::Float64, true),
2199 Field::new("host", ArrowDataType::Utf8, true),
2200 Field::new("invalid", ArrowDataType::Boolean, true),
2201 ]));
2202 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2203 let partition_columns = HashSet::from(["host"]);
2204
2205 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2206
2207 assert!(matches!(
2208 result,
2209 Err(Error::InvalidPromRemoteRequest { .. })
2210 ));
2211 }
2212
2213 #[test]
2214 fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
2215 let schema = Arc::new(ArrowSchema::new(vec![
2216 Field::new("greptime_timestamp", ArrowDataType::Int64, false),
2217 Field::new("greptime_value", ArrowDataType::Float64, true),
2218 Field::new("host", ArrowDataType::Utf8, true),
2219 ]));
2220 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2221 let partition_columns = HashSet::from(["host"]);
2222
2223 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2224
2225 assert!(matches!(
2226 result,
2227 Err(Error::InvalidPromRemoteRequest { .. })
2228 ));
2229 }
2230
2231 #[test]
2232 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
2233 let schema = Arc::new(ArrowSchema::new(vec![
2234 Field::new(
2235 "ts1",
2236 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2237 false,
2238 ),
2239 Field::new(
2240 "ts2",
2241 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2242 false,
2243 ),
2244 Field::new("greptime_value", ArrowDataType::Float64, true),
2245 Field::new("host", ArrowDataType::Utf8, true),
2246 ]));
2247 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2248 let partition_columns = HashSet::from(["host"]);
2249
2250 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2251
2252 assert!(matches!(
2253 result,
2254 Err(Error::InvalidPromRemoteRequest { .. })
2255 ));
2256 }
2257
2258 #[test]
2259 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2260 let schema = Arc::new(ArrowSchema::new(vec![
2261 Field::new(
2262 "greptime_timestamp",
2263 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2264 false,
2265 ),
2266 Field::new("value1", ArrowDataType::Float64, true),
2267 Field::new("value2", ArrowDataType::Float64, true),
2268 Field::new("host", ArrowDataType::Utf8, true),
2269 ]));
2270 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2271 let partition_columns = HashSet::from(["host"]);
2272
2273 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2274
2275 assert!(matches!(
2276 result,
2277 Err(Error::InvalidPromRemoteRequest { .. })
2278 ));
2279 }
2280
2281 #[test]
2282 fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2283 use arrow::array::BinaryArray;
2284 use metric_engine::batch_modifier::modify_batch_sparse;
2285
2286 let schema1 = Arc::new(ArrowSchema::new(vec![
2287 Field::new(
2288 "greptime_timestamp",
2289 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2290 false,
2291 ),
2292 Field::new("greptime_value", ArrowDataType::Float64, true),
2293 Field::new("tag1", ArrowDataType::Utf8, true),
2294 ]));
2295
2296 let schema2 = Arc::new(ArrowSchema::new(vec![
2297 Field::new(
2298 "greptime_timestamp",
2299 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2300 false,
2301 ),
2302 Field::new("greptime_value", ArrowDataType::Float64, true),
2303 Field::new("tag1", ArrowDataType::Utf8, true),
2304 Field::new("tag2", ArrowDataType::Utf8, true),
2305 ]));
2306 let batch2 = RecordBatch::try_new(
2307 schema2.clone(),
2308 vec![
2309 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2310 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2311 Arc::new(StringArray::from(vec!["v1"])),
2312 Arc::new(StringArray::from(vec!["v2"])),
2313 ],
2314 )
2315 .unwrap();
2316
2317 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2318 let partition_columns = HashSet::new();
2319
2320 let batch3 = RecordBatch::try_new(
2322 schema1.clone(),
2323 vec![
2324 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2325 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2326 Arc::new(StringArray::from(vec!["v1"])),
2327 ],
2328 )
2329 .unwrap();
2330
2331 let (tag_columns2, indices2) =
2334 columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2335 let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2336
2337 let (tag_columns3, indices3) =
2338 columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2339 let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2340
2341 let pk2 = modified2
2342 .column(0)
2343 .as_any()
2344 .downcast_ref::<BinaryArray>()
2345 .unwrap();
2346 let pk3 = modified3
2347 .column(0)
2348 .as_any()
2349 .downcast_ref::<BinaryArray>()
2350 .unwrap();
2351
2352 assert_ne!(
2354 pk2.value(0),
2355 pk3.value(0),
2356 "PK should be different because batch2 has tag2!"
2357 );
2358 }
2359
2360 #[test]
2361 fn test_transform_logical_batches_to_physical_success() {
2362 let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2363
2364 let table_batches = vec![TableBatch {
2365 table_name: "t1".to_string(),
2366 table_id: 1,
2367 batches: vec![batch],
2368 row_count: 1,
2369 }];
2370
2371 let name_to_ids = HashMap::from([("tag1".to_string(), 1)]);
2372 let partition_columns = HashSet::new();
2373 let modified =
2374 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2375 .unwrap();
2376
2377 assert_eq!(1, modified.len());
2378 assert_eq!(3, modified[0].num_columns());
2379 assert_eq!("__primary_key", modified[0].schema().field(0).name());
2380 assert_eq!("greptime_timestamp", modified[0].schema().field(1).name());
2381 assert_eq!("greptime_value", modified[0].schema().field(2).name());
2382 }
2383
2384 #[test]
2385 fn test_transform_logical_batches_to_physical_taxonomy_failure() {
2386 let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2387
2388 let table_batches = vec![TableBatch {
2389 table_name: "t1".to_string(),
2390 table_id: 1,
2391 batches: vec![batch],
2392 row_count: 1,
2393 }];
2394
2395 let name_to_ids = HashMap::new();
2397 let partition_columns = HashSet::new();
2398 let err =
2399 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2400 .unwrap_err();
2401
2402 assert!(
2403 err.to_string()
2404 .contains("not found in physical table column IDs")
2405 );
2406 }
2407
2408 #[test]
2409 fn test_transform_logical_batches_to_physical_multiple_batches() {
2410 let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2411 let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2412
2413 let table_batches = vec![
2414 TableBatch {
2415 table_name: "t1".to_string(),
2416 table_id: 1,
2417 batches: vec![batch1],
2418 row_count: 1,
2419 },
2420 TableBatch {
2421 table_name: "t2".to_string(),
2422 table_id: 2,
2423 batches: vec![batch2],
2424 row_count: 1,
2425 },
2426 ];
2427
2428 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2429 let partition_columns = HashSet::new();
2430 let modified =
2431 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2432 .unwrap();
2433
2434 assert_eq!(2, modified.len());
2435 }
2436
2437 #[test]
2438 fn test_transform_logical_batches_to_physical_mixed_success_failure() {
2439 let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2440 let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2441
2442 let table_batches = vec![
2443 TableBatch {
2444 table_name: "t1".to_string(),
2445 table_id: 1,
2446 batches: vec![batch1],
2447 row_count: 1,
2448 },
2449 TableBatch {
2450 table_name: "t2".to_string(),
2451 table_id: 2,
2452 batches: vec![batch2],
2453 row_count: 1,
2454 },
2455 ];
2456
2457 let name_to_ids = HashMap::from([("tag2".to_string(), 2)]);
2459 let partition_columns = HashSet::new();
2460 let err =
2461 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2462 .unwrap_err();
2463
2464 assert!(err.to_string().contains("tag1"));
2465 }
2466
2467 #[tokio::test]
2468 async fn test_flush_batch_physical_uses_mockable_trait_dependencies() {
2469 let table_batches = vec![TableBatch {
2470 table_name: "t1".to_string(),
2471 table_id: 11,
2472 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2473 row_count: 1,
2474 }];
2475 let partition_calls = Arc::new(AtomicUsize::new(0));
2476 let leader_calls = Arc::new(AtomicUsize::new(0));
2477 let node = MockFlushNodeRequester::default();
2478 let ctx = session::context::QueryContext::arc();
2479
2480 flush_batch_physical(
2481 &table_batches,
2482 "phy",
2483 &ctx,
2484 &MockFlushPartitionProvider {
2485 partition_rule_calls: partition_calls.clone(),
2486 region_leader_calls: leader_calls.clone(),
2487 },
2488 &node,
2489 &MockFlushCatalogProvider {
2490 table: Some(mock_physical_table_metadata(1024)),
2491 },
2492 )
2493 .await
2494 .unwrap();
2495
2496 assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2497 assert_eq!(1, leader_calls.load(Ordering::SeqCst));
2498 assert_eq!(1, node.writes.load(Ordering::SeqCst));
2499 }
2500
2501 #[derive(Default)]
2502 struct AffectedRowsFlushNodeRequester {
2503 affected_rows: usize,
2504 }
2505
2506 #[async_trait]
2507 impl PhysicalFlushNodeRequester for AffectedRowsFlushNodeRequester {
2508 async fn handle(
2509 &self,
2510 _peer: &Peer,
2511 _request: RegionRequest,
2512 ) -> error::Result<RegionResponse> {
2513 Ok(RegionResponse::new(self.affected_rows))
2514 }
2515 }
2516
2517 #[tokio::test]
2518 async fn test_flush_batch_physical_returns_actual_affected_rows() {
2519 let table_batches = vec![TableBatch {
2520 table_name: "t1".to_string(),
2521 table_id: 11,
2522 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2523 row_count: 1,
2524 }];
2525 let ctx = session::context::QueryContext::arc();
2526
2527 let affected_rows = flush_batch_physical(
2528 &table_batches,
2529 "phy",
2530 &ctx,
2531 &MockFlushPartitionProvider {
2532 partition_rule_calls: Arc::new(AtomicUsize::new(0)),
2533 region_leader_calls: Arc::new(AtomicUsize::new(0)),
2534 },
2535 &AffectedRowsFlushNodeRequester { affected_rows: 7 },
2536 &MockFlushCatalogProvider {
2537 table: Some(mock_physical_table_metadata(1024)),
2538 },
2539 )
2540 .await
2541 .unwrap();
2542
2543 assert_eq!(7, affected_rows);
2544 }
2545
2546 #[tokio::test]
2547 async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
2548 let table_batches = vec![TableBatch {
2549 table_name: "t1".to_string(),
2550 table_id: 11,
2551 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2552 row_count: 1,
2553 }];
2554 let partition_calls = Arc::new(AtomicUsize::new(0));
2555 let leader_calls = Arc::new(AtomicUsize::new(0));
2556 let node = MockFlushNodeRequester::default();
2557 let ctx = session::context::QueryContext::arc();
2558
2559 let err = flush_batch_physical(
2560 &table_batches,
2561 "missing_phy",
2562 &ctx,
2563 &MockFlushPartitionProvider {
2564 partition_rule_calls: partition_calls.clone(),
2565 region_leader_calls: leader_calls.clone(),
2566 },
2567 &node,
2568 &MockFlushCatalogProvider { table: None },
2569 )
2570 .await
2571 .unwrap_err();
2572
2573 assert!(
2574 err.to_string()
2575 .contains("Physical table 'missing_phy' not found")
2576 );
2577 assert_eq!(0, partition_calls.load(Ordering::SeqCst));
2578 assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2579 assert_eq!(0, node.writes.load(Ordering::SeqCst));
2580 }
2581
2582 #[tokio::test]
2583 async fn test_flush_batch_physical_aborts_immediately_on_transform_error() {
2584 let table_batches = vec![
2585 TableBatch {
2586 table_name: "broken".to_string(),
2587 table_id: 11,
2588 batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)],
2589 row_count: 1,
2590 },
2591 TableBatch {
2592 table_name: "healthy".to_string(),
2593 table_id: 12,
2594 batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)],
2595 row_count: 1,
2596 },
2597 ];
2598 let partition_calls = Arc::new(AtomicUsize::new(0));
2599 let leader_calls = Arc::new(AtomicUsize::new(0));
2600 let node = MockFlushNodeRequester::default();
2601 let ctx = session::context::QueryContext::arc();
2602
2603 let err = flush_batch_physical(
2604 &table_batches,
2605 "phy",
2606 &ctx,
2607 &MockFlushPartitionProvider {
2608 partition_rule_calls: partition_calls.clone(),
2609 region_leader_calls: leader_calls.clone(),
2610 },
2611 &node,
2612 &MockFlushCatalogProvider {
2613 table: Some(mock_physical_table_metadata(1024)),
2614 },
2615 )
2616 .await
2617 .unwrap_err();
2618
2619 assert!(err.to_string().contains("unknown_tag"));
2620 assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2621 assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2622 assert_eq!(0, node.writes.load(Ordering::SeqCst));
2623 }
2624
2625 #[test]
2626 fn test_plan_region_batches_splits_and_strips_partition_columns() {
2627 let combined_batch = RecordBatch::try_new(
2628 Arc::new(ArrowSchema::new(vec![
2629 Field::new("__primary_key", ArrowDataType::Binary, false),
2630 Field::new(
2631 "greptime_timestamp",
2632 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2633 false,
2634 ),
2635 Field::new("greptime_value", ArrowDataType::Float64, true),
2636 Field::new("host", ArrowDataType::Utf8, true),
2637 ])),
2638 vec![
2639 Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])),
2640 Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])),
2641 Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])),
2642 Arc::new(StringArray::from(vec!["node-1", "node-2"])),
2643 ],
2644 )
2645 .unwrap();
2646 let mut planned_batches = plan_region_batches(
2647 combined_batch,
2648 1024,
2649 &TwoRegionPartitionRule {
2650 partition_columns: vec!["host".to_string()],
2651 },
2652 &["host".to_string()],
2653 )
2654 .unwrap();
2655 planned_batches.sort_by_key(|planned| planned.region_id.region_number());
2656
2657 assert_eq!(2, planned_batches.len());
2658 assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id);
2659 assert_eq!(1, planned_batches[0].num_rows());
2660 assert_eq!(3, planned_batches[0].batch.num_columns());
2661 assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id);
2662 assert_eq!(1, planned_batches[1].num_rows());
2663 assert_eq!(3, planned_batches[1].batch.num_columns());
2664 }
2665
2666 #[test]
2667 fn test_encode_region_write_requests_builds_bulk_insert_requests() {
2668 let planned_batch = PlannedRegionBatch {
2669 region_id: RegionId::new(1024, 1),
2670 batch: RecordBatch::try_new(
2671 Arc::new(ArrowSchema::new(vec![
2672 Field::new("__primary_key", ArrowDataType::Binary, false),
2673 Field::new(
2674 "greptime_timestamp",
2675 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2676 false,
2677 ),
2678 Field::new("greptime_value", ArrowDataType::Float64, true),
2679 ])),
2680 vec![
2681 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2682 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2683 Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])),
2684 ],
2685 )
2686 .unwrap(),
2687 };
2688 let resolved_batch = ResolvedRegionBatch {
2689 planned: planned_batch,
2690 datanode: Peer {
2691 id: 1,
2692 addr: "node-1".to_string(),
2693 },
2694 };
2695 let writes = encode_region_write_requests(vec![resolved_batch]).unwrap();
2696
2697 assert_eq!(1, writes.len());
2698 assert_eq!(1, writes[0].datanode.id);
2699 let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else {
2700 panic!("expected bulk insert request");
2701 };
2702 assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id);
2703 }
2704}