1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21 BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use partition::partition::PartitionRuleRef;
40use session::context::QueryContextRef;
41use smallvec::SmallVec;
42use snafu::{OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use table::metadata::{TableInfo, TableInfoRef};
45use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
46
47use crate::error;
48use crate::error::{Error, Result};
49use crate::metrics::{
50 FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
51 PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
52 PENDING_WORKERS,
53};
54use crate::prom_row_builder::{
55 build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
56 rows_to_aligned_record_batch,
57};
58
59const PHYSICAL_TABLE_KEY: &str = "physical_table";
60const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
62const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
63const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
64
65#[async_trait]
66pub trait PendingRowsSchemaAlterer: Send + Sync {
67 async fn create_tables_if_missing_batch(
70 &self,
71 catalog: &str,
72 schema: &str,
73 tables: &[(&str, &[ColumnSchema])],
74 with_metric_engine: bool,
75 ctx: QueryContextRef,
76 ) -> Result<()>;
77
78 async fn add_missing_prom_tag_columns_batch(
81 &self,
82 catalog: &str,
83 schema: &str,
84 tables: &[(&str, &[String])],
85 ctx: QueryContextRef,
86 ) -> Result<()>;
87}
88
89pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
90
91#[derive(Clone)]
92pub struct PhysicalTableMetadata {
93 pub table_info: TableInfoRef,
94 pub col_name_to_ids: Option<HashMap<String, u32>>,
96}
97
98#[async_trait]
99pub trait PhysicalFlushCatalogProvider: Send + Sync {
100 async fn physical_table(
101 &self,
102 catalog: &str,
103 schema: &str,
104 table_name: &str,
105 query_ctx: &session::context::QueryContext,
106 ) -> catalog::error::Result<Option<PhysicalTableMetadata>>;
107}
108
109#[async_trait]
110pub trait PhysicalFlushPartitionProvider: Send + Sync {
111 async fn find_table_partition_rule(
112 &self,
113 table_info: &TableInfo,
114 ) -> partition::error::Result<PartitionRuleRef>;
115
116 async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer>;
117}
118
119#[async_trait]
120pub trait PhysicalFlushNodeRequester: Send + Sync {
121 async fn handle(
122 &self,
123 peer: &Peer,
124 request: RegionRequest,
125 ) -> Result<api::region::RegionResponse>;
126}
127
128#[derive(Clone)]
129struct CatalogManagerPhysicalFlushAdapter {
130 catalog_manager: CatalogManagerRef,
131}
132
133#[async_trait]
134impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter {
135 async fn physical_table(
136 &self,
137 catalog: &str,
138 schema: &str,
139 table_name: &str,
140 query_ctx: &session::context::QueryContext,
141 ) -> catalog::error::Result<Option<PhysicalTableMetadata>> {
142 self.catalog_manager
143 .table(catalog, schema, table_name, Some(query_ctx))
144 .await
145 .map(|table| {
146 table.map(|table| {
147 let table_info = table.table_info();
148 let name_to_ids = table_info.name_to_ids();
149 PhysicalTableMetadata {
150 table_info,
151 col_name_to_ids: name_to_ids,
152 }
153 })
154 })
155 }
156}
157
158#[derive(Clone)]
159struct PartitionManagerPhysicalFlushAdapter {
160 partition_manager: PartitionRuleManagerRef,
161}
162
163#[async_trait]
164impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter {
165 async fn find_table_partition_rule(
166 &self,
167 table_info: &TableInfo,
168 ) -> partition::error::Result<PartitionRuleRef> {
169 self.partition_manager
170 .find_table_partition_rule(table_info)
171 .await
172 .map(|(rule, _)| rule)
173 }
174
175 async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176 let peer = self.partition_manager.find_region_leader(region_id).await?;
177 Ok(peer)
178 }
179}
180
181#[derive(Clone)]
182struct NodeManagerPhysicalFlushAdapter {
183 node_manager: NodeManagerRef,
184}
185
186#[async_trait]
187impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter {
188 async fn handle(
189 &self,
190 peer: &Peer,
191 request: RegionRequest,
192 ) -> error::Result<api::region::RegionResponse> {
193 let datanode = self.node_manager.datanode(peer).await;
194 datanode
195 .handle(request)
196 .await
197 .context(error::CommonMetaSnafu)
198 }
199}
200
201#[derive(Debug, Clone, Hash, Eq, PartialEq)]
202struct BatchKey {
203 catalog: String,
204 schema: String,
205 physical_table: String,
206}
207
208#[derive(Debug)]
209pub struct TableBatch {
210 pub table_name: String,
211 pub table_id: TableId,
212 pub batches: Vec<RecordBatch>,
213 pub row_count: usize,
214}
215
216struct TableResolutionPlan {
219 region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
221 tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
223 tables_to_alter: Vec<(String, Vec<String>)>,
225}
226
227struct PendingBatch {
228 tables: HashMap<String, TableBatch>,
229 created_at: Instant,
230 total_row_count: usize,
231 db_string: String,
232 ctx: QueryContextRef,
233 waiters: Vec<FlushWaiter>,
234}
235
236struct FlushWaiter {
237 response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
238 _permit: OwnedSemaphorePermit,
239}
240
241struct FlushBatch {
242 table_batches: Vec<TableBatch>,
243 total_row_count: usize,
244 db_string: String,
245 ctx: QueryContextRef,
246 waiters: Vec<FlushWaiter>,
247}
248
249#[derive(Clone)]
250struct PendingWorker {
251 tx: mpsc::Sender<WorkerCommand>,
252}
253
254enum WorkerCommand {
255 Submit {
256 table_batches: Vec<(String, u32, RecordBatch)>,
257 total_rows: usize,
258 ctx: QueryContextRef,
259 response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
260 _permit: OwnedSemaphorePermit,
261 },
262}
263
264fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
267 let physical_table = ctx
268 .extension(PHYSICAL_TABLE_KEY)
269 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
270 .to_string();
271 BatchKey {
272 catalog: ctx.current_catalog().to_string(),
273 schema: ctx.current_schema(),
274 physical_table,
275 }
276}
277
278pub struct PendingRowsBatcher {
280 workers: Arc<DashMap<BatchKey, PendingWorker>>,
281 flush_interval: Duration,
282 max_batch_rows: usize,
283 partition_manager: PartitionRuleManagerRef,
284 node_manager: NodeManagerRef,
285 catalog_manager: CatalogManagerRef,
286 flush_semaphore: Arc<Semaphore>,
287 inflight_semaphore: Arc<Semaphore>,
288 worker_channel_capacity: usize,
289 prom_store_with_metric_engine: bool,
290 schema_alterer: PendingRowsSchemaAltererRef,
291 pending_rows_batch_sync: bool,
292 shutdown: broadcast::Sender<()>,
293}
294
295impl PendingRowsBatcher {
296 #[allow(clippy::too_many_arguments)]
297 pub fn try_new(
298 partition_manager: PartitionRuleManagerRef,
299 node_manager: NodeManagerRef,
300 catalog_manager: CatalogManagerRef,
301 prom_store_with_metric_engine: bool,
302 schema_alterer: PendingRowsSchemaAltererRef,
303 flush_interval: Duration,
304 max_batch_rows: usize,
305 max_concurrent_flushes: usize,
306 worker_channel_capacity: usize,
307 max_inflight_requests: usize,
308 ) -> Option<Arc<Self>> {
309 if flush_interval.is_zero()
313 || max_batch_rows == 0
314 || max_concurrent_flushes == 0
315 || worker_channel_capacity == 0
316 || max_inflight_requests == 0
317 {
318 return None;
319 }
320
321 let (shutdown, _) = broadcast::channel(1);
322 let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
323 .ok()
324 .as_deref()
325 .and_then(|v| v.parse::<bool>().ok())
326 .unwrap_or(true);
327 let workers = Arc::new(DashMap::new());
328 PENDING_WORKERS.set(workers.len() as i64);
329
330 Some(Arc::new(Self {
331 workers,
332 flush_interval,
333 max_batch_rows,
334 partition_manager,
335 node_manager,
336 catalog_manager,
337 prom_store_with_metric_engine,
338 schema_alterer,
339 flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
340 inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
341 worker_channel_capacity,
342 pending_rows_batch_sync,
343 shutdown,
344 }))
345 }
346
347 pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
348 let (table_batches, total_rows) = {
349 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
350 .with_label_values(&["submit_build_and_align"])
351 .start_timer();
352 self.build_and_align_table_batches(requests, &ctx).await?
353 };
354 if total_rows == 0 {
355 return Ok(0);
356 }
357
358 let permit = {
359 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
360 .with_label_values(&["submit_acquire_inflight_permit"])
361 .start_timer();
362 self.inflight_semaphore
363 .clone()
364 .acquire_owned()
365 .await
366 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
367 };
368
369 let (response_tx, response_rx) = oneshot::channel();
370
371 let batch_key = batch_key_from_ctx(&ctx);
372 let mut cmd = Some(WorkerCommand::Submit {
373 table_batches,
374 total_rows,
375 ctx,
376 response_tx,
377 _permit: permit,
378 });
379
380 {
381 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
382 .with_label_values(&["submit_send_to_worker"])
383 .start_timer();
384
385 for _ in 0..2 {
386 let worker = self.get_or_spawn_worker(batch_key.clone());
387 let Some(worker_cmd) = cmd.take() else {
388 break;
389 };
390
391 match worker.tx.send(worker_cmd).await {
392 Ok(()) => break,
393 Err(err) => {
394 cmd = Some(err.0);
395 remove_worker_if_same_channel(
396 self.workers.as_ref(),
397 &batch_key,
398 &worker.tx,
399 );
400 }
401 }
402 }
403
404 if cmd.is_some() {
405 return Err(Error::BatcherChannelClosed);
406 }
407 }
408
409 if self.pending_rows_batch_sync {
410 let result = {
411 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
412 .with_label_values(&["submit_wait_flush_result"])
413 .start_timer();
414 response_rx
415 .await
416 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
417 };
418 result
419 .context(error::SubmitBatchSnafu)
420 .map(|()| total_rows as u64)
421 } else {
422 Ok(total_rows as u64)
423 }
424 }
425
426 async fn build_and_align_table_batches(
431 &self,
432 requests: RowInsertRequests,
433 ctx: &QueryContextRef,
434 ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
435 let catalog = ctx.current_catalog().to_string();
436 let schema = ctx.current_schema();
437
438 let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
439 if total_rows == 0 {
440 return Ok((Vec::new(), 0));
441 }
442
443 let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
444 let mut plan = self
445 .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
446 .await?;
447
448 self.create_missing_tables_and_refresh_schemas(
449 &catalog,
450 &schema,
451 ctx,
452 &table_rows,
453 &mut plan,
454 )
455 .await?;
456
457 self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
458 .await?;
459
460 let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
461
462 Ok((aligned_batches, total_rows))
463 }
464
465 fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
468 let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
469 let mut total_rows = 0;
470
471 for request in requests.inserts {
472 let Some(rows) = request.rows else {
473 continue;
474 };
475 if rows.rows.is_empty() {
476 continue;
477 }
478
479 total_rows += rows.rows.len();
480 table_rows.push((request.table_name, rows));
481 }
482
483 (table_rows, total_rows)
484 }
485
486 fn collect_unique_table_schemas(
489 table_rows: &[(String, Rows)],
490 ) -> Result<Vec<(&str, &[ColumnSchema])>> {
491 let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
492 let mut seen = HashSet::new();
493
494 for (table_name, rows) in table_rows {
495 if seen.insert(table_name.as_str()) {
496 unique_tables.push((table_name.as_str(), &rows.schema));
497 } else {
498 return error::InvalidPromRemoteRequestSnafu {
500 msg: format!(
501 "Found duplicated table name in RowInsertRequest: {}",
502 table_name
503 ),
504 }
505 .fail();
506 }
507 }
508
509 Ok(unique_tables)
510 }
511
512 async fn plan_table_resolution(
515 &self,
516 catalog: &str,
517 schema: &str,
518 ctx: &QueryContextRef,
519 unique_tables: &[(&str, &[ColumnSchema])],
520 ) -> Result<TableResolutionPlan> {
521 let mut plan = TableResolutionPlan {
522 region_schemas: HashMap::with_capacity(unique_tables.len()),
523 tables_to_create: Vec::new(),
524 tables_to_alter: Vec::new(),
525 };
526
527 let resolved_tables = {
528 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
529 .with_label_values(&["align_resolve_table"])
530 .start_timer();
531 futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
532 self.catalog_manager
533 .table(catalog, schema, table_name, Some(ctx.as_ref()))
534 }))
535 .await
536 };
537
538 for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
539 let table = table_result?;
540
541 if let Some(table) = table {
542 let table_info = table.table_info();
543 let table_id = table_info.ident.table_id;
544 let region_schema = table_info.meta.schema.arrow_schema().clone();
545
546 let missing_columns = {
547 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
548 .with_label_values(&["align_identify_missing_columns"])
549 .start_timer();
550 identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
551 };
552 if !missing_columns.is_empty() {
553 plan.tables_to_alter
554 .push(((*table_name).to_string(), missing_columns));
555 }
556 plan.region_schemas
557 .insert((*table_name).to_string(), (region_schema, table_id));
558 } else {
559 let request_schema = {
560 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
561 .with_label_values(&["align_build_create_table_schema"])
562 .start_timer();
563 build_prom_create_table_schema_from_proto(rows_schema)?
564 };
565 plan.tables_to_create
566 .push(((*table_name).to_string(), request_schema));
567 }
568 }
569
570 Ok(plan)
571 }
572
573 async fn create_missing_tables_and_refresh_schemas(
576 &self,
577 catalog: &str,
578 schema: &str,
579 ctx: &QueryContextRef,
580 table_rows: &[(String, Rows)],
581 plan: &mut TableResolutionPlan,
582 ) -> Result<()> {
583 if plan.tables_to_create.is_empty() {
584 return Ok(());
585 }
586
587 let create_refs: Vec<(&str, &[ColumnSchema])> = plan
588 .tables_to_create
589 .iter()
590 .map(|(name, schema)| (name.as_str(), schema.as_slice()))
591 .collect();
592
593 {
594 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
595 .with_label_values(&["align_batch_create_tables"])
596 .start_timer();
597 self.schema_alterer
598 .create_tables_if_missing_batch(
599 catalog,
600 schema,
601 &create_refs,
602 self.prom_store_with_metric_engine,
603 ctx.clone(),
604 )
605 .await?;
606 }
607
608 let created_table_results = {
609 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
610 .with_label_values(&["align_resolve_table_after_create"])
611 .start_timer();
612 futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
613 self.catalog_manager
614 .table(catalog, schema, table_name, Some(ctx.as_ref()))
615 }))
616 .await
617 };
618
619 for ((table_name, _), table_result) in
620 plan.tables_to_create.iter().zip(created_table_results)
621 {
622 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
623 reason: format!(
624 "Table not found after pending batch create attempt: {}",
625 table_name
626 ),
627 })?;
628 let table_info = table.table_info();
629 let table_id = table_info.ident.table_id;
630 let region_schema = table_info.meta.schema.arrow_schema().clone();
631 plan.region_schemas
632 .insert(table_name.clone(), (region_schema, table_id));
633 }
634
635 Self::enqueue_alter_for_new_tables(table_rows, plan)?;
636
637 Ok(())
638 }
639
640 fn enqueue_alter_for_new_tables(
643 table_rows: &[(String, Rows)],
644 plan: &mut TableResolutionPlan,
645 ) -> Result<()> {
646 let created_tables: HashSet<&str> = plan
647 .tables_to_create
648 .iter()
649 .map(|(table_name, _)| table_name.as_str())
650 .collect();
651
652 for (table_name, rows) in table_rows {
653 if !created_tables.contains(table_name.as_str()) {
654 continue;
655 }
656
657 let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
658 continue;
659 };
660
661 let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
662 if missing_columns.is_empty()
663 || plan
664 .tables_to_alter
665 .iter()
666 .any(|(existing_name, _)| existing_name == table_name)
667 {
668 continue;
669 }
670
671 plan.tables_to_alter
672 .push((table_name.clone(), missing_columns));
673 }
674
675 Ok(())
676 }
677
678 async fn alter_tables_and_refresh_schemas(
681 &self,
682 catalog: &str,
683 schema: &str,
684 ctx: &QueryContextRef,
685 plan: &mut TableResolutionPlan,
686 ) -> Result<()> {
687 if plan.tables_to_alter.is_empty() {
688 return Ok(());
689 }
690
691 let alter_refs: Vec<(&str, &[String])> = plan
692 .tables_to_alter
693 .iter()
694 .map(|(name, cols)| (name.as_str(), cols.as_slice()))
695 .collect();
696 {
697 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
698 .with_label_values(&["align_batch_add_missing_columns"])
699 .start_timer();
700 self.schema_alterer
701 .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
702 .await?;
703 }
704
705 let altered_table_results = {
706 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
707 .with_label_values(&["align_resolve_table_after_schema_alter"])
708 .start_timer();
709 futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
710 self.catalog_manager
711 .table(catalog, schema, table_name, Some(ctx.as_ref()))
712 }))
713 .await
714 };
715
716 for ((table_name, _), table_result) in
717 plan.tables_to_alter.iter().zip(altered_table_results)
718 {
719 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
720 reason: format!(
721 "Table not found after pending batch schema alter: {}",
722 table_name
723 ),
724 })?;
725 let table_info = table.table_info();
726 let table_id = table_info.ident.table_id;
727 let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
728 plan.region_schemas
729 .insert(table_name.clone(), (refreshed_region_schema, table_id));
730 }
731
732 Ok(())
733 }
734
735 fn build_aligned_batches(
738 table_rows: &[(String, Rows)],
739 region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
740 ) -> Result<Vec<(String, u32, RecordBatch)>> {
741 let mut aligned_batches = Vec::with_capacity(table_rows.len());
742 for (table_name, rows) in table_rows {
743 let (region_schema, table_id) =
744 region_schemas.get(table_name).cloned().with_context(|| {
745 error::UnexpectedResultSnafu {
746 reason: format!("Region schema not resolved for table: {}", table_name),
747 }
748 })?;
749
750 let record_batch = {
751 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
752 .with_label_values(&["align_rows_to_record_batch"])
753 .start_timer();
754 rows_to_aligned_record_batch(rows, region_schema.as_ref())?
755 };
756 aligned_batches.push((table_name.clone(), table_id, record_batch));
757 }
758
759 Ok(aligned_batches)
760 }
761
762 fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
763 if let Some(worker) = self.workers.get(&key)
764 && !worker.tx.is_closed()
765 {
766 return worker.clone();
767 }
768
769 let entry = self.workers.entry(key.clone());
770 match entry {
771 Entry::Occupied(mut worker) => {
772 if worker.get().tx.is_closed() {
773 let new_worker = self.spawn_worker(key);
774 worker.insert(new_worker.clone());
775 PENDING_WORKERS.set(self.workers.len() as i64);
776 new_worker
777 } else {
778 worker.get().clone()
779 }
780 }
781 Entry::Vacant(vacant) => {
782 let worker = self.spawn_worker(key);
783
784 vacant.insert(worker.clone());
785 PENDING_WORKERS.set(self.workers.len() as i64);
786 worker
787 }
788 }
789 }
790
791 fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
792 let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
793 let worker = PendingWorker { tx: tx.clone() };
794 let worker_idle_timeout = self
795 .flush_interval
796 .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
797 .unwrap_or(self.flush_interval);
798
799 start_worker(
800 key,
801 worker.tx.clone(),
802 self.workers.clone(),
803 rx,
804 self.shutdown.clone(),
805 self.partition_manager.clone(),
806 self.node_manager.clone(),
807 self.catalog_manager.clone(),
808 self.flush_interval,
809 worker_idle_timeout,
810 self.max_batch_rows,
811 self.flush_semaphore.clone(),
812 );
813
814 worker
815 }
816}
817
818impl Drop for PendingRowsBatcher {
819 fn drop(&mut self) {
820 let _ = self.shutdown.send(());
821 }
822}
823
824impl PendingBatch {
825 fn new(ctx: QueryContextRef) -> Self {
826 let db_string = ctx.get_db_string();
827 Self {
828 tables: HashMap::new(),
829 created_at: Instant::now(),
830 total_row_count: 0,
831 db_string,
832 ctx,
833 waiters: Vec::new(),
834 }
835 }
836}
837
838#[allow(clippy::too_many_arguments)]
839fn start_worker(
840 key: BatchKey,
841 worker_tx: mpsc::Sender<WorkerCommand>,
842 workers: Arc<DashMap<BatchKey, PendingWorker>>,
843 mut rx: mpsc::Receiver<WorkerCommand>,
844 shutdown: broadcast::Sender<()>,
845 partition_manager: PartitionRuleManagerRef,
846 node_manager: NodeManagerRef,
847 catalog_manager: CatalogManagerRef,
848 flush_interval: Duration,
849 worker_idle_timeout: Duration,
850 max_batch_rows: usize,
851 flush_semaphore: Arc<Semaphore>,
852) {
853 tokio::spawn(async move {
854 let mut batch = None;
855 let mut interval = tokio::time::interval(flush_interval);
856 let mut shutdown_rx = shutdown.subscribe();
857 let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
858 let idle_timer = tokio::time::sleep_until(idle_deadline);
859 tokio::pin!(idle_timer);
860
861 loop {
862 tokio::select! {
863 cmd = rx.recv() => {
864 match cmd {
865 Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
866 idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
867
868 let pending_batch = batch.get_or_insert_with(||{
869 PENDING_BATCHES.inc();
870 PendingBatch::new(ctx)
871 });
872
873 pending_batch.waiters.push(FlushWaiter { response_tx, _permit });
874
875 for (table_name, table_id, record_batch) in table_batches {
876 let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
877 table_name,
878 table_id,
879 batches: Vec::new(),
880 row_count: 0,
881 });
882 entry.row_count += record_batch.num_rows();
883 entry.batches.push(record_batch);
884 }
885
886 pending_batch.total_row_count += total_rows;
887 PENDING_ROWS.add(total_rows as i64);
888
889 if pending_batch.total_row_count >= max_batch_rows
890 && let Some(flush) = drain_batch(&mut batch) {
891 spawn_flush(
892 flush,
893 partition_manager.clone(),
894 node_manager.clone(),
895 catalog_manager.clone(),
896 flush_semaphore.clone(),
897 ).await;
898 }
899 }
900 None => {
901 if let Some(flush) = drain_batch(&mut batch) {
902 flush_batch(
903 flush,
904 partition_manager.clone(),
905 node_manager.clone(),
906 catalog_manager.clone(),
907 ).await;
908 }
909 break;
910 }
911 }
912 }
913 _ = &mut idle_timer => {
914 if !should_close_worker_on_idle_timeout(
915 batch.as_ref().map_or(0, |batch| batch.total_row_count),
916 rx.len(),
917 ) {
918 idle_timer
919 .as_mut()
920 .reset(tokio::time::Instant::now() + worker_idle_timeout);
921 continue;
922 }
923
924 debug!(
925 "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
926 key.catalog,
927 key.schema,
928 key.physical_table
929 );
930 break;
931 }
932 _ = interval.tick() => {
933 if batch
934 .as_ref()
935 .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval)
936 && let Some(flush) = drain_batch(&mut batch) {
937 spawn_flush(
938 flush,
939 partition_manager.clone(),
940 node_manager.clone(),
941 catalog_manager.clone(),
942 flush_semaphore.clone(),
943 ).await;
944 }
945 }
946 _ = shutdown_rx.recv() => {
947 if let Some(flush) = drain_batch(&mut batch) {
948 flush_batch(
949 flush,
950 partition_manager.clone(),
951 node_manager.clone(),
952 catalog_manager.clone(),
953 ).await;
954 }
955 break;
956 }
957 }
958 }
959
960 remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
961 });
962}
963
964fn remove_worker_if_same_channel(
965 workers: &DashMap<BatchKey, PendingWorker>,
966 key: &BatchKey,
967 worker_tx: &mpsc::Sender<WorkerCommand>,
968) -> bool {
969 if let Some(worker) = workers.get(key)
970 && worker.tx.same_channel(worker_tx)
971 {
972 drop(worker);
973 workers.remove(key);
974 PENDING_WORKERS.set(workers.len() as i64);
975 return true;
976 }
977
978 false
979}
980
981fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
982 total_row_count == 0 && queued_requests == 0
983}
984
985fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
986 let batch = batch.take()?;
987 let total_row_count = batch.total_row_count;
988
989 if total_row_count == 0 {
990 return None;
991 }
992
993 let table_batches = batch.tables.into_values().collect();
994 let waiters = batch.waiters;
995
996 PENDING_ROWS.sub(total_row_count as i64);
997 PENDING_BATCHES.dec();
998
999 Some(FlushBatch {
1000 table_batches,
1001 total_row_count,
1002 db_string: batch.db_string,
1003 ctx: batch.ctx,
1004 waiters,
1005 })
1006}
1007
1008async fn spawn_flush(
1009 flush: FlushBatch,
1010 partition_manager: PartitionRuleManagerRef,
1011 node_manager: NodeManagerRef,
1012 catalog_manager: CatalogManagerRef,
1013 semaphore: Arc<Semaphore>,
1014) {
1015 match semaphore.acquire_owned().await {
1016 Ok(permit) => {
1017 tokio::spawn(async move {
1018 let _permit = permit;
1019 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1020 });
1021 }
1022 Err(err) => {
1023 warn!(err; "Flush semaphore closed, flushing inline");
1024 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1025 }
1026 }
1027}
1028
1029struct FlushRegionWrite {
1030 datanode: Peer,
1031 request: RegionRequest,
1032}
1033
1034struct PlannedRegionBatch {
1035 region_id: RegionId,
1036 batch: RecordBatch,
1037}
1038
1039#[cfg(test)]
1040impl PlannedRegionBatch {
1041 fn num_rows(&self) -> usize {
1042 self.batch.num_rows()
1043 }
1044}
1045
1046struct ResolvedRegionBatch {
1047 planned: PlannedRegionBatch,
1048 datanode: Peer,
1049}
1050
1051fn should_dispatch_concurrently(region_write_count: usize) -> bool {
1052 region_write_count > 1
1053}
1054
1055fn columns_taxonomy(
1064 batch_schema: &Arc<ArrowSchema>,
1065 table_name: &str,
1066 name_to_ids: &HashMap<String, u32>,
1067 partition_columns: &HashSet<&str>,
1068) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
1069 let mut tag_columns = Vec::new();
1070 let mut essential_column_indices =
1071 SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
1072 essential_column_indices.push(0);
1074 essential_column_indices.push(0);
1075
1076 let mut timestamp_index = None;
1077 let mut value_index = None;
1078
1079 for (index, field) in batch_schema.fields().iter().enumerate() {
1080 match field.data_type() {
1081 ArrowDataType::Utf8 => {
1082 let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
1083 error::InvalidPromRemoteRequestSnafu {
1084 msg: format!(
1085 "Column '{}' from logical table '{}' not found in physical table column IDs",
1086 field.name(),
1087 table_name
1088 ),
1089 }
1090 })?;
1091 tag_columns.push(TagColumnInfo {
1092 name: field.name().clone(),
1093 index,
1094 column_id,
1095 });
1096
1097 if partition_columns.contains(field.name().as_str()) {
1098 essential_column_indices.push(index);
1099 }
1100 }
1101 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
1102 ensure!(
1103 timestamp_index.replace(index).is_none(),
1104 error::InvalidPromRemoteRequestSnafu {
1105 msg: format!(
1106 "Duplicated timestamp column in logical table '{}' batch schema",
1107 table_name
1108 ),
1109 }
1110 );
1111 }
1112 ArrowDataType::Float64 => {
1113 ensure!(
1114 value_index.replace(index).is_none(),
1115 error::InvalidPromRemoteRequestSnafu {
1116 msg: format!(
1117 "Duplicated value column in logical table '{}' batch schema",
1118 table_name
1119 ),
1120 }
1121 );
1122 }
1123 datatype => {
1124 return error::InvalidPromRemoteRequestSnafu {
1125 msg: format!(
1126 "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1127 table_name
1128 ),
1129 }
1130 .fail();
1131 }
1132 }
1133 }
1134
1135 let timestamp_index =
1136 timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1137 msg: format!(
1138 "Missing essential column '{}' in logical table '{}' batch schema",
1139 greptime_timestamp(),
1140 table_name
1141 ),
1142 })?;
1143 let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1144 msg: format!(
1145 "Missing essential column '{}' in logical table '{}' batch schema",
1146 greptime_value(),
1147 table_name
1148 ),
1149 })?;
1150
1151 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1152
1153 essential_column_indices[0] = timestamp_index;
1154 essential_column_indices[1] = value_index;
1155
1156 Ok((tag_columns, essential_column_indices))
1157}
1158
1159fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1160 ensure!(
1161 batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1162 error::InternalSnafu {
1163 err_msg: format!(
1164 "Expected at least {} columns in physical batch, got {}",
1165 PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1166 batch.num_columns()
1167 ),
1168 }
1169 );
1170 let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1171 batch.project(&essential_indices).context(error::ArrowSnafu)
1172}
1173
1174async fn flush_region_writes_concurrently(
1175 node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1176 writes: Vec<FlushRegionWrite>,
1177) -> Result<usize> {
1178 let mut affected_rows = 0;
1179 if !should_dispatch_concurrently(writes.len()) {
1180 for write in writes {
1181 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1182 .with_label_values(&["flush_write_region"])
1183 .start_timer();
1184 affected_rows += node_manager
1185 .handle(&write.datanode, write.request)
1186 .await?
1187 .affected_rows;
1188 }
1189 return Ok(affected_rows);
1190 }
1191
1192 let write_futures = writes.into_iter().map(|write| async move {
1193 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1194 .with_label_values(&["flush_write_region"])
1195 .start_timer();
1196
1197 let response = node_manager.handle(&write.datanode, write.request).await?;
1198 Ok::<_, Error>(response.affected_rows)
1199 });
1200
1201 let affected_rows = futures::future::try_join_all(write_futures)
1203 .await?
1204 .into_iter()
1205 .sum();
1206 Ok(affected_rows)
1207}
1208
1209async fn flush_batch(
1210 flush: FlushBatch,
1211 partition_manager: PartitionRuleManagerRef,
1212 node_manager: NodeManagerRef,
1213 catalog_manager: CatalogManagerRef,
1214) {
1215 let FlushBatch {
1216 table_batches,
1217 total_row_count,
1218 db_string,
1219 ctx,
1220 waiters,
1221 } = flush;
1222 let start = Instant::now();
1223
1224 let physical_table_name = ctx
1227 .extension(PHYSICAL_TABLE_KEY)
1228 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1229 .to_string();
1230 let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager };
1231 let node_requester = NodeManagerPhysicalFlushAdapter { node_manager };
1232 let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager };
1233 let result = flush_batch_physical(
1234 &table_batches,
1235 &physical_table_name,
1236 &ctx,
1237 &partition_provider,
1238 &node_requester,
1239 &catalog_provider,
1240 )
1241 .await;
1242
1243 let elapsed = start.elapsed().as_secs_f64();
1244 FLUSH_ELAPSED.observe(elapsed);
1245
1246 if result.is_err() {
1247 FLUSH_FAILURES.inc();
1248 FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
1249 } else if let Ok(affected_rows) = &result {
1250 FLUSH_TOTAL.inc();
1251 FLUSH_ROWS.observe(total_row_count as f64);
1252 operator::metrics::DIST_INGEST_ROW_COUNT
1253 .with_label_values(&[db_string.as_str()])
1254 .inc_by(*affected_rows as u64);
1255 }
1256
1257 debug!(
1258 "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1259 total_row_count, elapsed
1260 );
1261
1262 notify_waiters(waiters, result.map(|_| ()));
1263}
1264
1265pub async fn flush_batch_physical(
1276 table_batches: &[TableBatch],
1277 physical_table_name: &str,
1278 ctx: &QueryContextRef,
1279 partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1280 node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1281 catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
1282) -> Result<usize> {
1283 let physical_table = {
1285 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1286 .with_label_values(&["flush_physical_resolve_table"])
1287 .start_timer();
1288 catalog_manager
1289 .physical_table(
1290 ctx.current_catalog(),
1291 &ctx.current_schema(),
1292 physical_table_name,
1293 ctx.as_ref(),
1294 )
1295 .await?
1296 .with_context(|| error::InternalSnafu {
1297 err_msg: format!(
1298 "Physical table '{}' not found during pending flush",
1299 physical_table_name
1300 ),
1301 })?
1302 };
1303
1304 let physical_table_info = physical_table.table_info;
1305 let name_to_ids = physical_table
1306 .col_name_to_ids
1307 .with_context(|| error::InternalSnafu {
1308 err_msg: format!(
1309 "Physical table '{}' has no column IDs for pending flush",
1310 physical_table_name
1311 ),
1312 })?;
1313
1314 let partition_rule = {
1316 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1317 .with_label_values(&["flush_physical_fetch_partition_rule"])
1318 .start_timer();
1319 partition_manager
1320 .find_table_partition_rule(physical_table_info.as_ref())
1321 .await?
1322 };
1323 let partition_columns = partition_rule.partition_columns();
1324 let partition_columns_set: HashSet<&str> =
1325 partition_columns.iter().map(String::as_str).collect();
1326
1327 let modified_batches =
1329 transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?;
1330
1331 let combined_batch = concat_modified_batches(&modified_batches)?;
1333
1334 let physical_table_id = physical_table_info.table_id();
1336 let planned_batches = plan_region_batches(
1337 combined_batch,
1338 physical_table_id,
1339 partition_rule.as_ref(),
1340 partition_columns,
1341 )?;
1342
1343 let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?;
1344 let region_writes = encode_region_write_requests(resolved_batches)?;
1345 flush_region_writes_concurrently(node_manager, region_writes).await
1346}
1347
1348fn transform_logical_batches_to_physical(
1353 table_batches: &[TableBatch],
1354 name_to_ids: &HashMap<String, u32>,
1355 partition_columns_set: &HashSet<&str>,
1356) -> Result<Vec<RecordBatch>> {
1357 let mut modified_batches: Vec<RecordBatch> =
1358 Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum());
1359
1360 let mut modify_elapsed = Duration::ZERO;
1361 let mut columns_taxonomy_elapsed = Duration::ZERO;
1362
1363 for table_batch in table_batches {
1364 let table_id = table_batch.table_id;
1365
1366 for batch in &table_batch.batches {
1367 let batch_schema = batch.schema();
1368 let start = Instant::now();
1369 let (tag_columns, essential_col_indices) = columns_taxonomy(
1370 &batch_schema,
1371 &table_batch.table_name,
1372 name_to_ids,
1373 partition_columns_set,
1374 )?;
1375
1376 columns_taxonomy_elapsed += start.elapsed();
1377 if tag_columns.is_empty() && essential_col_indices.is_empty() {
1378 continue;
1379 }
1380
1381 let modified = {
1382 let start = Instant::now();
1383 let batch = modify_batch_sparse(
1385 batch.clone(),
1386 table_id,
1387 &tag_columns,
1388 &essential_col_indices,
1389 )?;
1390 modify_elapsed += start.elapsed();
1391 batch
1392 };
1393
1394 modified_batches.push(modified);
1395 }
1396 }
1397
1398 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1399 .with_label_values(&["flush_physical_modify_batch"])
1400 .observe(modify_elapsed.as_secs_f64());
1401 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1402 .with_label_values(&["flush_physical_columns_taxonomy"])
1403 .observe(columns_taxonomy_elapsed.as_secs_f64());
1404
1405 ensure!(
1406 !modified_batches.is_empty(),
1407 error::InternalSnafu {
1408 err_msg: "No batches can be transformed during pending flush",
1409 }
1410 );
1411 Ok(modified_batches)
1412}
1413
1414fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result<RecordBatch> {
1418 let combined_schema = modified_batches[0].schema();
1419 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1420 .with_label_values(&["flush_physical_concat_all"])
1421 .start_timer();
1422 concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu)
1423}
1424
1425fn split_combined_batch_by_region(
1426 combined_batch: &RecordBatch,
1427 partition_rule: &dyn partition::partition::PartitionRule,
1428) -> Result<HashMap<u32, partition::partition::RegionMask>> {
1429 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1430 .with_label_values(&["flush_physical_split_record_batch"])
1431 .start_timer();
1432 let map = partition_rule.split_record_batch(combined_batch)?;
1433 Ok(map)
1434}
1435
1436fn prepare_physical_region_routing_batch(
1437 combined_batch: RecordBatch,
1438 partition_columns: &[String],
1439) -> Result<RecordBatch> {
1440 if partition_columns.is_empty() {
1441 return Ok(combined_batch);
1442 }
1443 strip_partition_columns_from_batch(combined_batch)
1444}
1445
1446fn plan_region_batch(
1447 stripped_batch: &RecordBatch,
1448 physical_table_id: TableId,
1449 region_number: u32,
1450 mask: &partition::partition::RegionMask,
1451) -> Result<Option<PlannedRegionBatch>> {
1452 if mask.select_none() {
1453 return Ok(None);
1454 }
1455
1456 let region_batch = if mask.select_all() {
1457 stripped_batch.clone()
1458 } else {
1459 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1460 .with_label_values(&["flush_physical_filter_record_batch"])
1461 .start_timer();
1462 filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)?
1463 };
1464
1465 let row_count = region_batch.num_rows();
1466 if row_count == 0 {
1467 return Ok(None);
1468 }
1469
1470 Ok(Some(PlannedRegionBatch {
1471 region_id: RegionId::new(physical_table_id, region_number),
1472 batch: region_batch,
1473 }))
1474}
1475
1476fn plan_region_batches(
1477 combined_batch: RecordBatch,
1478 physical_table_id: TableId,
1479 partition_rule: &dyn partition::partition::PartitionRule,
1480 partition_columns: &[String],
1481) -> Result<Vec<PlannedRegionBatch>> {
1482 let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?;
1483 let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?;
1484
1485 let mut planned_batches = Vec::new();
1486 for (region_number, mask) in region_masks {
1487 if let Some(planned_batch) =
1488 plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)?
1489 {
1490 planned_batches.push(planned_batch);
1491 }
1492 }
1493
1494 Ok(planned_batches)
1495}
1496
1497async fn resolve_region_targets(
1498 planned_batches: Vec<PlannedRegionBatch>,
1499 partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1500) -> Result<Vec<ResolvedRegionBatch>> {
1501 let mut resolved_batches = Vec::with_capacity(planned_batches.len());
1502 for planned in planned_batches {
1503 let datanode = {
1504 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1505 .with_label_values(&["flush_physical_resolve_region_leader"])
1506 .start_timer();
1507 partition_manager
1508 .find_region_leader(planned.region_id)
1509 .await?
1510 };
1511
1512 resolved_batches.push(ResolvedRegionBatch { planned, datanode });
1513 }
1514
1515 Ok(resolved_batches)
1516}
1517
1518fn encode_region_write_requests(
1519 resolved_batches: Vec<ResolvedRegionBatch>,
1520) -> Result<Vec<FlushRegionWrite>> {
1521 let mut region_writes = Vec::with_capacity(resolved_batches.len());
1522 for resolved in resolved_batches {
1523 let region_id = resolved.planned.region_id;
1524 let (schema_bytes, data_header, payload) = {
1525 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1526 .with_label_values(&["flush_physical_encode_ipc"])
1527 .start_timer();
1528 record_batch_to_ipc(resolved.planned.batch)?
1529 };
1530
1531 let request = RegionRequest {
1532 header: Some(RegionRequestHeader {
1533 tracing_context: TracingContext::from_current_span().to_w3c(),
1534 ..Default::default()
1535 }),
1536 body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1537 region_id: region_id.as_u64(),
1538 partition_expr_version: None,
1539 aligned_schema_version: None,
1542 body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1543 schema: schema_bytes,
1544 data_header,
1545 payload,
1546 })),
1547 })),
1548 };
1549
1550 region_writes.push(FlushRegionWrite {
1551 datanode: resolved.datanode,
1552 request,
1553 });
1554 }
1555
1556 Ok(region_writes)
1557}
1558
1559fn notify_waiters(waiters: Vec<FlushWaiter>, result: Result<()>) {
1560 let shared_result = result.map_err(Arc::new);
1561 for waiter in waiters {
1562 let _ = waiter.response_tx.send(match &shared_result {
1563 Ok(()) => Ok(()),
1564 Err(error) => Err(Arc::clone(error)),
1565 });
1566 }
1568}
1569
1570fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1571 let mut encoder = FlightEncoder::default();
1572 let schema = encoder.encode_schema(record_batch.schema().as_ref());
1573 let mut iter = encoder
1574 .encode(FlightMessage::RecordBatch(record_batch))
1575 .into_iter();
1576 let Some(flight_data) = iter.next() else {
1577 return Err(Error::Internal {
1578 err_msg: "Failed to encode empty flight data".to_string(),
1579 });
1580 };
1581 if iter.next().is_some() {
1582 return Err(Error::NotSupported {
1583 feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1584 });
1585 }
1586
1587 Ok((
1588 schema.data_header,
1589 flight_data.data_header,
1590 flight_data.data_body,
1591 ))
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596 use std::collections::{HashMap, HashSet};
1597 use std::sync::Arc;
1598 use std::sync::atomic::{AtomicUsize, Ordering};
1599 use std::time::{Duration, Instant};
1600
1601 use api::region::RegionResponse;
1602 use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1603 use api::v1::meta::Peer;
1604 use api::v1::region::{InsertRequests, RegionRequest, region_request};
1605 use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1606 use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray};
1607 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1608 use arrow::record_batch::RecordBatch;
1609 use async_trait::async_trait;
1610 use catalog::error::Result as CatalogResult;
1611 use common_meta::error::Result as MetaResult;
1612 use common_meta::node_manager::{
1613 Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1614 };
1615 use common_query::request::QueryRequest;
1616 use common_recordbatch::SendableRecordBatchStream;
1617 use dashmap::DashMap;
1618 use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
1619 use partition::error::Result as PartitionResult;
1620 use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
1621 use smallvec::SmallVec;
1622 use snafu::ResultExt;
1623 use store_api::storage::RegionId;
1624 use table::metadata::TableId;
1625 use table::test_util::table_info::test_table_info;
1626 use tokio::sync::{Semaphore, mpsc, oneshot};
1627 use tokio::time::sleep;
1628
1629 use super::{
1630 BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher,
1631 PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester,
1632 PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch,
1633 ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch,
1634 encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently,
1635 plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1636 should_dispatch_concurrently, strip_partition_columns_from_batch,
1637 transform_logical_batches_to_physical,
1638 };
1639 use crate::error;
1640
1641 fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1642 Rows {
1643 schema: vec![ColumnSchema {
1644 column_name: schema_name.to_string(),
1645 ..Default::default()
1646 }],
1647 rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1648 }
1649 }
1650
1651 fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch {
1652 let schema = Arc::new(ArrowSchema::new(vec![
1653 Field::new(
1654 "greptime_timestamp",
1655 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1656 false,
1657 ),
1658 Field::new("greptime_value", ArrowDataType::Float64, true),
1659 Field::new(tag_name, ArrowDataType::Utf8, true),
1660 ]));
1661
1662 RecordBatch::try_new(
1663 schema,
1664 vec![
1665 Arc::new(TimestampMillisecondArray::from(vec![ts])),
1666 Arc::new(arrow::array::Float64Array::from(vec![val])),
1667 Arc::new(StringArray::from(vec![tag_value])),
1668 ],
1669 )
1670 .unwrap()
1671 }
1672
1673 fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata {
1674 let schema = Arc::new(
1675 DtSchema::try_new(vec![
1676 DtColumnSchema::new(
1677 "__primary_key",
1678 datatypes::prelude::ConcreteDataType::binary_datatype(),
1679 false,
1680 ),
1681 DtColumnSchema::new(
1682 "greptime_timestamp",
1683 datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(),
1684 false,
1685 ),
1686 DtColumnSchema::new(
1687 "greptime_value",
1688 datatypes::prelude::ConcreteDataType::float64_datatype(),
1689 true,
1690 ),
1691 DtColumnSchema::new(
1692 "tag1",
1693 datatypes::prelude::ConcreteDataType::string_datatype(),
1694 true,
1695 ),
1696 ])
1697 .unwrap(),
1698 );
1699 let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema);
1700 table_info.meta.column_ids = vec![0, 1, 2, 3];
1701
1702 PhysicalTableMetadata {
1703 table_info: Arc::new(table_info),
1704 col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])),
1705 }
1706 }
1707
1708 struct MockFlushCatalogProvider {
1709 table: Option<PhysicalTableMetadata>,
1710 }
1711
1712 #[async_trait]
1713 impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider {
1714 async fn physical_table(
1715 &self,
1716 _catalog: &str,
1717 _schema: &str,
1718 _table_name: &str,
1719 _query_ctx: &session::context::QueryContext,
1720 ) -> CatalogResult<Option<PhysicalTableMetadata>> {
1721 Ok(self.table.clone())
1722 }
1723 }
1724
1725 struct SingleRegionPartitionRule;
1726
1727 impl PartitionRule for SingleRegionPartitionRule {
1728 fn as_any(&self) -> &dyn std::any::Any {
1729 self
1730 }
1731
1732 fn partition_columns(&self) -> &[String] {
1733 &[]
1734 }
1735
1736 fn find_region(
1737 &self,
1738 _values: &[datatypes::prelude::Value],
1739 ) -> partition::error::Result<store_api::storage::RegionNumber> {
1740 unimplemented!()
1741 }
1742
1743 fn split_record_batch(
1744 &self,
1745 record_batch: &RecordBatch,
1746 ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1747 {
1748 Ok(HashMap::from([(
1749 1,
1750 RegionMask::new(
1751 arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]),
1752 record_batch.num_rows(),
1753 ),
1754 )]))
1755 }
1756 }
1757
1758 struct TwoRegionPartitionRule {
1759 partition_columns: Vec<String>,
1760 }
1761
1762 impl PartitionRule for TwoRegionPartitionRule {
1763 fn as_any(&self) -> &dyn std::any::Any {
1764 self
1765 }
1766
1767 fn partition_columns(&self) -> &[String] {
1768 &self.partition_columns
1769 }
1770
1771 fn find_region(
1772 &self,
1773 _values: &[datatypes::prelude::Value],
1774 ) -> partition::error::Result<store_api::storage::RegionNumber> {
1775 unimplemented!()
1776 }
1777
1778 fn split_record_batch(
1779 &self,
1780 _record_batch: &RecordBatch,
1781 ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1782 {
1783 Ok(HashMap::from([
1784 (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)),
1785 (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)),
1786 (
1787 3,
1788 RegionMask::new(BooleanArray::from(vec![false, false]), 0),
1789 ),
1790 ]))
1791 }
1792 }
1793
1794 struct MockFlushPartitionProvider {
1795 partition_rule_calls: Arc<AtomicUsize>,
1796 region_leader_calls: Arc<AtomicUsize>,
1797 }
1798
1799 #[async_trait]
1800 impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider {
1801 async fn find_table_partition_rule(
1802 &self,
1803 _table_info: &table::metadata::TableInfo,
1804 ) -> PartitionResult<PartitionRuleRef> {
1805 self.partition_rule_calls.fetch_add(1, Ordering::SeqCst);
1806 Ok(Arc::new(SingleRegionPartitionRule))
1807 }
1808
1809 async fn find_region_leader(&self, _region_id: RegionId) -> error::Result<Peer> {
1810 self.region_leader_calls.fetch_add(1, Ordering::SeqCst);
1811 Ok(Peer {
1812 id: 1,
1813 addr: "node-1".to_string(),
1814 })
1815 }
1816 }
1817
1818 #[derive(Default)]
1819 struct MockFlushNodeRequester {
1820 writes: Arc<AtomicUsize>,
1821 }
1822
1823 #[async_trait]
1824 impl PhysicalFlushNodeRequester for MockFlushNodeRequester {
1825 async fn handle(
1826 &self,
1827 _peer: &Peer,
1828 _request: RegionRequest,
1829 ) -> error::Result<RegionResponse> {
1830 self.writes.fetch_add(1, Ordering::SeqCst);
1831 Ok(RegionResponse::new(0))
1832 }
1833 }
1834
1835 #[test]
1836 fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1837 let requests = RowInsertRequests {
1838 inserts: vec![
1839 RowInsertRequest {
1840 table_name: "cpu".to_string(),
1841 rows: Some(mock_rows(2, "host")),
1842 },
1843 RowInsertRequest {
1844 table_name: "mem".to_string(),
1845 rows: Some(mock_rows(0, "host")),
1846 },
1847 RowInsertRequest {
1848 table_name: "disk".to_string(),
1849 rows: None,
1850 },
1851 ],
1852 };
1853
1854 let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1855
1856 assert_eq!(2, total_rows);
1857 assert_eq!(1, table_rows.len());
1858 assert_eq!("cpu", table_rows[0].0);
1859 assert_eq!(2, table_rows[0].1.rows.len());
1860 }
1861
1862 #[test]
1863 fn test_drain_batch_takes_initialized_pending_batch_from_option() {
1864 let ctx = session::context::QueryContext::arc();
1865 let (response_tx, _response_rx) = oneshot::channel();
1866 let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap();
1867 let mut batch = Some(PendingBatch {
1868 tables: HashMap::from([(
1869 "cpu".to_string(),
1870 TableBatch {
1871 table_name: "cpu".to_string(),
1872 table_id: 42,
1873 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
1874 row_count: 1,
1875 },
1876 )]),
1877 created_at: Instant::now(),
1878 total_row_count: 1,
1879 db_string: ctx.get_db_string(),
1880 ctx: ctx.clone(),
1881 waiters: vec![FlushWaiter {
1882 response_tx,
1883 _permit: permit,
1884 }],
1885 });
1886
1887 let flush = drain_batch(&mut batch).unwrap();
1888
1889 assert!(batch.is_none());
1890 assert_eq!(1, flush.total_row_count);
1891 assert_eq!(1, flush.table_batches.len());
1892 assert_eq!(ctx.get_db_string(), flush.db_string);
1893 assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
1894 }
1895
1896 #[derive(Clone)]
1897 struct ConcurrentMockDatanode {
1898 delay: Duration,
1899 inflight: Arc<AtomicUsize>,
1900 max_inflight: Arc<AtomicUsize>,
1901 }
1902
1903 #[async_trait]
1904 impl Datanode for ConcurrentMockDatanode {
1905 async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1906 let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1907 loop {
1908 let max = self.max_inflight.load(Ordering::SeqCst);
1909 if now <= max {
1910 break;
1911 }
1912 if self
1913 .max_inflight
1914 .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1915 .is_ok()
1916 {
1917 break;
1918 }
1919 }
1920
1921 sleep(self.delay).await;
1922 self.inflight.fetch_sub(1, Ordering::SeqCst);
1923 Ok(RegionResponse::new(0))
1924 }
1925
1926 async fn handle_query(
1927 &self,
1928 _request: QueryRequest,
1929 ) -> MetaResult<SendableRecordBatchStream> {
1930 unimplemented!()
1931 }
1932 }
1933
1934 #[derive(Clone)]
1935 struct ConcurrentMockNodeManager {
1936 datanodes: Arc<HashMap<u64, DatanodeRef>>,
1937 }
1938
1939 #[async_trait]
1940 impl DatanodeManager for ConcurrentMockNodeManager {
1941 async fn datanode(&self, node: &Peer) -> DatanodeRef {
1942 self.datanodes
1943 .get(&node.id)
1944 .expect("datanode not found")
1945 .clone()
1946 }
1947 }
1948
1949 struct NoopFlownode;
1950
1951 #[async_trait]
1952 impl Flownode for NoopFlownode {
1953 async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1954 unimplemented!()
1955 }
1956
1957 async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1958 unimplemented!()
1959 }
1960
1961 async fn handle_mark_window_dirty(
1962 &self,
1963 _req: DirtyWindowRequests,
1964 ) -> MetaResult<FlowResponse> {
1965 unimplemented!()
1966 }
1967 }
1968
1969 #[async_trait]
1970 impl FlownodeManager for ConcurrentMockNodeManager {
1971 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1972 Arc::new(NoopFlownode)
1973 }
1974 }
1975
1976 #[async_trait]
1977 impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager {
1978 async fn handle(
1979 &self,
1980 peer: &Peer,
1981 request: RegionRequest,
1982 ) -> error::Result<RegionResponse> {
1983 let datanode = self.datanode(peer).await;
1984 datanode
1985 .handle(request)
1986 .await
1987 .context(error::CommonMetaSnafu)
1988 }
1989 }
1990
1991 #[test]
1992 fn test_remove_worker_if_same_channel_removes_matching_entry() {
1993 let workers = DashMap::new();
1994 let key = BatchKey {
1995 catalog: "greptime".to_string(),
1996 schema: "public".to_string(),
1997 physical_table: "phy".to_string(),
1998 };
1999
2000 let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
2001 workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
2002
2003 assert!(remove_worker_if_same_channel(&workers, &key, &tx));
2004 assert!(!workers.contains_key(&key));
2005 }
2006
2007 #[test]
2008 fn test_remove_worker_if_same_channel_keeps_newer_entry() {
2009 let workers = DashMap::new();
2010 let key = BatchKey {
2011 catalog: "greptime".to_string(),
2012 schema: "public".to_string(),
2013 physical_table: "phy".to_string(),
2014 };
2015
2016 let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
2017 let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
2018 workers.insert(
2019 key.clone(),
2020 PendingWorker {
2021 tx: fresh_tx.clone(),
2022 },
2023 );
2024
2025 assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
2026 assert!(workers.contains_key(&key));
2027 assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
2028 }
2029
2030 #[test]
2031 fn test_worker_idle_timeout_close_decision() {
2032 assert!(should_close_worker_on_idle_timeout(0, 0));
2033 assert!(!should_close_worker_on_idle_timeout(1, 0));
2034 assert!(!should_close_worker_on_idle_timeout(0, 1));
2035 }
2036
2037 #[tokio::test]
2038 async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
2039 let inflight = Arc::new(AtomicUsize::new(0));
2040 let max_inflight = Arc::new(AtomicUsize::new(0));
2041 let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2042 delay: Duration::from_millis(100),
2043 inflight: inflight.clone(),
2044 max_inflight: max_inflight.clone(),
2045 });
2046 let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2047 delay: Duration::from_millis(100),
2048 inflight,
2049 max_inflight: max_inflight.clone(),
2050 });
2051
2052 let mut datanodes = HashMap::new();
2053 datanodes.insert(1, datanode1);
2054 datanodes.insert(2, datanode2);
2055 let node_manager = Arc::new(ConcurrentMockNodeManager {
2056 datanodes: Arc::new(datanodes),
2057 });
2058
2059 let writes = vec![
2060 FlushRegionWrite {
2061 datanode: Peer {
2062 id: 1,
2063 addr: "node1".to_string(),
2064 },
2065 request: RegionRequest::default(),
2066 },
2067 FlushRegionWrite {
2068 datanode: Peer {
2069 id: 2,
2070 addr: "node2".to_string(),
2071 },
2072 request: RegionRequest::default(),
2073 },
2074 ];
2075
2076 flush_region_writes_concurrently(node_manager.as_ref(), writes)
2077 .await
2078 .unwrap();
2079 assert!(max_inflight.load(Ordering::SeqCst) >= 2);
2080 }
2081
2082 #[test]
2083 fn test_should_dispatch_concurrently_by_region_count() {
2084 assert!(!should_dispatch_concurrently(0));
2085 assert!(!should_dispatch_concurrently(1));
2086 assert!(should_dispatch_concurrently(2));
2087 }
2088
2089 #[test]
2090 fn test_strip_partition_columns_from_batch_removes_partition_tags() {
2091 let batch = RecordBatch::try_new(
2092 Arc::new(ArrowSchema::new(vec![
2093 Field::new("__primary_key", ArrowDataType::Binary, false),
2094 Field::new(
2095 "greptime_timestamp",
2096 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2097 false,
2098 ),
2099 Field::new("greptime_value", ArrowDataType::Float64, true),
2100 Field::new("host", ArrowDataType::Utf8, true),
2101 ])),
2102 vec![
2103 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2104 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2105 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2106 Arc::new(StringArray::from(vec!["node-1"])),
2107 ],
2108 )
2109 .unwrap();
2110
2111 let stripped = strip_partition_columns_from_batch(batch).unwrap();
2112
2113 assert_eq!(3, stripped.num_columns());
2114 assert_eq!("__primary_key", stripped.schema().field(0).name());
2115 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2116 assert_eq!("greptime_value", stripped.schema().field(2).name());
2117 }
2118
2119 #[test]
2120 fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
2121 let batch = RecordBatch::try_new(
2122 Arc::new(ArrowSchema::new(vec![
2123 Field::new("__primary_key", ArrowDataType::Binary, false),
2124 Field::new(
2125 "greptime_timestamp",
2126 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2127 false,
2128 ),
2129 Field::new("greptime_value", ArrowDataType::Float64, true),
2130 Field::new("host", ArrowDataType::Utf8, true),
2131 ])),
2132 vec![
2133 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2134 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2135 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2136 Arc::new(StringArray::from(vec!["node-1"])),
2137 ],
2138 )
2139 .unwrap();
2140
2141 let stripped = strip_partition_columns_from_batch(batch).unwrap();
2142
2143 assert_eq!(3, stripped.num_columns());
2144 assert_eq!("__primary_key", stripped.schema().field(0).name());
2145 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2146 assert_eq!("greptime_value", stripped.schema().field(2).name());
2147 }
2148
2149 #[test]
2150 fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
2151 let schema = Arc::new(ArrowSchema::new(vec![
2152 Field::new(
2153 "greptime_timestamp",
2154 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2155 false,
2156 ),
2157 Field::new("greptime_value", ArrowDataType::Float64, true),
2158 Field::new("host", ArrowDataType::Utf8, true),
2159 Field::new("region", ArrowDataType::Utf8, true),
2160 ]));
2161 let name_to_ids =
2162 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2163 let partition_columns = HashSet::from(["host"]);
2164
2165 let (tag_columns, non_tag_indices) =
2166 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2167
2168 assert_eq!(2, tag_columns.len());
2169 assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
2170 }
2171
2172 #[test]
2173 fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
2174 let schema = Arc::new(ArrowSchema::new(vec![
2175 Field::new("host", ArrowDataType::Utf8, true),
2176 Field::new("greptime_value", ArrowDataType::Float64, true),
2177 Field::new(
2178 "greptime_timestamp",
2179 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2180 false,
2181 ),
2182 Field::new("region", ArrowDataType::Utf8, true),
2183 ]));
2184 let name_to_ids =
2185 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2186 let partition_columns = HashSet::from(["host", "region"]);
2187
2188 let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
2189 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2190
2191 assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
2192 }
2193
2194 #[test]
2195 fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
2196 let schema = Arc::new(ArrowSchema::new(vec![
2197 Field::new(
2198 "greptime_timestamp",
2199 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2200 false,
2201 ),
2202 Field::new("greptime_value", ArrowDataType::Float64, true),
2203 Field::new("host", ArrowDataType::Utf8, true),
2204 Field::new("invalid", ArrowDataType::Boolean, true),
2205 ]));
2206 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2207 let partition_columns = HashSet::from(["host"]);
2208
2209 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2210
2211 assert!(matches!(
2212 result,
2213 Err(Error::InvalidPromRemoteRequest { .. })
2214 ));
2215 }
2216
2217 #[test]
2218 fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
2219 let schema = Arc::new(ArrowSchema::new(vec![
2220 Field::new("greptime_timestamp", ArrowDataType::Int64, false),
2221 Field::new("greptime_value", ArrowDataType::Float64, true),
2222 Field::new("host", ArrowDataType::Utf8, true),
2223 ]));
2224 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2225 let partition_columns = HashSet::from(["host"]);
2226
2227 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2228
2229 assert!(matches!(
2230 result,
2231 Err(Error::InvalidPromRemoteRequest { .. })
2232 ));
2233 }
2234
2235 #[test]
2236 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
2237 let schema = Arc::new(ArrowSchema::new(vec![
2238 Field::new(
2239 "ts1",
2240 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2241 false,
2242 ),
2243 Field::new(
2244 "ts2",
2245 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2246 false,
2247 ),
2248 Field::new("greptime_value", ArrowDataType::Float64, true),
2249 Field::new("host", ArrowDataType::Utf8, true),
2250 ]));
2251 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2252 let partition_columns = HashSet::from(["host"]);
2253
2254 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2255
2256 assert!(matches!(
2257 result,
2258 Err(Error::InvalidPromRemoteRequest { .. })
2259 ));
2260 }
2261
2262 #[test]
2263 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2264 let schema = Arc::new(ArrowSchema::new(vec![
2265 Field::new(
2266 "greptime_timestamp",
2267 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2268 false,
2269 ),
2270 Field::new("value1", ArrowDataType::Float64, true),
2271 Field::new("value2", ArrowDataType::Float64, true),
2272 Field::new("host", ArrowDataType::Utf8, true),
2273 ]));
2274 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2275 let partition_columns = HashSet::from(["host"]);
2276
2277 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2278
2279 assert!(matches!(
2280 result,
2281 Err(Error::InvalidPromRemoteRequest { .. })
2282 ));
2283 }
2284
2285 #[test]
2286 fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2287 use arrow::array::BinaryArray;
2288 use metric_engine::batch_modifier::modify_batch_sparse;
2289
2290 let schema1 = Arc::new(ArrowSchema::new(vec![
2291 Field::new(
2292 "greptime_timestamp",
2293 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2294 false,
2295 ),
2296 Field::new("greptime_value", ArrowDataType::Float64, true),
2297 Field::new("tag1", ArrowDataType::Utf8, true),
2298 ]));
2299
2300 let schema2 = Arc::new(ArrowSchema::new(vec![
2301 Field::new(
2302 "greptime_timestamp",
2303 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2304 false,
2305 ),
2306 Field::new("greptime_value", ArrowDataType::Float64, true),
2307 Field::new("tag1", ArrowDataType::Utf8, true),
2308 Field::new("tag2", ArrowDataType::Utf8, true),
2309 ]));
2310 let batch2 = RecordBatch::try_new(
2311 schema2.clone(),
2312 vec![
2313 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2314 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2315 Arc::new(StringArray::from(vec!["v1"])),
2316 Arc::new(StringArray::from(vec!["v2"])),
2317 ],
2318 )
2319 .unwrap();
2320
2321 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2322 let partition_columns = HashSet::new();
2323
2324 let batch3 = RecordBatch::try_new(
2326 schema1.clone(),
2327 vec![
2328 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2329 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2330 Arc::new(StringArray::from(vec!["v1"])),
2331 ],
2332 )
2333 .unwrap();
2334
2335 let (tag_columns2, indices2) =
2338 columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2339 let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2340
2341 let (tag_columns3, indices3) =
2342 columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2343 let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2344
2345 let pk2 = modified2
2346 .column(0)
2347 .as_any()
2348 .downcast_ref::<BinaryArray>()
2349 .unwrap();
2350 let pk3 = modified3
2351 .column(0)
2352 .as_any()
2353 .downcast_ref::<BinaryArray>()
2354 .unwrap();
2355
2356 assert_ne!(
2358 pk2.value(0),
2359 pk3.value(0),
2360 "PK should be different because batch2 has tag2!"
2361 );
2362 }
2363
2364 #[test]
2365 fn test_transform_logical_batches_to_physical_success() {
2366 let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2367
2368 let table_batches = vec![TableBatch {
2369 table_name: "t1".to_string(),
2370 table_id: 1,
2371 batches: vec![batch],
2372 row_count: 1,
2373 }];
2374
2375 let name_to_ids = HashMap::from([("tag1".to_string(), 1)]);
2376 let partition_columns = HashSet::new();
2377 let modified =
2378 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2379 .unwrap();
2380
2381 assert_eq!(1, modified.len());
2382 assert_eq!(3, modified[0].num_columns());
2383 assert_eq!("__primary_key", modified[0].schema().field(0).name());
2384 assert_eq!("greptime_timestamp", modified[0].schema().field(1).name());
2385 assert_eq!("greptime_value", modified[0].schema().field(2).name());
2386 }
2387
2388 #[test]
2389 fn test_transform_logical_batches_to_physical_taxonomy_failure() {
2390 let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2391
2392 let table_batches = vec![TableBatch {
2393 table_name: "t1".to_string(),
2394 table_id: 1,
2395 batches: vec![batch],
2396 row_count: 1,
2397 }];
2398
2399 let name_to_ids = HashMap::new();
2401 let partition_columns = HashSet::new();
2402 let err =
2403 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2404 .unwrap_err();
2405
2406 assert!(
2407 err.to_string()
2408 .contains("not found in physical table column IDs")
2409 );
2410 }
2411
2412 #[test]
2413 fn test_transform_logical_batches_to_physical_multiple_batches() {
2414 let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2415 let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2416
2417 let table_batches = vec![
2418 TableBatch {
2419 table_name: "t1".to_string(),
2420 table_id: 1,
2421 batches: vec![batch1],
2422 row_count: 1,
2423 },
2424 TableBatch {
2425 table_name: "t2".to_string(),
2426 table_id: 2,
2427 batches: vec![batch2],
2428 row_count: 1,
2429 },
2430 ];
2431
2432 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2433 let partition_columns = HashSet::new();
2434 let modified =
2435 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2436 .unwrap();
2437
2438 assert_eq!(2, modified.len());
2439 }
2440
2441 #[test]
2442 fn test_transform_logical_batches_to_physical_mixed_success_failure() {
2443 let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2444 let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2445
2446 let table_batches = vec![
2447 TableBatch {
2448 table_name: "t1".to_string(),
2449 table_id: 1,
2450 batches: vec![batch1],
2451 row_count: 1,
2452 },
2453 TableBatch {
2454 table_name: "t2".to_string(),
2455 table_id: 2,
2456 batches: vec![batch2],
2457 row_count: 1,
2458 },
2459 ];
2460
2461 let name_to_ids = HashMap::from([("tag2".to_string(), 2)]);
2463 let partition_columns = HashSet::new();
2464 let err =
2465 transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2466 .unwrap_err();
2467
2468 assert!(err.to_string().contains("tag1"));
2469 }
2470
2471 #[tokio::test]
2472 async fn test_flush_batch_physical_uses_mockable_trait_dependencies() {
2473 let table_batches = vec![TableBatch {
2474 table_name: "t1".to_string(),
2475 table_id: 11,
2476 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2477 row_count: 1,
2478 }];
2479 let partition_calls = Arc::new(AtomicUsize::new(0));
2480 let leader_calls = Arc::new(AtomicUsize::new(0));
2481 let node = MockFlushNodeRequester::default();
2482 let ctx = session::context::QueryContext::arc();
2483
2484 flush_batch_physical(
2485 &table_batches,
2486 "phy",
2487 &ctx,
2488 &MockFlushPartitionProvider {
2489 partition_rule_calls: partition_calls.clone(),
2490 region_leader_calls: leader_calls.clone(),
2491 },
2492 &node,
2493 &MockFlushCatalogProvider {
2494 table: Some(mock_physical_table_metadata(1024)),
2495 },
2496 )
2497 .await
2498 .unwrap();
2499
2500 assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2501 assert_eq!(1, leader_calls.load(Ordering::SeqCst));
2502 assert_eq!(1, node.writes.load(Ordering::SeqCst));
2503 }
2504
2505 #[derive(Default)]
2506 struct AffectedRowsFlushNodeRequester {
2507 affected_rows: usize,
2508 }
2509
2510 #[async_trait]
2511 impl PhysicalFlushNodeRequester for AffectedRowsFlushNodeRequester {
2512 async fn handle(
2513 &self,
2514 _peer: &Peer,
2515 _request: RegionRequest,
2516 ) -> error::Result<RegionResponse> {
2517 Ok(RegionResponse::new(self.affected_rows))
2518 }
2519 }
2520
2521 #[tokio::test]
2522 async fn test_flush_batch_physical_returns_actual_affected_rows() {
2523 let table_batches = vec![TableBatch {
2524 table_name: "t1".to_string(),
2525 table_id: 11,
2526 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2527 row_count: 1,
2528 }];
2529 let ctx = session::context::QueryContext::arc();
2530
2531 let affected_rows = flush_batch_physical(
2532 &table_batches,
2533 "phy",
2534 &ctx,
2535 &MockFlushPartitionProvider {
2536 partition_rule_calls: Arc::new(AtomicUsize::new(0)),
2537 region_leader_calls: Arc::new(AtomicUsize::new(0)),
2538 },
2539 &AffectedRowsFlushNodeRequester { affected_rows: 7 },
2540 &MockFlushCatalogProvider {
2541 table: Some(mock_physical_table_metadata(1024)),
2542 },
2543 )
2544 .await
2545 .unwrap();
2546
2547 assert_eq!(7, affected_rows);
2548 }
2549
2550 #[tokio::test]
2551 async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
2552 let table_batches = vec![TableBatch {
2553 table_name: "t1".to_string(),
2554 table_id: 11,
2555 batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2556 row_count: 1,
2557 }];
2558 let partition_calls = Arc::new(AtomicUsize::new(0));
2559 let leader_calls = Arc::new(AtomicUsize::new(0));
2560 let node = MockFlushNodeRequester::default();
2561 let ctx = session::context::QueryContext::arc();
2562
2563 let err = flush_batch_physical(
2564 &table_batches,
2565 "missing_phy",
2566 &ctx,
2567 &MockFlushPartitionProvider {
2568 partition_rule_calls: partition_calls.clone(),
2569 region_leader_calls: leader_calls.clone(),
2570 },
2571 &node,
2572 &MockFlushCatalogProvider { table: None },
2573 )
2574 .await
2575 .unwrap_err();
2576
2577 assert!(
2578 err.to_string()
2579 .contains("Physical table 'missing_phy' not found")
2580 );
2581 assert_eq!(0, partition_calls.load(Ordering::SeqCst));
2582 assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2583 assert_eq!(0, node.writes.load(Ordering::SeqCst));
2584 }
2585
2586 #[tokio::test]
2587 async fn test_flush_batch_physical_aborts_immediately_on_transform_error() {
2588 let table_batches = vec![
2589 TableBatch {
2590 table_name: "broken".to_string(),
2591 table_id: 11,
2592 batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)],
2593 row_count: 1,
2594 },
2595 TableBatch {
2596 table_name: "healthy".to_string(),
2597 table_id: 12,
2598 batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)],
2599 row_count: 1,
2600 },
2601 ];
2602 let partition_calls = Arc::new(AtomicUsize::new(0));
2603 let leader_calls = Arc::new(AtomicUsize::new(0));
2604 let node = MockFlushNodeRequester::default();
2605 let ctx = session::context::QueryContext::arc();
2606
2607 let err = flush_batch_physical(
2608 &table_batches,
2609 "phy",
2610 &ctx,
2611 &MockFlushPartitionProvider {
2612 partition_rule_calls: partition_calls.clone(),
2613 region_leader_calls: leader_calls.clone(),
2614 },
2615 &node,
2616 &MockFlushCatalogProvider {
2617 table: Some(mock_physical_table_metadata(1024)),
2618 },
2619 )
2620 .await
2621 .unwrap_err();
2622
2623 assert!(err.to_string().contains("unknown_tag"));
2624 assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2625 assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2626 assert_eq!(0, node.writes.load(Ordering::SeqCst));
2627 }
2628
2629 #[test]
2630 fn test_plan_region_batches_splits_and_strips_partition_columns() {
2631 let combined_batch = RecordBatch::try_new(
2632 Arc::new(ArrowSchema::new(vec![
2633 Field::new("__primary_key", ArrowDataType::Binary, false),
2634 Field::new(
2635 "greptime_timestamp",
2636 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2637 false,
2638 ),
2639 Field::new("greptime_value", ArrowDataType::Float64, true),
2640 Field::new("host", ArrowDataType::Utf8, true),
2641 ])),
2642 vec![
2643 Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])),
2644 Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])),
2645 Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])),
2646 Arc::new(StringArray::from(vec!["node-1", "node-2"])),
2647 ],
2648 )
2649 .unwrap();
2650 let mut planned_batches = plan_region_batches(
2651 combined_batch,
2652 1024,
2653 &TwoRegionPartitionRule {
2654 partition_columns: vec!["host".to_string()],
2655 },
2656 &["host".to_string()],
2657 )
2658 .unwrap();
2659 planned_batches.sort_by_key(|planned| planned.region_id.region_number());
2660
2661 assert_eq!(2, planned_batches.len());
2662 assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id);
2663 assert_eq!(1, planned_batches[0].num_rows());
2664 assert_eq!(3, planned_batches[0].batch.num_columns());
2665 assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id);
2666 assert_eq!(1, planned_batches[1].num_rows());
2667 assert_eq!(3, planned_batches[1].batch.num_columns());
2668 }
2669
2670 #[test]
2671 fn test_encode_region_write_requests_builds_bulk_insert_requests() {
2672 let planned_batch = PlannedRegionBatch {
2673 region_id: RegionId::new(1024, 1),
2674 batch: RecordBatch::try_new(
2675 Arc::new(ArrowSchema::new(vec![
2676 Field::new("__primary_key", ArrowDataType::Binary, false),
2677 Field::new(
2678 "greptime_timestamp",
2679 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2680 false,
2681 ),
2682 Field::new("greptime_value", ArrowDataType::Float64, true),
2683 ])),
2684 vec![
2685 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2686 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2687 Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])),
2688 ],
2689 )
2690 .unwrap(),
2691 };
2692 let resolved_batch = ResolvedRegionBatch {
2693 planned: planned_batch,
2694 datanode: Peer {
2695 id: 1,
2696 addr: "node-1".to_string(),
2697 },
2698 };
2699 let writes = encode_region_write_requests(vec![resolved_batch]).unwrap();
2700
2701 assert_eq!(1, writes.len());
2702 assert_eq!(1, writes[0].datanode.id);
2703 let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else {
2704 panic!("expected bulk insert request");
2705 };
2706 assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id);
2707 }
2708}