Skip to main content

servers/
pending_rows_batcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21    BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use partition::partition::PartitionRuleRef;
40use session::context::QueryContextRef;
41use smallvec::SmallVec;
42use snafu::{OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use table::metadata::{TableInfo, TableInfoRef};
45use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
46
47use crate::error;
48use crate::error::{Error, Result};
49use crate::metrics::{
50    FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
51    PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
52    PENDING_WORKERS,
53};
54use crate::prom_row_builder::{
55    build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
56    rows_to_aligned_record_batch,
57};
58
59const PHYSICAL_TABLE_KEY: &str = "physical_table";
60/// Whether wait for ingestion result before reply to client.
61const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
62const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
63const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
64
65#[async_trait]
66pub trait PendingRowsSchemaAlterer: Send + Sync {
67    /// Batch-create multiple logical tables that are missing.
68    /// Each entry is `(table_name, request_schema)`.
69    async fn create_tables_if_missing_batch(
70        &self,
71        catalog: &str,
72        schema: &str,
73        tables: &[(&str, &[ColumnSchema])],
74        with_metric_engine: bool,
75        ctx: QueryContextRef,
76    ) -> Result<()>;
77
78    /// Batch-alter multiple logical tables to add missing tag columns.
79    /// Each entry is `(table_name, missing_column_names)`.
80    async fn add_missing_prom_tag_columns_batch(
81        &self,
82        catalog: &str,
83        schema: &str,
84        tables: &[(&str, &[String])],
85        ctx: QueryContextRef,
86    ) -> Result<()>;
87}
88
89pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
90
91#[derive(Clone)]
92pub struct PhysicalTableMetadata {
93    pub table_info: TableInfoRef,
94    /// Mapping from column name to column id
95    pub col_name_to_ids: Option<HashMap<String, u32>>,
96}
97
98#[async_trait]
99pub trait PhysicalFlushCatalogProvider: Send + Sync {
100    async fn physical_table(
101        &self,
102        catalog: &str,
103        schema: &str,
104        table_name: &str,
105        query_ctx: &session::context::QueryContext,
106    ) -> catalog::error::Result<Option<PhysicalTableMetadata>>;
107}
108
109#[async_trait]
110pub trait PhysicalFlushPartitionProvider: Send + Sync {
111    async fn find_table_partition_rule(
112        &self,
113        table_info: &TableInfo,
114    ) -> partition::error::Result<PartitionRuleRef>;
115
116    async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer>;
117}
118
119#[async_trait]
120pub trait PhysicalFlushNodeRequester: Send + Sync {
121    async fn handle(
122        &self,
123        peer: &Peer,
124        request: RegionRequest,
125    ) -> Result<api::region::RegionResponse>;
126}
127
128#[derive(Clone)]
129struct CatalogManagerPhysicalFlushAdapter {
130    catalog_manager: CatalogManagerRef,
131}
132
133#[async_trait]
134impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter {
135    async fn physical_table(
136        &self,
137        catalog: &str,
138        schema: &str,
139        table_name: &str,
140        query_ctx: &session::context::QueryContext,
141    ) -> catalog::error::Result<Option<PhysicalTableMetadata>> {
142        self.catalog_manager
143            .table(catalog, schema, table_name, Some(query_ctx))
144            .await
145            .map(|table| {
146                table.map(|table| {
147                    let table_info = table.table_info();
148                    let name_to_ids = table_info.name_to_ids();
149                    PhysicalTableMetadata {
150                        table_info,
151                        col_name_to_ids: name_to_ids,
152                    }
153                })
154            })
155    }
156}
157
158#[derive(Clone)]
159struct PartitionManagerPhysicalFlushAdapter {
160    partition_manager: PartitionRuleManagerRef,
161}
162
163#[async_trait]
164impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter {
165    async fn find_table_partition_rule(
166        &self,
167        table_info: &TableInfo,
168    ) -> partition::error::Result<PartitionRuleRef> {
169        self.partition_manager
170            .find_table_partition_rule(table_info)
171            .await
172            .map(|(rule, _)| rule)
173    }
174
175    async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176        let peer = self.partition_manager.find_region_leader(region_id).await?;
177        Ok(peer)
178    }
179}
180
181#[derive(Clone)]
182struct NodeManagerPhysicalFlushAdapter {
183    node_manager: NodeManagerRef,
184}
185
186#[async_trait]
187impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter {
188    async fn handle(
189        &self,
190        peer: &Peer,
191        request: RegionRequest,
192    ) -> error::Result<api::region::RegionResponse> {
193        let datanode = self.node_manager.datanode(peer).await;
194        datanode
195            .handle(request)
196            .await
197            .context(error::CommonMetaSnafu)
198    }
199}
200
201#[derive(Debug, Clone, Hash, Eq, PartialEq)]
202struct BatchKey {
203    catalog: String,
204    schema: String,
205    physical_table: String,
206}
207
208#[derive(Debug)]
209pub struct TableBatch {
210    pub table_name: String,
211    pub table_id: TableId,
212    pub batches: Vec<RecordBatch>,
213    pub row_count: usize,
214}
215
216/// Intermediate planning state for resolving and preparing logical tables
217/// before row-to-batch alignment.
218struct TableResolutionPlan {
219    /// Resolved table schema and table id by logical table name.
220    region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
221    /// Missing tables that need to be created before alignment.
222    tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
223    /// Existing tables that need tag-column schema evolution.
224    tables_to_alter: Vec<(String, Vec<String>)>,
225}
226
227struct PendingBatch {
228    tables: HashMap<String, TableBatch>,
229    created_at: Instant,
230    total_row_count: usize,
231    ctx: QueryContextRef,
232    waiters: Vec<FlushWaiter>,
233}
234
235struct FlushWaiter {
236    response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
237    _permit: OwnedSemaphorePermit,
238}
239
240struct FlushBatch {
241    table_batches: Vec<TableBatch>,
242    total_row_count: usize,
243    ctx: QueryContextRef,
244    waiters: Vec<FlushWaiter>,
245}
246
247#[derive(Clone)]
248struct PendingWorker {
249    tx: mpsc::Sender<WorkerCommand>,
250}
251
252enum WorkerCommand {
253    Submit {
254        table_batches: Vec<(String, u32, RecordBatch)>,
255        total_rows: usize,
256        ctx: QueryContextRef,
257        response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
258        _permit: OwnedSemaphorePermit,
259    },
260}
261
262// Batch key is derived from QueryContext; it assumes catalog/schema/physical_table fully
263// define the write target and must remain consistent across the batch.
264fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
265    let physical_table = ctx
266        .extension(PHYSICAL_TABLE_KEY)
267        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
268        .to_string();
269    BatchKey {
270        catalog: ctx.current_catalog().to_string(),
271        schema: ctx.current_schema(),
272        physical_table,
273    }
274}
275
276/// Prometheus remote write pending rows batcher.
277pub struct PendingRowsBatcher {
278    workers: Arc<DashMap<BatchKey, PendingWorker>>,
279    flush_interval: Duration,
280    max_batch_rows: usize,
281    partition_manager: PartitionRuleManagerRef,
282    node_manager: NodeManagerRef,
283    catalog_manager: CatalogManagerRef,
284    flush_semaphore: Arc<Semaphore>,
285    inflight_semaphore: Arc<Semaphore>,
286    worker_channel_capacity: usize,
287    prom_store_with_metric_engine: bool,
288    schema_alterer: PendingRowsSchemaAltererRef,
289    pending_rows_batch_sync: bool,
290    shutdown: broadcast::Sender<()>,
291}
292
293impl PendingRowsBatcher {
294    #[allow(clippy::too_many_arguments)]
295    pub fn try_new(
296        partition_manager: PartitionRuleManagerRef,
297        node_manager: NodeManagerRef,
298        catalog_manager: CatalogManagerRef,
299        prom_store_with_metric_engine: bool,
300        schema_alterer: PendingRowsSchemaAltererRef,
301        flush_interval: Duration,
302        max_batch_rows: usize,
303        max_concurrent_flushes: usize,
304        worker_channel_capacity: usize,
305        max_inflight_requests: usize,
306    ) -> Option<Arc<Self>> {
307        // Disable the batcher if flush is disabled or configuration is invalid.
308        // Zero values for these knobs either cause panics (e.g., zero-capacity channels)
309        // or deadlocks (e.g., semaphores with no permits).
310        if flush_interval.is_zero()
311            || max_batch_rows == 0
312            || max_concurrent_flushes == 0
313            || worker_channel_capacity == 0
314            || max_inflight_requests == 0
315        {
316            return None;
317        }
318
319        let (shutdown, _) = broadcast::channel(1);
320        let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
321            .ok()
322            .as_deref()
323            .and_then(|v| v.parse::<bool>().ok())
324            .unwrap_or(true);
325        let workers = Arc::new(DashMap::new());
326        PENDING_WORKERS.set(workers.len() as i64);
327
328        Some(Arc::new(Self {
329            workers,
330            flush_interval,
331            max_batch_rows,
332            partition_manager,
333            node_manager,
334            catalog_manager,
335            prom_store_with_metric_engine,
336            schema_alterer,
337            flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
338            inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
339            worker_channel_capacity,
340            pending_rows_batch_sync,
341            shutdown,
342        }))
343    }
344
345    pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
346        let (table_batches, total_rows) = {
347            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
348                .with_label_values(&["submit_build_and_align"])
349                .start_timer();
350            self.build_and_align_table_batches(requests, &ctx).await?
351        };
352        if total_rows == 0 {
353            return Ok(0);
354        }
355
356        let permit = {
357            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
358                .with_label_values(&["submit_acquire_inflight_permit"])
359                .start_timer();
360            self.inflight_semaphore
361                .clone()
362                .acquire_owned()
363                .await
364                .map_err(|_| error::BatcherChannelClosedSnafu.build())?
365        };
366
367        let (response_tx, response_rx) = oneshot::channel();
368
369        let batch_key = batch_key_from_ctx(&ctx);
370        let mut cmd = Some(WorkerCommand::Submit {
371            table_batches,
372            total_rows,
373            ctx,
374            response_tx,
375            _permit: permit,
376        });
377
378        {
379            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
380                .with_label_values(&["submit_send_to_worker"])
381                .start_timer();
382
383            for _ in 0..2 {
384                let worker = self.get_or_spawn_worker(batch_key.clone());
385                let Some(worker_cmd) = cmd.take() else {
386                    break;
387                };
388
389                match worker.tx.send(worker_cmd).await {
390                    Ok(()) => break,
391                    Err(err) => {
392                        cmd = Some(err.0);
393                        remove_worker_if_same_channel(
394                            self.workers.as_ref(),
395                            &batch_key,
396                            &worker.tx,
397                        );
398                    }
399                }
400            }
401
402            if cmd.is_some() {
403                return Err(Error::BatcherChannelClosed);
404            }
405        }
406
407        if self.pending_rows_batch_sync {
408            let result = {
409                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
410                    .with_label_values(&["submit_wait_flush_result"])
411                    .start_timer();
412                response_rx
413                    .await
414                    .map_err(|_| error::BatcherChannelClosedSnafu.build())?
415            };
416            result
417                .context(error::SubmitBatchSnafu)
418                .map(|()| total_rows as u64)
419        } else {
420            Ok(total_rows as u64)
421        }
422    }
423
424    /// Converts proto `RowInsertRequests` directly into aligned `RecordBatch`es
425    /// in a single pass, handling table creation, schema alteration, column
426    /// renaming, reordering, and null-filling without building intermediate
427    /// RecordBatches.
428    async fn build_and_align_table_batches(
429        &self,
430        requests: RowInsertRequests,
431        ctx: &QueryContextRef,
432    ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
433        let catalog = ctx.current_catalog().to_string();
434        let schema = ctx.current_schema();
435
436        let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
437        if total_rows == 0 {
438            return Ok((Vec::new(), 0));
439        }
440
441        let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
442        let mut plan = self
443            .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
444            .await?;
445
446        self.create_missing_tables_and_refresh_schemas(
447            &catalog,
448            &schema,
449            ctx,
450            &table_rows,
451            &mut plan,
452        )
453        .await?;
454
455        self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
456            .await?;
457
458        let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
459
460        Ok((aligned_batches, total_rows))
461    }
462
463    /// Extracts non-empty `(table_name, rows)` pairs and computes total row
464    /// count across the retained entries.
465    fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
466        let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
467        let mut total_rows = 0;
468
469        for request in requests.inserts {
470            let Some(rows) = request.rows else {
471                continue;
472            };
473            if rows.rows.is_empty() {
474                continue;
475            }
476
477            total_rows += rows.rows.len();
478            table_rows.push((request.table_name, rows));
479        }
480
481        (table_rows, total_rows)
482    }
483
484    /// Returns unique `(table_name, proto_schema)` pairs while keeping the
485    /// first-seen schema for duplicate table names.
486    fn collect_unique_table_schemas(
487        table_rows: &[(String, Rows)],
488    ) -> Result<Vec<(&str, &[ColumnSchema])>> {
489        let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
490        let mut seen = HashSet::new();
491
492        for (table_name, rows) in table_rows {
493            if seen.insert(table_name.as_str()) {
494                unique_tables.push((table_name.as_str(), &rows.schema));
495            } else {
496                // table_rows should group rows by table name.
497                return error::InvalidPromRemoteRequestSnafu {
498                    msg: format!(
499                        "Found duplicated table name in RowInsertRequest: {}",
500                        table_name
501                    ),
502                }
503                .fail();
504            }
505        }
506
507        Ok(unique_tables)
508    }
509
510    /// Resolves table metadata and classifies each table into existing,
511    /// to-create, and to-alter groups used by subsequent DDL steps.
512    async fn plan_table_resolution(
513        &self,
514        catalog: &str,
515        schema: &str,
516        ctx: &QueryContextRef,
517        unique_tables: &[(&str, &[ColumnSchema])],
518    ) -> Result<TableResolutionPlan> {
519        let mut plan = TableResolutionPlan {
520            region_schemas: HashMap::with_capacity(unique_tables.len()),
521            tables_to_create: Vec::new(),
522            tables_to_alter: Vec::new(),
523        };
524
525        let resolved_tables = {
526            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
527                .with_label_values(&["align_resolve_table"])
528                .start_timer();
529            futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
530                self.catalog_manager
531                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
532            }))
533            .await
534        };
535
536        for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
537            let table = table_result?;
538
539            if let Some(table) = table {
540                let table_info = table.table_info();
541                let table_id = table_info.ident.table_id;
542                let region_schema = table_info.meta.schema.arrow_schema().clone();
543
544                let missing_columns = {
545                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
546                        .with_label_values(&["align_identify_missing_columns"])
547                        .start_timer();
548                    identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
549                };
550                if !missing_columns.is_empty() {
551                    plan.tables_to_alter
552                        .push(((*table_name).to_string(), missing_columns));
553                }
554                plan.region_schemas
555                    .insert((*table_name).to_string(), (region_schema, table_id));
556            } else {
557                let request_schema = {
558                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
559                        .with_label_values(&["align_build_create_table_schema"])
560                        .start_timer();
561                    build_prom_create_table_schema_from_proto(rows_schema)?
562                };
563                plan.tables_to_create
564                    .push(((*table_name).to_string(), request_schema));
565            }
566        }
567
568        Ok(plan)
569    }
570
571    /// Batch-creates missing tables, refreshes their schema metadata, and
572    /// enqueues follow-up alters for extra tag columns discovered in later rows.
573    async fn create_missing_tables_and_refresh_schemas(
574        &self,
575        catalog: &str,
576        schema: &str,
577        ctx: &QueryContextRef,
578        table_rows: &[(String, Rows)],
579        plan: &mut TableResolutionPlan,
580    ) -> Result<()> {
581        if plan.tables_to_create.is_empty() {
582            return Ok(());
583        }
584
585        let create_refs: Vec<(&str, &[ColumnSchema])> = plan
586            .tables_to_create
587            .iter()
588            .map(|(name, schema)| (name.as_str(), schema.as_slice()))
589            .collect();
590
591        {
592            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
593                .with_label_values(&["align_batch_create_tables"])
594                .start_timer();
595            self.schema_alterer
596                .create_tables_if_missing_batch(
597                    catalog,
598                    schema,
599                    &create_refs,
600                    self.prom_store_with_metric_engine,
601                    ctx.clone(),
602                )
603                .await?;
604        }
605
606        let created_table_results = {
607            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
608                .with_label_values(&["align_resolve_table_after_create"])
609                .start_timer();
610            futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
611                self.catalog_manager
612                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
613            }))
614            .await
615        };
616
617        for ((table_name, _), table_result) in
618            plan.tables_to_create.iter().zip(created_table_results)
619        {
620            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
621                reason: format!(
622                    "Table not found after pending batch create attempt: {}",
623                    table_name
624                ),
625            })?;
626            let table_info = table.table_info();
627            let table_id = table_info.ident.table_id;
628            let region_schema = table_info.meta.schema.arrow_schema().clone();
629            plan.region_schemas
630                .insert(table_name.clone(), (region_schema, table_id));
631        }
632
633        Self::enqueue_alter_for_new_tables(table_rows, plan)?;
634
635        Ok(())
636    }
637
638    /// For newly created tables, re-checks all row schemas and appends alter
639    /// operations when additional tag columns are still missing.
640    fn enqueue_alter_for_new_tables(
641        table_rows: &[(String, Rows)],
642        plan: &mut TableResolutionPlan,
643    ) -> Result<()> {
644        let created_tables: HashSet<&str> = plan
645            .tables_to_create
646            .iter()
647            .map(|(table_name, _)| table_name.as_str())
648            .collect();
649
650        for (table_name, rows) in table_rows {
651            if !created_tables.contains(table_name.as_str()) {
652                continue;
653            }
654
655            let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
656                continue;
657            };
658
659            let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
660            if missing_columns.is_empty()
661                || plan
662                    .tables_to_alter
663                    .iter()
664                    .any(|(existing_name, _)| existing_name == table_name)
665            {
666                continue;
667            }
668
669            plan.tables_to_alter
670                .push((table_name.clone(), missing_columns));
671        }
672
673        Ok(())
674    }
675
676    /// Batch-alters tables that have missing tag columns and refreshes the
677    /// in-memory schema map used for row alignment.
678    async fn alter_tables_and_refresh_schemas(
679        &self,
680        catalog: &str,
681        schema: &str,
682        ctx: &QueryContextRef,
683        plan: &mut TableResolutionPlan,
684    ) -> Result<()> {
685        if plan.tables_to_alter.is_empty() {
686            return Ok(());
687        }
688
689        let alter_refs: Vec<(&str, &[String])> = plan
690            .tables_to_alter
691            .iter()
692            .map(|(name, cols)| (name.as_str(), cols.as_slice()))
693            .collect();
694        {
695            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
696                .with_label_values(&["align_batch_add_missing_columns"])
697                .start_timer();
698            self.schema_alterer
699                .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
700                .await?;
701        }
702
703        let altered_table_results = {
704            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
705                .with_label_values(&["align_resolve_table_after_schema_alter"])
706                .start_timer();
707            futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
708                self.catalog_manager
709                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
710            }))
711            .await
712        };
713
714        for ((table_name, _), table_result) in
715            plan.tables_to_alter.iter().zip(altered_table_results)
716        {
717            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
718                reason: format!(
719                    "Table not found after pending batch schema alter: {}",
720                    table_name
721                ),
722            })?;
723            let table_info = table.table_info();
724            let table_id = table_info.ident.table_id;
725            let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
726            plan.region_schemas
727                .insert(table_name.clone(), (refreshed_region_schema, table_id));
728        }
729
730        Ok(())
731    }
732
733    /// Converts proto rows to `RecordBatch` values aligned to resolved region
734    /// schemas and returns `(table_name, table_id, batch)` tuples.
735    fn build_aligned_batches(
736        table_rows: &[(String, Rows)],
737        region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
738    ) -> Result<Vec<(String, u32, RecordBatch)>> {
739        let mut aligned_batches = Vec::with_capacity(table_rows.len());
740        for (table_name, rows) in table_rows {
741            let (region_schema, table_id) =
742                region_schemas.get(table_name).cloned().with_context(|| {
743                    error::UnexpectedResultSnafu {
744                        reason: format!("Region schema not resolved for table: {}", table_name),
745                    }
746                })?;
747
748            let record_batch = {
749                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
750                    .with_label_values(&["align_rows_to_record_batch"])
751                    .start_timer();
752                rows_to_aligned_record_batch(rows, region_schema.as_ref())?
753            };
754            aligned_batches.push((table_name.clone(), table_id, record_batch));
755        }
756
757        Ok(aligned_batches)
758    }
759
760    fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
761        if let Some(worker) = self.workers.get(&key)
762            && !worker.tx.is_closed()
763        {
764            return worker.clone();
765        }
766
767        let entry = self.workers.entry(key.clone());
768        match entry {
769            Entry::Occupied(mut worker) => {
770                if worker.get().tx.is_closed() {
771                    let new_worker = self.spawn_worker(key);
772                    worker.insert(new_worker.clone());
773                    PENDING_WORKERS.set(self.workers.len() as i64);
774                    new_worker
775                } else {
776                    worker.get().clone()
777                }
778            }
779            Entry::Vacant(vacant) => {
780                let worker = self.spawn_worker(key);
781
782                vacant.insert(worker.clone());
783                PENDING_WORKERS.set(self.workers.len() as i64);
784                worker
785            }
786        }
787    }
788
789    fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
790        let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
791        let worker = PendingWorker { tx: tx.clone() };
792        let worker_idle_timeout = self
793            .flush_interval
794            .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
795            .unwrap_or(self.flush_interval);
796
797        start_worker(
798            key,
799            worker.tx.clone(),
800            self.workers.clone(),
801            rx,
802            self.shutdown.clone(),
803            self.partition_manager.clone(),
804            self.node_manager.clone(),
805            self.catalog_manager.clone(),
806            self.flush_interval,
807            worker_idle_timeout,
808            self.max_batch_rows,
809            self.flush_semaphore.clone(),
810        );
811
812        worker
813    }
814}
815
816impl Drop for PendingRowsBatcher {
817    fn drop(&mut self) {
818        let _ = self.shutdown.send(());
819    }
820}
821
822impl PendingBatch {
823    fn new(ctx: QueryContextRef) -> Self {
824        Self {
825            tables: HashMap::new(),
826            created_at: Instant::now(),
827            total_row_count: 0,
828            ctx,
829            waiters: Vec::new(),
830        }
831    }
832}
833
834#[allow(clippy::too_many_arguments)]
835fn start_worker(
836    key: BatchKey,
837    worker_tx: mpsc::Sender<WorkerCommand>,
838    workers: Arc<DashMap<BatchKey, PendingWorker>>,
839    mut rx: mpsc::Receiver<WorkerCommand>,
840    shutdown: broadcast::Sender<()>,
841    partition_manager: PartitionRuleManagerRef,
842    node_manager: NodeManagerRef,
843    catalog_manager: CatalogManagerRef,
844    flush_interval: Duration,
845    worker_idle_timeout: Duration,
846    max_batch_rows: usize,
847    flush_semaphore: Arc<Semaphore>,
848) {
849    tokio::spawn(async move {
850        let mut batch = None;
851        let mut interval = tokio::time::interval(flush_interval);
852        let mut shutdown_rx = shutdown.subscribe();
853        let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
854        let idle_timer = tokio::time::sleep_until(idle_deadline);
855        tokio::pin!(idle_timer);
856
857        loop {
858            tokio::select! {
859                cmd = rx.recv() => {
860                    match cmd {
861                        Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
862                            idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
863
864                            let pending_batch = batch.get_or_insert_with(||{
865                                PENDING_BATCHES.inc();
866                                PendingBatch::new(ctx)
867                            });
868
869                            pending_batch.waiters.push(FlushWaiter { response_tx, _permit });
870
871                            for (table_name, table_id, record_batch) in table_batches {
872                                let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
873                                    table_name,
874                                    table_id,
875                                    batches: Vec::new(),
876                                    row_count: 0,
877                                });
878                                entry.row_count += record_batch.num_rows();
879                                entry.batches.push(record_batch);
880                            }
881
882                            pending_batch.total_row_count += total_rows;
883                            PENDING_ROWS.add(total_rows as i64);
884
885                            if pending_batch.total_row_count >= max_batch_rows
886                                && let Some(flush) = drain_batch(&mut batch) {
887                                    spawn_flush(
888                                        flush,
889                                        partition_manager.clone(),
890                                        node_manager.clone(),
891                                        catalog_manager.clone(),
892                                        flush_semaphore.clone(),
893                                    ).await;
894                            }
895                        }
896                        None => {
897                            if let Some(flush) = drain_batch(&mut batch) {
898                                flush_batch(
899                                    flush,
900                                    partition_manager.clone(),
901                                    node_manager.clone(),
902                                    catalog_manager.clone(),
903                                ).await;
904                            }
905                            break;
906                        }
907                    }
908                }
909                _ = &mut idle_timer => {
910                    if !should_close_worker_on_idle_timeout(
911                        batch.as_ref().map_or(0, |batch| batch.total_row_count),
912                        rx.len(),
913                    ) {
914                        idle_timer
915                            .as_mut()
916                            .reset(tokio::time::Instant::now() + worker_idle_timeout);
917                        continue;
918                    }
919
920                    debug!(
921                        "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
922                        key.catalog,
923                        key.schema,
924                        key.physical_table
925                    );
926                    break;
927                }
928                _ = interval.tick() => {
929                    if batch
930                        .as_ref()
931                        .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval)
932                        && let Some(flush) = drain_batch(&mut batch) {
933                            spawn_flush(
934                                flush,
935                                partition_manager.clone(),
936                                node_manager.clone(),
937                                catalog_manager.clone(),
938                                flush_semaphore.clone(),
939                            ).await;
940                    }
941                }
942                _ = shutdown_rx.recv() => {
943                    if let Some(flush) = drain_batch(&mut batch) {
944                        flush_batch(
945                            flush,
946                            partition_manager.clone(),
947                            node_manager.clone(),
948                            catalog_manager.clone(),
949                        ).await;
950                    }
951                    break;
952                }
953            }
954        }
955
956        remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
957    });
958}
959
960fn remove_worker_if_same_channel(
961    workers: &DashMap<BatchKey, PendingWorker>,
962    key: &BatchKey,
963    worker_tx: &mpsc::Sender<WorkerCommand>,
964) -> bool {
965    if let Some(worker) = workers.get(key)
966        && worker.tx.same_channel(worker_tx)
967    {
968        drop(worker);
969        workers.remove(key);
970        PENDING_WORKERS.set(workers.len() as i64);
971        return true;
972    }
973
974    false
975}
976
977fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
978    total_row_count == 0 && queued_requests == 0
979}
980
981fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
982    let batch = batch.take()?;
983    let total_row_count = batch.total_row_count;
984
985    if total_row_count == 0 {
986        return None;
987    }
988
989    let table_batches = batch.tables.into_values().collect();
990    let waiters = batch.waiters;
991
992    PENDING_ROWS.sub(total_row_count as i64);
993    PENDING_BATCHES.dec();
994
995    Some(FlushBatch {
996        table_batches,
997        total_row_count,
998        ctx: batch.ctx,
999        waiters,
1000    })
1001}
1002
1003async fn spawn_flush(
1004    flush: FlushBatch,
1005    partition_manager: PartitionRuleManagerRef,
1006    node_manager: NodeManagerRef,
1007    catalog_manager: CatalogManagerRef,
1008    semaphore: Arc<Semaphore>,
1009) {
1010    match semaphore.acquire_owned().await {
1011        Ok(permit) => {
1012            tokio::spawn(async move {
1013                let _permit = permit;
1014                flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1015            });
1016        }
1017        Err(err) => {
1018            warn!(err; "Flush semaphore closed, flushing inline");
1019            flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1020        }
1021    }
1022}
1023
1024struct FlushRegionWrite {
1025    datanode: Peer,
1026    request: RegionRequest,
1027}
1028
1029struct PlannedRegionBatch {
1030    region_id: RegionId,
1031    batch: RecordBatch,
1032}
1033
1034#[cfg(test)]
1035impl PlannedRegionBatch {
1036    fn num_rows(&self) -> usize {
1037        self.batch.num_rows()
1038    }
1039}
1040
1041struct ResolvedRegionBatch {
1042    planned: PlannedRegionBatch,
1043    datanode: Peer,
1044}
1045
1046fn should_dispatch_concurrently(region_write_count: usize) -> bool {
1047    region_write_count > 1
1048}
1049
1050/// Classifies columns in a logical-table batch for sparse primary-key conversion.
1051///
1052/// Returns:
1053/// - `Vec<TagColumnInfo>`: all Utf8 tag columns sorted by tag name, used for
1054///   TSID and sparse primary-key encoding.
1055/// - `SmallVec<[usize; 3]>`: indices of columns copied into the physical batch
1056///   after `__primary_key`, ordered as `[greptime_timestamp, greptime_value,
1057///   partition_tag_columns...]`.
1058fn columns_taxonomy(
1059    batch_schema: &Arc<ArrowSchema>,
1060    table_name: &str,
1061    name_to_ids: &HashMap<String, u32>,
1062    partition_columns: &HashSet<&str>,
1063) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
1064    let mut tag_columns = Vec::new();
1065    let mut essential_column_indices =
1066        SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
1067    // Placeholder for greptime_timestamp and greptime_value
1068    essential_column_indices.push(0);
1069    essential_column_indices.push(0);
1070
1071    let mut timestamp_index = None;
1072    let mut value_index = None;
1073
1074    for (index, field) in batch_schema.fields().iter().enumerate() {
1075        match field.data_type() {
1076            ArrowDataType::Utf8 => {
1077                let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
1078                    error::InvalidPromRemoteRequestSnafu {
1079                        msg: format!(
1080                            "Column '{}' from logical table '{}' not found in physical table column IDs",
1081                            field.name(),
1082                            table_name
1083                        ),
1084                    }
1085                })?;
1086                tag_columns.push(TagColumnInfo {
1087                    name: field.name().clone(),
1088                    index,
1089                    column_id,
1090                });
1091
1092                if partition_columns.contains(field.name().as_str()) {
1093                    essential_column_indices.push(index);
1094                }
1095            }
1096            ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
1097                ensure!(
1098                    timestamp_index.replace(index).is_none(),
1099                    error::InvalidPromRemoteRequestSnafu {
1100                        msg: format!(
1101                            "Duplicated timestamp column in logical table '{}' batch schema",
1102                            table_name
1103                        ),
1104                    }
1105                );
1106            }
1107            ArrowDataType::Float64 => {
1108                ensure!(
1109                    value_index.replace(index).is_none(),
1110                    error::InvalidPromRemoteRequestSnafu {
1111                        msg: format!(
1112                            "Duplicated value column in logical table '{}' batch schema",
1113                            table_name
1114                        ),
1115                    }
1116                );
1117            }
1118            datatype => {
1119                return error::InvalidPromRemoteRequestSnafu {
1120                    msg: format!(
1121                        "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1122                        table_name
1123                    ),
1124                }
1125                .fail();
1126            }
1127        }
1128    }
1129
1130    let timestamp_index =
1131        timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1132            msg: format!(
1133                "Missing essential column '{}' in logical table '{}' batch schema",
1134                greptime_timestamp(),
1135                table_name
1136            ),
1137        })?;
1138    let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1139        msg: format!(
1140            "Missing essential column '{}' in logical table '{}' batch schema",
1141            greptime_value(),
1142            table_name
1143        ),
1144    })?;
1145
1146    tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1147
1148    essential_column_indices[0] = timestamp_index;
1149    essential_column_indices[1] = value_index;
1150
1151    Ok((tag_columns, essential_column_indices))
1152}
1153
1154fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1155    ensure!(
1156        batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1157        error::InternalSnafu {
1158            err_msg: format!(
1159                "Expected at least {} columns in physical batch, got {}",
1160                PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1161                batch.num_columns()
1162            ),
1163        }
1164    );
1165    let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1166    batch.project(&essential_indices).context(error::ArrowSnafu)
1167}
1168
1169async fn flush_region_writes_concurrently(
1170    node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1171    writes: Vec<FlushRegionWrite>,
1172) -> Result<()> {
1173    if !should_dispatch_concurrently(writes.len()) {
1174        for write in writes {
1175            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1176                .with_label_values(&["flush_write_region"])
1177                .start_timer();
1178            node_manager.handle(&write.datanode, write.request).await?;
1179        }
1180        return Ok(());
1181    }
1182
1183    let write_futures = writes.into_iter().map(|write| async move {
1184        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1185            .with_label_values(&["flush_write_region"])
1186            .start_timer();
1187
1188        node_manager.handle(&write.datanode, write.request).await?;
1189        Ok::<_, Error>(())
1190    });
1191
1192    // todo(hl): should be bounded.
1193    futures::future::try_join_all(write_futures).await?;
1194    Ok(())
1195}
1196
1197async fn flush_batch(
1198    flush: FlushBatch,
1199    partition_manager: PartitionRuleManagerRef,
1200    node_manager: NodeManagerRef,
1201    catalog_manager: CatalogManagerRef,
1202) {
1203    let FlushBatch {
1204        table_batches,
1205        total_row_count,
1206        ctx,
1207        waiters,
1208    } = flush;
1209    let start = Instant::now();
1210
1211    // Physical-table-level flush: transform all logical table batches
1212    // into physical format and write them together.
1213    let physical_table_name = ctx
1214        .extension(PHYSICAL_TABLE_KEY)
1215        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1216        .to_string();
1217    let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager };
1218    let node_requester = NodeManagerPhysicalFlushAdapter { node_manager };
1219    let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager };
1220    let result = flush_batch_physical(
1221        &table_batches,
1222        &physical_table_name,
1223        &ctx,
1224        &partition_provider,
1225        &node_requester,
1226        &catalog_provider,
1227    )
1228    .await;
1229
1230    let elapsed = start.elapsed().as_secs_f64();
1231    FLUSH_ELAPSED.observe(elapsed);
1232
1233    if result.is_err() {
1234        FLUSH_FAILURES.inc();
1235        FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
1236    } else {
1237        FLUSH_TOTAL.inc();
1238        FLUSH_ROWS.observe(total_row_count as f64);
1239    }
1240
1241    debug!(
1242        "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1243        total_row_count, elapsed
1244    );
1245
1246    notify_waiters(waiters, result);
1247}
1248
1249/// Flushes a batch of logical table rows by transforming them into the physical table format
1250/// and writing them to the appropriate datanode regions.
1251///
1252/// This function performs the end-to-end physical flush pipeline:
1253/// 1. Resolves the physical table metadata and column ID mapping.
1254/// 2. Fetches the physical table's partition rule.
1255/// 3. Transforms each logical table batch into the physical (sparse primary key) format.
1256/// 4. Concatenates all transformed batches into a single combined batch.
1257/// 5. Splits the combined batch by partition rule and sends region write requests
1258///    concurrently to the target datanodes.
1259pub async fn flush_batch_physical(
1260    table_batches: &[TableBatch],
1261    physical_table_name: &str,
1262    ctx: &QueryContextRef,
1263    partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1264    node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1265    catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
1266) -> Result<()> {
1267    // 1. Resolve the physical table and get column ID mapping
1268    let physical_table = {
1269        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1270            .with_label_values(&["flush_physical_resolve_table"])
1271            .start_timer();
1272        catalog_manager
1273            .physical_table(
1274                ctx.current_catalog(),
1275                &ctx.current_schema(),
1276                physical_table_name,
1277                ctx.as_ref(),
1278            )
1279            .await?
1280            .with_context(|| error::InternalSnafu {
1281                err_msg: format!(
1282                    "Physical table '{}' not found during pending flush",
1283                    physical_table_name
1284                ),
1285            })?
1286    };
1287
1288    let physical_table_info = physical_table.table_info;
1289    let name_to_ids = physical_table
1290        .col_name_to_ids
1291        .with_context(|| error::InternalSnafu {
1292            err_msg: format!(
1293                "Physical table '{}' has no column IDs for pending flush",
1294                physical_table_name
1295            ),
1296        })?;
1297
1298    // 2. Get the physical table's partition rule (one lookup instead of N)
1299    let partition_rule = {
1300        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1301            .with_label_values(&["flush_physical_fetch_partition_rule"])
1302            .start_timer();
1303        partition_manager
1304            .find_table_partition_rule(physical_table_info.as_ref())
1305            .await?
1306    };
1307    let partition_columns = partition_rule.partition_columns();
1308    let partition_columns_set: HashSet<&str> =
1309        partition_columns.iter().map(String::as_str).collect();
1310
1311    // 3. Transform each logical table batch into physical format
1312    let modified_batches =
1313        transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?;
1314
1315    // 4. Concatenate all modified batches (all share the same physical schema)
1316    let combined_batch = concat_modified_batches(&modified_batches)?;
1317
1318    // 5. Split by physical partition rule and send to regions
1319    let physical_table_id = physical_table_info.table_id();
1320    let planned_batches = plan_region_batches(
1321        combined_batch,
1322        physical_table_id,
1323        partition_rule.as_ref(),
1324        partition_columns,
1325    )?;
1326
1327    let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?;
1328    let region_writes = encode_region_write_requests(resolved_batches)?;
1329    flush_region_writes_concurrently(node_manager, region_writes).await
1330}
1331
1332/// Transforms logical table batches into physical format (sparse primary key encoding).
1333///
1334/// It identifies tag columns and essential columns (timestamp, value) for each logical batch
1335/// and applies sparse primary key modification.
1336fn transform_logical_batches_to_physical(
1337    table_batches: &[TableBatch],
1338    name_to_ids: &HashMap<String, u32>,
1339    partition_columns_set: &HashSet<&str>,
1340) -> Result<Vec<RecordBatch>> {
1341    let mut modified_batches: Vec<RecordBatch> =
1342        Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum());
1343
1344    let mut modify_elapsed = Duration::ZERO;
1345    let mut columns_taxonomy_elapsed = Duration::ZERO;
1346
1347    for table_batch in table_batches {
1348        let table_id = table_batch.table_id;
1349
1350        for batch in &table_batch.batches {
1351            let batch_schema = batch.schema();
1352            let start = Instant::now();
1353            let (tag_columns, essential_col_indices) = columns_taxonomy(
1354                &batch_schema,
1355                &table_batch.table_name,
1356                name_to_ids,
1357                partition_columns_set,
1358            )?;
1359
1360            columns_taxonomy_elapsed += start.elapsed();
1361            if tag_columns.is_empty() && essential_col_indices.is_empty() {
1362                continue;
1363            }
1364
1365            let modified = {
1366                let start = Instant::now();
1367                let batch = modify_batch_sparse(
1368                    batch.clone(),
1369                    table_id,
1370                    &tag_columns,
1371                    &essential_col_indices,
1372                )?;
1373                modify_elapsed += start.elapsed();
1374                batch
1375            };
1376
1377            modified_batches.push(modified);
1378        }
1379    }
1380
1381    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1382        .with_label_values(&["flush_physical_modify_batch"])
1383        .observe(modify_elapsed.as_secs_f64());
1384    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1385        .with_label_values(&["flush_physical_columns_taxonomy"])
1386        .observe(columns_taxonomy_elapsed.as_secs_f64());
1387
1388    ensure!(
1389        !modified_batches.is_empty(),
1390        error::InternalSnafu {
1391            err_msg: "No batches can be transformed during pending flush",
1392        }
1393    );
1394    Ok(modified_batches)
1395}
1396
1397/// Concatenates all modified batches into a single large batch.
1398///
1399/// All modified batches share the same physical schema.
1400fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result<RecordBatch> {
1401    let combined_schema = modified_batches[0].schema();
1402    let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1403        .with_label_values(&["flush_physical_concat_all"])
1404        .start_timer();
1405    concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu)
1406}
1407
1408fn split_combined_batch_by_region(
1409    combined_batch: &RecordBatch,
1410    partition_rule: &dyn partition::partition::PartitionRule,
1411) -> Result<HashMap<u32, partition::partition::RegionMask>> {
1412    let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1413        .with_label_values(&["flush_physical_split_record_batch"])
1414        .start_timer();
1415    let map = partition_rule.split_record_batch(combined_batch)?;
1416    Ok(map)
1417}
1418
1419fn prepare_physical_region_routing_batch(
1420    combined_batch: RecordBatch,
1421    partition_columns: &[String],
1422) -> Result<RecordBatch> {
1423    if partition_columns.is_empty() {
1424        return Ok(combined_batch);
1425    }
1426    strip_partition_columns_from_batch(combined_batch)
1427}
1428
1429fn plan_region_batch(
1430    stripped_batch: &RecordBatch,
1431    physical_table_id: TableId,
1432    region_number: u32,
1433    mask: &partition::partition::RegionMask,
1434) -> Result<Option<PlannedRegionBatch>> {
1435    if mask.select_none() {
1436        return Ok(None);
1437    }
1438
1439    let region_batch = if mask.select_all() {
1440        stripped_batch.clone()
1441    } else {
1442        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1443            .with_label_values(&["flush_physical_filter_record_batch"])
1444            .start_timer();
1445        filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)?
1446    };
1447
1448    let row_count = region_batch.num_rows();
1449    if row_count == 0 {
1450        return Ok(None);
1451    }
1452
1453    Ok(Some(PlannedRegionBatch {
1454        region_id: RegionId::new(physical_table_id, region_number),
1455        batch: region_batch,
1456    }))
1457}
1458
1459fn plan_region_batches(
1460    combined_batch: RecordBatch,
1461    physical_table_id: TableId,
1462    partition_rule: &dyn partition::partition::PartitionRule,
1463    partition_columns: &[String],
1464) -> Result<Vec<PlannedRegionBatch>> {
1465    let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?;
1466    let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?;
1467
1468    let mut planned_batches = Vec::new();
1469    for (region_number, mask) in region_masks {
1470        if let Some(planned_batch) =
1471            plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)?
1472        {
1473            planned_batches.push(planned_batch);
1474        }
1475    }
1476
1477    Ok(planned_batches)
1478}
1479
1480async fn resolve_region_targets(
1481    planned_batches: Vec<PlannedRegionBatch>,
1482    partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1483) -> Result<Vec<ResolvedRegionBatch>> {
1484    let mut resolved_batches = Vec::with_capacity(planned_batches.len());
1485    for planned in planned_batches {
1486        let datanode = {
1487            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1488                .with_label_values(&["flush_physical_resolve_region_leader"])
1489                .start_timer();
1490            partition_manager
1491                .find_region_leader(planned.region_id)
1492                .await?
1493        };
1494
1495        resolved_batches.push(ResolvedRegionBatch { planned, datanode });
1496    }
1497
1498    Ok(resolved_batches)
1499}
1500
1501fn encode_region_write_requests(
1502    resolved_batches: Vec<ResolvedRegionBatch>,
1503) -> Result<Vec<FlushRegionWrite>> {
1504    let mut region_writes = Vec::with_capacity(resolved_batches.len());
1505    for resolved in resolved_batches {
1506        let region_id = resolved.planned.region_id;
1507        let (schema_bytes, data_header, payload) = {
1508            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1509                .with_label_values(&["flush_physical_encode_ipc"])
1510                .start_timer();
1511            record_batch_to_ipc(resolved.planned.batch)?
1512        };
1513
1514        let request = RegionRequest {
1515            header: Some(RegionRequestHeader {
1516                tracing_context: TracingContext::from_current_span().to_w3c(),
1517                ..Default::default()
1518            }),
1519            body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1520                region_id: region_id.as_u64(),
1521                partition_expr_version: None,
1522                body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1523                    schema: schema_bytes,
1524                    data_header,
1525                    payload,
1526                })),
1527            })),
1528        };
1529
1530        region_writes.push(FlushRegionWrite {
1531            datanode: resolved.datanode,
1532            request,
1533        });
1534    }
1535
1536    Ok(region_writes)
1537}
1538
1539fn notify_waiters(waiters: Vec<FlushWaiter>, result: Result<()>) {
1540    let shared_result = result.map_err(Arc::new);
1541    for waiter in waiters {
1542        let _ = waiter.response_tx.send(match &shared_result {
1543            Ok(()) => Ok(()),
1544            Err(error) => Err(Arc::clone(error)),
1545        });
1546        // waiter._permit is dropped here, releasing the inflight semaphore slot
1547    }
1548}
1549
1550fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1551    let mut encoder = FlightEncoder::default();
1552    let schema = encoder.encode_schema(record_batch.schema().as_ref());
1553    let mut iter = encoder
1554        .encode(FlightMessage::RecordBatch(record_batch))
1555        .into_iter();
1556    let Some(flight_data) = iter.next() else {
1557        return Err(Error::Internal {
1558            err_msg: "Failed to encode empty flight data".to_string(),
1559        });
1560    };
1561    if iter.next().is_some() {
1562        return Err(Error::NotSupported {
1563            feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1564        });
1565    }
1566
1567    Ok((
1568        schema.data_header,
1569        flight_data.data_header,
1570        flight_data.data_body,
1571    ))
1572}
1573
1574#[cfg(test)]
1575mod tests {
1576    use std::collections::{HashMap, HashSet};
1577    use std::sync::Arc;
1578    use std::sync::atomic::{AtomicUsize, Ordering};
1579    use std::time::{Duration, Instant};
1580
1581    use api::region::RegionResponse;
1582    use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1583    use api::v1::meta::Peer;
1584    use api::v1::region::{InsertRequests, RegionRequest, region_request};
1585    use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1586    use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray};
1587    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1588    use arrow::record_batch::RecordBatch;
1589    use async_trait::async_trait;
1590    use catalog::error::Result as CatalogResult;
1591    use common_meta::error::Result as MetaResult;
1592    use common_meta::node_manager::{
1593        Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1594    };
1595    use common_query::request::QueryRequest;
1596    use common_recordbatch::SendableRecordBatchStream;
1597    use dashmap::DashMap;
1598    use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
1599    use partition::error::Result as PartitionResult;
1600    use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
1601    use smallvec::SmallVec;
1602    use snafu::ResultExt;
1603    use store_api::storage::RegionId;
1604    use table::metadata::TableId;
1605    use table::test_util::table_info::test_table_info;
1606    use tokio::sync::{Semaphore, mpsc, oneshot};
1607    use tokio::time::sleep;
1608
1609    use super::{
1610        BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher,
1611        PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester,
1612        PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch,
1613        ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch,
1614        encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently,
1615        plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1616        should_dispatch_concurrently, strip_partition_columns_from_batch,
1617        transform_logical_batches_to_physical,
1618    };
1619    use crate::error;
1620
1621    fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1622        Rows {
1623            schema: vec![ColumnSchema {
1624                column_name: schema_name.to_string(),
1625                ..Default::default()
1626            }],
1627            rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1628        }
1629    }
1630
1631    fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch {
1632        let schema = Arc::new(ArrowSchema::new(vec![
1633            Field::new(
1634                "greptime_timestamp",
1635                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1636                false,
1637            ),
1638            Field::new("greptime_value", ArrowDataType::Float64, true),
1639            Field::new(tag_name, ArrowDataType::Utf8, true),
1640        ]));
1641
1642        RecordBatch::try_new(
1643            schema,
1644            vec![
1645                Arc::new(TimestampMillisecondArray::from(vec![ts])),
1646                Arc::new(arrow::array::Float64Array::from(vec![val])),
1647                Arc::new(StringArray::from(vec![tag_value])),
1648            ],
1649        )
1650        .unwrap()
1651    }
1652
1653    fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata {
1654        let schema = Arc::new(
1655            DtSchema::try_new(vec![
1656                DtColumnSchema::new(
1657                    "__primary_key",
1658                    datatypes::prelude::ConcreteDataType::binary_datatype(),
1659                    false,
1660                ),
1661                DtColumnSchema::new(
1662                    "greptime_timestamp",
1663                    datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(),
1664                    false,
1665                ),
1666                DtColumnSchema::new(
1667                    "greptime_value",
1668                    datatypes::prelude::ConcreteDataType::float64_datatype(),
1669                    true,
1670                ),
1671                DtColumnSchema::new(
1672                    "tag1",
1673                    datatypes::prelude::ConcreteDataType::string_datatype(),
1674                    true,
1675                ),
1676            ])
1677            .unwrap(),
1678        );
1679        let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema);
1680        table_info.meta.column_ids = vec![0, 1, 2, 3];
1681
1682        PhysicalTableMetadata {
1683            table_info: Arc::new(table_info),
1684            col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])),
1685        }
1686    }
1687
1688    struct MockFlushCatalogProvider {
1689        table: Option<PhysicalTableMetadata>,
1690    }
1691
1692    #[async_trait]
1693    impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider {
1694        async fn physical_table(
1695            &self,
1696            _catalog: &str,
1697            _schema: &str,
1698            _table_name: &str,
1699            _query_ctx: &session::context::QueryContext,
1700        ) -> CatalogResult<Option<PhysicalTableMetadata>> {
1701            Ok(self.table.clone())
1702        }
1703    }
1704
1705    struct SingleRegionPartitionRule;
1706
1707    impl PartitionRule for SingleRegionPartitionRule {
1708        fn as_any(&self) -> &dyn std::any::Any {
1709            self
1710        }
1711
1712        fn partition_columns(&self) -> &[String] {
1713            &[]
1714        }
1715
1716        fn find_region(
1717            &self,
1718            _values: &[datatypes::prelude::Value],
1719        ) -> partition::error::Result<store_api::storage::RegionNumber> {
1720            unimplemented!()
1721        }
1722
1723        fn split_record_batch(
1724            &self,
1725            record_batch: &RecordBatch,
1726        ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1727        {
1728            Ok(HashMap::from([(
1729                1,
1730                RegionMask::new(
1731                    arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]),
1732                    record_batch.num_rows(),
1733                ),
1734            )]))
1735        }
1736    }
1737
1738    struct TwoRegionPartitionRule {
1739        partition_columns: Vec<String>,
1740    }
1741
1742    impl PartitionRule for TwoRegionPartitionRule {
1743        fn as_any(&self) -> &dyn std::any::Any {
1744            self
1745        }
1746
1747        fn partition_columns(&self) -> &[String] {
1748            &self.partition_columns
1749        }
1750
1751        fn find_region(
1752            &self,
1753            _values: &[datatypes::prelude::Value],
1754        ) -> partition::error::Result<store_api::storage::RegionNumber> {
1755            unimplemented!()
1756        }
1757
1758        fn split_record_batch(
1759            &self,
1760            _record_batch: &RecordBatch,
1761        ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1762        {
1763            Ok(HashMap::from([
1764                (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)),
1765                (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)),
1766                (
1767                    3,
1768                    RegionMask::new(BooleanArray::from(vec![false, false]), 0),
1769                ),
1770            ]))
1771        }
1772    }
1773
1774    struct MockFlushPartitionProvider {
1775        partition_rule_calls: Arc<AtomicUsize>,
1776        region_leader_calls: Arc<AtomicUsize>,
1777    }
1778
1779    #[async_trait]
1780    impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider {
1781        async fn find_table_partition_rule(
1782            &self,
1783            _table_info: &table::metadata::TableInfo,
1784        ) -> PartitionResult<PartitionRuleRef> {
1785            self.partition_rule_calls.fetch_add(1, Ordering::SeqCst);
1786            Ok(Arc::new(SingleRegionPartitionRule))
1787        }
1788
1789        async fn find_region_leader(&self, _region_id: RegionId) -> error::Result<Peer> {
1790            self.region_leader_calls.fetch_add(1, Ordering::SeqCst);
1791            Ok(Peer {
1792                id: 1,
1793                addr: "node-1".to_string(),
1794            })
1795        }
1796    }
1797
1798    #[derive(Default)]
1799    struct MockFlushNodeRequester {
1800        writes: Arc<AtomicUsize>,
1801    }
1802
1803    #[async_trait]
1804    impl PhysicalFlushNodeRequester for MockFlushNodeRequester {
1805        async fn handle(
1806            &self,
1807            _peer: &Peer,
1808            _request: RegionRequest,
1809        ) -> error::Result<RegionResponse> {
1810            self.writes.fetch_add(1, Ordering::SeqCst);
1811            Ok(RegionResponse::new(0))
1812        }
1813    }
1814
1815    #[test]
1816    fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1817        let requests = RowInsertRequests {
1818            inserts: vec![
1819                RowInsertRequest {
1820                    table_name: "cpu".to_string(),
1821                    rows: Some(mock_rows(2, "host")),
1822                },
1823                RowInsertRequest {
1824                    table_name: "mem".to_string(),
1825                    rows: Some(mock_rows(0, "host")),
1826                },
1827                RowInsertRequest {
1828                    table_name: "disk".to_string(),
1829                    rows: None,
1830                },
1831            ],
1832        };
1833
1834        let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1835
1836        assert_eq!(2, total_rows);
1837        assert_eq!(1, table_rows.len());
1838        assert_eq!("cpu", table_rows[0].0);
1839        assert_eq!(2, table_rows[0].1.rows.len());
1840    }
1841
1842    #[test]
1843    fn test_drain_batch_takes_initialized_pending_batch_from_option() {
1844        let ctx = session::context::QueryContext::arc();
1845        let (response_tx, _response_rx) = oneshot::channel();
1846        let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap();
1847        let mut batch = Some(PendingBatch {
1848            tables: HashMap::from([(
1849                "cpu".to_string(),
1850                TableBatch {
1851                    table_name: "cpu".to_string(),
1852                    table_id: 42,
1853                    batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
1854                    row_count: 1,
1855                },
1856            )]),
1857            created_at: Instant::now(),
1858            total_row_count: 1,
1859            ctx: ctx.clone(),
1860            waiters: vec![FlushWaiter {
1861                response_tx,
1862                _permit: permit,
1863            }],
1864        });
1865
1866        let flush = drain_batch(&mut batch).unwrap();
1867
1868        assert!(batch.is_none());
1869        assert_eq!(1, flush.total_row_count);
1870        assert_eq!(1, flush.table_batches.len());
1871        assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
1872    }
1873
1874    #[derive(Clone)]
1875    struct ConcurrentMockDatanode {
1876        delay: Duration,
1877        inflight: Arc<AtomicUsize>,
1878        max_inflight: Arc<AtomicUsize>,
1879    }
1880
1881    #[async_trait]
1882    impl Datanode for ConcurrentMockDatanode {
1883        async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1884            let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1885            loop {
1886                let max = self.max_inflight.load(Ordering::SeqCst);
1887                if now <= max {
1888                    break;
1889                }
1890                if self
1891                    .max_inflight
1892                    .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1893                    .is_ok()
1894                {
1895                    break;
1896                }
1897            }
1898
1899            sleep(self.delay).await;
1900            self.inflight.fetch_sub(1, Ordering::SeqCst);
1901            Ok(RegionResponse::new(0))
1902        }
1903
1904        async fn handle_query(
1905            &self,
1906            _request: QueryRequest,
1907        ) -> MetaResult<SendableRecordBatchStream> {
1908            unimplemented!()
1909        }
1910    }
1911
1912    #[derive(Clone)]
1913    struct ConcurrentMockNodeManager {
1914        datanodes: Arc<HashMap<u64, DatanodeRef>>,
1915    }
1916
1917    #[async_trait]
1918    impl DatanodeManager for ConcurrentMockNodeManager {
1919        async fn datanode(&self, node: &Peer) -> DatanodeRef {
1920            self.datanodes
1921                .get(&node.id)
1922                .expect("datanode not found")
1923                .clone()
1924        }
1925    }
1926
1927    struct NoopFlownode;
1928
1929    #[async_trait]
1930    impl Flownode for NoopFlownode {
1931        async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1932            unimplemented!()
1933        }
1934
1935        async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1936            unimplemented!()
1937        }
1938
1939        async fn handle_mark_window_dirty(
1940            &self,
1941            _req: DirtyWindowRequests,
1942        ) -> MetaResult<FlowResponse> {
1943            unimplemented!()
1944        }
1945    }
1946
1947    #[async_trait]
1948    impl FlownodeManager for ConcurrentMockNodeManager {
1949        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1950            Arc::new(NoopFlownode)
1951        }
1952    }
1953
1954    #[async_trait]
1955    impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager {
1956        async fn handle(
1957            &self,
1958            peer: &Peer,
1959            request: RegionRequest,
1960        ) -> error::Result<RegionResponse> {
1961            let datanode = self.datanode(peer).await;
1962            datanode
1963                .handle(request)
1964                .await
1965                .context(error::CommonMetaSnafu)
1966        }
1967    }
1968
1969    #[test]
1970    fn test_remove_worker_if_same_channel_removes_matching_entry() {
1971        let workers = DashMap::new();
1972        let key = BatchKey {
1973            catalog: "greptime".to_string(),
1974            schema: "public".to_string(),
1975            physical_table: "phy".to_string(),
1976        };
1977
1978        let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
1979        workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
1980
1981        assert!(remove_worker_if_same_channel(&workers, &key, &tx));
1982        assert!(!workers.contains_key(&key));
1983    }
1984
1985    #[test]
1986    fn test_remove_worker_if_same_channel_keeps_newer_entry() {
1987        let workers = DashMap::new();
1988        let key = BatchKey {
1989            catalog: "greptime".to_string(),
1990            schema: "public".to_string(),
1991            physical_table: "phy".to_string(),
1992        };
1993
1994        let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
1995        let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
1996        workers.insert(
1997            key.clone(),
1998            PendingWorker {
1999                tx: fresh_tx.clone(),
2000            },
2001        );
2002
2003        assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
2004        assert!(workers.contains_key(&key));
2005        assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
2006    }
2007
2008    #[test]
2009    fn test_worker_idle_timeout_close_decision() {
2010        assert!(should_close_worker_on_idle_timeout(0, 0));
2011        assert!(!should_close_worker_on_idle_timeout(1, 0));
2012        assert!(!should_close_worker_on_idle_timeout(0, 1));
2013    }
2014
2015    #[tokio::test]
2016    async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
2017        let inflight = Arc::new(AtomicUsize::new(0));
2018        let max_inflight = Arc::new(AtomicUsize::new(0));
2019        let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2020            delay: Duration::from_millis(100),
2021            inflight: inflight.clone(),
2022            max_inflight: max_inflight.clone(),
2023        });
2024        let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2025            delay: Duration::from_millis(100),
2026            inflight,
2027            max_inflight: max_inflight.clone(),
2028        });
2029
2030        let mut datanodes = HashMap::new();
2031        datanodes.insert(1, datanode1);
2032        datanodes.insert(2, datanode2);
2033        let node_manager = Arc::new(ConcurrentMockNodeManager {
2034            datanodes: Arc::new(datanodes),
2035        });
2036
2037        let writes = vec![
2038            FlushRegionWrite {
2039                datanode: Peer {
2040                    id: 1,
2041                    addr: "node1".to_string(),
2042                },
2043                request: RegionRequest::default(),
2044            },
2045            FlushRegionWrite {
2046                datanode: Peer {
2047                    id: 2,
2048                    addr: "node2".to_string(),
2049                },
2050                request: RegionRequest::default(),
2051            },
2052        ];
2053
2054        flush_region_writes_concurrently(node_manager.as_ref(), writes)
2055            .await
2056            .unwrap();
2057        assert!(max_inflight.load(Ordering::SeqCst) >= 2);
2058    }
2059
2060    #[test]
2061    fn test_should_dispatch_concurrently_by_region_count() {
2062        assert!(!should_dispatch_concurrently(0));
2063        assert!(!should_dispatch_concurrently(1));
2064        assert!(should_dispatch_concurrently(2));
2065    }
2066
2067    #[test]
2068    fn test_strip_partition_columns_from_batch_removes_partition_tags() {
2069        let batch = RecordBatch::try_new(
2070            Arc::new(ArrowSchema::new(vec![
2071                Field::new("__primary_key", ArrowDataType::Binary, false),
2072                Field::new(
2073                    "greptime_timestamp",
2074                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2075                    false,
2076                ),
2077                Field::new("greptime_value", ArrowDataType::Float64, true),
2078                Field::new("host", ArrowDataType::Utf8, true),
2079            ])),
2080            vec![
2081                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2082                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2083                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2084                Arc::new(StringArray::from(vec!["node-1"])),
2085            ],
2086        )
2087        .unwrap();
2088
2089        let stripped = strip_partition_columns_from_batch(batch).unwrap();
2090
2091        assert_eq!(3, stripped.num_columns());
2092        assert_eq!("__primary_key", stripped.schema().field(0).name());
2093        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2094        assert_eq!("greptime_value", stripped.schema().field(2).name());
2095    }
2096
2097    #[test]
2098    fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
2099        let batch = RecordBatch::try_new(
2100            Arc::new(ArrowSchema::new(vec![
2101                Field::new("__primary_key", ArrowDataType::Binary, false),
2102                Field::new(
2103                    "greptime_timestamp",
2104                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2105                    false,
2106                ),
2107                Field::new("greptime_value", ArrowDataType::Float64, true),
2108                Field::new("host", ArrowDataType::Utf8, true),
2109            ])),
2110            vec![
2111                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2112                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2113                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2114                Arc::new(StringArray::from(vec!["node-1"])),
2115            ],
2116        )
2117        .unwrap();
2118
2119        let stripped = strip_partition_columns_from_batch(batch).unwrap();
2120
2121        assert_eq!(3, stripped.num_columns());
2122        assert_eq!("__primary_key", stripped.schema().field(0).name());
2123        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2124        assert_eq!("greptime_value", stripped.schema().field(2).name());
2125    }
2126
2127    #[test]
2128    fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
2129        let schema = Arc::new(ArrowSchema::new(vec![
2130            Field::new(
2131                "greptime_timestamp",
2132                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2133                false,
2134            ),
2135            Field::new("greptime_value", ArrowDataType::Float64, true),
2136            Field::new("host", ArrowDataType::Utf8, true),
2137            Field::new("region", ArrowDataType::Utf8, true),
2138        ]));
2139        let name_to_ids =
2140            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2141        let partition_columns = HashSet::from(["host"]);
2142
2143        let (tag_columns, non_tag_indices) =
2144            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2145
2146        assert_eq!(2, tag_columns.len());
2147        assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
2148    }
2149
2150    #[test]
2151    fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
2152        let schema = Arc::new(ArrowSchema::new(vec![
2153            Field::new("host", ArrowDataType::Utf8, true),
2154            Field::new("greptime_value", ArrowDataType::Float64, true),
2155            Field::new(
2156                "greptime_timestamp",
2157                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2158                false,
2159            ),
2160            Field::new("region", ArrowDataType::Utf8, true),
2161        ]));
2162        let name_to_ids =
2163            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2164        let partition_columns = HashSet::from(["host", "region"]);
2165
2166        let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
2167            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2168
2169        assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
2170    }
2171
2172    #[test]
2173    fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
2174        let schema = Arc::new(ArrowSchema::new(vec![
2175            Field::new(
2176                "greptime_timestamp",
2177                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2178                false,
2179            ),
2180            Field::new("greptime_value", ArrowDataType::Float64, true),
2181            Field::new("host", ArrowDataType::Utf8, true),
2182            Field::new("invalid", ArrowDataType::Boolean, true),
2183        ]));
2184        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2185        let partition_columns = HashSet::from(["host"]);
2186
2187        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2188
2189        assert!(matches!(
2190            result,
2191            Err(Error::InvalidPromRemoteRequest { .. })
2192        ));
2193    }
2194
2195    #[test]
2196    fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
2197        let schema = Arc::new(ArrowSchema::new(vec![
2198            Field::new("greptime_timestamp", ArrowDataType::Int64, false),
2199            Field::new("greptime_value", ArrowDataType::Float64, true),
2200            Field::new("host", ArrowDataType::Utf8, true),
2201        ]));
2202        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2203        let partition_columns = HashSet::from(["host"]);
2204
2205        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2206
2207        assert!(matches!(
2208            result,
2209            Err(Error::InvalidPromRemoteRequest { .. })
2210        ));
2211    }
2212
2213    #[test]
2214    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
2215        let schema = Arc::new(ArrowSchema::new(vec![
2216            Field::new(
2217                "ts1",
2218                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2219                false,
2220            ),
2221            Field::new(
2222                "ts2",
2223                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2224                false,
2225            ),
2226            Field::new("greptime_value", ArrowDataType::Float64, true),
2227            Field::new("host", ArrowDataType::Utf8, true),
2228        ]));
2229        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2230        let partition_columns = HashSet::from(["host"]);
2231
2232        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2233
2234        assert!(matches!(
2235            result,
2236            Err(Error::InvalidPromRemoteRequest { .. })
2237        ));
2238    }
2239
2240    #[test]
2241    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2242        let schema = Arc::new(ArrowSchema::new(vec![
2243            Field::new(
2244                "greptime_timestamp",
2245                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2246                false,
2247            ),
2248            Field::new("value1", ArrowDataType::Float64, true),
2249            Field::new("value2", ArrowDataType::Float64, true),
2250            Field::new("host", ArrowDataType::Utf8, true),
2251        ]));
2252        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2253        let partition_columns = HashSet::from(["host"]);
2254
2255        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2256
2257        assert!(matches!(
2258            result,
2259            Err(Error::InvalidPromRemoteRequest { .. })
2260        ));
2261    }
2262
2263    #[test]
2264    fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2265        use arrow::array::BinaryArray;
2266        use metric_engine::batch_modifier::modify_batch_sparse;
2267
2268        let schema1 = Arc::new(ArrowSchema::new(vec![
2269            Field::new(
2270                "greptime_timestamp",
2271                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2272                false,
2273            ),
2274            Field::new("greptime_value", ArrowDataType::Float64, true),
2275            Field::new("tag1", ArrowDataType::Utf8, true),
2276        ]));
2277
2278        let schema2 = Arc::new(ArrowSchema::new(vec![
2279            Field::new(
2280                "greptime_timestamp",
2281                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2282                false,
2283            ),
2284            Field::new("greptime_value", ArrowDataType::Float64, true),
2285            Field::new("tag1", ArrowDataType::Utf8, true),
2286            Field::new("tag2", ArrowDataType::Utf8, true),
2287        ]));
2288        let batch2 = RecordBatch::try_new(
2289            schema2.clone(),
2290            vec![
2291                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2292                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2293                Arc::new(StringArray::from(vec!["v1"])),
2294                Arc::new(StringArray::from(vec!["v2"])),
2295            ],
2296        )
2297        .unwrap();
2298
2299        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2300        let partition_columns = HashSet::new();
2301
2302        // A batch that only has tag1, same values as batch2 for ts and val.
2303        let batch3 = RecordBatch::try_new(
2304            schema1.clone(),
2305            vec![
2306                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2307                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2308                Arc::new(StringArray::from(vec!["v1"])),
2309            ],
2310        )
2311        .unwrap();
2312
2313        // Simulate the new loop logic in flush_batch_physical:
2314        // Resolve taxonomy FOR EACH BATCH.
2315        let (tag_columns2, indices2) =
2316            columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2317        let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2318
2319        let (tag_columns3, indices3) =
2320            columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2321        let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2322
2323        let pk2 = modified2
2324            .column(0)
2325            .as_any()
2326            .downcast_ref::<BinaryArray>()
2327            .unwrap();
2328        let pk3 = modified3
2329            .column(0)
2330            .as_any()
2331            .downcast_ref::<BinaryArray>()
2332            .unwrap();
2333
2334        // Now they SHOULD be different because tag2 is included in pk2 but not in pk3.
2335        assert_ne!(
2336            pk2.value(0),
2337            pk3.value(0),
2338            "PK should be different because batch2 has tag2!"
2339        );
2340    }
2341
2342    #[test]
2343    fn test_transform_logical_batches_to_physical_success() {
2344        let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2345
2346        let table_batches = vec![TableBatch {
2347            table_name: "t1".to_string(),
2348            table_id: 1,
2349            batches: vec![batch],
2350            row_count: 1,
2351        }];
2352
2353        let name_to_ids = HashMap::from([("tag1".to_string(), 1)]);
2354        let partition_columns = HashSet::new();
2355        let modified =
2356            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2357                .unwrap();
2358
2359        assert_eq!(1, modified.len());
2360        assert_eq!(3, modified[0].num_columns());
2361        assert_eq!("__primary_key", modified[0].schema().field(0).name());
2362        assert_eq!("greptime_timestamp", modified[0].schema().field(1).name());
2363        assert_eq!("greptime_value", modified[0].schema().field(2).name());
2364    }
2365
2366    #[test]
2367    fn test_transform_logical_batches_to_physical_taxonomy_failure() {
2368        let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2369
2370        let table_batches = vec![TableBatch {
2371            table_name: "t1".to_string(),
2372            table_id: 1,
2373            batches: vec![batch],
2374            row_count: 1,
2375        }];
2376
2377        // tag1 is missing from name_to_ids, causing columns_taxonomy to fail.
2378        let name_to_ids = HashMap::new();
2379        let partition_columns = HashSet::new();
2380        let err =
2381            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2382                .unwrap_err();
2383
2384        assert!(
2385            err.to_string()
2386                .contains("not found in physical table column IDs")
2387        );
2388    }
2389
2390    #[test]
2391    fn test_transform_logical_batches_to_physical_multiple_batches() {
2392        let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2393        let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2394
2395        let table_batches = vec![
2396            TableBatch {
2397                table_name: "t1".to_string(),
2398                table_id: 1,
2399                batches: vec![batch1],
2400                row_count: 1,
2401            },
2402            TableBatch {
2403                table_name: "t2".to_string(),
2404                table_id: 2,
2405                batches: vec![batch2],
2406                row_count: 1,
2407            },
2408        ];
2409
2410        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2411        let partition_columns = HashSet::new();
2412        let modified =
2413            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2414                .unwrap();
2415
2416        assert_eq!(2, modified.len());
2417    }
2418
2419    #[test]
2420    fn test_transform_logical_batches_to_physical_mixed_success_failure() {
2421        let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2422        let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2423
2424        let table_batches = vec![
2425            TableBatch {
2426                table_name: "t1".to_string(),
2427                table_id: 1,
2428                batches: vec![batch1],
2429                row_count: 1,
2430            },
2431            TableBatch {
2432                table_name: "t2".to_string(),
2433                table_id: 2,
2434                batches: vec![batch2],
2435                row_count: 1,
2436            },
2437        ];
2438
2439        // tag1 is missing from name_to_ids, causing batch1 to fail.
2440        let name_to_ids = HashMap::from([("tag2".to_string(), 2)]);
2441        let partition_columns = HashSet::new();
2442        let err =
2443            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2444                .unwrap_err();
2445
2446        assert!(err.to_string().contains("tag1"));
2447    }
2448
2449    #[tokio::test]
2450    async fn test_flush_batch_physical_uses_mockable_trait_dependencies() {
2451        let table_batches = vec![TableBatch {
2452            table_name: "t1".to_string(),
2453            table_id: 11,
2454            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2455            row_count: 1,
2456        }];
2457        let partition_calls = Arc::new(AtomicUsize::new(0));
2458        let leader_calls = Arc::new(AtomicUsize::new(0));
2459        let node = MockFlushNodeRequester::default();
2460        let ctx = session::context::QueryContext::arc();
2461
2462        flush_batch_physical(
2463            &table_batches,
2464            "phy",
2465            &ctx,
2466            &MockFlushPartitionProvider {
2467                partition_rule_calls: partition_calls.clone(),
2468                region_leader_calls: leader_calls.clone(),
2469            },
2470            &node,
2471            &MockFlushCatalogProvider {
2472                table: Some(mock_physical_table_metadata(1024)),
2473            },
2474        )
2475        .await
2476        .unwrap();
2477
2478        assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2479        assert_eq!(1, leader_calls.load(Ordering::SeqCst));
2480        assert_eq!(1, node.writes.load(Ordering::SeqCst));
2481    }
2482
2483    #[tokio::test]
2484    async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
2485        let table_batches = vec![TableBatch {
2486            table_name: "t1".to_string(),
2487            table_id: 11,
2488            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2489            row_count: 1,
2490        }];
2491        let partition_calls = Arc::new(AtomicUsize::new(0));
2492        let leader_calls = Arc::new(AtomicUsize::new(0));
2493        let node = MockFlushNodeRequester::default();
2494        let ctx = session::context::QueryContext::arc();
2495
2496        let err = flush_batch_physical(
2497            &table_batches,
2498            "missing_phy",
2499            &ctx,
2500            &MockFlushPartitionProvider {
2501                partition_rule_calls: partition_calls.clone(),
2502                region_leader_calls: leader_calls.clone(),
2503            },
2504            &node,
2505            &MockFlushCatalogProvider { table: None },
2506        )
2507        .await
2508        .unwrap_err();
2509
2510        assert!(
2511            err.to_string()
2512                .contains("Physical table 'missing_phy' not found")
2513        );
2514        assert_eq!(0, partition_calls.load(Ordering::SeqCst));
2515        assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2516        assert_eq!(0, node.writes.load(Ordering::SeqCst));
2517    }
2518
2519    #[tokio::test]
2520    async fn test_flush_batch_physical_aborts_immediately_on_transform_error() {
2521        let table_batches = vec![
2522            TableBatch {
2523                table_name: "broken".to_string(),
2524                table_id: 11,
2525                batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)],
2526                row_count: 1,
2527            },
2528            TableBatch {
2529                table_name: "healthy".to_string(),
2530                table_id: 12,
2531                batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)],
2532                row_count: 1,
2533            },
2534        ];
2535        let partition_calls = Arc::new(AtomicUsize::new(0));
2536        let leader_calls = Arc::new(AtomicUsize::new(0));
2537        let node = MockFlushNodeRequester::default();
2538        let ctx = session::context::QueryContext::arc();
2539
2540        let err = flush_batch_physical(
2541            &table_batches,
2542            "phy",
2543            &ctx,
2544            &MockFlushPartitionProvider {
2545                partition_rule_calls: partition_calls.clone(),
2546                region_leader_calls: leader_calls.clone(),
2547            },
2548            &node,
2549            &MockFlushCatalogProvider {
2550                table: Some(mock_physical_table_metadata(1024)),
2551            },
2552        )
2553        .await
2554        .unwrap_err();
2555
2556        assert!(err.to_string().contains("unknown_tag"));
2557        assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2558        assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2559        assert_eq!(0, node.writes.load(Ordering::SeqCst));
2560    }
2561
2562    #[test]
2563    fn test_plan_region_batches_splits_and_strips_partition_columns() {
2564        let combined_batch = RecordBatch::try_new(
2565            Arc::new(ArrowSchema::new(vec![
2566                Field::new("__primary_key", ArrowDataType::Binary, false),
2567                Field::new(
2568                    "greptime_timestamp",
2569                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2570                    false,
2571                ),
2572                Field::new("greptime_value", ArrowDataType::Float64, true),
2573                Field::new("host", ArrowDataType::Utf8, true),
2574            ])),
2575            vec![
2576                Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])),
2577                Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])),
2578                Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])),
2579                Arc::new(StringArray::from(vec!["node-1", "node-2"])),
2580            ],
2581        )
2582        .unwrap();
2583        let mut planned_batches = plan_region_batches(
2584            combined_batch,
2585            1024,
2586            &TwoRegionPartitionRule {
2587                partition_columns: vec!["host".to_string()],
2588            },
2589            &["host".to_string()],
2590        )
2591        .unwrap();
2592        planned_batches.sort_by_key(|planned| planned.region_id.region_number());
2593
2594        assert_eq!(2, planned_batches.len());
2595        assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id);
2596        assert_eq!(1, planned_batches[0].num_rows());
2597        assert_eq!(3, planned_batches[0].batch.num_columns());
2598        assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id);
2599        assert_eq!(1, planned_batches[1].num_rows());
2600        assert_eq!(3, planned_batches[1].batch.num_columns());
2601    }
2602
2603    #[test]
2604    fn test_encode_region_write_requests_builds_bulk_insert_requests() {
2605        let planned_batch = PlannedRegionBatch {
2606            region_id: RegionId::new(1024, 1),
2607            batch: RecordBatch::try_new(
2608                Arc::new(ArrowSchema::new(vec![
2609                    Field::new("__primary_key", ArrowDataType::Binary, false),
2610                    Field::new(
2611                        "greptime_timestamp",
2612                        ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2613                        false,
2614                    ),
2615                    Field::new("greptime_value", ArrowDataType::Float64, true),
2616                ])),
2617                vec![
2618                    Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2619                    Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2620                    Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])),
2621                ],
2622            )
2623            .unwrap(),
2624        };
2625        let resolved_batch = ResolvedRegionBatch {
2626            planned: planned_batch,
2627            datanode: Peer {
2628                id: 1,
2629                addr: "node-1".to_string(),
2630            },
2631        };
2632        let writes = encode_region_write_requests(vec![resolved_batch]).unwrap();
2633
2634        assert_eq!(1, writes.len());
2635        assert_eq!(1, writes[0].datanode.id);
2636        let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else {
2637            panic!("expected bulk insert request");
2638        };
2639        assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id);
2640    }
2641}