Skip to main content

servers/
pending_rows_batcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21    BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use partition::partition::PartitionRuleRef;
40use session::context::QueryContextRef;
41use smallvec::SmallVec;
42use snafu::{OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use table::metadata::{TableInfo, TableInfoRef};
45use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
46
47use crate::error;
48use crate::error::{Error, Result};
49use crate::metrics::{
50    FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
51    PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
52    PENDING_WORKERS,
53};
54use crate::prom_row_builder::{
55    build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
56    rows_to_aligned_record_batch,
57};
58
59const PHYSICAL_TABLE_KEY: &str = "physical_table";
60/// Whether wait for ingestion result before reply to client.
61const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
62const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
63const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
64
65#[async_trait]
66pub trait PendingRowsSchemaAlterer: Send + Sync {
67    /// Batch-create multiple logical tables that are missing.
68    /// Each entry is `(table_name, request_schema)`.
69    async fn create_tables_if_missing_batch(
70        &self,
71        catalog: &str,
72        schema: &str,
73        tables: &[(&str, &[ColumnSchema])],
74        with_metric_engine: bool,
75        ctx: QueryContextRef,
76    ) -> Result<()>;
77
78    /// Batch-alter multiple logical tables to add missing tag columns.
79    /// Each entry is `(table_name, missing_column_names)`.
80    async fn add_missing_prom_tag_columns_batch(
81        &self,
82        catalog: &str,
83        schema: &str,
84        tables: &[(&str, &[String])],
85        ctx: QueryContextRef,
86    ) -> Result<()>;
87}
88
89pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
90
91#[derive(Clone)]
92pub struct PhysicalTableMetadata {
93    pub table_info: TableInfoRef,
94    /// Mapping from column name to column id
95    pub col_name_to_ids: Option<HashMap<String, u32>>,
96}
97
98#[async_trait]
99pub trait PhysicalFlushCatalogProvider: Send + Sync {
100    async fn physical_table(
101        &self,
102        catalog: &str,
103        schema: &str,
104        table_name: &str,
105        query_ctx: &session::context::QueryContext,
106    ) -> catalog::error::Result<Option<PhysicalTableMetadata>>;
107}
108
109#[async_trait]
110pub trait PhysicalFlushPartitionProvider: Send + Sync {
111    async fn find_table_partition_rule(
112        &self,
113        table_info: &TableInfo,
114    ) -> partition::error::Result<PartitionRuleRef>;
115
116    async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer>;
117}
118
119#[async_trait]
120pub trait PhysicalFlushNodeRequester: Send + Sync {
121    async fn handle(
122        &self,
123        peer: &Peer,
124        request: RegionRequest,
125    ) -> Result<api::region::RegionResponse>;
126}
127
128#[derive(Clone)]
129struct CatalogManagerPhysicalFlushAdapter {
130    catalog_manager: CatalogManagerRef,
131}
132
133#[async_trait]
134impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter {
135    async fn physical_table(
136        &self,
137        catalog: &str,
138        schema: &str,
139        table_name: &str,
140        query_ctx: &session::context::QueryContext,
141    ) -> catalog::error::Result<Option<PhysicalTableMetadata>> {
142        self.catalog_manager
143            .table(catalog, schema, table_name, Some(query_ctx))
144            .await
145            .map(|table| {
146                table.map(|table| {
147                    let table_info = table.table_info();
148                    let name_to_ids = table_info.name_to_ids();
149                    PhysicalTableMetadata {
150                        table_info,
151                        col_name_to_ids: name_to_ids,
152                    }
153                })
154            })
155    }
156}
157
158#[derive(Clone)]
159struct PartitionManagerPhysicalFlushAdapter {
160    partition_manager: PartitionRuleManagerRef,
161}
162
163#[async_trait]
164impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter {
165    async fn find_table_partition_rule(
166        &self,
167        table_info: &TableInfo,
168    ) -> partition::error::Result<PartitionRuleRef> {
169        self.partition_manager
170            .find_table_partition_rule(table_info)
171            .await
172            .map(|(rule, _)| rule)
173    }
174
175    async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176        let peer = self.partition_manager.find_region_leader(region_id).await?;
177        Ok(peer)
178    }
179}
180
181#[derive(Clone)]
182struct NodeManagerPhysicalFlushAdapter {
183    node_manager: NodeManagerRef,
184}
185
186#[async_trait]
187impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter {
188    async fn handle(
189        &self,
190        peer: &Peer,
191        request: RegionRequest,
192    ) -> error::Result<api::region::RegionResponse> {
193        let datanode = self.node_manager.datanode(peer).await;
194        datanode
195            .handle(request)
196            .await
197            .context(error::CommonMetaSnafu)
198    }
199}
200
201#[derive(Debug, Clone, Hash, Eq, PartialEq)]
202struct BatchKey {
203    catalog: String,
204    schema: String,
205    physical_table: String,
206}
207
208#[derive(Debug)]
209pub struct TableBatch {
210    pub table_name: String,
211    pub table_id: TableId,
212    pub batches: Vec<RecordBatch>,
213    pub row_count: usize,
214}
215
216/// Intermediate planning state for resolving and preparing logical tables
217/// before row-to-batch alignment.
218struct TableResolutionPlan {
219    /// Resolved table schema and table id by logical table name.
220    region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
221    /// Missing tables that need to be created before alignment.
222    tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
223    /// Existing tables that need tag-column schema evolution.
224    tables_to_alter: Vec<(String, Vec<String>)>,
225}
226
227struct PendingBatch {
228    tables: HashMap<String, TableBatch>,
229    created_at: Instant,
230    total_row_count: usize,
231    db_string: String,
232    ctx: QueryContextRef,
233    waiters: Vec<FlushWaiter>,
234}
235
236struct FlushWaiter {
237    response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
238    _permit: OwnedSemaphorePermit,
239}
240
241struct FlushBatch {
242    table_batches: Vec<TableBatch>,
243    total_row_count: usize,
244    db_string: String,
245    ctx: QueryContextRef,
246    waiters: Vec<FlushWaiter>,
247}
248
249#[derive(Clone)]
250struct PendingWorker {
251    tx: mpsc::Sender<WorkerCommand>,
252}
253
254enum WorkerCommand {
255    Submit {
256        table_batches: Vec<(String, u32, RecordBatch)>,
257        total_rows: usize,
258        ctx: QueryContextRef,
259        response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
260        _permit: OwnedSemaphorePermit,
261    },
262}
263
264// Batch key is derived from QueryContext; it assumes catalog/schema/physical_table fully
265// define the write target and must remain consistent across the batch.
266fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
267    let physical_table = ctx
268        .extension(PHYSICAL_TABLE_KEY)
269        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
270        .to_string();
271    BatchKey {
272        catalog: ctx.current_catalog().to_string(),
273        schema: ctx.current_schema(),
274        physical_table,
275    }
276}
277
278/// Prometheus remote write pending rows batcher.
279pub struct PendingRowsBatcher {
280    workers: Arc<DashMap<BatchKey, PendingWorker>>,
281    flush_interval: Duration,
282    max_batch_rows: usize,
283    partition_manager: PartitionRuleManagerRef,
284    node_manager: NodeManagerRef,
285    catalog_manager: CatalogManagerRef,
286    flush_semaphore: Arc<Semaphore>,
287    inflight_semaphore: Arc<Semaphore>,
288    worker_channel_capacity: usize,
289    prom_store_with_metric_engine: bool,
290    schema_alterer: PendingRowsSchemaAltererRef,
291    pending_rows_batch_sync: bool,
292    shutdown: broadcast::Sender<()>,
293}
294
295impl PendingRowsBatcher {
296    #[allow(clippy::too_many_arguments)]
297    pub fn try_new(
298        partition_manager: PartitionRuleManagerRef,
299        node_manager: NodeManagerRef,
300        catalog_manager: CatalogManagerRef,
301        prom_store_with_metric_engine: bool,
302        schema_alterer: PendingRowsSchemaAltererRef,
303        flush_interval: Duration,
304        max_batch_rows: usize,
305        max_concurrent_flushes: usize,
306        worker_channel_capacity: usize,
307        max_inflight_requests: usize,
308    ) -> Option<Arc<Self>> {
309        // Disable the batcher if flush is disabled or configuration is invalid.
310        // Zero values for these knobs either cause panics (e.g., zero-capacity channels)
311        // or deadlocks (e.g., semaphores with no permits).
312        if flush_interval.is_zero()
313            || max_batch_rows == 0
314            || max_concurrent_flushes == 0
315            || worker_channel_capacity == 0
316            || max_inflight_requests == 0
317        {
318            return None;
319        }
320
321        let (shutdown, _) = broadcast::channel(1);
322        let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
323            .ok()
324            .as_deref()
325            .and_then(|v| v.parse::<bool>().ok())
326            .unwrap_or(true);
327        let workers = Arc::new(DashMap::new());
328        PENDING_WORKERS.set(workers.len() as i64);
329
330        Some(Arc::new(Self {
331            workers,
332            flush_interval,
333            max_batch_rows,
334            partition_manager,
335            node_manager,
336            catalog_manager,
337            prom_store_with_metric_engine,
338            schema_alterer,
339            flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
340            inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
341            worker_channel_capacity,
342            pending_rows_batch_sync,
343            shutdown,
344        }))
345    }
346
347    pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
348        let (table_batches, total_rows) = {
349            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
350                .with_label_values(&["submit_build_and_align"])
351                .start_timer();
352            self.build_and_align_table_batches(requests, &ctx).await?
353        };
354        if total_rows == 0 {
355            return Ok(0);
356        }
357
358        let permit = {
359            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
360                .with_label_values(&["submit_acquire_inflight_permit"])
361                .start_timer();
362            self.inflight_semaphore
363                .clone()
364                .acquire_owned()
365                .await
366                .map_err(|_| error::BatcherChannelClosedSnafu.build())?
367        };
368
369        let (response_tx, response_rx) = oneshot::channel();
370
371        let batch_key = batch_key_from_ctx(&ctx);
372        let mut cmd = Some(WorkerCommand::Submit {
373            table_batches,
374            total_rows,
375            ctx,
376            response_tx,
377            _permit: permit,
378        });
379
380        {
381            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
382                .with_label_values(&["submit_send_to_worker"])
383                .start_timer();
384
385            for _ in 0..2 {
386                let worker = self.get_or_spawn_worker(batch_key.clone());
387                let Some(worker_cmd) = cmd.take() else {
388                    break;
389                };
390
391                match worker.tx.send(worker_cmd).await {
392                    Ok(()) => break,
393                    Err(err) => {
394                        cmd = Some(err.0);
395                        remove_worker_if_same_channel(
396                            self.workers.as_ref(),
397                            &batch_key,
398                            &worker.tx,
399                        );
400                    }
401                }
402            }
403
404            if cmd.is_some() {
405                return Err(Error::BatcherChannelClosed);
406            }
407        }
408
409        if self.pending_rows_batch_sync {
410            let result = {
411                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
412                    .with_label_values(&["submit_wait_flush_result"])
413                    .start_timer();
414                response_rx
415                    .await
416                    .map_err(|_| error::BatcherChannelClosedSnafu.build())?
417            };
418            result
419                .context(error::SubmitBatchSnafu)
420                .map(|()| total_rows as u64)
421        } else {
422            Ok(total_rows as u64)
423        }
424    }
425
426    /// Converts proto `RowInsertRequests` directly into aligned `RecordBatch`es
427    /// in a single pass, handling table creation, schema alteration, column
428    /// renaming, reordering, and null-filling without building intermediate
429    /// RecordBatches.
430    async fn build_and_align_table_batches(
431        &self,
432        requests: RowInsertRequests,
433        ctx: &QueryContextRef,
434    ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
435        let catalog = ctx.current_catalog().to_string();
436        let schema = ctx.current_schema();
437
438        let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
439        if total_rows == 0 {
440            return Ok((Vec::new(), 0));
441        }
442
443        let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
444        let mut plan = self
445            .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
446            .await?;
447
448        self.create_missing_tables_and_refresh_schemas(
449            &catalog,
450            &schema,
451            ctx,
452            &table_rows,
453            &mut plan,
454        )
455        .await?;
456
457        self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
458            .await?;
459
460        let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
461
462        Ok((aligned_batches, total_rows))
463    }
464
465    /// Extracts non-empty `(table_name, rows)` pairs and computes total row
466    /// count across the retained entries.
467    fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
468        let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
469        let mut total_rows = 0;
470
471        for request in requests.inserts {
472            let Some(rows) = request.rows else {
473                continue;
474            };
475            if rows.rows.is_empty() {
476                continue;
477            }
478
479            total_rows += rows.rows.len();
480            table_rows.push((request.table_name, rows));
481        }
482
483        (table_rows, total_rows)
484    }
485
486    /// Returns unique `(table_name, proto_schema)` pairs while keeping the
487    /// first-seen schema for duplicate table names.
488    fn collect_unique_table_schemas(
489        table_rows: &[(String, Rows)],
490    ) -> Result<Vec<(&str, &[ColumnSchema])>> {
491        let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
492        let mut seen = HashSet::new();
493
494        for (table_name, rows) in table_rows {
495            if seen.insert(table_name.as_str()) {
496                unique_tables.push((table_name.as_str(), &rows.schema));
497            } else {
498                // table_rows should group rows by table name.
499                return error::InvalidPromRemoteRequestSnafu {
500                    msg: format!(
501                        "Found duplicated table name in RowInsertRequest: {}",
502                        table_name
503                    ),
504                }
505                .fail();
506            }
507        }
508
509        Ok(unique_tables)
510    }
511
512    /// Resolves table metadata and classifies each table into existing,
513    /// to-create, and to-alter groups used by subsequent DDL steps.
514    async fn plan_table_resolution(
515        &self,
516        catalog: &str,
517        schema: &str,
518        ctx: &QueryContextRef,
519        unique_tables: &[(&str, &[ColumnSchema])],
520    ) -> Result<TableResolutionPlan> {
521        let mut plan = TableResolutionPlan {
522            region_schemas: HashMap::with_capacity(unique_tables.len()),
523            tables_to_create: Vec::new(),
524            tables_to_alter: Vec::new(),
525        };
526
527        let resolved_tables = {
528            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
529                .with_label_values(&["align_resolve_table"])
530                .start_timer();
531            futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
532                self.catalog_manager
533                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
534            }))
535            .await
536        };
537
538        for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
539            let table = table_result?;
540
541            if let Some(table) = table {
542                let table_info = table.table_info();
543                let table_id = table_info.ident.table_id;
544                let region_schema = table_info.meta.schema.arrow_schema().clone();
545
546                let missing_columns = {
547                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
548                        .with_label_values(&["align_identify_missing_columns"])
549                        .start_timer();
550                    identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
551                };
552                if !missing_columns.is_empty() {
553                    plan.tables_to_alter
554                        .push(((*table_name).to_string(), missing_columns));
555                }
556                plan.region_schemas
557                    .insert((*table_name).to_string(), (region_schema, table_id));
558            } else {
559                let request_schema = {
560                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
561                        .with_label_values(&["align_build_create_table_schema"])
562                        .start_timer();
563                    build_prom_create_table_schema_from_proto(rows_schema)?
564                };
565                plan.tables_to_create
566                    .push(((*table_name).to_string(), request_schema));
567            }
568        }
569
570        Ok(plan)
571    }
572
573    /// Batch-creates missing tables, refreshes their schema metadata, and
574    /// enqueues follow-up alters for extra tag columns discovered in later rows.
575    async fn create_missing_tables_and_refresh_schemas(
576        &self,
577        catalog: &str,
578        schema: &str,
579        ctx: &QueryContextRef,
580        table_rows: &[(String, Rows)],
581        plan: &mut TableResolutionPlan,
582    ) -> Result<()> {
583        if plan.tables_to_create.is_empty() {
584            return Ok(());
585        }
586
587        let create_refs: Vec<(&str, &[ColumnSchema])> = plan
588            .tables_to_create
589            .iter()
590            .map(|(name, schema)| (name.as_str(), schema.as_slice()))
591            .collect();
592
593        {
594            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
595                .with_label_values(&["align_batch_create_tables"])
596                .start_timer();
597            self.schema_alterer
598                .create_tables_if_missing_batch(
599                    catalog,
600                    schema,
601                    &create_refs,
602                    self.prom_store_with_metric_engine,
603                    ctx.clone(),
604                )
605                .await?;
606        }
607
608        let created_table_results = {
609            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
610                .with_label_values(&["align_resolve_table_after_create"])
611                .start_timer();
612            futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
613                self.catalog_manager
614                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
615            }))
616            .await
617        };
618
619        for ((table_name, _), table_result) in
620            plan.tables_to_create.iter().zip(created_table_results)
621        {
622            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
623                reason: format!(
624                    "Table not found after pending batch create attempt: {}",
625                    table_name
626                ),
627            })?;
628            let table_info = table.table_info();
629            let table_id = table_info.ident.table_id;
630            let region_schema = table_info.meta.schema.arrow_schema().clone();
631            plan.region_schemas
632                .insert(table_name.clone(), (region_schema, table_id));
633        }
634
635        Self::enqueue_alter_for_new_tables(table_rows, plan)?;
636
637        Ok(())
638    }
639
640    /// For newly created tables, re-checks all row schemas and appends alter
641    /// operations when additional tag columns are still missing.
642    fn enqueue_alter_for_new_tables(
643        table_rows: &[(String, Rows)],
644        plan: &mut TableResolutionPlan,
645    ) -> Result<()> {
646        let created_tables: HashSet<&str> = plan
647            .tables_to_create
648            .iter()
649            .map(|(table_name, _)| table_name.as_str())
650            .collect();
651
652        for (table_name, rows) in table_rows {
653            if !created_tables.contains(table_name.as_str()) {
654                continue;
655            }
656
657            let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
658                continue;
659            };
660
661            let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
662            if missing_columns.is_empty()
663                || plan
664                    .tables_to_alter
665                    .iter()
666                    .any(|(existing_name, _)| existing_name == table_name)
667            {
668                continue;
669            }
670
671            plan.tables_to_alter
672                .push((table_name.clone(), missing_columns));
673        }
674
675        Ok(())
676    }
677
678    /// Batch-alters tables that have missing tag columns and refreshes the
679    /// in-memory schema map used for row alignment.
680    async fn alter_tables_and_refresh_schemas(
681        &self,
682        catalog: &str,
683        schema: &str,
684        ctx: &QueryContextRef,
685        plan: &mut TableResolutionPlan,
686    ) -> Result<()> {
687        if plan.tables_to_alter.is_empty() {
688            return Ok(());
689        }
690
691        let alter_refs: Vec<(&str, &[String])> = plan
692            .tables_to_alter
693            .iter()
694            .map(|(name, cols)| (name.as_str(), cols.as_slice()))
695            .collect();
696        {
697            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
698                .with_label_values(&["align_batch_add_missing_columns"])
699                .start_timer();
700            self.schema_alterer
701                .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
702                .await?;
703        }
704
705        let altered_table_results = {
706            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
707                .with_label_values(&["align_resolve_table_after_schema_alter"])
708                .start_timer();
709            futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
710                self.catalog_manager
711                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
712            }))
713            .await
714        };
715
716        for ((table_name, _), table_result) in
717            plan.tables_to_alter.iter().zip(altered_table_results)
718        {
719            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
720                reason: format!(
721                    "Table not found after pending batch schema alter: {}",
722                    table_name
723                ),
724            })?;
725            let table_info = table.table_info();
726            let table_id = table_info.ident.table_id;
727            let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
728            plan.region_schemas
729                .insert(table_name.clone(), (refreshed_region_schema, table_id));
730        }
731
732        Ok(())
733    }
734
735    /// Converts proto rows to `RecordBatch` values aligned to resolved region
736    /// schemas and returns `(table_name, table_id, batch)` tuples.
737    fn build_aligned_batches(
738        table_rows: &[(String, Rows)],
739        region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
740    ) -> Result<Vec<(String, u32, RecordBatch)>> {
741        let mut aligned_batches = Vec::with_capacity(table_rows.len());
742        for (table_name, rows) in table_rows {
743            let (region_schema, table_id) =
744                region_schemas.get(table_name).cloned().with_context(|| {
745                    error::UnexpectedResultSnafu {
746                        reason: format!("Region schema not resolved for table: {}", table_name),
747                    }
748                })?;
749
750            let record_batch = {
751                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
752                    .with_label_values(&["align_rows_to_record_batch"])
753                    .start_timer();
754                rows_to_aligned_record_batch(rows, region_schema.as_ref())?
755            };
756            aligned_batches.push((table_name.clone(), table_id, record_batch));
757        }
758
759        Ok(aligned_batches)
760    }
761
762    fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
763        if let Some(worker) = self.workers.get(&key)
764            && !worker.tx.is_closed()
765        {
766            return worker.clone();
767        }
768
769        let entry = self.workers.entry(key.clone());
770        match entry {
771            Entry::Occupied(mut worker) => {
772                if worker.get().tx.is_closed() {
773                    let new_worker = self.spawn_worker(key);
774                    worker.insert(new_worker.clone());
775                    PENDING_WORKERS.set(self.workers.len() as i64);
776                    new_worker
777                } else {
778                    worker.get().clone()
779                }
780            }
781            Entry::Vacant(vacant) => {
782                let worker = self.spawn_worker(key);
783
784                vacant.insert(worker.clone());
785                PENDING_WORKERS.set(self.workers.len() as i64);
786                worker
787            }
788        }
789    }
790
791    fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
792        let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
793        let worker = PendingWorker { tx: tx.clone() };
794        let worker_idle_timeout = self
795            .flush_interval
796            .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
797            .unwrap_or(self.flush_interval);
798
799        start_worker(
800            key,
801            worker.tx.clone(),
802            self.workers.clone(),
803            rx,
804            self.shutdown.clone(),
805            self.partition_manager.clone(),
806            self.node_manager.clone(),
807            self.catalog_manager.clone(),
808            self.flush_interval,
809            worker_idle_timeout,
810            self.max_batch_rows,
811            self.flush_semaphore.clone(),
812        );
813
814        worker
815    }
816}
817
818impl Drop for PendingRowsBatcher {
819    fn drop(&mut self) {
820        let _ = self.shutdown.send(());
821    }
822}
823
824impl PendingBatch {
825    fn new(ctx: QueryContextRef) -> Self {
826        let db_string = ctx.get_db_string();
827        Self {
828            tables: HashMap::new(),
829            created_at: Instant::now(),
830            total_row_count: 0,
831            db_string,
832            ctx,
833            waiters: Vec::new(),
834        }
835    }
836}
837
838#[allow(clippy::too_many_arguments)]
839fn start_worker(
840    key: BatchKey,
841    worker_tx: mpsc::Sender<WorkerCommand>,
842    workers: Arc<DashMap<BatchKey, PendingWorker>>,
843    mut rx: mpsc::Receiver<WorkerCommand>,
844    shutdown: broadcast::Sender<()>,
845    partition_manager: PartitionRuleManagerRef,
846    node_manager: NodeManagerRef,
847    catalog_manager: CatalogManagerRef,
848    flush_interval: Duration,
849    worker_idle_timeout: Duration,
850    max_batch_rows: usize,
851    flush_semaphore: Arc<Semaphore>,
852) {
853    tokio::spawn(async move {
854        let mut batch = None;
855        let mut interval = tokio::time::interval(flush_interval);
856        let mut shutdown_rx = shutdown.subscribe();
857        let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
858        let idle_timer = tokio::time::sleep_until(idle_deadline);
859        tokio::pin!(idle_timer);
860
861        loop {
862            tokio::select! {
863                cmd = rx.recv() => {
864                    match cmd {
865                        Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
866                            idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
867
868                            let pending_batch = batch.get_or_insert_with(||{
869                                PENDING_BATCHES.inc();
870                                PendingBatch::new(ctx)
871                            });
872
873                            pending_batch.waiters.push(FlushWaiter { response_tx, _permit });
874
875                            for (table_name, table_id, record_batch) in table_batches {
876                                let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
877                                    table_name,
878                                    table_id,
879                                    batches: Vec::new(),
880                                    row_count: 0,
881                                });
882                                entry.row_count += record_batch.num_rows();
883                                entry.batches.push(record_batch);
884                            }
885
886                            pending_batch.total_row_count += total_rows;
887                            PENDING_ROWS.add(total_rows as i64);
888
889                            if pending_batch.total_row_count >= max_batch_rows
890                                && let Some(flush) = drain_batch(&mut batch) {
891                                    spawn_flush(
892                                        flush,
893                                        partition_manager.clone(),
894                                        node_manager.clone(),
895                                        catalog_manager.clone(),
896                                        flush_semaphore.clone(),
897                                    ).await;
898                            }
899                        }
900                        None => {
901                            if let Some(flush) = drain_batch(&mut batch) {
902                                flush_batch(
903                                    flush,
904                                    partition_manager.clone(),
905                                    node_manager.clone(),
906                                    catalog_manager.clone(),
907                                ).await;
908                            }
909                            break;
910                        }
911                    }
912                }
913                _ = &mut idle_timer => {
914                    if !should_close_worker_on_idle_timeout(
915                        batch.as_ref().map_or(0, |batch| batch.total_row_count),
916                        rx.len(),
917                    ) {
918                        idle_timer
919                            .as_mut()
920                            .reset(tokio::time::Instant::now() + worker_idle_timeout);
921                        continue;
922                    }
923
924                    debug!(
925                        "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
926                        key.catalog,
927                        key.schema,
928                        key.physical_table
929                    );
930                    break;
931                }
932                _ = interval.tick() => {
933                    if batch
934                        .as_ref()
935                        .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval)
936                        && let Some(flush) = drain_batch(&mut batch) {
937                            spawn_flush(
938                                flush,
939                                partition_manager.clone(),
940                                node_manager.clone(),
941                                catalog_manager.clone(),
942                                flush_semaphore.clone(),
943                            ).await;
944                    }
945                }
946                _ = shutdown_rx.recv() => {
947                    if let Some(flush) = drain_batch(&mut batch) {
948                        flush_batch(
949                            flush,
950                            partition_manager.clone(),
951                            node_manager.clone(),
952                            catalog_manager.clone(),
953                        ).await;
954                    }
955                    break;
956                }
957            }
958        }
959
960        remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
961    });
962}
963
964fn remove_worker_if_same_channel(
965    workers: &DashMap<BatchKey, PendingWorker>,
966    key: &BatchKey,
967    worker_tx: &mpsc::Sender<WorkerCommand>,
968) -> bool {
969    if let Some(worker) = workers.get(key)
970        && worker.tx.same_channel(worker_tx)
971    {
972        drop(worker);
973        workers.remove(key);
974        PENDING_WORKERS.set(workers.len() as i64);
975        return true;
976    }
977
978    false
979}
980
981fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
982    total_row_count == 0 && queued_requests == 0
983}
984
985fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
986    let batch = batch.take()?;
987    let total_row_count = batch.total_row_count;
988
989    if total_row_count == 0 {
990        return None;
991    }
992
993    let table_batches = batch.tables.into_values().collect();
994    let waiters = batch.waiters;
995
996    PENDING_ROWS.sub(total_row_count as i64);
997    PENDING_BATCHES.dec();
998
999    Some(FlushBatch {
1000        table_batches,
1001        total_row_count,
1002        db_string: batch.db_string,
1003        ctx: batch.ctx,
1004        waiters,
1005    })
1006}
1007
1008async fn spawn_flush(
1009    flush: FlushBatch,
1010    partition_manager: PartitionRuleManagerRef,
1011    node_manager: NodeManagerRef,
1012    catalog_manager: CatalogManagerRef,
1013    semaphore: Arc<Semaphore>,
1014) {
1015    match semaphore.acquire_owned().await {
1016        Ok(permit) => {
1017            tokio::spawn(async move {
1018                let _permit = permit;
1019                flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1020            });
1021        }
1022        Err(err) => {
1023            warn!(err; "Flush semaphore closed, flushing inline");
1024            flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1025        }
1026    }
1027}
1028
1029struct FlushRegionWrite {
1030    datanode: Peer,
1031    request: RegionRequest,
1032}
1033
1034struct PlannedRegionBatch {
1035    region_id: RegionId,
1036    batch: RecordBatch,
1037}
1038
1039#[cfg(test)]
1040impl PlannedRegionBatch {
1041    fn num_rows(&self) -> usize {
1042        self.batch.num_rows()
1043    }
1044}
1045
1046struct ResolvedRegionBatch {
1047    planned: PlannedRegionBatch,
1048    datanode: Peer,
1049}
1050
1051fn should_dispatch_concurrently(region_write_count: usize) -> bool {
1052    region_write_count > 1
1053}
1054
1055/// Classifies columns in a logical-table batch for sparse primary-key conversion.
1056///
1057/// Returns:
1058/// - `Vec<TagColumnInfo>`: all Utf8 tag columns sorted by tag name, used for
1059///   TSID and sparse primary-key encoding.
1060/// - `SmallVec<[usize; 3]>`: indices of columns copied into the physical batch
1061///   after `__primary_key`, ordered as `[greptime_timestamp, greptime_value,
1062///   partition_tag_columns...]`.
1063fn columns_taxonomy(
1064    batch_schema: &Arc<ArrowSchema>,
1065    table_name: &str,
1066    name_to_ids: &HashMap<String, u32>,
1067    partition_columns: &HashSet<&str>,
1068) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
1069    let mut tag_columns = Vec::new();
1070    let mut essential_column_indices =
1071        SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
1072    // Placeholder for greptime_timestamp and greptime_value
1073    essential_column_indices.push(0);
1074    essential_column_indices.push(0);
1075
1076    let mut timestamp_index = None;
1077    let mut value_index = None;
1078
1079    for (index, field) in batch_schema.fields().iter().enumerate() {
1080        match field.data_type() {
1081            ArrowDataType::Utf8 => {
1082                let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
1083                    error::InvalidPromRemoteRequestSnafu {
1084                        msg: format!(
1085                            "Column '{}' from logical table '{}' not found in physical table column IDs",
1086                            field.name(),
1087                            table_name
1088                        ),
1089                    }
1090                })?;
1091                tag_columns.push(TagColumnInfo {
1092                    name: field.name().clone(),
1093                    index,
1094                    column_id,
1095                });
1096
1097                if partition_columns.contains(field.name().as_str()) {
1098                    essential_column_indices.push(index);
1099                }
1100            }
1101            ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
1102                ensure!(
1103                    timestamp_index.replace(index).is_none(),
1104                    error::InvalidPromRemoteRequestSnafu {
1105                        msg: format!(
1106                            "Duplicated timestamp column in logical table '{}' batch schema",
1107                            table_name
1108                        ),
1109                    }
1110                );
1111            }
1112            ArrowDataType::Float64 => {
1113                ensure!(
1114                    value_index.replace(index).is_none(),
1115                    error::InvalidPromRemoteRequestSnafu {
1116                        msg: format!(
1117                            "Duplicated value column in logical table '{}' batch schema",
1118                            table_name
1119                        ),
1120                    }
1121                );
1122            }
1123            datatype => {
1124                return error::InvalidPromRemoteRequestSnafu {
1125                    msg: format!(
1126                        "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1127                        table_name
1128                    ),
1129                }
1130                .fail();
1131            }
1132        }
1133    }
1134
1135    let timestamp_index =
1136        timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1137            msg: format!(
1138                "Missing essential column '{}' in logical table '{}' batch schema",
1139                greptime_timestamp(),
1140                table_name
1141            ),
1142        })?;
1143    let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1144        msg: format!(
1145            "Missing essential column '{}' in logical table '{}' batch schema",
1146            greptime_value(),
1147            table_name
1148        ),
1149    })?;
1150
1151    tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1152
1153    essential_column_indices[0] = timestamp_index;
1154    essential_column_indices[1] = value_index;
1155
1156    Ok((tag_columns, essential_column_indices))
1157}
1158
1159fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1160    ensure!(
1161        batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1162        error::InternalSnafu {
1163            err_msg: format!(
1164                "Expected at least {} columns in physical batch, got {}",
1165                PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1166                batch.num_columns()
1167            ),
1168        }
1169    );
1170    let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1171    batch.project(&essential_indices).context(error::ArrowSnafu)
1172}
1173
1174async fn flush_region_writes_concurrently(
1175    node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1176    writes: Vec<FlushRegionWrite>,
1177) -> Result<usize> {
1178    let mut affected_rows = 0;
1179    if !should_dispatch_concurrently(writes.len()) {
1180        for write in writes {
1181            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1182                .with_label_values(&["flush_write_region"])
1183                .start_timer();
1184            affected_rows += node_manager
1185                .handle(&write.datanode, write.request)
1186                .await?
1187                .affected_rows;
1188        }
1189        return Ok(affected_rows);
1190    }
1191
1192    let write_futures = writes.into_iter().map(|write| async move {
1193        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1194            .with_label_values(&["flush_write_region"])
1195            .start_timer();
1196
1197        let response = node_manager.handle(&write.datanode, write.request).await?;
1198        Ok::<_, Error>(response.affected_rows)
1199    });
1200
1201    // todo(hl): should be bounded.
1202    let affected_rows = futures::future::try_join_all(write_futures)
1203        .await?
1204        .into_iter()
1205        .sum();
1206    Ok(affected_rows)
1207}
1208
1209async fn flush_batch(
1210    flush: FlushBatch,
1211    partition_manager: PartitionRuleManagerRef,
1212    node_manager: NodeManagerRef,
1213    catalog_manager: CatalogManagerRef,
1214) {
1215    let FlushBatch {
1216        table_batches,
1217        total_row_count,
1218        db_string,
1219        ctx,
1220        waiters,
1221    } = flush;
1222    let start = Instant::now();
1223
1224    // Physical-table-level flush: transform all logical table batches
1225    // into physical format and write them together.
1226    let physical_table_name = ctx
1227        .extension(PHYSICAL_TABLE_KEY)
1228        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1229        .to_string();
1230    let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager };
1231    let node_requester = NodeManagerPhysicalFlushAdapter { node_manager };
1232    let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager };
1233    let result = flush_batch_physical(
1234        &table_batches,
1235        &physical_table_name,
1236        &ctx,
1237        &partition_provider,
1238        &node_requester,
1239        &catalog_provider,
1240    )
1241    .await;
1242
1243    let elapsed = start.elapsed().as_secs_f64();
1244    FLUSH_ELAPSED.observe(elapsed);
1245
1246    if result.is_err() {
1247        FLUSH_FAILURES.inc();
1248        FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
1249    } else if let Ok(affected_rows) = &result {
1250        FLUSH_TOTAL.inc();
1251        FLUSH_ROWS.observe(total_row_count as f64);
1252        operator::metrics::DIST_INGEST_ROW_COUNT
1253            .with_label_values(&[db_string.as_str()])
1254            .inc_by(*affected_rows as u64);
1255    }
1256
1257    debug!(
1258        "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1259        total_row_count, elapsed
1260    );
1261
1262    notify_waiters(waiters, result.map(|_| ()));
1263}
1264
1265/// Flushes a batch of logical table rows by transforming them into the physical table format
1266/// and writing them to the appropriate datanode regions.
1267///
1268/// This function performs the end-to-end physical flush pipeline:
1269/// 1. Resolves the physical table metadata and column ID mapping.
1270/// 2. Fetches the physical table's partition rule.
1271/// 3. Transforms each logical table batch into the physical (sparse primary key) format.
1272/// 4. Concatenates all transformed batches into a single combined batch.
1273/// 5. Splits the combined batch by partition rule and sends region write requests
1274///    concurrently to the target datanodes.
1275pub async fn flush_batch_physical(
1276    table_batches: &[TableBatch],
1277    physical_table_name: &str,
1278    ctx: &QueryContextRef,
1279    partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1280    node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1281    catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
1282) -> Result<usize> {
1283    // 1. Resolve the physical table and get column ID mapping
1284    let physical_table = {
1285        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1286            .with_label_values(&["flush_physical_resolve_table"])
1287            .start_timer();
1288        catalog_manager
1289            .physical_table(
1290                ctx.current_catalog(),
1291                &ctx.current_schema(),
1292                physical_table_name,
1293                ctx.as_ref(),
1294            )
1295            .await?
1296            .with_context(|| error::InternalSnafu {
1297                err_msg: format!(
1298                    "Physical table '{}' not found during pending flush",
1299                    physical_table_name
1300                ),
1301            })?
1302    };
1303
1304    let physical_table_info = physical_table.table_info;
1305    let name_to_ids = physical_table
1306        .col_name_to_ids
1307        .with_context(|| error::InternalSnafu {
1308            err_msg: format!(
1309                "Physical table '{}' has no column IDs for pending flush",
1310                physical_table_name
1311            ),
1312        })?;
1313
1314    // 2. Get the physical table's partition rule (one lookup instead of N)
1315    let partition_rule = {
1316        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1317            .with_label_values(&["flush_physical_fetch_partition_rule"])
1318            .start_timer();
1319        partition_manager
1320            .find_table_partition_rule(physical_table_info.as_ref())
1321            .await?
1322    };
1323    let partition_columns = partition_rule.partition_columns();
1324    let partition_columns_set: HashSet<&str> =
1325        partition_columns.iter().map(String::as_str).collect();
1326
1327    // 3. Transform each logical table batch into physical format
1328    let modified_batches =
1329        transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?;
1330
1331    // 4. Concatenate all modified batches (all share the same physical schema)
1332    let combined_batch = concat_modified_batches(&modified_batches)?;
1333
1334    // 5. Split by physical partition rule and send to regions
1335    let physical_table_id = physical_table_info.table_id();
1336    let planned_batches = plan_region_batches(
1337        combined_batch,
1338        physical_table_id,
1339        partition_rule.as_ref(),
1340        partition_columns,
1341    )?;
1342
1343    let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?;
1344    let region_writes = encode_region_write_requests(resolved_batches)?;
1345    flush_region_writes_concurrently(node_manager, region_writes).await
1346}
1347
1348/// Transforms logical table batches into physical format (sparse primary key encoding).
1349///
1350/// It identifies tag columns and essential columns (timestamp, value) for each logical batch
1351/// and applies sparse primary key modification.
1352fn transform_logical_batches_to_physical(
1353    table_batches: &[TableBatch],
1354    name_to_ids: &HashMap<String, u32>,
1355    partition_columns_set: &HashSet<&str>,
1356) -> Result<Vec<RecordBatch>> {
1357    let mut modified_batches: Vec<RecordBatch> =
1358        Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum());
1359
1360    let mut modify_elapsed = Duration::ZERO;
1361    let mut columns_taxonomy_elapsed = Duration::ZERO;
1362
1363    for table_batch in table_batches {
1364        let table_id = table_batch.table_id;
1365
1366        for batch in &table_batch.batches {
1367            let batch_schema = batch.schema();
1368            let start = Instant::now();
1369            let (tag_columns, essential_col_indices) = columns_taxonomy(
1370                &batch_schema,
1371                &table_batch.table_name,
1372                name_to_ids,
1373                partition_columns_set,
1374            )?;
1375
1376            columns_taxonomy_elapsed += start.elapsed();
1377            if tag_columns.is_empty() && essential_col_indices.is_empty() {
1378                continue;
1379            }
1380
1381            let modified = {
1382                let start = Instant::now();
1383                // The schema of modified batch is: __primary_key, timestamp, value, other partition columns...
1384                let batch = modify_batch_sparse(
1385                    batch.clone(),
1386                    table_id,
1387                    &tag_columns,
1388                    &essential_col_indices,
1389                )?;
1390                modify_elapsed += start.elapsed();
1391                batch
1392            };
1393
1394            modified_batches.push(modified);
1395        }
1396    }
1397
1398    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1399        .with_label_values(&["flush_physical_modify_batch"])
1400        .observe(modify_elapsed.as_secs_f64());
1401    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1402        .with_label_values(&["flush_physical_columns_taxonomy"])
1403        .observe(columns_taxonomy_elapsed.as_secs_f64());
1404
1405    ensure!(
1406        !modified_batches.is_empty(),
1407        error::InternalSnafu {
1408            err_msg: "No batches can be transformed during pending flush",
1409        }
1410    );
1411    Ok(modified_batches)
1412}
1413
1414/// Concatenates all modified batches into a single large batch.
1415///
1416/// All modified batches share the same physical schema.
1417fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result<RecordBatch> {
1418    let combined_schema = modified_batches[0].schema();
1419    let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1420        .with_label_values(&["flush_physical_concat_all"])
1421        .start_timer();
1422    concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu)
1423}
1424
1425fn split_combined_batch_by_region(
1426    combined_batch: &RecordBatch,
1427    partition_rule: &dyn partition::partition::PartitionRule,
1428) -> Result<HashMap<u32, partition::partition::RegionMask>> {
1429    let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1430        .with_label_values(&["flush_physical_split_record_batch"])
1431        .start_timer();
1432    let map = partition_rule.split_record_batch(combined_batch)?;
1433    Ok(map)
1434}
1435
1436fn prepare_physical_region_routing_batch(
1437    combined_batch: RecordBatch,
1438    partition_columns: &[String],
1439) -> Result<RecordBatch> {
1440    if partition_columns.is_empty() {
1441        return Ok(combined_batch);
1442    }
1443    strip_partition_columns_from_batch(combined_batch)
1444}
1445
1446fn plan_region_batch(
1447    stripped_batch: &RecordBatch,
1448    physical_table_id: TableId,
1449    region_number: u32,
1450    mask: &partition::partition::RegionMask,
1451) -> Result<Option<PlannedRegionBatch>> {
1452    if mask.select_none() {
1453        return Ok(None);
1454    }
1455
1456    let region_batch = if mask.select_all() {
1457        stripped_batch.clone()
1458    } else {
1459        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1460            .with_label_values(&["flush_physical_filter_record_batch"])
1461            .start_timer();
1462        filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)?
1463    };
1464
1465    let row_count = region_batch.num_rows();
1466    if row_count == 0 {
1467        return Ok(None);
1468    }
1469
1470    Ok(Some(PlannedRegionBatch {
1471        region_id: RegionId::new(physical_table_id, region_number),
1472        batch: region_batch,
1473    }))
1474}
1475
1476fn plan_region_batches(
1477    combined_batch: RecordBatch,
1478    physical_table_id: TableId,
1479    partition_rule: &dyn partition::partition::PartitionRule,
1480    partition_columns: &[String],
1481) -> Result<Vec<PlannedRegionBatch>> {
1482    let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?;
1483    let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?;
1484
1485    let mut planned_batches = Vec::new();
1486    for (region_number, mask) in region_masks {
1487        if let Some(planned_batch) =
1488            plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)?
1489        {
1490            planned_batches.push(planned_batch);
1491        }
1492    }
1493
1494    Ok(planned_batches)
1495}
1496
1497async fn resolve_region_targets(
1498    planned_batches: Vec<PlannedRegionBatch>,
1499    partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1500) -> Result<Vec<ResolvedRegionBatch>> {
1501    let mut resolved_batches = Vec::with_capacity(planned_batches.len());
1502    for planned in planned_batches {
1503        let datanode = {
1504            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1505                .with_label_values(&["flush_physical_resolve_region_leader"])
1506                .start_timer();
1507            partition_manager
1508                .find_region_leader(planned.region_id)
1509                .await?
1510        };
1511
1512        resolved_batches.push(ResolvedRegionBatch { planned, datanode });
1513    }
1514
1515    Ok(resolved_batches)
1516}
1517
1518fn encode_region_write_requests(
1519    resolved_batches: Vec<ResolvedRegionBatch>,
1520) -> Result<Vec<FlushRegionWrite>> {
1521    let mut region_writes = Vec::with_capacity(resolved_batches.len());
1522    for resolved in resolved_batches {
1523        let region_id = resolved.planned.region_id;
1524        let (schema_bytes, data_header, payload) = {
1525            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1526                .with_label_values(&["flush_physical_encode_ipc"])
1527                .start_timer();
1528            record_batch_to_ipc(resolved.planned.batch)?
1529        };
1530
1531        let request = RegionRequest {
1532            header: Some(RegionRequestHeader {
1533                tracing_context: TracingContext::from_current_span().to_w3c(),
1534                ..Default::default()
1535            }),
1536            body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1537                region_id: region_id.as_u64(),
1538                partition_expr_version: None,
1539                // Set aligned_schema_version to None so that datanode will check the batch schema again to see if any
1540                // column is missing.
1541                aligned_schema_version: None,
1542                body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1543                    schema: schema_bytes,
1544                    data_header,
1545                    payload,
1546                })),
1547            })),
1548        };
1549
1550        region_writes.push(FlushRegionWrite {
1551            datanode: resolved.datanode,
1552            request,
1553        });
1554    }
1555
1556    Ok(region_writes)
1557}
1558
1559fn notify_waiters(waiters: Vec<FlushWaiter>, result: Result<()>) {
1560    let shared_result = result.map_err(Arc::new);
1561    for waiter in waiters {
1562        let _ = waiter.response_tx.send(match &shared_result {
1563            Ok(()) => Ok(()),
1564            Err(error) => Err(Arc::clone(error)),
1565        });
1566        // waiter._permit is dropped here, releasing the inflight semaphore slot
1567    }
1568}
1569
1570fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1571    let mut encoder = FlightEncoder::default();
1572    let schema = encoder.encode_schema(record_batch.schema().as_ref());
1573    let mut iter = encoder
1574        .encode(FlightMessage::RecordBatch(record_batch))
1575        .into_iter();
1576    let Some(flight_data) = iter.next() else {
1577        return Err(Error::Internal {
1578            err_msg: "Failed to encode empty flight data".to_string(),
1579        });
1580    };
1581    if iter.next().is_some() {
1582        return Err(Error::NotSupported {
1583            feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1584        });
1585    }
1586
1587    Ok((
1588        schema.data_header,
1589        flight_data.data_header,
1590        flight_data.data_body,
1591    ))
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596    use std::collections::{HashMap, HashSet};
1597    use std::sync::Arc;
1598    use std::sync::atomic::{AtomicUsize, Ordering};
1599    use std::time::{Duration, Instant};
1600
1601    use api::region::RegionResponse;
1602    use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1603    use api::v1::meta::Peer;
1604    use api::v1::region::{InsertRequests, RegionRequest, region_request};
1605    use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1606    use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray};
1607    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1608    use arrow::record_batch::RecordBatch;
1609    use async_trait::async_trait;
1610    use catalog::error::Result as CatalogResult;
1611    use common_meta::error::Result as MetaResult;
1612    use common_meta::node_manager::{
1613        Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1614    };
1615    use common_query::request::QueryRequest;
1616    use common_recordbatch::SendableRecordBatchStream;
1617    use dashmap::DashMap;
1618    use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
1619    use partition::error::Result as PartitionResult;
1620    use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
1621    use smallvec::SmallVec;
1622    use snafu::ResultExt;
1623    use store_api::storage::RegionId;
1624    use table::metadata::TableId;
1625    use table::test_util::table_info::test_table_info;
1626    use tokio::sync::{Semaphore, mpsc, oneshot};
1627    use tokio::time::sleep;
1628
1629    use super::{
1630        BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher,
1631        PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester,
1632        PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch,
1633        ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch,
1634        encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently,
1635        plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1636        should_dispatch_concurrently, strip_partition_columns_from_batch,
1637        transform_logical_batches_to_physical,
1638    };
1639    use crate::error;
1640
1641    fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1642        Rows {
1643            schema: vec![ColumnSchema {
1644                column_name: schema_name.to_string(),
1645                ..Default::default()
1646            }],
1647            rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1648        }
1649    }
1650
1651    fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch {
1652        let schema = Arc::new(ArrowSchema::new(vec![
1653            Field::new(
1654                "greptime_timestamp",
1655                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1656                false,
1657            ),
1658            Field::new("greptime_value", ArrowDataType::Float64, true),
1659            Field::new(tag_name, ArrowDataType::Utf8, true),
1660        ]));
1661
1662        RecordBatch::try_new(
1663            schema,
1664            vec![
1665                Arc::new(TimestampMillisecondArray::from(vec![ts])),
1666                Arc::new(arrow::array::Float64Array::from(vec![val])),
1667                Arc::new(StringArray::from(vec![tag_value])),
1668            ],
1669        )
1670        .unwrap()
1671    }
1672
1673    fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata {
1674        let schema = Arc::new(
1675            DtSchema::try_new(vec![
1676                DtColumnSchema::new(
1677                    "__primary_key",
1678                    datatypes::prelude::ConcreteDataType::binary_datatype(),
1679                    false,
1680                ),
1681                DtColumnSchema::new(
1682                    "greptime_timestamp",
1683                    datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(),
1684                    false,
1685                ),
1686                DtColumnSchema::new(
1687                    "greptime_value",
1688                    datatypes::prelude::ConcreteDataType::float64_datatype(),
1689                    true,
1690                ),
1691                DtColumnSchema::new(
1692                    "tag1",
1693                    datatypes::prelude::ConcreteDataType::string_datatype(),
1694                    true,
1695                ),
1696            ])
1697            .unwrap(),
1698        );
1699        let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema);
1700        table_info.meta.column_ids = vec![0, 1, 2, 3];
1701
1702        PhysicalTableMetadata {
1703            table_info: Arc::new(table_info),
1704            col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])),
1705        }
1706    }
1707
1708    struct MockFlushCatalogProvider {
1709        table: Option<PhysicalTableMetadata>,
1710    }
1711
1712    #[async_trait]
1713    impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider {
1714        async fn physical_table(
1715            &self,
1716            _catalog: &str,
1717            _schema: &str,
1718            _table_name: &str,
1719            _query_ctx: &session::context::QueryContext,
1720        ) -> CatalogResult<Option<PhysicalTableMetadata>> {
1721            Ok(self.table.clone())
1722        }
1723    }
1724
1725    struct SingleRegionPartitionRule;
1726
1727    impl PartitionRule for SingleRegionPartitionRule {
1728        fn as_any(&self) -> &dyn std::any::Any {
1729            self
1730        }
1731
1732        fn partition_columns(&self) -> &[String] {
1733            &[]
1734        }
1735
1736        fn find_region(
1737            &self,
1738            _values: &[datatypes::prelude::Value],
1739        ) -> partition::error::Result<store_api::storage::RegionNumber> {
1740            unimplemented!()
1741        }
1742
1743        fn split_record_batch(
1744            &self,
1745            record_batch: &RecordBatch,
1746        ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1747        {
1748            Ok(HashMap::from([(
1749                1,
1750                RegionMask::new(
1751                    arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]),
1752                    record_batch.num_rows(),
1753                ),
1754            )]))
1755        }
1756    }
1757
1758    struct TwoRegionPartitionRule {
1759        partition_columns: Vec<String>,
1760    }
1761
1762    impl PartitionRule for TwoRegionPartitionRule {
1763        fn as_any(&self) -> &dyn std::any::Any {
1764            self
1765        }
1766
1767        fn partition_columns(&self) -> &[String] {
1768            &self.partition_columns
1769        }
1770
1771        fn find_region(
1772            &self,
1773            _values: &[datatypes::prelude::Value],
1774        ) -> partition::error::Result<store_api::storage::RegionNumber> {
1775            unimplemented!()
1776        }
1777
1778        fn split_record_batch(
1779            &self,
1780            _record_batch: &RecordBatch,
1781        ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1782        {
1783            Ok(HashMap::from([
1784                (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)),
1785                (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)),
1786                (
1787                    3,
1788                    RegionMask::new(BooleanArray::from(vec![false, false]), 0),
1789                ),
1790            ]))
1791        }
1792    }
1793
1794    struct MockFlushPartitionProvider {
1795        partition_rule_calls: Arc<AtomicUsize>,
1796        region_leader_calls: Arc<AtomicUsize>,
1797    }
1798
1799    #[async_trait]
1800    impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider {
1801        async fn find_table_partition_rule(
1802            &self,
1803            _table_info: &table::metadata::TableInfo,
1804        ) -> PartitionResult<PartitionRuleRef> {
1805            self.partition_rule_calls.fetch_add(1, Ordering::SeqCst);
1806            Ok(Arc::new(SingleRegionPartitionRule))
1807        }
1808
1809        async fn find_region_leader(&self, _region_id: RegionId) -> error::Result<Peer> {
1810            self.region_leader_calls.fetch_add(1, Ordering::SeqCst);
1811            Ok(Peer {
1812                id: 1,
1813                addr: "node-1".to_string(),
1814            })
1815        }
1816    }
1817
1818    #[derive(Default)]
1819    struct MockFlushNodeRequester {
1820        writes: Arc<AtomicUsize>,
1821    }
1822
1823    #[async_trait]
1824    impl PhysicalFlushNodeRequester for MockFlushNodeRequester {
1825        async fn handle(
1826            &self,
1827            _peer: &Peer,
1828            _request: RegionRequest,
1829        ) -> error::Result<RegionResponse> {
1830            self.writes.fetch_add(1, Ordering::SeqCst);
1831            Ok(RegionResponse::new(0))
1832        }
1833    }
1834
1835    #[test]
1836    fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1837        let requests = RowInsertRequests {
1838            inserts: vec![
1839                RowInsertRequest {
1840                    table_name: "cpu".to_string(),
1841                    rows: Some(mock_rows(2, "host")),
1842                },
1843                RowInsertRequest {
1844                    table_name: "mem".to_string(),
1845                    rows: Some(mock_rows(0, "host")),
1846                },
1847                RowInsertRequest {
1848                    table_name: "disk".to_string(),
1849                    rows: None,
1850                },
1851            ],
1852        };
1853
1854        let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1855
1856        assert_eq!(2, total_rows);
1857        assert_eq!(1, table_rows.len());
1858        assert_eq!("cpu", table_rows[0].0);
1859        assert_eq!(2, table_rows[0].1.rows.len());
1860    }
1861
1862    #[test]
1863    fn test_drain_batch_takes_initialized_pending_batch_from_option() {
1864        let ctx = session::context::QueryContext::arc();
1865        let (response_tx, _response_rx) = oneshot::channel();
1866        let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap();
1867        let mut batch = Some(PendingBatch {
1868            tables: HashMap::from([(
1869                "cpu".to_string(),
1870                TableBatch {
1871                    table_name: "cpu".to_string(),
1872                    table_id: 42,
1873                    batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
1874                    row_count: 1,
1875                },
1876            )]),
1877            created_at: Instant::now(),
1878            total_row_count: 1,
1879            db_string: ctx.get_db_string(),
1880            ctx: ctx.clone(),
1881            waiters: vec![FlushWaiter {
1882                response_tx,
1883                _permit: permit,
1884            }],
1885        });
1886
1887        let flush = drain_batch(&mut batch).unwrap();
1888
1889        assert!(batch.is_none());
1890        assert_eq!(1, flush.total_row_count);
1891        assert_eq!(1, flush.table_batches.len());
1892        assert_eq!(ctx.get_db_string(), flush.db_string);
1893        assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
1894    }
1895
1896    #[derive(Clone)]
1897    struct ConcurrentMockDatanode {
1898        delay: Duration,
1899        inflight: Arc<AtomicUsize>,
1900        max_inflight: Arc<AtomicUsize>,
1901    }
1902
1903    #[async_trait]
1904    impl Datanode for ConcurrentMockDatanode {
1905        async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1906            let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1907            loop {
1908                let max = self.max_inflight.load(Ordering::SeqCst);
1909                if now <= max {
1910                    break;
1911                }
1912                if self
1913                    .max_inflight
1914                    .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1915                    .is_ok()
1916                {
1917                    break;
1918                }
1919            }
1920
1921            sleep(self.delay).await;
1922            self.inflight.fetch_sub(1, Ordering::SeqCst);
1923            Ok(RegionResponse::new(0))
1924        }
1925
1926        async fn handle_query(
1927            &self,
1928            _request: QueryRequest,
1929        ) -> MetaResult<SendableRecordBatchStream> {
1930            unimplemented!()
1931        }
1932    }
1933
1934    #[derive(Clone)]
1935    struct ConcurrentMockNodeManager {
1936        datanodes: Arc<HashMap<u64, DatanodeRef>>,
1937    }
1938
1939    #[async_trait]
1940    impl DatanodeManager for ConcurrentMockNodeManager {
1941        async fn datanode(&self, node: &Peer) -> DatanodeRef {
1942            self.datanodes
1943                .get(&node.id)
1944                .expect("datanode not found")
1945                .clone()
1946        }
1947    }
1948
1949    struct NoopFlownode;
1950
1951    #[async_trait]
1952    impl Flownode for NoopFlownode {
1953        async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1954            unimplemented!()
1955        }
1956
1957        async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1958            unimplemented!()
1959        }
1960
1961        async fn handle_mark_window_dirty(
1962            &self,
1963            _req: DirtyWindowRequests,
1964        ) -> MetaResult<FlowResponse> {
1965            unimplemented!()
1966        }
1967    }
1968
1969    #[async_trait]
1970    impl FlownodeManager for ConcurrentMockNodeManager {
1971        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1972            Arc::new(NoopFlownode)
1973        }
1974    }
1975
1976    #[async_trait]
1977    impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager {
1978        async fn handle(
1979            &self,
1980            peer: &Peer,
1981            request: RegionRequest,
1982        ) -> error::Result<RegionResponse> {
1983            let datanode = self.datanode(peer).await;
1984            datanode
1985                .handle(request)
1986                .await
1987                .context(error::CommonMetaSnafu)
1988        }
1989    }
1990
1991    #[test]
1992    fn test_remove_worker_if_same_channel_removes_matching_entry() {
1993        let workers = DashMap::new();
1994        let key = BatchKey {
1995            catalog: "greptime".to_string(),
1996            schema: "public".to_string(),
1997            physical_table: "phy".to_string(),
1998        };
1999
2000        let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
2001        workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
2002
2003        assert!(remove_worker_if_same_channel(&workers, &key, &tx));
2004        assert!(!workers.contains_key(&key));
2005    }
2006
2007    #[test]
2008    fn test_remove_worker_if_same_channel_keeps_newer_entry() {
2009        let workers = DashMap::new();
2010        let key = BatchKey {
2011            catalog: "greptime".to_string(),
2012            schema: "public".to_string(),
2013            physical_table: "phy".to_string(),
2014        };
2015
2016        let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
2017        let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
2018        workers.insert(
2019            key.clone(),
2020            PendingWorker {
2021                tx: fresh_tx.clone(),
2022            },
2023        );
2024
2025        assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
2026        assert!(workers.contains_key(&key));
2027        assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
2028    }
2029
2030    #[test]
2031    fn test_worker_idle_timeout_close_decision() {
2032        assert!(should_close_worker_on_idle_timeout(0, 0));
2033        assert!(!should_close_worker_on_idle_timeout(1, 0));
2034        assert!(!should_close_worker_on_idle_timeout(0, 1));
2035    }
2036
2037    #[tokio::test]
2038    async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
2039        let inflight = Arc::new(AtomicUsize::new(0));
2040        let max_inflight = Arc::new(AtomicUsize::new(0));
2041        let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2042            delay: Duration::from_millis(100),
2043            inflight: inflight.clone(),
2044            max_inflight: max_inflight.clone(),
2045        });
2046        let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2047            delay: Duration::from_millis(100),
2048            inflight,
2049            max_inflight: max_inflight.clone(),
2050        });
2051
2052        let mut datanodes = HashMap::new();
2053        datanodes.insert(1, datanode1);
2054        datanodes.insert(2, datanode2);
2055        let node_manager = Arc::new(ConcurrentMockNodeManager {
2056            datanodes: Arc::new(datanodes),
2057        });
2058
2059        let writes = vec![
2060            FlushRegionWrite {
2061                datanode: Peer {
2062                    id: 1,
2063                    addr: "node1".to_string(),
2064                },
2065                request: RegionRequest::default(),
2066            },
2067            FlushRegionWrite {
2068                datanode: Peer {
2069                    id: 2,
2070                    addr: "node2".to_string(),
2071                },
2072                request: RegionRequest::default(),
2073            },
2074        ];
2075
2076        flush_region_writes_concurrently(node_manager.as_ref(), writes)
2077            .await
2078            .unwrap();
2079        assert!(max_inflight.load(Ordering::SeqCst) >= 2);
2080    }
2081
2082    #[test]
2083    fn test_should_dispatch_concurrently_by_region_count() {
2084        assert!(!should_dispatch_concurrently(0));
2085        assert!(!should_dispatch_concurrently(1));
2086        assert!(should_dispatch_concurrently(2));
2087    }
2088
2089    #[test]
2090    fn test_strip_partition_columns_from_batch_removes_partition_tags() {
2091        let batch = RecordBatch::try_new(
2092            Arc::new(ArrowSchema::new(vec![
2093                Field::new("__primary_key", ArrowDataType::Binary, false),
2094                Field::new(
2095                    "greptime_timestamp",
2096                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2097                    false,
2098                ),
2099                Field::new("greptime_value", ArrowDataType::Float64, true),
2100                Field::new("host", ArrowDataType::Utf8, true),
2101            ])),
2102            vec![
2103                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2104                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2105                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2106                Arc::new(StringArray::from(vec!["node-1"])),
2107            ],
2108        )
2109        .unwrap();
2110
2111        let stripped = strip_partition_columns_from_batch(batch).unwrap();
2112
2113        assert_eq!(3, stripped.num_columns());
2114        assert_eq!("__primary_key", stripped.schema().field(0).name());
2115        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2116        assert_eq!("greptime_value", stripped.schema().field(2).name());
2117    }
2118
2119    #[test]
2120    fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
2121        let batch = RecordBatch::try_new(
2122            Arc::new(ArrowSchema::new(vec![
2123                Field::new("__primary_key", ArrowDataType::Binary, false),
2124                Field::new(
2125                    "greptime_timestamp",
2126                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2127                    false,
2128                ),
2129                Field::new("greptime_value", ArrowDataType::Float64, true),
2130                Field::new("host", ArrowDataType::Utf8, true),
2131            ])),
2132            vec![
2133                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2134                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2135                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2136                Arc::new(StringArray::from(vec!["node-1"])),
2137            ],
2138        )
2139        .unwrap();
2140
2141        let stripped = strip_partition_columns_from_batch(batch).unwrap();
2142
2143        assert_eq!(3, stripped.num_columns());
2144        assert_eq!("__primary_key", stripped.schema().field(0).name());
2145        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2146        assert_eq!("greptime_value", stripped.schema().field(2).name());
2147    }
2148
2149    #[test]
2150    fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
2151        let schema = Arc::new(ArrowSchema::new(vec![
2152            Field::new(
2153                "greptime_timestamp",
2154                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2155                false,
2156            ),
2157            Field::new("greptime_value", ArrowDataType::Float64, true),
2158            Field::new("host", ArrowDataType::Utf8, true),
2159            Field::new("region", ArrowDataType::Utf8, true),
2160        ]));
2161        let name_to_ids =
2162            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2163        let partition_columns = HashSet::from(["host"]);
2164
2165        let (tag_columns, non_tag_indices) =
2166            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2167
2168        assert_eq!(2, tag_columns.len());
2169        assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
2170    }
2171
2172    #[test]
2173    fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
2174        let schema = Arc::new(ArrowSchema::new(vec![
2175            Field::new("host", ArrowDataType::Utf8, true),
2176            Field::new("greptime_value", ArrowDataType::Float64, true),
2177            Field::new(
2178                "greptime_timestamp",
2179                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2180                false,
2181            ),
2182            Field::new("region", ArrowDataType::Utf8, true),
2183        ]));
2184        let name_to_ids =
2185            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2186        let partition_columns = HashSet::from(["host", "region"]);
2187
2188        let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
2189            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2190
2191        assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
2192    }
2193
2194    #[test]
2195    fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
2196        let schema = Arc::new(ArrowSchema::new(vec![
2197            Field::new(
2198                "greptime_timestamp",
2199                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2200                false,
2201            ),
2202            Field::new("greptime_value", ArrowDataType::Float64, true),
2203            Field::new("host", ArrowDataType::Utf8, true),
2204            Field::new("invalid", ArrowDataType::Boolean, true),
2205        ]));
2206        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2207        let partition_columns = HashSet::from(["host"]);
2208
2209        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2210
2211        assert!(matches!(
2212            result,
2213            Err(Error::InvalidPromRemoteRequest { .. })
2214        ));
2215    }
2216
2217    #[test]
2218    fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
2219        let schema = Arc::new(ArrowSchema::new(vec![
2220            Field::new("greptime_timestamp", ArrowDataType::Int64, false),
2221            Field::new("greptime_value", ArrowDataType::Float64, true),
2222            Field::new("host", ArrowDataType::Utf8, true),
2223        ]));
2224        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2225        let partition_columns = HashSet::from(["host"]);
2226
2227        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2228
2229        assert!(matches!(
2230            result,
2231            Err(Error::InvalidPromRemoteRequest { .. })
2232        ));
2233    }
2234
2235    #[test]
2236    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
2237        let schema = Arc::new(ArrowSchema::new(vec![
2238            Field::new(
2239                "ts1",
2240                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2241                false,
2242            ),
2243            Field::new(
2244                "ts2",
2245                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2246                false,
2247            ),
2248            Field::new("greptime_value", ArrowDataType::Float64, true),
2249            Field::new("host", ArrowDataType::Utf8, true),
2250        ]));
2251        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2252        let partition_columns = HashSet::from(["host"]);
2253
2254        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2255
2256        assert!(matches!(
2257            result,
2258            Err(Error::InvalidPromRemoteRequest { .. })
2259        ));
2260    }
2261
2262    #[test]
2263    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2264        let schema = Arc::new(ArrowSchema::new(vec![
2265            Field::new(
2266                "greptime_timestamp",
2267                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2268                false,
2269            ),
2270            Field::new("value1", ArrowDataType::Float64, true),
2271            Field::new("value2", ArrowDataType::Float64, true),
2272            Field::new("host", ArrowDataType::Utf8, true),
2273        ]));
2274        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2275        let partition_columns = HashSet::from(["host"]);
2276
2277        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2278
2279        assert!(matches!(
2280            result,
2281            Err(Error::InvalidPromRemoteRequest { .. })
2282        ));
2283    }
2284
2285    #[test]
2286    fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2287        use arrow::array::BinaryArray;
2288        use metric_engine::batch_modifier::modify_batch_sparse;
2289
2290        let schema1 = Arc::new(ArrowSchema::new(vec![
2291            Field::new(
2292                "greptime_timestamp",
2293                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2294                false,
2295            ),
2296            Field::new("greptime_value", ArrowDataType::Float64, true),
2297            Field::new("tag1", ArrowDataType::Utf8, true),
2298        ]));
2299
2300        let schema2 = Arc::new(ArrowSchema::new(vec![
2301            Field::new(
2302                "greptime_timestamp",
2303                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2304                false,
2305            ),
2306            Field::new("greptime_value", ArrowDataType::Float64, true),
2307            Field::new("tag1", ArrowDataType::Utf8, true),
2308            Field::new("tag2", ArrowDataType::Utf8, true),
2309        ]));
2310        let batch2 = RecordBatch::try_new(
2311            schema2.clone(),
2312            vec![
2313                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2314                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2315                Arc::new(StringArray::from(vec!["v1"])),
2316                Arc::new(StringArray::from(vec!["v2"])),
2317            ],
2318        )
2319        .unwrap();
2320
2321        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2322        let partition_columns = HashSet::new();
2323
2324        // A batch that only has tag1, same values as batch2 for ts and val.
2325        let batch3 = RecordBatch::try_new(
2326            schema1.clone(),
2327            vec![
2328                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2329                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2330                Arc::new(StringArray::from(vec!["v1"])),
2331            ],
2332        )
2333        .unwrap();
2334
2335        // Simulate the new loop logic in flush_batch_physical:
2336        // Resolve taxonomy FOR EACH BATCH.
2337        let (tag_columns2, indices2) =
2338            columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2339        let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2340
2341        let (tag_columns3, indices3) =
2342            columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2343        let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2344
2345        let pk2 = modified2
2346            .column(0)
2347            .as_any()
2348            .downcast_ref::<BinaryArray>()
2349            .unwrap();
2350        let pk3 = modified3
2351            .column(0)
2352            .as_any()
2353            .downcast_ref::<BinaryArray>()
2354            .unwrap();
2355
2356        // Now they SHOULD be different because tag2 is included in pk2 but not in pk3.
2357        assert_ne!(
2358            pk2.value(0),
2359            pk3.value(0),
2360            "PK should be different because batch2 has tag2!"
2361        );
2362    }
2363
2364    #[test]
2365    fn test_transform_logical_batches_to_physical_success() {
2366        let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2367
2368        let table_batches = vec![TableBatch {
2369            table_name: "t1".to_string(),
2370            table_id: 1,
2371            batches: vec![batch],
2372            row_count: 1,
2373        }];
2374
2375        let name_to_ids = HashMap::from([("tag1".to_string(), 1)]);
2376        let partition_columns = HashSet::new();
2377        let modified =
2378            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2379                .unwrap();
2380
2381        assert_eq!(1, modified.len());
2382        assert_eq!(3, modified[0].num_columns());
2383        assert_eq!("__primary_key", modified[0].schema().field(0).name());
2384        assert_eq!("greptime_timestamp", modified[0].schema().field(1).name());
2385        assert_eq!("greptime_value", modified[0].schema().field(2).name());
2386    }
2387
2388    #[test]
2389    fn test_transform_logical_batches_to_physical_taxonomy_failure() {
2390        let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2391
2392        let table_batches = vec![TableBatch {
2393            table_name: "t1".to_string(),
2394            table_id: 1,
2395            batches: vec![batch],
2396            row_count: 1,
2397        }];
2398
2399        // tag1 is missing from name_to_ids, causing columns_taxonomy to fail.
2400        let name_to_ids = HashMap::new();
2401        let partition_columns = HashSet::new();
2402        let err =
2403            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2404                .unwrap_err();
2405
2406        assert!(
2407            err.to_string()
2408                .contains("not found in physical table column IDs")
2409        );
2410    }
2411
2412    #[test]
2413    fn test_transform_logical_batches_to_physical_multiple_batches() {
2414        let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2415        let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2416
2417        let table_batches = vec![
2418            TableBatch {
2419                table_name: "t1".to_string(),
2420                table_id: 1,
2421                batches: vec![batch1],
2422                row_count: 1,
2423            },
2424            TableBatch {
2425                table_name: "t2".to_string(),
2426                table_id: 2,
2427                batches: vec![batch2],
2428                row_count: 1,
2429            },
2430        ];
2431
2432        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2433        let partition_columns = HashSet::new();
2434        let modified =
2435            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2436                .unwrap();
2437
2438        assert_eq!(2, modified.len());
2439    }
2440
2441    #[test]
2442    fn test_transform_logical_batches_to_physical_mixed_success_failure() {
2443        let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2444        let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2445
2446        let table_batches = vec![
2447            TableBatch {
2448                table_name: "t1".to_string(),
2449                table_id: 1,
2450                batches: vec![batch1],
2451                row_count: 1,
2452            },
2453            TableBatch {
2454                table_name: "t2".to_string(),
2455                table_id: 2,
2456                batches: vec![batch2],
2457                row_count: 1,
2458            },
2459        ];
2460
2461        // tag1 is missing from name_to_ids, causing batch1 to fail.
2462        let name_to_ids = HashMap::from([("tag2".to_string(), 2)]);
2463        let partition_columns = HashSet::new();
2464        let err =
2465            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2466                .unwrap_err();
2467
2468        assert!(err.to_string().contains("tag1"));
2469    }
2470
2471    #[tokio::test]
2472    async fn test_flush_batch_physical_uses_mockable_trait_dependencies() {
2473        let table_batches = vec![TableBatch {
2474            table_name: "t1".to_string(),
2475            table_id: 11,
2476            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2477            row_count: 1,
2478        }];
2479        let partition_calls = Arc::new(AtomicUsize::new(0));
2480        let leader_calls = Arc::new(AtomicUsize::new(0));
2481        let node = MockFlushNodeRequester::default();
2482        let ctx = session::context::QueryContext::arc();
2483
2484        flush_batch_physical(
2485            &table_batches,
2486            "phy",
2487            &ctx,
2488            &MockFlushPartitionProvider {
2489                partition_rule_calls: partition_calls.clone(),
2490                region_leader_calls: leader_calls.clone(),
2491            },
2492            &node,
2493            &MockFlushCatalogProvider {
2494                table: Some(mock_physical_table_metadata(1024)),
2495            },
2496        )
2497        .await
2498        .unwrap();
2499
2500        assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2501        assert_eq!(1, leader_calls.load(Ordering::SeqCst));
2502        assert_eq!(1, node.writes.load(Ordering::SeqCst));
2503    }
2504
2505    #[derive(Default)]
2506    struct AffectedRowsFlushNodeRequester {
2507        affected_rows: usize,
2508    }
2509
2510    #[async_trait]
2511    impl PhysicalFlushNodeRequester for AffectedRowsFlushNodeRequester {
2512        async fn handle(
2513            &self,
2514            _peer: &Peer,
2515            _request: RegionRequest,
2516        ) -> error::Result<RegionResponse> {
2517            Ok(RegionResponse::new(self.affected_rows))
2518        }
2519    }
2520
2521    #[tokio::test]
2522    async fn test_flush_batch_physical_returns_actual_affected_rows() {
2523        let table_batches = vec![TableBatch {
2524            table_name: "t1".to_string(),
2525            table_id: 11,
2526            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2527            row_count: 1,
2528        }];
2529        let ctx = session::context::QueryContext::arc();
2530
2531        let affected_rows = flush_batch_physical(
2532            &table_batches,
2533            "phy",
2534            &ctx,
2535            &MockFlushPartitionProvider {
2536                partition_rule_calls: Arc::new(AtomicUsize::new(0)),
2537                region_leader_calls: Arc::new(AtomicUsize::new(0)),
2538            },
2539            &AffectedRowsFlushNodeRequester { affected_rows: 7 },
2540            &MockFlushCatalogProvider {
2541                table: Some(mock_physical_table_metadata(1024)),
2542            },
2543        )
2544        .await
2545        .unwrap();
2546
2547        assert_eq!(7, affected_rows);
2548    }
2549
2550    #[tokio::test]
2551    async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
2552        let table_batches = vec![TableBatch {
2553            table_name: "t1".to_string(),
2554            table_id: 11,
2555            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2556            row_count: 1,
2557        }];
2558        let partition_calls = Arc::new(AtomicUsize::new(0));
2559        let leader_calls = Arc::new(AtomicUsize::new(0));
2560        let node = MockFlushNodeRequester::default();
2561        let ctx = session::context::QueryContext::arc();
2562
2563        let err = flush_batch_physical(
2564            &table_batches,
2565            "missing_phy",
2566            &ctx,
2567            &MockFlushPartitionProvider {
2568                partition_rule_calls: partition_calls.clone(),
2569                region_leader_calls: leader_calls.clone(),
2570            },
2571            &node,
2572            &MockFlushCatalogProvider { table: None },
2573        )
2574        .await
2575        .unwrap_err();
2576
2577        assert!(
2578            err.to_string()
2579                .contains("Physical table 'missing_phy' not found")
2580        );
2581        assert_eq!(0, partition_calls.load(Ordering::SeqCst));
2582        assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2583        assert_eq!(0, node.writes.load(Ordering::SeqCst));
2584    }
2585
2586    #[tokio::test]
2587    async fn test_flush_batch_physical_aborts_immediately_on_transform_error() {
2588        let table_batches = vec![
2589            TableBatch {
2590                table_name: "broken".to_string(),
2591                table_id: 11,
2592                batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)],
2593                row_count: 1,
2594            },
2595            TableBatch {
2596                table_name: "healthy".to_string(),
2597                table_id: 12,
2598                batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)],
2599                row_count: 1,
2600            },
2601        ];
2602        let partition_calls = Arc::new(AtomicUsize::new(0));
2603        let leader_calls = Arc::new(AtomicUsize::new(0));
2604        let node = MockFlushNodeRequester::default();
2605        let ctx = session::context::QueryContext::arc();
2606
2607        let err = flush_batch_physical(
2608            &table_batches,
2609            "phy",
2610            &ctx,
2611            &MockFlushPartitionProvider {
2612                partition_rule_calls: partition_calls.clone(),
2613                region_leader_calls: leader_calls.clone(),
2614            },
2615            &node,
2616            &MockFlushCatalogProvider {
2617                table: Some(mock_physical_table_metadata(1024)),
2618            },
2619        )
2620        .await
2621        .unwrap_err();
2622
2623        assert!(err.to_string().contains("unknown_tag"));
2624        assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2625        assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2626        assert_eq!(0, node.writes.load(Ordering::SeqCst));
2627    }
2628
2629    #[test]
2630    fn test_plan_region_batches_splits_and_strips_partition_columns() {
2631        let combined_batch = RecordBatch::try_new(
2632            Arc::new(ArrowSchema::new(vec![
2633                Field::new("__primary_key", ArrowDataType::Binary, false),
2634                Field::new(
2635                    "greptime_timestamp",
2636                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2637                    false,
2638                ),
2639                Field::new("greptime_value", ArrowDataType::Float64, true),
2640                Field::new("host", ArrowDataType::Utf8, true),
2641            ])),
2642            vec![
2643                Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])),
2644                Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])),
2645                Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])),
2646                Arc::new(StringArray::from(vec!["node-1", "node-2"])),
2647            ],
2648        )
2649        .unwrap();
2650        let mut planned_batches = plan_region_batches(
2651            combined_batch,
2652            1024,
2653            &TwoRegionPartitionRule {
2654                partition_columns: vec!["host".to_string()],
2655            },
2656            &["host".to_string()],
2657        )
2658        .unwrap();
2659        planned_batches.sort_by_key(|planned| planned.region_id.region_number());
2660
2661        assert_eq!(2, planned_batches.len());
2662        assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id);
2663        assert_eq!(1, planned_batches[0].num_rows());
2664        assert_eq!(3, planned_batches[0].batch.num_columns());
2665        assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id);
2666        assert_eq!(1, planned_batches[1].num_rows());
2667        assert_eq!(3, planned_batches[1].batch.num_columns());
2668    }
2669
2670    #[test]
2671    fn test_encode_region_write_requests_builds_bulk_insert_requests() {
2672        let planned_batch = PlannedRegionBatch {
2673            region_id: RegionId::new(1024, 1),
2674            batch: RecordBatch::try_new(
2675                Arc::new(ArrowSchema::new(vec![
2676                    Field::new("__primary_key", ArrowDataType::Binary, false),
2677                    Field::new(
2678                        "greptime_timestamp",
2679                        ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2680                        false,
2681                    ),
2682                    Field::new("greptime_value", ArrowDataType::Float64, true),
2683                ])),
2684                vec![
2685                    Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2686                    Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2687                    Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])),
2688                ],
2689            )
2690            .unwrap(),
2691        };
2692        let resolved_batch = ResolvedRegionBatch {
2693            planned: planned_batch,
2694            datanode: Peer {
2695                id: 1,
2696                addr: "node-1".to_string(),
2697            },
2698        };
2699        let writes = encode_region_write_requests(vec![resolved_batch]).unwrap();
2700
2701        assert_eq!(1, writes.len());
2702        assert_eq!(1, writes[0].datanode.id);
2703        let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else {
2704            panic!("expected bulk insert request");
2705        };
2706        assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id);
2707    }
2708}