1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21 BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, error, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use session::context::QueryContextRef;
40use smallvec::SmallVec;
41use snafu::{OptionExt, ensure};
42use store_api::storage::{RegionId, TableId};
43use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
44
45use crate::error;
46use crate::error::{Error, Result};
47use crate::metrics::{
48 FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
49 PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
50 PENDING_WORKERS,
51};
52use crate::prom_row_builder::{
53 build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
54 rows_to_aligned_record_batch,
55};
56
57const PHYSICAL_TABLE_KEY: &str = "physical_table";
58const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
60const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
61const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
62
63#[async_trait]
64pub trait PendingRowsSchemaAlterer: Send + Sync {
65 async fn create_tables_if_missing_batch(
68 &self,
69 catalog: &str,
70 schema: &str,
71 tables: &[(&str, &[ColumnSchema])],
72 with_metric_engine: bool,
73 ctx: QueryContextRef,
74 ) -> Result<()>;
75
76 async fn add_missing_prom_tag_columns_batch(
79 &self,
80 catalog: &str,
81 schema: &str,
82 tables: &[(&str, &[String])],
83 ctx: QueryContextRef,
84 ) -> Result<()>;
85}
86
87pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
88
89#[derive(Debug, Clone, Hash, Eq, PartialEq)]
90struct BatchKey {
91 catalog: String,
92 schema: String,
93 physical_table: String,
94}
95
96#[derive(Debug)]
97struct TableBatch {
98 table_name: String,
99 table_id: TableId,
100 batches: Vec<RecordBatch>,
101 row_count: usize,
102}
103
104struct TableResolutionPlan {
107 region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
109 tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
111 tables_to_alter: Vec<(String, Vec<String>)>,
113}
114
115struct PendingBatch {
116 tables: HashMap<String, TableBatch>,
117 created_at: Option<Instant>,
118 total_row_count: usize,
119 ctx: Option<QueryContextRef>,
120 waiters: Vec<FlushWaiter>,
121}
122
123struct FlushWaiter {
124 response_tx: oneshot::Sender<Result<()>>,
125 _permit: OwnedSemaphorePermit,
126}
127
128struct FlushBatch {
129 table_batches: Vec<TableBatch>,
130 total_row_count: usize,
131 ctx: QueryContextRef,
132 waiters: Vec<FlushWaiter>,
133}
134
135#[derive(Clone)]
136struct PendingWorker {
137 tx: mpsc::Sender<WorkerCommand>,
138}
139
140enum WorkerCommand {
141 Submit {
142 table_batches: Vec<(String, u32, RecordBatch)>,
143 total_rows: usize,
144 ctx: QueryContextRef,
145 response_tx: oneshot::Sender<Result<()>>,
146 _permit: OwnedSemaphorePermit,
147 },
148}
149
150fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
153 let physical_table = ctx
154 .extension(PHYSICAL_TABLE_KEY)
155 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
156 .to_string();
157 BatchKey {
158 catalog: ctx.current_catalog().to_string(),
159 schema: ctx.current_schema(),
160 physical_table,
161 }
162}
163
164pub struct PendingRowsBatcher {
166 workers: Arc<DashMap<BatchKey, PendingWorker>>,
167 flush_interval: Duration,
168 max_batch_rows: usize,
169 partition_manager: PartitionRuleManagerRef,
170 node_manager: NodeManagerRef,
171 catalog_manager: CatalogManagerRef,
172 flush_semaphore: Arc<Semaphore>,
173 inflight_semaphore: Arc<Semaphore>,
174 worker_channel_capacity: usize,
175 prom_store_with_metric_engine: bool,
176 schema_alterer: PendingRowsSchemaAltererRef,
177 pending_rows_batch_sync: bool,
178 shutdown: broadcast::Sender<()>,
179}
180
181impl PendingRowsBatcher {
182 #[allow(clippy::too_many_arguments)]
183 pub fn try_new(
184 partition_manager: PartitionRuleManagerRef,
185 node_manager: NodeManagerRef,
186 catalog_manager: CatalogManagerRef,
187 prom_store_with_metric_engine: bool,
188 schema_alterer: PendingRowsSchemaAltererRef,
189 flush_interval: Duration,
190 max_batch_rows: usize,
191 max_concurrent_flushes: usize,
192 worker_channel_capacity: usize,
193 max_inflight_requests: usize,
194 ) -> Option<Arc<Self>> {
195 if flush_interval.is_zero()
199 || max_batch_rows == 0
200 || max_concurrent_flushes == 0
201 || worker_channel_capacity == 0
202 || max_inflight_requests == 0
203 {
204 return None;
205 }
206
207 let (shutdown, _) = broadcast::channel(1);
208 let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
209 .ok()
210 .as_deref()
211 .and_then(|v| v.parse::<bool>().ok())
212 .unwrap_or(true);
213 let workers = Arc::new(DashMap::new());
214 PENDING_WORKERS.set(workers.len() as i64);
215
216 Some(Arc::new(Self {
217 workers,
218 flush_interval,
219 max_batch_rows,
220 partition_manager,
221 node_manager,
222 catalog_manager,
223 prom_store_with_metric_engine,
224 schema_alterer,
225 flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
226 inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
227 worker_channel_capacity,
228 pending_rows_batch_sync,
229 shutdown,
230 }))
231 }
232
233 pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
234 let (table_batches, total_rows) = {
235 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
236 .with_label_values(&["submit_build_and_align"])
237 .start_timer();
238 self.build_and_align_table_batches(requests, &ctx).await?
239 };
240 if total_rows == 0 {
241 return Ok(0);
242 }
243
244 let permit = {
245 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
246 .with_label_values(&["submit_acquire_inflight_permit"])
247 .start_timer();
248 self.inflight_semaphore
249 .clone()
250 .acquire_owned()
251 .await
252 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
253 };
254
255 let (response_tx, response_rx) = oneshot::channel();
256
257 let batch_key = batch_key_from_ctx(&ctx);
258 let mut cmd = Some(WorkerCommand::Submit {
259 table_batches,
260 total_rows,
261 ctx,
262 response_tx,
263 _permit: permit,
264 });
265
266 {
267 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
268 .with_label_values(&["submit_send_to_worker"])
269 .start_timer();
270
271 for _ in 0..2 {
272 let worker = self.get_or_spawn_worker(batch_key.clone());
273 let Some(worker_cmd) = cmd.take() else {
274 break;
275 };
276
277 match worker.tx.send(worker_cmd).await {
278 Ok(()) => break,
279 Err(err) => {
280 cmd = Some(err.0);
281 remove_worker_if_same_channel(
282 self.workers.as_ref(),
283 &batch_key,
284 &worker.tx,
285 );
286 }
287 }
288 }
289
290 if cmd.is_some() {
291 return Err(Error::BatcherChannelClosed);
292 }
293 }
294
295 if self.pending_rows_batch_sync {
296 let result = {
297 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
298 .with_label_values(&["submit_wait_flush_result"])
299 .start_timer();
300 response_rx
301 .await
302 .map_err(|_| error::BatcherChannelClosedSnafu.build())?
303 };
304 result.map(|()| total_rows as u64)
305 } else {
306 Ok(total_rows as u64)
307 }
308 }
309
310 async fn build_and_align_table_batches(
315 &self,
316 requests: RowInsertRequests,
317 ctx: &QueryContextRef,
318 ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
319 let catalog = ctx.current_catalog().to_string();
320 let schema = ctx.current_schema();
321
322 let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
323 if total_rows == 0 {
324 return Ok((Vec::new(), 0));
325 }
326
327 let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
328 let mut plan = self
329 .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
330 .await?;
331
332 self.create_missing_tables_and_refresh_schemas(
333 &catalog,
334 &schema,
335 ctx,
336 &table_rows,
337 &mut plan,
338 )
339 .await?;
340
341 self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
342 .await?;
343
344 let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
345
346 Ok((aligned_batches, total_rows))
347 }
348
349 fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
352 let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
353 let mut total_rows = 0;
354
355 for request in requests.inserts {
356 let Some(rows) = request.rows else {
357 continue;
358 };
359 if rows.rows.is_empty() {
360 continue;
361 }
362
363 total_rows += rows.rows.len();
364 table_rows.push((request.table_name, rows));
365 }
366
367 (table_rows, total_rows)
368 }
369
370 fn collect_unique_table_schemas(
373 table_rows: &[(String, Rows)],
374 ) -> Result<Vec<(&str, &[ColumnSchema])>> {
375 let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
376 let mut seen = HashSet::new();
377
378 for (table_name, rows) in table_rows {
379 if seen.insert(table_name.as_str()) {
380 unique_tables.push((table_name.as_str(), &rows.schema));
381 } else {
382 return error::InvalidPromRemoteRequestSnafu {
384 msg: format!(
385 "Found duplicated table name in RowInsertRequest: {}",
386 table_name
387 ),
388 }
389 .fail();
390 }
391 }
392
393 Ok(unique_tables)
394 }
395
396 async fn plan_table_resolution(
399 &self,
400 catalog: &str,
401 schema: &str,
402 ctx: &QueryContextRef,
403 unique_tables: &[(&str, &[ColumnSchema])],
404 ) -> Result<TableResolutionPlan> {
405 let mut plan = TableResolutionPlan {
406 region_schemas: HashMap::with_capacity(unique_tables.len()),
407 tables_to_create: Vec::new(),
408 tables_to_alter: Vec::new(),
409 };
410
411 let resolved_tables = {
412 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
413 .with_label_values(&["align_resolve_table"])
414 .start_timer();
415 futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
416 self.catalog_manager
417 .table(catalog, schema, table_name, Some(ctx.as_ref()))
418 }))
419 .await
420 };
421
422 for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
423 let table = table_result?;
424
425 if let Some(table) = table {
426 let table_info = table.table_info();
427 let table_id = table_info.ident.table_id;
428 let region_schema = table_info.meta.schema.arrow_schema().clone();
429
430 let missing_columns = {
431 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
432 .with_label_values(&["align_identify_missing_columns"])
433 .start_timer();
434 identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
435 };
436 if !missing_columns.is_empty() {
437 plan.tables_to_alter
438 .push(((*table_name).to_string(), missing_columns));
439 }
440 plan.region_schemas
441 .insert((*table_name).to_string(), (region_schema, table_id));
442 } else {
443 let request_schema = {
444 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
445 .with_label_values(&["align_build_create_table_schema"])
446 .start_timer();
447 build_prom_create_table_schema_from_proto(rows_schema)?
448 };
449 plan.tables_to_create
450 .push(((*table_name).to_string(), request_schema));
451 }
452 }
453
454 Ok(plan)
455 }
456
457 async fn create_missing_tables_and_refresh_schemas(
460 &self,
461 catalog: &str,
462 schema: &str,
463 ctx: &QueryContextRef,
464 table_rows: &[(String, Rows)],
465 plan: &mut TableResolutionPlan,
466 ) -> Result<()> {
467 if plan.tables_to_create.is_empty() {
468 return Ok(());
469 }
470
471 let create_refs: Vec<(&str, &[ColumnSchema])> = plan
472 .tables_to_create
473 .iter()
474 .map(|(name, schema)| (name.as_str(), schema.as_slice()))
475 .collect();
476
477 {
478 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
479 .with_label_values(&["align_batch_create_tables"])
480 .start_timer();
481 self.schema_alterer
482 .create_tables_if_missing_batch(
483 catalog,
484 schema,
485 &create_refs,
486 self.prom_store_with_metric_engine,
487 ctx.clone(),
488 )
489 .await?;
490 }
491
492 let created_table_results = {
493 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
494 .with_label_values(&["align_resolve_table_after_create"])
495 .start_timer();
496 futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
497 self.catalog_manager
498 .table(catalog, schema, table_name, Some(ctx.as_ref()))
499 }))
500 .await
501 };
502
503 for ((table_name, _), table_result) in
504 plan.tables_to_create.iter().zip(created_table_results)
505 {
506 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
507 reason: format!(
508 "Table not found after pending batch create attempt: {}",
509 table_name
510 ),
511 })?;
512 let table_info = table.table_info();
513 let table_id = table_info.ident.table_id;
514 let region_schema = table_info.meta.schema.arrow_schema().clone();
515 plan.region_schemas
516 .insert(table_name.clone(), (region_schema, table_id));
517 }
518
519 Self::enqueue_alter_for_new_tables(table_rows, plan)?;
520
521 Ok(())
522 }
523
524 fn enqueue_alter_for_new_tables(
527 table_rows: &[(String, Rows)],
528 plan: &mut TableResolutionPlan,
529 ) -> Result<()> {
530 let created_tables: HashSet<&str> = plan
531 .tables_to_create
532 .iter()
533 .map(|(table_name, _)| table_name.as_str())
534 .collect();
535
536 for (table_name, rows) in table_rows {
537 if !created_tables.contains(table_name.as_str()) {
538 continue;
539 }
540
541 let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
542 continue;
543 };
544
545 let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
546 if missing_columns.is_empty()
547 || plan
548 .tables_to_alter
549 .iter()
550 .any(|(existing_name, _)| existing_name == table_name)
551 {
552 continue;
553 }
554
555 plan.tables_to_alter
556 .push((table_name.clone(), missing_columns));
557 }
558
559 Ok(())
560 }
561
562 async fn alter_tables_and_refresh_schemas(
565 &self,
566 catalog: &str,
567 schema: &str,
568 ctx: &QueryContextRef,
569 plan: &mut TableResolutionPlan,
570 ) -> Result<()> {
571 if plan.tables_to_alter.is_empty() {
572 return Ok(());
573 }
574
575 let alter_refs: Vec<(&str, &[String])> = plan
576 .tables_to_alter
577 .iter()
578 .map(|(name, cols)| (name.as_str(), cols.as_slice()))
579 .collect();
580 {
581 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
582 .with_label_values(&["align_batch_add_missing_columns"])
583 .start_timer();
584 self.schema_alterer
585 .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
586 .await?;
587 }
588
589 let altered_table_results = {
590 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
591 .with_label_values(&["align_resolve_table_after_schema_alter"])
592 .start_timer();
593 futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
594 self.catalog_manager
595 .table(catalog, schema, table_name, Some(ctx.as_ref()))
596 }))
597 .await
598 };
599
600 for ((table_name, _), table_result) in
601 plan.tables_to_alter.iter().zip(altered_table_results)
602 {
603 let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
604 reason: format!(
605 "Table not found after pending batch schema alter: {}",
606 table_name
607 ),
608 })?;
609 let table_info = table.table_info();
610 let table_id = table_info.ident.table_id;
611 let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
612 plan.region_schemas
613 .insert(table_name.clone(), (refreshed_region_schema, table_id));
614 }
615
616 Ok(())
617 }
618
619 fn build_aligned_batches(
622 table_rows: &[(String, Rows)],
623 region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
624 ) -> Result<Vec<(String, u32, RecordBatch)>> {
625 let mut aligned_batches = Vec::with_capacity(table_rows.len());
626 for (table_name, rows) in table_rows {
627 let (region_schema, table_id) =
628 region_schemas.get(table_name).cloned().with_context(|| {
629 error::UnexpectedResultSnafu {
630 reason: format!("Region schema not resolved for table: {}", table_name),
631 }
632 })?;
633
634 let record_batch = {
635 let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
636 .with_label_values(&["align_rows_to_record_batch"])
637 .start_timer();
638 rows_to_aligned_record_batch(rows, region_schema.as_ref())?
639 };
640 aligned_batches.push((table_name.clone(), table_id, record_batch));
641 }
642
643 Ok(aligned_batches)
644 }
645
646 fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
647 if let Some(worker) = self.workers.get(&key)
648 && !worker.tx.is_closed()
649 {
650 return worker.clone();
651 }
652
653 let entry = self.workers.entry(key.clone());
654 match entry {
655 Entry::Occupied(mut worker) => {
656 if worker.get().tx.is_closed() {
657 let new_worker = self.spawn_worker(key);
658 worker.insert(new_worker.clone());
659 PENDING_WORKERS.set(self.workers.len() as i64);
660 new_worker
661 } else {
662 worker.get().clone()
663 }
664 }
665 Entry::Vacant(vacant) => {
666 let worker = self.spawn_worker(key);
667
668 vacant.insert(worker.clone());
669 PENDING_WORKERS.set(self.workers.len() as i64);
670 worker
671 }
672 }
673 }
674
675 fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
676 let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
677 let worker = PendingWorker { tx: tx.clone() };
678 let worker_idle_timeout = self
679 .flush_interval
680 .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
681 .unwrap_or(self.flush_interval);
682
683 start_worker(
684 key,
685 worker.tx.clone(),
686 self.workers.clone(),
687 rx,
688 self.shutdown.clone(),
689 self.partition_manager.clone(),
690 self.node_manager.clone(),
691 self.catalog_manager.clone(),
692 self.flush_interval,
693 worker_idle_timeout,
694 self.max_batch_rows,
695 self.flush_semaphore.clone(),
696 );
697
698 worker
699 }
700}
701
702impl Drop for PendingRowsBatcher {
703 fn drop(&mut self) {
704 let _ = self.shutdown.send(());
705 }
706}
707
708impl PendingBatch {
709 fn new() -> Self {
710 Self {
711 tables: HashMap::new(),
712 created_at: None,
713 total_row_count: 0,
714 ctx: None,
715 waiters: Vec::new(),
716 }
717 }
718}
719
720#[allow(clippy::too_many_arguments)]
721fn start_worker(
722 key: BatchKey,
723 worker_tx: mpsc::Sender<WorkerCommand>,
724 workers: Arc<DashMap<BatchKey, PendingWorker>>,
725 mut rx: mpsc::Receiver<WorkerCommand>,
726 shutdown: broadcast::Sender<()>,
727 partition_manager: PartitionRuleManagerRef,
728 node_manager: NodeManagerRef,
729 catalog_manager: CatalogManagerRef,
730 flush_interval: Duration,
731 worker_idle_timeout: Duration,
732 max_batch_rows: usize,
733 flush_semaphore: Arc<Semaphore>,
734) {
735 tokio::spawn(async move {
736 let mut batch = PendingBatch::new();
737 let mut interval = tokio::time::interval(flush_interval);
738 let mut shutdown_rx = shutdown.subscribe();
739 let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
740 let idle_timer = tokio::time::sleep_until(idle_deadline);
741 tokio::pin!(idle_timer);
742
743 loop {
744 tokio::select! {
745 cmd = rx.recv() => {
746 match cmd {
747 Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
748 idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
749
750 if batch.total_row_count == 0 {
751 batch.created_at = Some(Instant::now());
752 batch.ctx = Some(ctx);
753 PENDING_BATCHES.inc();
754 }
755
756 batch.waiters.push(FlushWaiter { response_tx, _permit });
757
758 for (table_name, table_id, record_batch) in table_batches {
759 let entry = batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
760 table_name,
761 table_id,
762 batches: Vec::new(),
763 row_count: 0,
764 });
765 entry.row_count += record_batch.num_rows();
766 entry.batches.push(record_batch);
767 }
768
769 batch.total_row_count += total_rows;
770 PENDING_ROWS.add(total_rows as i64);
771
772 if batch.total_row_count >= max_batch_rows
773 && let Some(flush) = drain_batch(&mut batch) {
774 spawn_flush(
775 flush,
776 partition_manager.clone(),
777 node_manager.clone(),
778 catalog_manager.clone(),
779 flush_semaphore.clone(),
780 ).await;
781 }
782 }
783 None => {
784 if let Some(flush) = drain_batch(&mut batch) {
785 flush_batch(
786 flush,
787 partition_manager.clone(),
788 node_manager.clone(),
789 catalog_manager.clone(),
790 ).await;
791 }
792 break;
793 }
794 }
795 }
796 _ = &mut idle_timer => {
797 if !should_close_worker_on_idle_timeout(batch.total_row_count, rx.len()) {
798 idle_timer
799 .as_mut()
800 .reset(tokio::time::Instant::now() + worker_idle_timeout);
801 continue;
802 }
803
804 debug!(
805 "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
806 key.catalog,
807 key.schema,
808 key.physical_table
809 );
810 break;
811 }
812 _ = interval.tick() => {
813 if let Some(created_at) = batch.created_at
814 && batch.total_row_count > 0
815 && created_at.elapsed() >= flush_interval
816 && let Some(flush) = drain_batch(&mut batch) {
817 spawn_flush(
818 flush,
819 partition_manager.clone(),
820 node_manager.clone(),
821 catalog_manager.clone(),
822 flush_semaphore.clone(),
823 ).await;
824 }
825 }
826 _ = shutdown_rx.recv() => {
827 if let Some(flush) = drain_batch(&mut batch) {
828 flush_batch(
829 flush,
830 partition_manager.clone(),
831 node_manager.clone(),
832 catalog_manager.clone(),
833 ).await;
834 }
835 break;
836 }
837 }
838 }
839
840 remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
841 });
842}
843
844fn remove_worker_if_same_channel(
845 workers: &DashMap<BatchKey, PendingWorker>,
846 key: &BatchKey,
847 worker_tx: &mpsc::Sender<WorkerCommand>,
848) -> bool {
849 if let Some(worker) = workers.get(key)
850 && worker.tx.same_channel(worker_tx)
851 {
852 drop(worker);
853 workers.remove(key);
854 PENDING_WORKERS.set(workers.len() as i64);
855 return true;
856 }
857
858 false
859}
860
861fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
862 total_row_count == 0 && queued_requests == 0
863}
864
865fn drain_batch(batch: &mut PendingBatch) -> Option<FlushBatch> {
866 if batch.total_row_count == 0 {
867 return None;
868 }
869
870 let ctx = match batch.ctx.take() {
871 Some(ctx) => ctx,
872 None => {
873 flush_with_error(batch, "Pending batch missing context");
874 return None;
875 }
876 };
877
878 let total_row_count = batch.total_row_count;
879 let table_batches = std::mem::take(&mut batch.tables).into_values().collect();
880 let waiters = std::mem::take(&mut batch.waiters);
881 batch.total_row_count = 0;
882 batch.created_at = None;
883
884 PENDING_ROWS.sub(total_row_count as i64);
885 PENDING_BATCHES.dec();
886
887 Some(FlushBatch {
888 table_batches,
889 total_row_count,
890 ctx,
891 waiters,
892 })
893}
894
895async fn spawn_flush(
896 flush: FlushBatch,
897 partition_manager: PartitionRuleManagerRef,
898 node_manager: NodeManagerRef,
899 catalog_manager: CatalogManagerRef,
900 semaphore: Arc<Semaphore>,
901) {
902 match semaphore.acquire_owned().await {
903 Ok(permit) => {
904 tokio::spawn(async move {
905 let _permit = permit;
906 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
907 });
908 }
909 Err(err) => {
910 warn!(err; "Flush semaphore closed, flushing inline");
911 flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
912 }
913 }
914}
915
916struct FlushRegionWrite {
917 region_id: RegionId,
918 row_count: usize,
919 datanode: Peer,
920 request: RegionRequest,
921}
922
923enum FlushWriteResult {
924 Success { row_count: usize },
925 Failed { row_count: usize, message: String },
926}
927
928fn should_dispatch_concurrently(region_write_count: usize) -> bool {
929 region_write_count > 1
930}
931
932fn columns_taxonomy(
941 batch_schema: &Arc<ArrowSchema>,
942 table_name: &str,
943 name_to_ids: &HashMap<String, u32>,
944 partition_columns: &HashSet<&str>,
945) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
946 let mut tag_columns = Vec::new();
947 let mut essential_column_indices =
948 SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
949 essential_column_indices.push(0);
951 essential_column_indices.push(0);
952
953 let mut timestamp_index = None;
954 let mut value_index = None;
955
956 for (index, field) in batch_schema.fields().iter().enumerate() {
957 match field.data_type() {
958 ArrowDataType::Utf8 => {
959 let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
960 error::InvalidPromRemoteRequestSnafu {
961 msg: format!(
962 "Column '{}' from logical table '{}' not found in physical table column IDs",
963 field.name(),
964 table_name
965 ),
966 }
967 })?;
968 tag_columns.push(TagColumnInfo {
969 name: field.name().clone(),
970 index,
971 column_id,
972 });
973
974 if partition_columns.contains(field.name().as_str()) {
975 essential_column_indices.push(index);
976 }
977 }
978 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
979 ensure!(
980 timestamp_index.replace(index).is_none(),
981 error::InvalidPromRemoteRequestSnafu {
982 msg: format!(
983 "Duplicated timestamp column in logical table '{}' batch schema",
984 table_name
985 ),
986 }
987 );
988 }
989 ArrowDataType::Float64 => {
990 ensure!(
991 value_index.replace(index).is_none(),
992 error::InvalidPromRemoteRequestSnafu {
993 msg: format!(
994 "Duplicated value column in logical table '{}' batch schema",
995 table_name
996 ),
997 }
998 );
999 }
1000 datatype => {
1001 return error::InvalidPromRemoteRequestSnafu {
1002 msg: format!(
1003 "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1004 table_name
1005 ),
1006 }
1007 .fail();
1008 }
1009 }
1010 }
1011
1012 let timestamp_index =
1013 timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1014 msg: format!(
1015 "Missing essential column '{}' in logical table '{}' batch schema",
1016 greptime_timestamp(),
1017 table_name
1018 ),
1019 })?;
1020 let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1021 msg: format!(
1022 "Missing essential column '{}' in logical table '{}' batch schema",
1023 greptime_value(),
1024 table_name
1025 ),
1026 })?;
1027
1028 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1029
1030 essential_column_indices[0] = timestamp_index;
1031 essential_column_indices[1] = value_index;
1032
1033 Ok((tag_columns, essential_column_indices))
1034}
1035
1036fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1037 ensure!(
1038 batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1039 error::InternalSnafu {
1040 err_msg: format!(
1041 "Expected at least {} columns in physical batch, got {}",
1042 PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1043 batch.num_columns()
1044 ),
1045 }
1046 );
1047 let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1048 batch
1049 .project(&essential_indices)
1050 .map_err(|err| Error::Internal {
1051 err_msg: format!("Failed to project essential columns from RecordBatch: {err}"),
1052 })
1053}
1054
1055async fn flush_region_writes_concurrently(
1056 node_manager: NodeManagerRef,
1057 writes: Vec<FlushRegionWrite>,
1058) -> Vec<FlushWriteResult> {
1059 if !should_dispatch_concurrently(writes.len()) {
1060 let mut results = Vec::with_capacity(writes.len());
1061 for write in writes {
1062 let datanode = node_manager.datanode(&write.datanode).await;
1063 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1064 .with_label_values(&["flush_write_region"])
1065 .start_timer();
1066 match datanode.handle(write.request).await {
1067 Ok(_) => results.push(FlushWriteResult::Success {
1068 row_count: write.row_count,
1069 }),
1070 Err(err) => results.push(FlushWriteResult::Failed {
1071 row_count: write.row_count,
1072 message: format!(
1073 "Bulk insert flush failed for region {}: {:?}",
1074 write.region_id, err
1075 ),
1076 }),
1077 }
1078 }
1079 return results;
1080 }
1081
1082 let write_futures = writes.into_iter().map(|write| {
1083 let node_manager = node_manager.clone();
1084 async move {
1085 let datanode = node_manager.datanode(&write.datanode).await;
1086 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1087 .with_label_values(&["flush_write_region"])
1088 .start_timer();
1089
1090 match datanode.handle(write.request).await {
1091 Ok(_) => FlushWriteResult::Success {
1092 row_count: write.row_count,
1093 },
1094 Err(err) => FlushWriteResult::Failed {
1095 row_count: write.row_count,
1096 message: format!(
1097 "Bulk insert flush failed for region {}: {:?}",
1098 write.region_id, err
1099 ),
1100 },
1101 }
1102 }
1103 });
1104
1105 futures::future::join_all(write_futures).await
1107}
1108
1109async fn flush_batch(
1110 flush: FlushBatch,
1111 partition_manager: PartitionRuleManagerRef,
1112 node_manager: NodeManagerRef,
1113 catalog_manager: CatalogManagerRef,
1114) {
1115 let FlushBatch {
1116 table_batches,
1117 total_row_count,
1118 ctx,
1119 waiters,
1120 } = flush;
1121 let start = Instant::now();
1122 let mut first_error: Option<String> = None;
1123
1124 let physical_table_name = ctx
1127 .extension(PHYSICAL_TABLE_KEY)
1128 .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1129 .to_string();
1130 flush_batch_physical(
1131 &table_batches,
1132 total_row_count,
1133 &physical_table_name,
1134 &ctx,
1135 &partition_manager,
1136 &node_manager,
1137 &catalog_manager,
1138 &mut first_error,
1139 )
1140 .await;
1141
1142 let elapsed = start.elapsed().as_secs_f64();
1143 FLUSH_ELAPSED.observe(elapsed);
1144 debug!(
1145 "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1146 total_row_count, elapsed
1147 );
1148
1149 notify_waiters(waiters, &first_error);
1150}
1151
1152#[allow(clippy::too_many_arguments)]
1159async fn flush_batch_physical(
1160 table_batches: &[TableBatch],
1161 total_row_count: usize,
1162 physical_table_name: &str,
1163 ctx: &QueryContextRef,
1164 partition_manager: &PartitionRuleManagerRef,
1165 node_manager: &NodeManagerRef,
1166 catalog_manager: &CatalogManagerRef,
1167 first_error: &mut Option<String>,
1168) {
1169 macro_rules! record_failure {
1170 ($row_count:expr, $msg:expr) => {{
1171 let msg = $msg;
1172 if first_error.is_none() {
1173 *first_error = Some(msg.clone());
1174 }
1175 mark_flush_failure($row_count, &msg);
1176 }};
1177 }
1178
1179 let physical_table = {
1181 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1182 .with_label_values(&["flush_physical_resolve_table"])
1183 .start_timer();
1184 match catalog_manager
1185 .table(
1186 ctx.current_catalog(),
1187 &ctx.current_schema(),
1188 physical_table_name,
1189 Some(ctx.as_ref()),
1190 )
1191 .await
1192 {
1193 Ok(Some(table)) => table,
1194 Ok(None) => {
1195 record_failure!(
1196 total_row_count,
1197 format!(
1198 "Physical table '{}' not found during pending flush",
1199 physical_table_name
1200 )
1201 );
1202 return;
1203 }
1204 Err(err) => {
1205 record_failure!(
1206 total_row_count,
1207 format!(
1208 "Failed to resolve physical table '{}' for pending flush: {:?}",
1209 physical_table_name, err
1210 )
1211 );
1212 return;
1213 }
1214 }
1215 };
1216
1217 let physical_table_info = physical_table.table_info();
1218 let name_to_ids = match physical_table_info.name_to_ids() {
1219 Some(ids) => ids,
1220 None => {
1221 record_failure!(
1222 total_row_count,
1223 format!(
1224 "Physical table '{}' has no column IDs for pending flush",
1225 physical_table_name
1226 )
1227 );
1228 return;
1229 }
1230 };
1231
1232 let partition_rule = {
1234 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1235 .with_label_values(&["flush_physical_fetch_partition_rule"])
1236 .start_timer();
1237 match partition_manager
1238 .find_table_partition_rule(&physical_table_info)
1239 .await
1240 {
1241 Ok(rule) => rule,
1242 Err(err) => {
1243 record_failure!(
1244 total_row_count,
1245 format!(
1246 "Failed to fetch partition rule for physical table '{}': {:?}",
1247 physical_table_name, err
1248 )
1249 );
1250 return;
1251 }
1252 }
1253 };
1254 let partition_columns = partition_rule.0.partition_columns();
1255 let partition_columns_set: HashSet<&str> =
1256 partition_columns.iter().map(String::as_str).collect();
1257
1258 let mut modified_batches: Vec<RecordBatch> = Vec::with_capacity(table_batches.len());
1260 let mut modified_row_count: usize = 0;
1261
1262 let mut modify_elapsed = Duration::ZERO;
1263 let mut columns_taxonomy_elapsed = Duration::ZERO;
1264
1265 'next_table: for table_batch in table_batches {
1266 let table_id = table_batch.table_id;
1267
1268 for batch in &table_batch.batches {
1271 let batch_schema = batch.schema();
1276 let start = Instant::now();
1277 let (tag_columns, essential_col_indices) = match columns_taxonomy(
1278 &batch_schema,
1279 &table_batch.table_name,
1280 &name_to_ids,
1281 &partition_columns_set,
1282 ) {
1283 Ok(columns) => columns,
1284 Err(err) => {
1285 warn!(
1286 "Failed to resolve columns for logical table '{}': {:?}",
1287 table_batch.table_name, err
1288 );
1289 record_failure!(table_batch.row_count, err.to_string());
1290 continue 'next_table;
1291 }
1292 };
1293
1294 columns_taxonomy_elapsed += start.elapsed();
1295 if tag_columns.is_empty() && essential_col_indices.is_empty() {
1296 continue;
1297 }
1298
1299 let modified = {
1300 let start = Instant::now();
1301 match modify_batch_sparse(
1302 batch.clone(),
1303 table_id,
1304 &tag_columns,
1305 &essential_col_indices,
1306 ) {
1307 Ok(batch) => {
1308 modify_elapsed += start.elapsed();
1309 batch
1310 }
1311 Err(err) => {
1312 record_failure!(
1313 table_batch.row_count,
1314 format!(
1315 "Failed to modify batch for logical table '{}': {:?}",
1316 table_batch.table_name, err
1317 )
1318 );
1319 continue 'next_table;
1320 }
1321 }
1322 };
1323
1324 modified_row_count += modified.num_rows();
1325 modified_batches.push(modified);
1326 }
1327 }
1328
1329 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1330 .with_label_values(&["flush_physical_modify_batch"])
1331 .observe(modify_elapsed.as_secs_f64());
1332 PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1333 .with_label_values(&["flush_physical_columns_taxonomy"])
1334 .observe(columns_taxonomy_elapsed.as_secs_f64());
1335
1336 if modified_batches.is_empty() {
1337 if first_error.is_none() {
1338 record_failure!(
1339 total_row_count,
1340 format!(
1341 "No batches can be transformed for physical table '{}' during pending flush",
1342 physical_table_name
1343 )
1344 );
1345 }
1346 return;
1347 }
1348
1349 let combined_batch = {
1351 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1352 .with_label_values(&["flush_physical_concat_all"])
1353 .start_timer();
1354 let combined_schema = modified_batches[0].schema();
1355 match concat_batches(&combined_schema, &modified_batches) {
1357 Ok(batch) => batch,
1358 Err(err) => {
1359 record_failure!(
1360 modified_row_count,
1361 format!("Failed to concat modified batches: {:?}", err)
1362 );
1363 return;
1364 }
1365 }
1366 };
1367
1368 let physical_table_id = physical_table_info.table_id();
1370 let region_masks = {
1371 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1372 .with_label_values(&["flush_physical_split_record_batch"])
1373 .start_timer();
1374 match partition_rule.0.split_record_batch(&combined_batch) {
1375 Ok(masks) => masks,
1376 Err(err) => {
1377 record_failure!(
1378 total_row_count,
1379 format!(
1380 "Failed to split combined batch for physical table '{}': {:?}",
1381 physical_table_name, err
1382 )
1383 );
1384 return;
1385 }
1386 }
1387 };
1388
1389 let stripped_batch = if partition_columns.is_empty() {
1390 combined_batch
1391 } else {
1392 match strip_partition_columns_from_batch(combined_batch) {
1394 Ok(batch) => batch,
1395 Err(err) => {
1396 record_failure!(
1397 total_row_count,
1398 format!(
1399 "Failed to strip partition columns for physical table '{}': {:?}",
1400 physical_table_name, err
1401 )
1402 );
1403 return;
1404 }
1405 }
1406 };
1407
1408 let mut region_writes = Vec::new();
1409 for (region_number, mask) in region_masks {
1410 if mask.select_none() {
1411 continue;
1412 }
1413
1414 let region_batch = if mask.select_all() {
1415 stripped_batch.clone()
1416 } else {
1417 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1418 .with_label_values(&["flush_physical_filter_record_batch"])
1419 .start_timer();
1420 match filter_record_batch(&stripped_batch, mask.array()) {
1421 Ok(batch) => batch,
1422 Err(err) => {
1423 record_failure!(
1424 total_row_count,
1425 format!(
1426 "Failed to filter combined batch for physical table '{}': {:?}",
1427 physical_table_name, err
1428 )
1429 );
1430 continue;
1431 }
1432 }
1433 };
1434
1435 let row_count = region_batch.num_rows();
1436 if row_count == 0 {
1437 continue;
1438 }
1439
1440 let region_id = RegionId::new(physical_table_id, region_number);
1441 let datanode = {
1442 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1443 .with_label_values(&["flush_physical_resolve_region_leader"])
1444 .start_timer();
1445 match partition_manager.find_region_leader(region_id).await {
1446 Ok(peer) => peer,
1447 Err(err) => {
1448 record_failure!(
1449 row_count,
1450 format!(
1451 "Failed to resolve region leader for physical region {}: {:?}",
1452 region_id, err
1453 )
1454 );
1455 continue;
1456 }
1457 }
1458 };
1459
1460 let (schema_bytes, data_header, payload) = {
1461 let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1462 .with_label_values(&["flush_physical_encode_ipc"])
1463 .start_timer();
1464 match record_batch_to_ipc(region_batch) {
1465 Ok(encoded) => encoded,
1466 Err(err) => {
1467 record_failure!(
1468 row_count,
1469 format!(
1470 "Failed to encode Arrow IPC for physical region {}: {:?}",
1471 region_id, err
1472 )
1473 );
1474 continue;
1475 }
1476 }
1477 };
1478
1479 let request = RegionRequest {
1480 header: Some(RegionRequestHeader {
1481 tracing_context: TracingContext::from_current_span().to_w3c(),
1482 ..Default::default()
1483 }),
1484 body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1485 region_id: region_id.as_u64(),
1486 partition_expr_version: None,
1487 body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1488 schema: schema_bytes,
1489 data_header,
1490 payload,
1491 })),
1492 })),
1493 };
1494
1495 region_writes.push(FlushRegionWrite {
1496 region_id,
1497 row_count,
1498 datanode,
1499 request,
1500 });
1501 }
1502
1503 for result in flush_region_writes_concurrently(node_manager.clone(), region_writes).await {
1504 match result {
1505 FlushWriteResult::Success { row_count } => {
1506 FLUSH_TOTAL.inc();
1507 FLUSH_ROWS.observe(row_count as f64);
1508 }
1509 FlushWriteResult::Failed { row_count, message } => {
1510 record_failure!(row_count, message);
1511 }
1512 }
1513 }
1514}
1515
1516fn notify_waiters(waiters: Vec<FlushWaiter>, first_error: &Option<String>) {
1517 for waiter in waiters {
1518 let result = match first_error {
1519 Some(err_msg) => Err(Error::Internal {
1520 err_msg: err_msg.clone(),
1521 }),
1522 None => Ok(()),
1523 };
1524 let _ = waiter.response_tx.send(result);
1525 }
1527}
1528
1529fn mark_flush_failure(row_count: usize, message: &str) {
1530 error!("Pending rows batch flush failed, message: {}", message);
1531 FLUSH_FAILURES.inc();
1532 FLUSH_DROPPED_ROWS.inc_by(row_count as u64);
1533}
1534
1535fn flush_with_error(batch: &mut PendingBatch, message: &str) {
1536 if batch.total_row_count == 0 {
1537 return;
1538 }
1539
1540 let row_count = batch.total_row_count;
1541 let waiters = std::mem::take(&mut batch.waiters);
1542 batch.tables.clear();
1543 batch.total_row_count = 0;
1544 batch.created_at = None;
1545 batch.ctx = None;
1546
1547 PENDING_ROWS.sub(row_count as i64);
1548 PENDING_BATCHES.dec();
1549
1550 let err_msg = Some(message.to_string());
1551 notify_waiters(waiters, &err_msg);
1552 mark_flush_failure(row_count, message);
1553}
1554
1555fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1556 let mut encoder = FlightEncoder::default();
1557 let schema = encoder.encode_schema(record_batch.schema().as_ref());
1558 let mut iter = encoder
1559 .encode(FlightMessage::RecordBatch(record_batch))
1560 .into_iter();
1561 let Some(flight_data) = iter.next() else {
1562 return Err(Error::Internal {
1563 err_msg: "Failed to encode empty flight data".to_string(),
1564 });
1565 };
1566 if iter.next().is_some() {
1567 return Err(Error::NotSupported {
1568 feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1569 });
1570 }
1571
1572 Ok((
1573 schema.data_header,
1574 flight_data.data_header,
1575 flight_data.data_body,
1576 ))
1577}
1578
1579#[cfg(test)]
1580mod tests {
1581 use std::collections::{HashMap, HashSet};
1582 use std::sync::Arc;
1583 use std::sync::atomic::{AtomicUsize, Ordering};
1584 use std::time::Duration;
1585
1586 use api::region::RegionResponse;
1587 use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1588 use api::v1::meta::Peer;
1589 use api::v1::region::{InsertRequests, RegionRequest};
1590 use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1591 use arrow::array::{BinaryArray, StringArray, TimestampMillisecondArray};
1592 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1593 use arrow::record_batch::RecordBatch;
1594 use async_trait::async_trait;
1595 use common_meta::error::Result as MetaResult;
1596 use common_meta::node_manager::{
1597 Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1598 };
1599 use common_query::request::QueryRequest;
1600 use common_recordbatch::SendableRecordBatchStream;
1601 use dashmap::DashMap;
1602 use smallvec::SmallVec;
1603 use store_api::storage::RegionId;
1604 use tokio::sync::mpsc;
1605 use tokio::time::sleep;
1606
1607 use super::{
1608 BatchKey, Error, FlushRegionWrite, FlushWriteResult, PendingRowsBatcher, PendingWorker,
1609 WorkerCommand, columns_taxonomy, flush_region_writes_concurrently,
1610 remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1611 should_dispatch_concurrently, strip_partition_columns_from_batch,
1612 };
1613
1614 fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1615 Rows {
1616 schema: vec![ColumnSchema {
1617 column_name: schema_name.to_string(),
1618 ..Default::default()
1619 }],
1620 rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1621 }
1622 }
1623
1624 #[test]
1625 fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1626 let requests = RowInsertRequests {
1627 inserts: vec![
1628 RowInsertRequest {
1629 table_name: "cpu".to_string(),
1630 rows: Some(mock_rows(2, "host")),
1631 },
1632 RowInsertRequest {
1633 table_name: "mem".to_string(),
1634 rows: Some(mock_rows(0, "host")),
1635 },
1636 RowInsertRequest {
1637 table_name: "disk".to_string(),
1638 rows: None,
1639 },
1640 ],
1641 };
1642
1643 let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1644
1645 assert_eq!(2, total_rows);
1646 assert_eq!(1, table_rows.len());
1647 assert_eq!("cpu", table_rows[0].0);
1648 assert_eq!(2, table_rows[0].1.rows.len());
1649 }
1650
1651 #[derive(Clone)]
1652 struct ConcurrentMockDatanode {
1653 delay: Duration,
1654 inflight: Arc<AtomicUsize>,
1655 max_inflight: Arc<AtomicUsize>,
1656 }
1657
1658 #[async_trait]
1659 impl Datanode for ConcurrentMockDatanode {
1660 async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1661 let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1662 loop {
1663 let max = self.max_inflight.load(Ordering::SeqCst);
1664 if now <= max {
1665 break;
1666 }
1667 if self
1668 .max_inflight
1669 .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1670 .is_ok()
1671 {
1672 break;
1673 }
1674 }
1675
1676 sleep(self.delay).await;
1677 self.inflight.fetch_sub(1, Ordering::SeqCst);
1678 Ok(RegionResponse::new(0))
1679 }
1680
1681 async fn handle_query(
1682 &self,
1683 _request: QueryRequest,
1684 ) -> MetaResult<SendableRecordBatchStream> {
1685 unimplemented!()
1686 }
1687 }
1688
1689 #[derive(Clone)]
1690 struct ConcurrentMockNodeManager {
1691 datanodes: Arc<HashMap<u64, DatanodeRef>>,
1692 }
1693
1694 #[async_trait]
1695 impl DatanodeManager for ConcurrentMockNodeManager {
1696 async fn datanode(&self, node: &Peer) -> DatanodeRef {
1697 self.datanodes
1698 .get(&node.id)
1699 .expect("datanode not found")
1700 .clone()
1701 }
1702 }
1703
1704 struct NoopFlownode;
1705
1706 #[async_trait]
1707 impl Flownode for NoopFlownode {
1708 async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1709 unimplemented!()
1710 }
1711
1712 async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1713 unimplemented!()
1714 }
1715
1716 async fn handle_mark_window_dirty(
1717 &self,
1718 _req: DirtyWindowRequests,
1719 ) -> MetaResult<FlowResponse> {
1720 unimplemented!()
1721 }
1722 }
1723
1724 #[async_trait]
1725 impl FlownodeManager for ConcurrentMockNodeManager {
1726 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1727 Arc::new(NoopFlownode)
1728 }
1729 }
1730
1731 #[test]
1732 fn test_remove_worker_if_same_channel_removes_matching_entry() {
1733 let workers = DashMap::new();
1734 let key = BatchKey {
1735 catalog: "greptime".to_string(),
1736 schema: "public".to_string(),
1737 physical_table: "phy".to_string(),
1738 };
1739
1740 let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
1741 workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
1742
1743 assert!(remove_worker_if_same_channel(&workers, &key, &tx));
1744 assert!(!workers.contains_key(&key));
1745 }
1746
1747 #[test]
1748 fn test_remove_worker_if_same_channel_keeps_newer_entry() {
1749 let workers = DashMap::new();
1750 let key = BatchKey {
1751 catalog: "greptime".to_string(),
1752 schema: "public".to_string(),
1753 physical_table: "phy".to_string(),
1754 };
1755
1756 let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
1757 let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
1758 workers.insert(
1759 key.clone(),
1760 PendingWorker {
1761 tx: fresh_tx.clone(),
1762 },
1763 );
1764
1765 assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
1766 assert!(workers.contains_key(&key));
1767 assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
1768 }
1769
1770 #[test]
1771 fn test_worker_idle_timeout_close_decision() {
1772 assert!(should_close_worker_on_idle_timeout(0, 0));
1773 assert!(!should_close_worker_on_idle_timeout(1, 0));
1774 assert!(!should_close_worker_on_idle_timeout(0, 1));
1775 }
1776
1777 #[tokio::test]
1778 async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
1779 let inflight = Arc::new(AtomicUsize::new(0));
1780 let max_inflight = Arc::new(AtomicUsize::new(0));
1781 let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
1782 delay: Duration::from_millis(100),
1783 inflight: inflight.clone(),
1784 max_inflight: max_inflight.clone(),
1785 });
1786 let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
1787 delay: Duration::from_millis(100),
1788 inflight,
1789 max_inflight: max_inflight.clone(),
1790 });
1791
1792 let mut datanodes = HashMap::new();
1793 datanodes.insert(1, datanode1);
1794 datanodes.insert(2, datanode2);
1795 let node_manager = Arc::new(ConcurrentMockNodeManager {
1796 datanodes: Arc::new(datanodes),
1797 });
1798
1799 let writes = vec![
1800 FlushRegionWrite {
1801 region_id: RegionId::new(1024, 1),
1802 row_count: 10,
1803 datanode: Peer {
1804 id: 1,
1805 addr: "node1".to_string(),
1806 },
1807 request: RegionRequest::default(),
1808 },
1809 FlushRegionWrite {
1810 region_id: RegionId::new(1024, 2),
1811 row_count: 12,
1812 datanode: Peer {
1813 id: 2,
1814 addr: "node2".to_string(),
1815 },
1816 request: RegionRequest::default(),
1817 },
1818 ];
1819
1820 let results = flush_region_writes_concurrently(node_manager, writes).await;
1821 assert_eq!(2, results.len());
1822 assert!(
1823 results
1824 .iter()
1825 .all(|result| matches!(result, FlushWriteResult::Success { .. }))
1826 );
1827 assert!(max_inflight.load(Ordering::SeqCst) >= 2);
1828 }
1829
1830 #[test]
1831 fn test_should_dispatch_concurrently_by_region_count() {
1832 assert!(!should_dispatch_concurrently(0));
1833 assert!(!should_dispatch_concurrently(1));
1834 assert!(should_dispatch_concurrently(2));
1835 }
1836
1837 #[test]
1838 fn test_strip_partition_columns_from_batch_removes_partition_tags() {
1839 let batch = RecordBatch::try_new(
1840 Arc::new(ArrowSchema::new(vec![
1841 Field::new("__primary_key", ArrowDataType::Binary, false),
1842 Field::new(
1843 "greptime_timestamp",
1844 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1845 false,
1846 ),
1847 Field::new("greptime_value", ArrowDataType::Float64, true),
1848 Field::new("host", ArrowDataType::Utf8, true),
1849 ])),
1850 vec![
1851 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
1852 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
1853 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
1854 Arc::new(StringArray::from(vec!["node-1"])),
1855 ],
1856 )
1857 .unwrap();
1858
1859 let stripped = strip_partition_columns_from_batch(batch).unwrap();
1860
1861 assert_eq!(3, stripped.num_columns());
1862 assert_eq!("__primary_key", stripped.schema().field(0).name());
1863 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
1864 assert_eq!("greptime_value", stripped.schema().field(2).name());
1865 }
1866
1867 #[test]
1868 fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
1869 let batch = RecordBatch::try_new(
1870 Arc::new(ArrowSchema::new(vec![
1871 Field::new("__primary_key", ArrowDataType::Binary, false),
1872 Field::new(
1873 "greptime_timestamp",
1874 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1875 false,
1876 ),
1877 Field::new("greptime_value", ArrowDataType::Float64, true),
1878 Field::new("host", ArrowDataType::Utf8, true),
1879 ])),
1880 vec![
1881 Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
1882 Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
1883 Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
1884 Arc::new(StringArray::from(vec!["node-1"])),
1885 ],
1886 )
1887 .unwrap();
1888
1889 let stripped = strip_partition_columns_from_batch(batch).unwrap();
1890
1891 assert_eq!(3, stripped.num_columns());
1892 assert_eq!("__primary_key", stripped.schema().field(0).name());
1893 assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
1894 assert_eq!("greptime_value", stripped.schema().field(2).name());
1895 }
1896
1897 #[test]
1898 fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
1899 let schema = Arc::new(ArrowSchema::new(vec![
1900 Field::new(
1901 "greptime_timestamp",
1902 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1903 false,
1904 ),
1905 Field::new("greptime_value", ArrowDataType::Float64, true),
1906 Field::new("host", ArrowDataType::Utf8, true),
1907 Field::new("region", ArrowDataType::Utf8, true),
1908 ]));
1909 let name_to_ids =
1910 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
1911 let partition_columns = HashSet::from(["host"]);
1912
1913 let (tag_columns, non_tag_indices) =
1914 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
1915
1916 assert_eq!(2, tag_columns.len());
1917 assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
1918 }
1919
1920 #[test]
1921 fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
1922 let schema = Arc::new(ArrowSchema::new(vec![
1923 Field::new("host", ArrowDataType::Utf8, true),
1924 Field::new("greptime_value", ArrowDataType::Float64, true),
1925 Field::new(
1926 "greptime_timestamp",
1927 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1928 false,
1929 ),
1930 Field::new("region", ArrowDataType::Utf8, true),
1931 ]));
1932 let name_to_ids =
1933 HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
1934 let partition_columns = HashSet::from(["host", "region"]);
1935
1936 let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
1937 columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
1938
1939 assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
1940 }
1941
1942 #[test]
1943 fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
1944 let schema = Arc::new(ArrowSchema::new(vec![
1945 Field::new(
1946 "greptime_timestamp",
1947 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1948 false,
1949 ),
1950 Field::new("greptime_value", ArrowDataType::Float64, true),
1951 Field::new("host", ArrowDataType::Utf8, true),
1952 Field::new("invalid", ArrowDataType::Boolean, true),
1953 ]));
1954 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
1955 let partition_columns = HashSet::from(["host"]);
1956
1957 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
1958
1959 assert!(matches!(
1960 result,
1961 Err(Error::InvalidPromRemoteRequest { .. })
1962 ));
1963 }
1964
1965 #[test]
1966 fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
1967 let schema = Arc::new(ArrowSchema::new(vec![
1968 Field::new("greptime_timestamp", ArrowDataType::Int64, false),
1969 Field::new("greptime_value", ArrowDataType::Float64, true),
1970 Field::new("host", ArrowDataType::Utf8, true),
1971 ]));
1972 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
1973 let partition_columns = HashSet::from(["host"]);
1974
1975 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
1976
1977 assert!(matches!(
1978 result,
1979 Err(Error::InvalidPromRemoteRequest { .. })
1980 ));
1981 }
1982
1983 #[test]
1984 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
1985 let schema = Arc::new(ArrowSchema::new(vec![
1986 Field::new(
1987 "ts1",
1988 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1989 false,
1990 ),
1991 Field::new(
1992 "ts2",
1993 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1994 false,
1995 ),
1996 Field::new("greptime_value", ArrowDataType::Float64, true),
1997 Field::new("host", ArrowDataType::Utf8, true),
1998 ]));
1999 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2000 let partition_columns = HashSet::from(["host"]);
2001
2002 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2003
2004 assert!(matches!(
2005 result,
2006 Err(Error::InvalidPromRemoteRequest { .. })
2007 ));
2008 }
2009
2010 #[test]
2011 fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2012 let schema = Arc::new(ArrowSchema::new(vec![
2013 Field::new(
2014 "greptime_timestamp",
2015 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2016 false,
2017 ),
2018 Field::new("value1", ArrowDataType::Float64, true),
2019 Field::new("value2", ArrowDataType::Float64, true),
2020 Field::new("host", ArrowDataType::Utf8, true),
2021 ]));
2022 let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2023 let partition_columns = HashSet::from(["host"]);
2024
2025 let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2026
2027 assert!(matches!(
2028 result,
2029 Err(Error::InvalidPromRemoteRequest { .. })
2030 ));
2031 }
2032
2033 #[test]
2034 fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2035 use arrow::array::BinaryArray;
2036 use metric_engine::batch_modifier::modify_batch_sparse;
2037
2038 let schema1 = Arc::new(ArrowSchema::new(vec![
2039 Field::new(
2040 "greptime_timestamp",
2041 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2042 false,
2043 ),
2044 Field::new("greptime_value", ArrowDataType::Float64, true),
2045 Field::new("tag1", ArrowDataType::Utf8, true),
2046 ]));
2047
2048 let schema2 = Arc::new(ArrowSchema::new(vec![
2049 Field::new(
2050 "greptime_timestamp",
2051 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2052 false,
2053 ),
2054 Field::new("greptime_value", ArrowDataType::Float64, true),
2055 Field::new("tag1", ArrowDataType::Utf8, true),
2056 Field::new("tag2", ArrowDataType::Utf8, true),
2057 ]));
2058 let batch2 = RecordBatch::try_new(
2059 schema2.clone(),
2060 vec![
2061 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2062 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2063 Arc::new(StringArray::from(vec!["v1"])),
2064 Arc::new(StringArray::from(vec!["v2"])),
2065 ],
2066 )
2067 .unwrap();
2068
2069 let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2070 let partition_columns = HashSet::new();
2071
2072 let batch3 = RecordBatch::try_new(
2074 schema1.clone(),
2075 vec![
2076 Arc::new(TimestampMillisecondArray::from(vec![2000])),
2077 Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2078 Arc::new(StringArray::from(vec!["v1"])),
2079 ],
2080 )
2081 .unwrap();
2082
2083 let (tag_columns2, indices2) =
2086 columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2087 let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2088
2089 let (tag_columns3, indices3) =
2090 columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2091 let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2092
2093 let pk2 = modified2
2094 .column(0)
2095 .as_any()
2096 .downcast_ref::<BinaryArray>()
2097 .unwrap();
2098 let pk3 = modified3
2099 .column(0)
2100 .as_any()
2101 .downcast_ref::<BinaryArray>()
2102 .unwrap();
2103
2104 assert_ne!(
2106 pk2.value(0),
2107 pk3.value(0),
2108 "PK should be different because batch2 has tag2!"
2109 );
2110 }
2111}