Skip to main content

servers/
pending_rows_batcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21    BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use partition::partition::PartitionRuleRef;
40use session::context::QueryContextRef;
41use smallvec::SmallVec;
42use snafu::{OptionExt, ResultExt, ensure};
43use store_api::storage::{RegionId, TableId};
44use table::metadata::{TableInfo, TableInfoRef};
45use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
46
47use crate::error;
48use crate::error::{Error, Result};
49use crate::metrics::{
50    FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
51    PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
52    PENDING_WORKERS,
53};
54use crate::prom_row_builder::{
55    build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
56    rows_to_aligned_record_batch,
57};
58
59const PHYSICAL_TABLE_KEY: &str = "physical_table";
60/// Whether wait for ingestion result before reply to client.
61const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
62const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
63const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
64
65#[async_trait]
66pub trait PendingRowsSchemaAlterer: Send + Sync {
67    /// Batch-create multiple logical tables that are missing.
68    /// Each entry is `(table_name, request_schema)`.
69    async fn create_tables_if_missing_batch(
70        &self,
71        catalog: &str,
72        schema: &str,
73        tables: &[(&str, &[ColumnSchema])],
74        with_metric_engine: bool,
75        ctx: QueryContextRef,
76    ) -> Result<()>;
77
78    /// Batch-alter multiple logical tables to add missing tag columns.
79    /// Each entry is `(table_name, missing_column_names)`.
80    async fn add_missing_prom_tag_columns_batch(
81        &self,
82        catalog: &str,
83        schema: &str,
84        tables: &[(&str, &[String])],
85        ctx: QueryContextRef,
86    ) -> Result<()>;
87}
88
89pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
90
91#[derive(Clone)]
92pub struct PhysicalTableMetadata {
93    pub table_info: TableInfoRef,
94    /// Mapping from column name to column id
95    pub col_name_to_ids: Option<HashMap<String, u32>>,
96}
97
98#[async_trait]
99pub trait PhysicalFlushCatalogProvider: Send + Sync {
100    async fn physical_table(
101        &self,
102        catalog: &str,
103        schema: &str,
104        table_name: &str,
105        query_ctx: &session::context::QueryContext,
106    ) -> catalog::error::Result<Option<PhysicalTableMetadata>>;
107}
108
109#[async_trait]
110pub trait PhysicalFlushPartitionProvider: Send + Sync {
111    async fn find_table_partition_rule(
112        &self,
113        table_info: &TableInfo,
114    ) -> partition::error::Result<PartitionRuleRef>;
115
116    async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer>;
117}
118
119#[async_trait]
120pub trait PhysicalFlushNodeRequester: Send + Sync {
121    async fn handle(
122        &self,
123        peer: &Peer,
124        request: RegionRequest,
125    ) -> Result<api::region::RegionResponse>;
126}
127
128#[derive(Clone)]
129struct CatalogManagerPhysicalFlushAdapter {
130    catalog_manager: CatalogManagerRef,
131}
132
133#[async_trait]
134impl PhysicalFlushCatalogProvider for CatalogManagerPhysicalFlushAdapter {
135    async fn physical_table(
136        &self,
137        catalog: &str,
138        schema: &str,
139        table_name: &str,
140        query_ctx: &session::context::QueryContext,
141    ) -> catalog::error::Result<Option<PhysicalTableMetadata>> {
142        self.catalog_manager
143            .table(catalog, schema, table_name, Some(query_ctx))
144            .await
145            .map(|table| {
146                table.map(|table| {
147                    let table_info = table.table_info();
148                    let name_to_ids = table_info.name_to_ids();
149                    PhysicalTableMetadata {
150                        table_info,
151                        col_name_to_ids: name_to_ids,
152                    }
153                })
154            })
155    }
156}
157
158#[derive(Clone)]
159struct PartitionManagerPhysicalFlushAdapter {
160    partition_manager: PartitionRuleManagerRef,
161}
162
163#[async_trait]
164impl PhysicalFlushPartitionProvider for PartitionManagerPhysicalFlushAdapter {
165    async fn find_table_partition_rule(
166        &self,
167        table_info: &TableInfo,
168    ) -> partition::error::Result<PartitionRuleRef> {
169        self.partition_manager
170            .find_table_partition_rule(table_info)
171            .await
172            .map(|(rule, _)| rule)
173    }
174
175    async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
176        let peer = self.partition_manager.find_region_leader(region_id).await?;
177        Ok(peer)
178    }
179}
180
181#[derive(Clone)]
182struct NodeManagerPhysicalFlushAdapter {
183    node_manager: NodeManagerRef,
184}
185
186#[async_trait]
187impl PhysicalFlushNodeRequester for NodeManagerPhysicalFlushAdapter {
188    async fn handle(
189        &self,
190        peer: &Peer,
191        request: RegionRequest,
192    ) -> error::Result<api::region::RegionResponse> {
193        let datanode = self.node_manager.datanode(peer).await;
194        datanode
195            .handle(request)
196            .await
197            .context(error::CommonMetaSnafu)
198    }
199}
200
201#[derive(Debug, Clone, Hash, Eq, PartialEq)]
202struct BatchKey {
203    catalog: String,
204    schema: String,
205    physical_table: String,
206}
207
208#[derive(Debug)]
209pub struct TableBatch {
210    pub table_name: String,
211    pub table_id: TableId,
212    pub batches: Vec<RecordBatch>,
213    pub row_count: usize,
214}
215
216/// Intermediate planning state for resolving and preparing logical tables
217/// before row-to-batch alignment.
218struct TableResolutionPlan {
219    /// Resolved table schema and table id by logical table name.
220    region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
221    /// Missing tables that need to be created before alignment.
222    tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
223    /// Existing tables that need tag-column schema evolution.
224    tables_to_alter: Vec<(String, Vec<String>)>,
225}
226
227struct PendingBatch {
228    tables: HashMap<String, TableBatch>,
229    created_at: Instant,
230    total_row_count: usize,
231    db_string: String,
232    ctx: QueryContextRef,
233    waiters: Vec<FlushWaiter>,
234}
235
236struct FlushWaiter {
237    response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
238    _permit: OwnedSemaphorePermit,
239}
240
241struct FlushBatch {
242    table_batches: Vec<TableBatch>,
243    total_row_count: usize,
244    db_string: String,
245    ctx: QueryContextRef,
246    waiters: Vec<FlushWaiter>,
247}
248
249#[derive(Clone)]
250struct PendingWorker {
251    tx: mpsc::Sender<WorkerCommand>,
252}
253
254enum WorkerCommand {
255    Submit {
256        table_batches: Vec<(String, u32, RecordBatch)>,
257        total_rows: usize,
258        ctx: QueryContextRef,
259        response_tx: oneshot::Sender<std::result::Result<(), Arc<Error>>>,
260        _permit: OwnedSemaphorePermit,
261    },
262}
263
264// Batch key is derived from QueryContext; it assumes catalog/schema/physical_table fully
265// define the write target and must remain consistent across the batch.
266fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
267    let physical_table = ctx
268        .extension(PHYSICAL_TABLE_KEY)
269        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
270        .to_string();
271    BatchKey {
272        catalog: ctx.current_catalog().to_string(),
273        schema: ctx.current_schema(),
274        physical_table,
275    }
276}
277
278/// Prometheus remote write pending rows batcher.
279pub struct PendingRowsBatcher {
280    workers: Arc<DashMap<BatchKey, PendingWorker>>,
281    flush_interval: Duration,
282    max_batch_rows: usize,
283    partition_manager: PartitionRuleManagerRef,
284    node_manager: NodeManagerRef,
285    catalog_manager: CatalogManagerRef,
286    flush_semaphore: Arc<Semaphore>,
287    inflight_semaphore: Arc<Semaphore>,
288    worker_channel_capacity: usize,
289    prom_store_with_metric_engine: bool,
290    schema_alterer: PendingRowsSchemaAltererRef,
291    pending_rows_batch_sync: bool,
292    shutdown: broadcast::Sender<()>,
293}
294
295impl PendingRowsBatcher {
296    #[allow(clippy::too_many_arguments)]
297    pub fn try_new(
298        partition_manager: PartitionRuleManagerRef,
299        node_manager: NodeManagerRef,
300        catalog_manager: CatalogManagerRef,
301        prom_store_with_metric_engine: bool,
302        schema_alterer: PendingRowsSchemaAltererRef,
303        flush_interval: Duration,
304        max_batch_rows: usize,
305        max_concurrent_flushes: usize,
306        worker_channel_capacity: usize,
307        max_inflight_requests: usize,
308    ) -> Option<Arc<Self>> {
309        // Disable the batcher if flush is disabled or configuration is invalid.
310        // Zero values for these knobs either cause panics (e.g., zero-capacity channels)
311        // or deadlocks (e.g., semaphores with no permits).
312        if flush_interval.is_zero()
313            || max_batch_rows == 0
314            || max_concurrent_flushes == 0
315            || worker_channel_capacity == 0
316            || max_inflight_requests == 0
317        {
318            return None;
319        }
320
321        let (shutdown, _) = broadcast::channel(1);
322        let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
323            .ok()
324            .as_deref()
325            .and_then(|v| v.parse::<bool>().ok())
326            .unwrap_or(true);
327        let workers = Arc::new(DashMap::new());
328        PENDING_WORKERS.set(workers.len() as i64);
329
330        Some(Arc::new(Self {
331            workers,
332            flush_interval,
333            max_batch_rows,
334            partition_manager,
335            node_manager,
336            catalog_manager,
337            prom_store_with_metric_engine,
338            schema_alterer,
339            flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
340            inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
341            worker_channel_capacity,
342            pending_rows_batch_sync,
343            shutdown,
344        }))
345    }
346
347    pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
348        let (table_batches, total_rows) = {
349            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
350                .with_label_values(&["submit_build_and_align"])
351                .start_timer();
352            self.build_and_align_table_batches(requests, &ctx).await?
353        };
354        if total_rows == 0 {
355            return Ok(0);
356        }
357
358        let permit = {
359            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
360                .with_label_values(&["submit_acquire_inflight_permit"])
361                .start_timer();
362            self.inflight_semaphore
363                .clone()
364                .acquire_owned()
365                .await
366                .map_err(|_| error::BatcherChannelClosedSnafu.build())?
367        };
368
369        let (response_tx, response_rx) = oneshot::channel();
370
371        let batch_key = batch_key_from_ctx(&ctx);
372        let mut cmd = Some(WorkerCommand::Submit {
373            table_batches,
374            total_rows,
375            ctx,
376            response_tx,
377            _permit: permit,
378        });
379
380        {
381            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
382                .with_label_values(&["submit_send_to_worker"])
383                .start_timer();
384
385            for _ in 0..2 {
386                let worker = self.get_or_spawn_worker(batch_key.clone());
387                let Some(worker_cmd) = cmd.take() else {
388                    break;
389                };
390
391                match worker.tx.send(worker_cmd).await {
392                    Ok(()) => break,
393                    Err(err) => {
394                        cmd = Some(err.0);
395                        remove_worker_if_same_channel(
396                            self.workers.as_ref(),
397                            &batch_key,
398                            &worker.tx,
399                        );
400                    }
401                }
402            }
403
404            if cmd.is_some() {
405                return Err(Error::BatcherChannelClosed);
406            }
407        }
408
409        if self.pending_rows_batch_sync {
410            let result = {
411                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
412                    .with_label_values(&["submit_wait_flush_result"])
413                    .start_timer();
414                response_rx
415                    .await
416                    .map_err(|_| error::BatcherChannelClosedSnafu.build())?
417            };
418            result
419                .context(error::SubmitBatchSnafu)
420                .map(|()| total_rows as u64)
421        } else {
422            Ok(total_rows as u64)
423        }
424    }
425
426    /// Converts proto `RowInsertRequests` directly into aligned `RecordBatch`es
427    /// in a single pass, handling table creation, schema alteration, column
428    /// renaming, reordering, and null-filling without building intermediate
429    /// RecordBatches.
430    async fn build_and_align_table_batches(
431        &self,
432        requests: RowInsertRequests,
433        ctx: &QueryContextRef,
434    ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
435        let catalog = ctx.current_catalog().to_string();
436        let schema = ctx.current_schema();
437
438        let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
439        if total_rows == 0 {
440            return Ok((Vec::new(), 0));
441        }
442
443        let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
444        let mut plan = self
445            .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
446            .await?;
447
448        self.create_missing_tables_and_refresh_schemas(
449            &catalog,
450            &schema,
451            ctx,
452            &table_rows,
453            &mut plan,
454        )
455        .await?;
456
457        self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
458            .await?;
459
460        let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
461
462        Ok((aligned_batches, total_rows))
463    }
464
465    /// Extracts non-empty `(table_name, rows)` pairs and computes total row
466    /// count across the retained entries.
467    fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
468        let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
469        let mut total_rows = 0;
470
471        for request in requests.inserts {
472            let Some(rows) = request.rows else {
473                continue;
474            };
475            if rows.rows.is_empty() {
476                continue;
477            }
478
479            total_rows += rows.rows.len();
480            table_rows.push((request.table_name, rows));
481        }
482
483        (table_rows, total_rows)
484    }
485
486    /// Returns unique `(table_name, proto_schema)` pairs while keeping the
487    /// first-seen schema for duplicate table names.
488    fn collect_unique_table_schemas(
489        table_rows: &[(String, Rows)],
490    ) -> Result<Vec<(&str, &[ColumnSchema])>> {
491        let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
492        let mut seen = HashSet::new();
493
494        for (table_name, rows) in table_rows {
495            if seen.insert(table_name.as_str()) {
496                unique_tables.push((table_name.as_str(), &rows.schema));
497            } else {
498                // table_rows should group rows by table name.
499                return error::InvalidPromRemoteRequestSnafu {
500                    msg: format!(
501                        "Found duplicated table name in RowInsertRequest: {}",
502                        table_name
503                    ),
504                }
505                .fail();
506            }
507        }
508
509        Ok(unique_tables)
510    }
511
512    /// Resolves table metadata and classifies each table into existing,
513    /// to-create, and to-alter groups used by subsequent DDL steps.
514    async fn plan_table_resolution(
515        &self,
516        catalog: &str,
517        schema: &str,
518        ctx: &QueryContextRef,
519        unique_tables: &[(&str, &[ColumnSchema])],
520    ) -> Result<TableResolutionPlan> {
521        let mut plan = TableResolutionPlan {
522            region_schemas: HashMap::with_capacity(unique_tables.len()),
523            tables_to_create: Vec::new(),
524            tables_to_alter: Vec::new(),
525        };
526
527        let resolved_tables = {
528            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
529                .with_label_values(&["align_resolve_table"])
530                .start_timer();
531            futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
532                self.catalog_manager
533                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
534            }))
535            .await
536        };
537
538        for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
539            let table = table_result?;
540
541            if let Some(table) = table {
542                let table_info = table.table_info();
543                let table_id = table_info.ident.table_id;
544                let region_schema = table_info.meta.schema.arrow_schema().clone();
545
546                let missing_columns = {
547                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
548                        .with_label_values(&["align_identify_missing_columns"])
549                        .start_timer();
550                    identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
551                };
552                if !missing_columns.is_empty() {
553                    plan.tables_to_alter
554                        .push(((*table_name).to_string(), missing_columns));
555                }
556                plan.region_schemas
557                    .insert((*table_name).to_string(), (region_schema, table_id));
558            } else {
559                let request_schema = {
560                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
561                        .with_label_values(&["align_build_create_table_schema"])
562                        .start_timer();
563                    build_prom_create_table_schema_from_proto(rows_schema)?
564                };
565                plan.tables_to_create
566                    .push(((*table_name).to_string(), request_schema));
567            }
568        }
569
570        Ok(plan)
571    }
572
573    /// Batch-creates missing tables, refreshes their schema metadata, and
574    /// enqueues follow-up alters for extra tag columns discovered in later rows.
575    async fn create_missing_tables_and_refresh_schemas(
576        &self,
577        catalog: &str,
578        schema: &str,
579        ctx: &QueryContextRef,
580        table_rows: &[(String, Rows)],
581        plan: &mut TableResolutionPlan,
582    ) -> Result<()> {
583        if plan.tables_to_create.is_empty() {
584            return Ok(());
585        }
586
587        let create_refs: Vec<(&str, &[ColumnSchema])> = plan
588            .tables_to_create
589            .iter()
590            .map(|(name, schema)| (name.as_str(), schema.as_slice()))
591            .collect();
592
593        {
594            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
595                .with_label_values(&["align_batch_create_tables"])
596                .start_timer();
597            self.schema_alterer
598                .create_tables_if_missing_batch(
599                    catalog,
600                    schema,
601                    &create_refs,
602                    self.prom_store_with_metric_engine,
603                    ctx.clone(),
604                )
605                .await?;
606        }
607
608        let created_table_results = {
609            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
610                .with_label_values(&["align_resolve_table_after_create"])
611                .start_timer();
612            futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
613                self.catalog_manager
614                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
615            }))
616            .await
617        };
618
619        for ((table_name, _), table_result) in
620            plan.tables_to_create.iter().zip(created_table_results)
621        {
622            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
623                reason: format!(
624                    "Table not found after pending batch create attempt: {}",
625                    table_name
626                ),
627            })?;
628            let table_info = table.table_info();
629            let table_id = table_info.ident.table_id;
630            let region_schema = table_info.meta.schema.arrow_schema().clone();
631            plan.region_schemas
632                .insert(table_name.clone(), (region_schema, table_id));
633        }
634
635        Self::enqueue_alter_for_new_tables(table_rows, plan)?;
636
637        Ok(())
638    }
639
640    /// For newly created tables, re-checks all row schemas and appends alter
641    /// operations when additional tag columns are still missing.
642    fn enqueue_alter_for_new_tables(
643        table_rows: &[(String, Rows)],
644        plan: &mut TableResolutionPlan,
645    ) -> Result<()> {
646        let created_tables: HashSet<&str> = plan
647            .tables_to_create
648            .iter()
649            .map(|(table_name, _)| table_name.as_str())
650            .collect();
651
652        for (table_name, rows) in table_rows {
653            if !created_tables.contains(table_name.as_str()) {
654                continue;
655            }
656
657            let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
658                continue;
659            };
660
661            let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
662            if missing_columns.is_empty()
663                || plan
664                    .tables_to_alter
665                    .iter()
666                    .any(|(existing_name, _)| existing_name == table_name)
667            {
668                continue;
669            }
670
671            plan.tables_to_alter
672                .push((table_name.clone(), missing_columns));
673        }
674
675        Ok(())
676    }
677
678    /// Batch-alters tables that have missing tag columns and refreshes the
679    /// in-memory schema map used for row alignment.
680    async fn alter_tables_and_refresh_schemas(
681        &self,
682        catalog: &str,
683        schema: &str,
684        ctx: &QueryContextRef,
685        plan: &mut TableResolutionPlan,
686    ) -> Result<()> {
687        if plan.tables_to_alter.is_empty() {
688            return Ok(());
689        }
690
691        let alter_refs: Vec<(&str, &[String])> = plan
692            .tables_to_alter
693            .iter()
694            .map(|(name, cols)| (name.as_str(), cols.as_slice()))
695            .collect();
696        {
697            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
698                .with_label_values(&["align_batch_add_missing_columns"])
699                .start_timer();
700            self.schema_alterer
701                .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
702                .await?;
703        }
704
705        let altered_table_results = {
706            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
707                .with_label_values(&["align_resolve_table_after_schema_alter"])
708                .start_timer();
709            futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
710                self.catalog_manager
711                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
712            }))
713            .await
714        };
715
716        for ((table_name, _), table_result) in
717            plan.tables_to_alter.iter().zip(altered_table_results)
718        {
719            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
720                reason: format!(
721                    "Table not found after pending batch schema alter: {}",
722                    table_name
723                ),
724            })?;
725            let table_info = table.table_info();
726            let table_id = table_info.ident.table_id;
727            let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
728            plan.region_schemas
729                .insert(table_name.clone(), (refreshed_region_schema, table_id));
730        }
731
732        Ok(())
733    }
734
735    /// Converts proto rows to `RecordBatch` values aligned to resolved region
736    /// schemas and returns `(table_name, table_id, batch)` tuples.
737    fn build_aligned_batches(
738        table_rows: &[(String, Rows)],
739        region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
740    ) -> Result<Vec<(String, u32, RecordBatch)>> {
741        let mut aligned_batches = Vec::with_capacity(table_rows.len());
742        for (table_name, rows) in table_rows {
743            let (region_schema, table_id) =
744                region_schemas.get(table_name).cloned().with_context(|| {
745                    error::UnexpectedResultSnafu {
746                        reason: format!("Region schema not resolved for table: {}", table_name),
747                    }
748                })?;
749
750            let record_batch = {
751                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
752                    .with_label_values(&["align_rows_to_record_batch"])
753                    .start_timer();
754                rows_to_aligned_record_batch(rows, region_schema.as_ref())?
755            };
756            aligned_batches.push((table_name.clone(), table_id, record_batch));
757        }
758
759        Ok(aligned_batches)
760    }
761
762    fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
763        if let Some(worker) = self.workers.get(&key)
764            && !worker.tx.is_closed()
765        {
766            return worker.clone();
767        }
768
769        let entry = self.workers.entry(key.clone());
770        match entry {
771            Entry::Occupied(mut worker) => {
772                if worker.get().tx.is_closed() {
773                    let new_worker = self.spawn_worker(key);
774                    worker.insert(new_worker.clone());
775                    PENDING_WORKERS.set(self.workers.len() as i64);
776                    new_worker
777                } else {
778                    worker.get().clone()
779                }
780            }
781            Entry::Vacant(vacant) => {
782                let worker = self.spawn_worker(key);
783
784                vacant.insert(worker.clone());
785                PENDING_WORKERS.set(self.workers.len() as i64);
786                worker
787            }
788        }
789    }
790
791    fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
792        let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
793        let worker = PendingWorker { tx: tx.clone() };
794        let worker_idle_timeout = self
795            .flush_interval
796            .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
797            .unwrap_or(self.flush_interval);
798
799        start_worker(
800            key,
801            worker.tx.clone(),
802            self.workers.clone(),
803            rx,
804            self.shutdown.clone(),
805            self.partition_manager.clone(),
806            self.node_manager.clone(),
807            self.catalog_manager.clone(),
808            self.flush_interval,
809            worker_idle_timeout,
810            self.max_batch_rows,
811            self.flush_semaphore.clone(),
812        );
813
814        worker
815    }
816}
817
818impl Drop for PendingRowsBatcher {
819    fn drop(&mut self) {
820        let _ = self.shutdown.send(());
821    }
822}
823
824impl PendingBatch {
825    fn new(ctx: QueryContextRef) -> Self {
826        let db_string = ctx.get_db_string();
827        Self {
828            tables: HashMap::new(),
829            created_at: Instant::now(),
830            total_row_count: 0,
831            db_string,
832            ctx,
833            waiters: Vec::new(),
834        }
835    }
836}
837
838#[allow(clippy::too_many_arguments)]
839fn start_worker(
840    key: BatchKey,
841    worker_tx: mpsc::Sender<WorkerCommand>,
842    workers: Arc<DashMap<BatchKey, PendingWorker>>,
843    mut rx: mpsc::Receiver<WorkerCommand>,
844    shutdown: broadcast::Sender<()>,
845    partition_manager: PartitionRuleManagerRef,
846    node_manager: NodeManagerRef,
847    catalog_manager: CatalogManagerRef,
848    flush_interval: Duration,
849    worker_idle_timeout: Duration,
850    max_batch_rows: usize,
851    flush_semaphore: Arc<Semaphore>,
852) {
853    tokio::spawn(async move {
854        let mut batch = None;
855        let mut interval = tokio::time::interval(flush_interval);
856        let mut shutdown_rx = shutdown.subscribe();
857        let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
858        let idle_timer = tokio::time::sleep_until(idle_deadline);
859        tokio::pin!(idle_timer);
860
861        loop {
862            tokio::select! {
863                cmd = rx.recv() => {
864                    match cmd {
865                        Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
866                            idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
867
868                            let pending_batch = batch.get_or_insert_with(||{
869                                PENDING_BATCHES.inc();
870                                PendingBatch::new(ctx)
871                            });
872
873                            pending_batch.waiters.push(FlushWaiter { response_tx, _permit });
874
875                            for (table_name, table_id, record_batch) in table_batches {
876                                let entry = pending_batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
877                                    table_name,
878                                    table_id,
879                                    batches: Vec::new(),
880                                    row_count: 0,
881                                });
882                                entry.row_count += record_batch.num_rows();
883                                entry.batches.push(record_batch);
884                            }
885
886                            pending_batch.total_row_count += total_rows;
887                            PENDING_ROWS.add(total_rows as i64);
888
889                            if pending_batch.total_row_count >= max_batch_rows
890                                && let Some(flush) = drain_batch(&mut batch) {
891                                    spawn_flush(
892                                        flush,
893                                        partition_manager.clone(),
894                                        node_manager.clone(),
895                                        catalog_manager.clone(),
896                                        flush_semaphore.clone(),
897                                    ).await;
898                            }
899                        }
900                        None => {
901                            if let Some(flush) = drain_batch(&mut batch) {
902                                flush_batch(
903                                    flush,
904                                    partition_manager.clone(),
905                                    node_manager.clone(),
906                                    catalog_manager.clone(),
907                                ).await;
908                            }
909                            break;
910                        }
911                    }
912                }
913                _ = &mut idle_timer => {
914                    if !should_close_worker_on_idle_timeout(
915                        batch.as_ref().map_or(0, |batch| batch.total_row_count),
916                        rx.len(),
917                    ) {
918                        idle_timer
919                            .as_mut()
920                            .reset(tokio::time::Instant::now() + worker_idle_timeout);
921                        continue;
922                    }
923
924                    debug!(
925                        "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
926                        key.catalog,
927                        key.schema,
928                        key.physical_table
929                    );
930                    break;
931                }
932                _ = interval.tick() => {
933                    if batch
934                        .as_ref()
935                        .is_some_and(|batch| batch.created_at.elapsed() >= flush_interval)
936                        && let Some(flush) = drain_batch(&mut batch) {
937                            spawn_flush(
938                                flush,
939                                partition_manager.clone(),
940                                node_manager.clone(),
941                                catalog_manager.clone(),
942                                flush_semaphore.clone(),
943                            ).await;
944                    }
945                }
946                _ = shutdown_rx.recv() => {
947                    if let Some(flush) = drain_batch(&mut batch) {
948                        flush_batch(
949                            flush,
950                            partition_manager.clone(),
951                            node_manager.clone(),
952                            catalog_manager.clone(),
953                        ).await;
954                    }
955                    break;
956                }
957            }
958        }
959
960        remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
961    });
962}
963
964fn remove_worker_if_same_channel(
965    workers: &DashMap<BatchKey, PendingWorker>,
966    key: &BatchKey,
967    worker_tx: &mpsc::Sender<WorkerCommand>,
968) -> bool {
969    if let Some(worker) = workers.get(key)
970        && worker.tx.same_channel(worker_tx)
971    {
972        drop(worker);
973        workers.remove(key);
974        PENDING_WORKERS.set(workers.len() as i64);
975        return true;
976    }
977
978    false
979}
980
981fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
982    total_row_count == 0 && queued_requests == 0
983}
984
985fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
986    let batch = batch.take()?;
987    let total_row_count = batch.total_row_count;
988
989    if total_row_count == 0 {
990        return None;
991    }
992
993    let table_batches = batch.tables.into_values().collect();
994    let waiters = batch.waiters;
995
996    PENDING_ROWS.sub(total_row_count as i64);
997    PENDING_BATCHES.dec();
998
999    Some(FlushBatch {
1000        table_batches,
1001        total_row_count,
1002        db_string: batch.db_string,
1003        ctx: batch.ctx,
1004        waiters,
1005    })
1006}
1007
1008async fn spawn_flush(
1009    flush: FlushBatch,
1010    partition_manager: PartitionRuleManagerRef,
1011    node_manager: NodeManagerRef,
1012    catalog_manager: CatalogManagerRef,
1013    semaphore: Arc<Semaphore>,
1014) {
1015    match semaphore.acquire_owned().await {
1016        Ok(permit) => {
1017            tokio::spawn(async move {
1018                let _permit = permit;
1019                flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1020            });
1021        }
1022        Err(err) => {
1023            warn!(err; "Flush semaphore closed, flushing inline");
1024            flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
1025        }
1026    }
1027}
1028
1029struct FlushRegionWrite {
1030    datanode: Peer,
1031    request: RegionRequest,
1032}
1033
1034struct PlannedRegionBatch {
1035    region_id: RegionId,
1036    batch: RecordBatch,
1037}
1038
1039#[cfg(test)]
1040impl PlannedRegionBatch {
1041    fn num_rows(&self) -> usize {
1042        self.batch.num_rows()
1043    }
1044}
1045
1046struct ResolvedRegionBatch {
1047    planned: PlannedRegionBatch,
1048    datanode: Peer,
1049}
1050
1051fn should_dispatch_concurrently(region_write_count: usize) -> bool {
1052    region_write_count > 1
1053}
1054
1055/// Classifies columns in a logical-table batch for sparse primary-key conversion.
1056///
1057/// Returns:
1058/// - `Vec<TagColumnInfo>`: all Utf8 tag columns sorted by tag name, used for
1059///   TSID and sparse primary-key encoding.
1060/// - `SmallVec<[usize; 3]>`: indices of columns copied into the physical batch
1061///   after `__primary_key`, ordered as `[greptime_timestamp, greptime_value,
1062///   partition_tag_columns...]`.
1063fn columns_taxonomy(
1064    batch_schema: &Arc<ArrowSchema>,
1065    table_name: &str,
1066    name_to_ids: &HashMap<String, u32>,
1067    partition_columns: &HashSet<&str>,
1068) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
1069    let mut tag_columns = Vec::new();
1070    let mut essential_column_indices =
1071        SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
1072    // Placeholder for greptime_timestamp and greptime_value
1073    essential_column_indices.push(0);
1074    essential_column_indices.push(0);
1075
1076    let mut timestamp_index = None;
1077    let mut value_index = None;
1078
1079    for (index, field) in batch_schema.fields().iter().enumerate() {
1080        match field.data_type() {
1081            ArrowDataType::Utf8 => {
1082                let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
1083                    error::InvalidPromRemoteRequestSnafu {
1084                        msg: format!(
1085                            "Column '{}' from logical table '{}' not found in physical table column IDs",
1086                            field.name(),
1087                            table_name
1088                        ),
1089                    }
1090                })?;
1091                tag_columns.push(TagColumnInfo {
1092                    name: field.name().clone(),
1093                    index,
1094                    column_id,
1095                });
1096
1097                if partition_columns.contains(field.name().as_str()) {
1098                    essential_column_indices.push(index);
1099                }
1100            }
1101            ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
1102                ensure!(
1103                    timestamp_index.replace(index).is_none(),
1104                    error::InvalidPromRemoteRequestSnafu {
1105                        msg: format!(
1106                            "Duplicated timestamp column in logical table '{}' batch schema",
1107                            table_name
1108                        ),
1109                    }
1110                );
1111            }
1112            ArrowDataType::Float64 => {
1113                ensure!(
1114                    value_index.replace(index).is_none(),
1115                    error::InvalidPromRemoteRequestSnafu {
1116                        msg: format!(
1117                            "Duplicated value column in logical table '{}' batch schema",
1118                            table_name
1119                        ),
1120                    }
1121                );
1122            }
1123            datatype => {
1124                return error::InvalidPromRemoteRequestSnafu {
1125                    msg: format!(
1126                        "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1127                        table_name
1128                    ),
1129                }
1130                .fail();
1131            }
1132        }
1133    }
1134
1135    let timestamp_index =
1136        timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1137            msg: format!(
1138                "Missing essential column '{}' in logical table '{}' batch schema",
1139                greptime_timestamp(),
1140                table_name
1141            ),
1142        })?;
1143    let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1144        msg: format!(
1145            "Missing essential column '{}' in logical table '{}' batch schema",
1146            greptime_value(),
1147            table_name
1148        ),
1149    })?;
1150
1151    tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1152
1153    essential_column_indices[0] = timestamp_index;
1154    essential_column_indices[1] = value_index;
1155
1156    Ok((tag_columns, essential_column_indices))
1157}
1158
1159fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1160    ensure!(
1161        batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1162        error::InternalSnafu {
1163            err_msg: format!(
1164                "Expected at least {} columns in physical batch, got {}",
1165                PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1166                batch.num_columns()
1167            ),
1168        }
1169    );
1170    let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1171    batch.project(&essential_indices).context(error::ArrowSnafu)
1172}
1173
1174async fn flush_region_writes_concurrently(
1175    node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1176    writes: Vec<FlushRegionWrite>,
1177) -> Result<usize> {
1178    let mut affected_rows = 0;
1179    if !should_dispatch_concurrently(writes.len()) {
1180        for write in writes {
1181            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1182                .with_label_values(&["flush_write_region"])
1183                .start_timer();
1184            affected_rows += node_manager
1185                .handle(&write.datanode, write.request)
1186                .await?
1187                .affected_rows;
1188        }
1189        return Ok(affected_rows);
1190    }
1191
1192    let write_futures = writes.into_iter().map(|write| async move {
1193        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1194            .with_label_values(&["flush_write_region"])
1195            .start_timer();
1196
1197        let response = node_manager.handle(&write.datanode, write.request).await?;
1198        Ok::<_, Error>(response.affected_rows)
1199    });
1200
1201    // todo(hl): should be bounded.
1202    let affected_rows = futures::future::try_join_all(write_futures)
1203        .await?
1204        .into_iter()
1205        .sum();
1206    Ok(affected_rows)
1207}
1208
1209async fn flush_batch(
1210    flush: FlushBatch,
1211    partition_manager: PartitionRuleManagerRef,
1212    node_manager: NodeManagerRef,
1213    catalog_manager: CatalogManagerRef,
1214) {
1215    let FlushBatch {
1216        table_batches,
1217        total_row_count,
1218        db_string,
1219        ctx,
1220        waiters,
1221    } = flush;
1222    let start = Instant::now();
1223
1224    // Physical-table-level flush: transform all logical table batches
1225    // into physical format and write them together.
1226    let physical_table_name = ctx
1227        .extension(PHYSICAL_TABLE_KEY)
1228        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1229        .to_string();
1230    let partition_provider = PartitionManagerPhysicalFlushAdapter { partition_manager };
1231    let node_requester = NodeManagerPhysicalFlushAdapter { node_manager };
1232    let catalog_provider = CatalogManagerPhysicalFlushAdapter { catalog_manager };
1233    let result = flush_batch_physical(
1234        &table_batches,
1235        &physical_table_name,
1236        &ctx,
1237        &partition_provider,
1238        &node_requester,
1239        &catalog_provider,
1240    )
1241    .await;
1242
1243    let elapsed = start.elapsed().as_secs_f64();
1244    FLUSH_ELAPSED.observe(elapsed);
1245
1246    if result.is_err() {
1247        FLUSH_FAILURES.inc();
1248        FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
1249    } else if let Ok(affected_rows) = &result {
1250        FLUSH_TOTAL.inc();
1251        FLUSH_ROWS.observe(total_row_count as f64);
1252        operator::metrics::DIST_INGEST_ROW_COUNT
1253            .with_label_values(&[db_string.as_str()])
1254            .inc_by(*affected_rows as u64);
1255    }
1256
1257    debug!(
1258        "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1259        total_row_count, elapsed
1260    );
1261
1262    notify_waiters(waiters, result.map(|_| ()));
1263}
1264
1265/// Flushes a batch of logical table rows by transforming them into the physical table format
1266/// and writing them to the appropriate datanode regions.
1267///
1268/// This function performs the end-to-end physical flush pipeline:
1269/// 1. Resolves the physical table metadata and column ID mapping.
1270/// 2. Fetches the physical table's partition rule.
1271/// 3. Transforms each logical table batch into the physical (sparse primary key) format.
1272/// 4. Concatenates all transformed batches into a single combined batch.
1273/// 5. Splits the combined batch by partition rule and sends region write requests
1274///    concurrently to the target datanodes.
1275pub async fn flush_batch_physical(
1276    table_batches: &[TableBatch],
1277    physical_table_name: &str,
1278    ctx: &QueryContextRef,
1279    partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1280    node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
1281    catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
1282) -> Result<usize> {
1283    // 1. Resolve the physical table and get column ID mapping
1284    let physical_table = {
1285        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1286            .with_label_values(&["flush_physical_resolve_table"])
1287            .start_timer();
1288        catalog_manager
1289            .physical_table(
1290                ctx.current_catalog(),
1291                &ctx.current_schema(),
1292                physical_table_name,
1293                ctx.as_ref(),
1294            )
1295            .await?
1296            .with_context(|| error::InternalSnafu {
1297                err_msg: format!(
1298                    "Physical table '{}' not found during pending flush",
1299                    physical_table_name
1300                ),
1301            })?
1302    };
1303
1304    let physical_table_info = physical_table.table_info;
1305    let name_to_ids = physical_table
1306        .col_name_to_ids
1307        .with_context(|| error::InternalSnafu {
1308            err_msg: format!(
1309                "Physical table '{}' has no column IDs for pending flush",
1310                physical_table_name
1311            ),
1312        })?;
1313
1314    // 2. Get the physical table's partition rule (one lookup instead of N)
1315    let partition_rule = {
1316        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1317            .with_label_values(&["flush_physical_fetch_partition_rule"])
1318            .start_timer();
1319        partition_manager
1320            .find_table_partition_rule(physical_table_info.as_ref())
1321            .await?
1322    };
1323    let partition_columns = partition_rule.partition_columns();
1324    let partition_columns_set: HashSet<&str> =
1325        partition_columns.iter().map(String::as_str).collect();
1326
1327    // 3. Transform each logical table batch into physical format
1328    let modified_batches =
1329        transform_logical_batches_to_physical(table_batches, &name_to_ids, &partition_columns_set)?;
1330
1331    // 4. Concatenate all modified batches (all share the same physical schema)
1332    let combined_batch = concat_modified_batches(&modified_batches)?;
1333
1334    // 5. Split by physical partition rule and send to regions
1335    let physical_table_id = physical_table_info.table_id();
1336    let planned_batches = plan_region_batches(
1337        combined_batch,
1338        physical_table_id,
1339        partition_rule.as_ref(),
1340        partition_columns,
1341    )?;
1342
1343    let resolved_batches = resolve_region_targets(planned_batches, partition_manager).await?;
1344    let region_writes = encode_region_write_requests(resolved_batches)?;
1345    flush_region_writes_concurrently(node_manager, region_writes).await
1346}
1347
1348/// Transforms logical table batches into physical format (sparse primary key encoding).
1349///
1350/// It identifies tag columns and essential columns (timestamp, value) for each logical batch
1351/// and applies sparse primary key modification.
1352fn transform_logical_batches_to_physical(
1353    table_batches: &[TableBatch],
1354    name_to_ids: &HashMap<String, u32>,
1355    partition_columns_set: &HashSet<&str>,
1356) -> Result<Vec<RecordBatch>> {
1357    let mut modified_batches: Vec<RecordBatch> =
1358        Vec::with_capacity(table_batches.iter().map(|b| b.batches.len()).sum());
1359
1360    let mut modify_elapsed = Duration::ZERO;
1361    let mut columns_taxonomy_elapsed = Duration::ZERO;
1362
1363    for table_batch in table_batches {
1364        let table_id = table_batch.table_id;
1365
1366        for batch in &table_batch.batches {
1367            let batch_schema = batch.schema();
1368            let start = Instant::now();
1369            let (tag_columns, essential_col_indices) = columns_taxonomy(
1370                &batch_schema,
1371                &table_batch.table_name,
1372                name_to_ids,
1373                partition_columns_set,
1374            )?;
1375
1376            columns_taxonomy_elapsed += start.elapsed();
1377            if tag_columns.is_empty() && essential_col_indices.is_empty() {
1378                continue;
1379            }
1380
1381            let modified = {
1382                let start = Instant::now();
1383                let batch = modify_batch_sparse(
1384                    batch.clone(),
1385                    table_id,
1386                    &tag_columns,
1387                    &essential_col_indices,
1388                )?;
1389                modify_elapsed += start.elapsed();
1390                batch
1391            };
1392
1393            modified_batches.push(modified);
1394        }
1395    }
1396
1397    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1398        .with_label_values(&["flush_physical_modify_batch"])
1399        .observe(modify_elapsed.as_secs_f64());
1400    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1401        .with_label_values(&["flush_physical_columns_taxonomy"])
1402        .observe(columns_taxonomy_elapsed.as_secs_f64());
1403
1404    ensure!(
1405        !modified_batches.is_empty(),
1406        error::InternalSnafu {
1407            err_msg: "No batches can be transformed during pending flush",
1408        }
1409    );
1410    Ok(modified_batches)
1411}
1412
1413/// Concatenates all modified batches into a single large batch.
1414///
1415/// All modified batches share the same physical schema.
1416fn concat_modified_batches(modified_batches: &[RecordBatch]) -> Result<RecordBatch> {
1417    let combined_schema = modified_batches[0].schema();
1418    let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1419        .with_label_values(&["flush_physical_concat_all"])
1420        .start_timer();
1421    concat_batches(&combined_schema, modified_batches).context(error::ArrowSnafu)
1422}
1423
1424fn split_combined_batch_by_region(
1425    combined_batch: &RecordBatch,
1426    partition_rule: &dyn partition::partition::PartitionRule,
1427) -> Result<HashMap<u32, partition::partition::RegionMask>> {
1428    let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1429        .with_label_values(&["flush_physical_split_record_batch"])
1430        .start_timer();
1431    let map = partition_rule.split_record_batch(combined_batch)?;
1432    Ok(map)
1433}
1434
1435fn prepare_physical_region_routing_batch(
1436    combined_batch: RecordBatch,
1437    partition_columns: &[String],
1438) -> Result<RecordBatch> {
1439    if partition_columns.is_empty() {
1440        return Ok(combined_batch);
1441    }
1442    strip_partition_columns_from_batch(combined_batch)
1443}
1444
1445fn plan_region_batch(
1446    stripped_batch: &RecordBatch,
1447    physical_table_id: TableId,
1448    region_number: u32,
1449    mask: &partition::partition::RegionMask,
1450) -> Result<Option<PlannedRegionBatch>> {
1451    if mask.select_none() {
1452        return Ok(None);
1453    }
1454
1455    let region_batch = if mask.select_all() {
1456        stripped_batch.clone()
1457    } else {
1458        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1459            .with_label_values(&["flush_physical_filter_record_batch"])
1460            .start_timer();
1461        filter_record_batch(stripped_batch, mask.array()).context(error::ArrowSnafu)?
1462    };
1463
1464    let row_count = region_batch.num_rows();
1465    if row_count == 0 {
1466        return Ok(None);
1467    }
1468
1469    Ok(Some(PlannedRegionBatch {
1470        region_id: RegionId::new(physical_table_id, region_number),
1471        batch: region_batch,
1472    }))
1473}
1474
1475fn plan_region_batches(
1476    combined_batch: RecordBatch,
1477    physical_table_id: TableId,
1478    partition_rule: &dyn partition::partition::PartitionRule,
1479    partition_columns: &[String],
1480) -> Result<Vec<PlannedRegionBatch>> {
1481    let region_masks = split_combined_batch_by_region(&combined_batch, partition_rule)?;
1482    let stripped_batch = prepare_physical_region_routing_batch(combined_batch, partition_columns)?;
1483
1484    let mut planned_batches = Vec::new();
1485    for (region_number, mask) in region_masks {
1486        if let Some(planned_batch) =
1487            plan_region_batch(&stripped_batch, physical_table_id, region_number, &mask)?
1488        {
1489            planned_batches.push(planned_batch);
1490        }
1491    }
1492
1493    Ok(planned_batches)
1494}
1495
1496async fn resolve_region_targets(
1497    planned_batches: Vec<PlannedRegionBatch>,
1498    partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
1499) -> Result<Vec<ResolvedRegionBatch>> {
1500    let mut resolved_batches = Vec::with_capacity(planned_batches.len());
1501    for planned in planned_batches {
1502        let datanode = {
1503            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1504                .with_label_values(&["flush_physical_resolve_region_leader"])
1505                .start_timer();
1506            partition_manager
1507                .find_region_leader(planned.region_id)
1508                .await?
1509        };
1510
1511        resolved_batches.push(ResolvedRegionBatch { planned, datanode });
1512    }
1513
1514    Ok(resolved_batches)
1515}
1516
1517fn encode_region_write_requests(
1518    resolved_batches: Vec<ResolvedRegionBatch>,
1519) -> Result<Vec<FlushRegionWrite>> {
1520    let mut region_writes = Vec::with_capacity(resolved_batches.len());
1521    for resolved in resolved_batches {
1522        let region_id = resolved.planned.region_id;
1523        let (schema_bytes, data_header, payload) = {
1524            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1525                .with_label_values(&["flush_physical_encode_ipc"])
1526                .start_timer();
1527            record_batch_to_ipc(resolved.planned.batch)?
1528        };
1529
1530        let request = RegionRequest {
1531            header: Some(RegionRequestHeader {
1532                tracing_context: TracingContext::from_current_span().to_w3c(),
1533                ..Default::default()
1534            }),
1535            body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1536                region_id: region_id.as_u64(),
1537                partition_expr_version: None,
1538                body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1539                    schema: schema_bytes,
1540                    data_header,
1541                    payload,
1542                })),
1543            })),
1544        };
1545
1546        region_writes.push(FlushRegionWrite {
1547            datanode: resolved.datanode,
1548            request,
1549        });
1550    }
1551
1552    Ok(region_writes)
1553}
1554
1555fn notify_waiters(waiters: Vec<FlushWaiter>, result: Result<()>) {
1556    let shared_result = result.map_err(Arc::new);
1557    for waiter in waiters {
1558        let _ = waiter.response_tx.send(match &shared_result {
1559            Ok(()) => Ok(()),
1560            Err(error) => Err(Arc::clone(error)),
1561        });
1562        // waiter._permit is dropped here, releasing the inflight semaphore slot
1563    }
1564}
1565
1566fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1567    let mut encoder = FlightEncoder::default();
1568    let schema = encoder.encode_schema(record_batch.schema().as_ref());
1569    let mut iter = encoder
1570        .encode(FlightMessage::RecordBatch(record_batch))
1571        .into_iter();
1572    let Some(flight_data) = iter.next() else {
1573        return Err(Error::Internal {
1574            err_msg: "Failed to encode empty flight data".to_string(),
1575        });
1576    };
1577    if iter.next().is_some() {
1578        return Err(Error::NotSupported {
1579            feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1580        });
1581    }
1582
1583    Ok((
1584        schema.data_header,
1585        flight_data.data_header,
1586        flight_data.data_body,
1587    ))
1588}
1589
1590#[cfg(test)]
1591mod tests {
1592    use std::collections::{HashMap, HashSet};
1593    use std::sync::Arc;
1594    use std::sync::atomic::{AtomicUsize, Ordering};
1595    use std::time::{Duration, Instant};
1596
1597    use api::region::RegionResponse;
1598    use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1599    use api::v1::meta::Peer;
1600    use api::v1::region::{InsertRequests, RegionRequest, region_request};
1601    use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1602    use arrow::array::{BinaryArray, BooleanArray, StringArray, TimestampMillisecondArray};
1603    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1604    use arrow::record_batch::RecordBatch;
1605    use async_trait::async_trait;
1606    use catalog::error::Result as CatalogResult;
1607    use common_meta::error::Result as MetaResult;
1608    use common_meta::node_manager::{
1609        Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1610    };
1611    use common_query::request::QueryRequest;
1612    use common_recordbatch::SendableRecordBatchStream;
1613    use dashmap::DashMap;
1614    use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
1615    use partition::error::Result as PartitionResult;
1616    use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
1617    use smallvec::SmallVec;
1618    use snafu::ResultExt;
1619    use store_api::storage::RegionId;
1620    use table::metadata::TableId;
1621    use table::test_util::table_info::test_table_info;
1622    use tokio::sync::{Semaphore, mpsc, oneshot};
1623    use tokio::time::sleep;
1624
1625    use super::{
1626        BatchKey, Error, FlushRegionWrite, FlushWaiter, PendingBatch, PendingRowsBatcher,
1627        PendingWorker, PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester,
1628        PhysicalFlushPartitionProvider, PhysicalTableMetadata, PlannedRegionBatch,
1629        ResolvedRegionBatch, TableBatch, WorkerCommand, columns_taxonomy, drain_batch,
1630        encode_region_write_requests, flush_batch_physical, flush_region_writes_concurrently,
1631        plan_region_batches, remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1632        should_dispatch_concurrently, strip_partition_columns_from_batch,
1633        transform_logical_batches_to_physical,
1634    };
1635    use crate::error;
1636
1637    fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1638        Rows {
1639            schema: vec![ColumnSchema {
1640                column_name: schema_name.to_string(),
1641                ..Default::default()
1642            }],
1643            rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1644        }
1645    }
1646
1647    fn mock_tag_batch(tag_name: &str, tag_value: &str, ts: i64, val: f64) -> RecordBatch {
1648        let schema = Arc::new(ArrowSchema::new(vec![
1649            Field::new(
1650                "greptime_timestamp",
1651                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1652                false,
1653            ),
1654            Field::new("greptime_value", ArrowDataType::Float64, true),
1655            Field::new(tag_name, ArrowDataType::Utf8, true),
1656        ]));
1657
1658        RecordBatch::try_new(
1659            schema,
1660            vec![
1661                Arc::new(TimestampMillisecondArray::from(vec![ts])),
1662                Arc::new(arrow::array::Float64Array::from(vec![val])),
1663                Arc::new(StringArray::from(vec![tag_value])),
1664            ],
1665        )
1666        .unwrap()
1667    }
1668
1669    fn mock_physical_table_metadata(table_id: TableId) -> PhysicalTableMetadata {
1670        let schema = Arc::new(
1671            DtSchema::try_new(vec![
1672                DtColumnSchema::new(
1673                    "__primary_key",
1674                    datatypes::prelude::ConcreteDataType::binary_datatype(),
1675                    false,
1676                ),
1677                DtColumnSchema::new(
1678                    "greptime_timestamp",
1679                    datatypes::prelude::ConcreteDataType::timestamp_millisecond_datatype(),
1680                    false,
1681                ),
1682                DtColumnSchema::new(
1683                    "greptime_value",
1684                    datatypes::prelude::ConcreteDataType::float64_datatype(),
1685                    true,
1686                ),
1687                DtColumnSchema::new(
1688                    "tag1",
1689                    datatypes::prelude::ConcreteDataType::string_datatype(),
1690                    true,
1691                ),
1692            ])
1693            .unwrap(),
1694        );
1695        let mut table_info = test_table_info(table_id, "phy", "public", "greptime", schema);
1696        table_info.meta.column_ids = vec![0, 1, 2, 3];
1697
1698        PhysicalTableMetadata {
1699            table_info: Arc::new(table_info),
1700            col_name_to_ids: Some(HashMap::from([("tag1".to_string(), 3)])),
1701        }
1702    }
1703
1704    struct MockFlushCatalogProvider {
1705        table: Option<PhysicalTableMetadata>,
1706    }
1707
1708    #[async_trait]
1709    impl PhysicalFlushCatalogProvider for MockFlushCatalogProvider {
1710        async fn physical_table(
1711            &self,
1712            _catalog: &str,
1713            _schema: &str,
1714            _table_name: &str,
1715            _query_ctx: &session::context::QueryContext,
1716        ) -> CatalogResult<Option<PhysicalTableMetadata>> {
1717            Ok(self.table.clone())
1718        }
1719    }
1720
1721    struct SingleRegionPartitionRule;
1722
1723    impl PartitionRule for SingleRegionPartitionRule {
1724        fn as_any(&self) -> &dyn std::any::Any {
1725            self
1726        }
1727
1728        fn partition_columns(&self) -> &[String] {
1729            &[]
1730        }
1731
1732        fn find_region(
1733            &self,
1734            _values: &[datatypes::prelude::Value],
1735        ) -> partition::error::Result<store_api::storage::RegionNumber> {
1736            unimplemented!()
1737        }
1738
1739        fn split_record_batch(
1740            &self,
1741            record_batch: &RecordBatch,
1742        ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1743        {
1744            Ok(HashMap::from([(
1745                1,
1746                RegionMask::new(
1747                    arrow::array::BooleanArray::from(vec![true; record_batch.num_rows()]),
1748                    record_batch.num_rows(),
1749                ),
1750            )]))
1751        }
1752    }
1753
1754    struct TwoRegionPartitionRule {
1755        partition_columns: Vec<String>,
1756    }
1757
1758    impl PartitionRule for TwoRegionPartitionRule {
1759        fn as_any(&self) -> &dyn std::any::Any {
1760            self
1761        }
1762
1763        fn partition_columns(&self) -> &[String] {
1764            &self.partition_columns
1765        }
1766
1767        fn find_region(
1768            &self,
1769            _values: &[datatypes::prelude::Value],
1770        ) -> partition::error::Result<store_api::storage::RegionNumber> {
1771            unimplemented!()
1772        }
1773
1774        fn split_record_batch(
1775            &self,
1776            _record_batch: &RecordBatch,
1777        ) -> partition::error::Result<HashMap<store_api::storage::RegionNumber, RegionMask>>
1778        {
1779            Ok(HashMap::from([
1780                (1, RegionMask::new(BooleanArray::from(vec![true, false]), 1)),
1781                (2, RegionMask::new(BooleanArray::from(vec![false, true]), 1)),
1782                (
1783                    3,
1784                    RegionMask::new(BooleanArray::from(vec![false, false]), 0),
1785                ),
1786            ]))
1787        }
1788    }
1789
1790    struct MockFlushPartitionProvider {
1791        partition_rule_calls: Arc<AtomicUsize>,
1792        region_leader_calls: Arc<AtomicUsize>,
1793    }
1794
1795    #[async_trait]
1796    impl PhysicalFlushPartitionProvider for MockFlushPartitionProvider {
1797        async fn find_table_partition_rule(
1798            &self,
1799            _table_info: &table::metadata::TableInfo,
1800        ) -> PartitionResult<PartitionRuleRef> {
1801            self.partition_rule_calls.fetch_add(1, Ordering::SeqCst);
1802            Ok(Arc::new(SingleRegionPartitionRule))
1803        }
1804
1805        async fn find_region_leader(&self, _region_id: RegionId) -> error::Result<Peer> {
1806            self.region_leader_calls.fetch_add(1, Ordering::SeqCst);
1807            Ok(Peer {
1808                id: 1,
1809                addr: "node-1".to_string(),
1810            })
1811        }
1812    }
1813
1814    #[derive(Default)]
1815    struct MockFlushNodeRequester {
1816        writes: Arc<AtomicUsize>,
1817    }
1818
1819    #[async_trait]
1820    impl PhysicalFlushNodeRequester for MockFlushNodeRequester {
1821        async fn handle(
1822            &self,
1823            _peer: &Peer,
1824            _request: RegionRequest,
1825        ) -> error::Result<RegionResponse> {
1826            self.writes.fetch_add(1, Ordering::SeqCst);
1827            Ok(RegionResponse::new(0))
1828        }
1829    }
1830
1831    #[test]
1832    fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1833        let requests = RowInsertRequests {
1834            inserts: vec![
1835                RowInsertRequest {
1836                    table_name: "cpu".to_string(),
1837                    rows: Some(mock_rows(2, "host")),
1838                },
1839                RowInsertRequest {
1840                    table_name: "mem".to_string(),
1841                    rows: Some(mock_rows(0, "host")),
1842                },
1843                RowInsertRequest {
1844                    table_name: "disk".to_string(),
1845                    rows: None,
1846                },
1847            ],
1848        };
1849
1850        let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1851
1852        assert_eq!(2, total_rows);
1853        assert_eq!(1, table_rows.len());
1854        assert_eq!("cpu", table_rows[0].0);
1855        assert_eq!(2, table_rows[0].1.rows.len());
1856    }
1857
1858    #[test]
1859    fn test_drain_batch_takes_initialized_pending_batch_from_option() {
1860        let ctx = session::context::QueryContext::arc();
1861        let (response_tx, _response_rx) = oneshot::channel();
1862        let permit = Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap();
1863        let mut batch = Some(PendingBatch {
1864            tables: HashMap::from([(
1865                "cpu".to_string(),
1866                TableBatch {
1867                    table_name: "cpu".to_string(),
1868                    table_id: 42,
1869                    batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
1870                    row_count: 1,
1871                },
1872            )]),
1873            created_at: Instant::now(),
1874            total_row_count: 1,
1875            db_string: ctx.get_db_string(),
1876            ctx: ctx.clone(),
1877            waiters: vec![FlushWaiter {
1878                response_tx,
1879                _permit: permit,
1880            }],
1881        });
1882
1883        let flush = drain_batch(&mut batch).unwrap();
1884
1885        assert!(batch.is_none());
1886        assert_eq!(1, flush.total_row_count);
1887        assert_eq!(1, flush.table_batches.len());
1888        assert_eq!(ctx.get_db_string(), flush.db_string);
1889        assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
1890    }
1891
1892    #[derive(Clone)]
1893    struct ConcurrentMockDatanode {
1894        delay: Duration,
1895        inflight: Arc<AtomicUsize>,
1896        max_inflight: Arc<AtomicUsize>,
1897    }
1898
1899    #[async_trait]
1900    impl Datanode for ConcurrentMockDatanode {
1901        async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1902            let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1903            loop {
1904                let max = self.max_inflight.load(Ordering::SeqCst);
1905                if now <= max {
1906                    break;
1907                }
1908                if self
1909                    .max_inflight
1910                    .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1911                    .is_ok()
1912                {
1913                    break;
1914                }
1915            }
1916
1917            sleep(self.delay).await;
1918            self.inflight.fetch_sub(1, Ordering::SeqCst);
1919            Ok(RegionResponse::new(0))
1920        }
1921
1922        async fn handle_query(
1923            &self,
1924            _request: QueryRequest,
1925        ) -> MetaResult<SendableRecordBatchStream> {
1926            unimplemented!()
1927        }
1928    }
1929
1930    #[derive(Clone)]
1931    struct ConcurrentMockNodeManager {
1932        datanodes: Arc<HashMap<u64, DatanodeRef>>,
1933    }
1934
1935    #[async_trait]
1936    impl DatanodeManager for ConcurrentMockNodeManager {
1937        async fn datanode(&self, node: &Peer) -> DatanodeRef {
1938            self.datanodes
1939                .get(&node.id)
1940                .expect("datanode not found")
1941                .clone()
1942        }
1943    }
1944
1945    struct NoopFlownode;
1946
1947    #[async_trait]
1948    impl Flownode for NoopFlownode {
1949        async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1950            unimplemented!()
1951        }
1952
1953        async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1954            unimplemented!()
1955        }
1956
1957        async fn handle_mark_window_dirty(
1958            &self,
1959            _req: DirtyWindowRequests,
1960        ) -> MetaResult<FlowResponse> {
1961            unimplemented!()
1962        }
1963    }
1964
1965    #[async_trait]
1966    impl FlownodeManager for ConcurrentMockNodeManager {
1967        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1968            Arc::new(NoopFlownode)
1969        }
1970    }
1971
1972    #[async_trait]
1973    impl PhysicalFlushNodeRequester for ConcurrentMockNodeManager {
1974        async fn handle(
1975            &self,
1976            peer: &Peer,
1977            request: RegionRequest,
1978        ) -> error::Result<RegionResponse> {
1979            let datanode = self.datanode(peer).await;
1980            datanode
1981                .handle(request)
1982                .await
1983                .context(error::CommonMetaSnafu)
1984        }
1985    }
1986
1987    #[test]
1988    fn test_remove_worker_if_same_channel_removes_matching_entry() {
1989        let workers = DashMap::new();
1990        let key = BatchKey {
1991            catalog: "greptime".to_string(),
1992            schema: "public".to_string(),
1993            physical_table: "phy".to_string(),
1994        };
1995
1996        let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
1997        workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
1998
1999        assert!(remove_worker_if_same_channel(&workers, &key, &tx));
2000        assert!(!workers.contains_key(&key));
2001    }
2002
2003    #[test]
2004    fn test_remove_worker_if_same_channel_keeps_newer_entry() {
2005        let workers = DashMap::new();
2006        let key = BatchKey {
2007            catalog: "greptime".to_string(),
2008            schema: "public".to_string(),
2009            physical_table: "phy".to_string(),
2010        };
2011
2012        let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
2013        let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
2014        workers.insert(
2015            key.clone(),
2016            PendingWorker {
2017                tx: fresh_tx.clone(),
2018            },
2019        );
2020
2021        assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
2022        assert!(workers.contains_key(&key));
2023        assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
2024    }
2025
2026    #[test]
2027    fn test_worker_idle_timeout_close_decision() {
2028        assert!(should_close_worker_on_idle_timeout(0, 0));
2029        assert!(!should_close_worker_on_idle_timeout(1, 0));
2030        assert!(!should_close_worker_on_idle_timeout(0, 1));
2031    }
2032
2033    #[tokio::test]
2034    async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
2035        let inflight = Arc::new(AtomicUsize::new(0));
2036        let max_inflight = Arc::new(AtomicUsize::new(0));
2037        let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2038            delay: Duration::from_millis(100),
2039            inflight: inflight.clone(),
2040            max_inflight: max_inflight.clone(),
2041        });
2042        let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
2043            delay: Duration::from_millis(100),
2044            inflight,
2045            max_inflight: max_inflight.clone(),
2046        });
2047
2048        let mut datanodes = HashMap::new();
2049        datanodes.insert(1, datanode1);
2050        datanodes.insert(2, datanode2);
2051        let node_manager = Arc::new(ConcurrentMockNodeManager {
2052            datanodes: Arc::new(datanodes),
2053        });
2054
2055        let writes = vec![
2056            FlushRegionWrite {
2057                datanode: Peer {
2058                    id: 1,
2059                    addr: "node1".to_string(),
2060                },
2061                request: RegionRequest::default(),
2062            },
2063            FlushRegionWrite {
2064                datanode: Peer {
2065                    id: 2,
2066                    addr: "node2".to_string(),
2067                },
2068                request: RegionRequest::default(),
2069            },
2070        ];
2071
2072        flush_region_writes_concurrently(node_manager.as_ref(), writes)
2073            .await
2074            .unwrap();
2075        assert!(max_inflight.load(Ordering::SeqCst) >= 2);
2076    }
2077
2078    #[test]
2079    fn test_should_dispatch_concurrently_by_region_count() {
2080        assert!(!should_dispatch_concurrently(0));
2081        assert!(!should_dispatch_concurrently(1));
2082        assert!(should_dispatch_concurrently(2));
2083    }
2084
2085    #[test]
2086    fn test_strip_partition_columns_from_batch_removes_partition_tags() {
2087        let batch = RecordBatch::try_new(
2088            Arc::new(ArrowSchema::new(vec![
2089                Field::new("__primary_key", ArrowDataType::Binary, false),
2090                Field::new(
2091                    "greptime_timestamp",
2092                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2093                    false,
2094                ),
2095                Field::new("greptime_value", ArrowDataType::Float64, true),
2096                Field::new("host", ArrowDataType::Utf8, true),
2097            ])),
2098            vec![
2099                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2100                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2101                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2102                Arc::new(StringArray::from(vec!["node-1"])),
2103            ],
2104        )
2105        .unwrap();
2106
2107        let stripped = strip_partition_columns_from_batch(batch).unwrap();
2108
2109        assert_eq!(3, stripped.num_columns());
2110        assert_eq!("__primary_key", stripped.schema().field(0).name());
2111        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2112        assert_eq!("greptime_value", stripped.schema().field(2).name());
2113    }
2114
2115    #[test]
2116    fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
2117        let batch = RecordBatch::try_new(
2118            Arc::new(ArrowSchema::new(vec![
2119                Field::new("__primary_key", ArrowDataType::Binary, false),
2120                Field::new(
2121                    "greptime_timestamp",
2122                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2123                    false,
2124                ),
2125                Field::new("greptime_value", ArrowDataType::Float64, true),
2126                Field::new("host", ArrowDataType::Utf8, true),
2127            ])),
2128            vec![
2129                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2130                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2131                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
2132                Arc::new(StringArray::from(vec!["node-1"])),
2133            ],
2134        )
2135        .unwrap();
2136
2137        let stripped = strip_partition_columns_from_batch(batch).unwrap();
2138
2139        assert_eq!(3, stripped.num_columns());
2140        assert_eq!("__primary_key", stripped.schema().field(0).name());
2141        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
2142        assert_eq!("greptime_value", stripped.schema().field(2).name());
2143    }
2144
2145    #[test]
2146    fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
2147        let schema = Arc::new(ArrowSchema::new(vec![
2148            Field::new(
2149                "greptime_timestamp",
2150                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2151                false,
2152            ),
2153            Field::new("greptime_value", ArrowDataType::Float64, true),
2154            Field::new("host", ArrowDataType::Utf8, true),
2155            Field::new("region", ArrowDataType::Utf8, true),
2156        ]));
2157        let name_to_ids =
2158            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2159        let partition_columns = HashSet::from(["host"]);
2160
2161        let (tag_columns, non_tag_indices) =
2162            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2163
2164        assert_eq!(2, tag_columns.len());
2165        assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
2166    }
2167
2168    #[test]
2169    fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
2170        let schema = Arc::new(ArrowSchema::new(vec![
2171            Field::new("host", ArrowDataType::Utf8, true),
2172            Field::new("greptime_value", ArrowDataType::Float64, true),
2173            Field::new(
2174                "greptime_timestamp",
2175                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2176                false,
2177            ),
2178            Field::new("region", ArrowDataType::Utf8, true),
2179        ]));
2180        let name_to_ids =
2181            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
2182        let partition_columns = HashSet::from(["host", "region"]);
2183
2184        let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
2185            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
2186
2187        assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
2188    }
2189
2190    #[test]
2191    fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
2192        let schema = Arc::new(ArrowSchema::new(vec![
2193            Field::new(
2194                "greptime_timestamp",
2195                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2196                false,
2197            ),
2198            Field::new("greptime_value", ArrowDataType::Float64, true),
2199            Field::new("host", ArrowDataType::Utf8, true),
2200            Field::new("invalid", ArrowDataType::Boolean, true),
2201        ]));
2202        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2203        let partition_columns = HashSet::from(["host"]);
2204
2205        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2206
2207        assert!(matches!(
2208            result,
2209            Err(Error::InvalidPromRemoteRequest { .. })
2210        ));
2211    }
2212
2213    #[test]
2214    fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
2215        let schema = Arc::new(ArrowSchema::new(vec![
2216            Field::new("greptime_timestamp", ArrowDataType::Int64, false),
2217            Field::new("greptime_value", ArrowDataType::Float64, true),
2218            Field::new("host", ArrowDataType::Utf8, true),
2219        ]));
2220        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2221        let partition_columns = HashSet::from(["host"]);
2222
2223        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2224
2225        assert!(matches!(
2226            result,
2227            Err(Error::InvalidPromRemoteRequest { .. })
2228        ));
2229    }
2230
2231    #[test]
2232    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
2233        let schema = Arc::new(ArrowSchema::new(vec![
2234            Field::new(
2235                "ts1",
2236                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2237                false,
2238            ),
2239            Field::new(
2240                "ts2",
2241                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2242                false,
2243            ),
2244            Field::new("greptime_value", ArrowDataType::Float64, true),
2245            Field::new("host", ArrowDataType::Utf8, true),
2246        ]));
2247        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2248        let partition_columns = HashSet::from(["host"]);
2249
2250        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2251
2252        assert!(matches!(
2253            result,
2254            Err(Error::InvalidPromRemoteRequest { .. })
2255        ));
2256    }
2257
2258    #[test]
2259    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2260        let schema = Arc::new(ArrowSchema::new(vec![
2261            Field::new(
2262                "greptime_timestamp",
2263                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2264                false,
2265            ),
2266            Field::new("value1", ArrowDataType::Float64, true),
2267            Field::new("value2", ArrowDataType::Float64, true),
2268            Field::new("host", ArrowDataType::Utf8, true),
2269        ]));
2270        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2271        let partition_columns = HashSet::from(["host"]);
2272
2273        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2274
2275        assert!(matches!(
2276            result,
2277            Err(Error::InvalidPromRemoteRequest { .. })
2278        ));
2279    }
2280
2281    #[test]
2282    fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2283        use arrow::array::BinaryArray;
2284        use metric_engine::batch_modifier::modify_batch_sparse;
2285
2286        let schema1 = Arc::new(ArrowSchema::new(vec![
2287            Field::new(
2288                "greptime_timestamp",
2289                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2290                false,
2291            ),
2292            Field::new("greptime_value", ArrowDataType::Float64, true),
2293            Field::new("tag1", ArrowDataType::Utf8, true),
2294        ]));
2295
2296        let schema2 = Arc::new(ArrowSchema::new(vec![
2297            Field::new(
2298                "greptime_timestamp",
2299                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2300                false,
2301            ),
2302            Field::new("greptime_value", ArrowDataType::Float64, true),
2303            Field::new("tag1", ArrowDataType::Utf8, true),
2304            Field::new("tag2", ArrowDataType::Utf8, true),
2305        ]));
2306        let batch2 = RecordBatch::try_new(
2307            schema2.clone(),
2308            vec![
2309                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2310                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2311                Arc::new(StringArray::from(vec!["v1"])),
2312                Arc::new(StringArray::from(vec!["v2"])),
2313            ],
2314        )
2315        .unwrap();
2316
2317        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2318        let partition_columns = HashSet::new();
2319
2320        // A batch that only has tag1, same values as batch2 for ts and val.
2321        let batch3 = RecordBatch::try_new(
2322            schema1.clone(),
2323            vec![
2324                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2325                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2326                Arc::new(StringArray::from(vec!["v1"])),
2327            ],
2328        )
2329        .unwrap();
2330
2331        // Simulate the new loop logic in flush_batch_physical:
2332        // Resolve taxonomy FOR EACH BATCH.
2333        let (tag_columns2, indices2) =
2334            columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2335        let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2336
2337        let (tag_columns3, indices3) =
2338            columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2339        let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2340
2341        let pk2 = modified2
2342            .column(0)
2343            .as_any()
2344            .downcast_ref::<BinaryArray>()
2345            .unwrap();
2346        let pk3 = modified3
2347            .column(0)
2348            .as_any()
2349            .downcast_ref::<BinaryArray>()
2350            .unwrap();
2351
2352        // Now they SHOULD be different because tag2 is included in pk2 but not in pk3.
2353        assert_ne!(
2354            pk2.value(0),
2355            pk3.value(0),
2356            "PK should be different because batch2 has tag2!"
2357        );
2358    }
2359
2360    #[test]
2361    fn test_transform_logical_batches_to_physical_success() {
2362        let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2363
2364        let table_batches = vec![TableBatch {
2365            table_name: "t1".to_string(),
2366            table_id: 1,
2367            batches: vec![batch],
2368            row_count: 1,
2369        }];
2370
2371        let name_to_ids = HashMap::from([("tag1".to_string(), 1)]);
2372        let partition_columns = HashSet::new();
2373        let modified =
2374            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2375                .unwrap();
2376
2377        assert_eq!(1, modified.len());
2378        assert_eq!(3, modified[0].num_columns());
2379        assert_eq!("__primary_key", modified[0].schema().field(0).name());
2380        assert_eq!("greptime_timestamp", modified[0].schema().field(1).name());
2381        assert_eq!("greptime_value", modified[0].schema().field(2).name());
2382    }
2383
2384    #[test]
2385    fn test_transform_logical_batches_to_physical_taxonomy_failure() {
2386        let batch = mock_tag_batch("tag1", "v1", 1000, 1.0);
2387
2388        let table_batches = vec![TableBatch {
2389            table_name: "t1".to_string(),
2390            table_id: 1,
2391            batches: vec![batch],
2392            row_count: 1,
2393        }];
2394
2395        // tag1 is missing from name_to_ids, causing columns_taxonomy to fail.
2396        let name_to_ids = HashMap::new();
2397        let partition_columns = HashSet::new();
2398        let err =
2399            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2400                .unwrap_err();
2401
2402        assert!(
2403            err.to_string()
2404                .contains("not found in physical table column IDs")
2405        );
2406    }
2407
2408    #[test]
2409    fn test_transform_logical_batches_to_physical_multiple_batches() {
2410        let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2411        let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2412
2413        let table_batches = vec![
2414            TableBatch {
2415                table_name: "t1".to_string(),
2416                table_id: 1,
2417                batches: vec![batch1],
2418                row_count: 1,
2419            },
2420            TableBatch {
2421                table_name: "t2".to_string(),
2422                table_id: 2,
2423                batches: vec![batch2],
2424                row_count: 1,
2425            },
2426        ];
2427
2428        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2429        let partition_columns = HashSet::new();
2430        let modified =
2431            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2432                .unwrap();
2433
2434        assert_eq!(2, modified.len());
2435    }
2436
2437    #[test]
2438    fn test_transform_logical_batches_to_physical_mixed_success_failure() {
2439        let batch1 = mock_tag_batch("tag1", "v1", 1000, 1.0);
2440        let batch2 = mock_tag_batch("tag2", "v2", 2000, 2.0);
2441
2442        let table_batches = vec![
2443            TableBatch {
2444                table_name: "t1".to_string(),
2445                table_id: 1,
2446                batches: vec![batch1],
2447                row_count: 1,
2448            },
2449            TableBatch {
2450                table_name: "t2".to_string(),
2451                table_id: 2,
2452                batches: vec![batch2],
2453                row_count: 1,
2454            },
2455        ];
2456
2457        // tag1 is missing from name_to_ids, causing batch1 to fail.
2458        let name_to_ids = HashMap::from([("tag2".to_string(), 2)]);
2459        let partition_columns = HashSet::new();
2460        let err =
2461            transform_logical_batches_to_physical(&table_batches, &name_to_ids, &partition_columns)
2462                .unwrap_err();
2463
2464        assert!(err.to_string().contains("tag1"));
2465    }
2466
2467    #[tokio::test]
2468    async fn test_flush_batch_physical_uses_mockable_trait_dependencies() {
2469        let table_batches = vec![TableBatch {
2470            table_name: "t1".to_string(),
2471            table_id: 11,
2472            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2473            row_count: 1,
2474        }];
2475        let partition_calls = Arc::new(AtomicUsize::new(0));
2476        let leader_calls = Arc::new(AtomicUsize::new(0));
2477        let node = MockFlushNodeRequester::default();
2478        let ctx = session::context::QueryContext::arc();
2479
2480        flush_batch_physical(
2481            &table_batches,
2482            "phy",
2483            &ctx,
2484            &MockFlushPartitionProvider {
2485                partition_rule_calls: partition_calls.clone(),
2486                region_leader_calls: leader_calls.clone(),
2487            },
2488            &node,
2489            &MockFlushCatalogProvider {
2490                table: Some(mock_physical_table_metadata(1024)),
2491            },
2492        )
2493        .await
2494        .unwrap();
2495
2496        assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2497        assert_eq!(1, leader_calls.load(Ordering::SeqCst));
2498        assert_eq!(1, node.writes.load(Ordering::SeqCst));
2499    }
2500
2501    #[derive(Default)]
2502    struct AffectedRowsFlushNodeRequester {
2503        affected_rows: usize,
2504    }
2505
2506    #[async_trait]
2507    impl PhysicalFlushNodeRequester for AffectedRowsFlushNodeRequester {
2508        async fn handle(
2509            &self,
2510            _peer: &Peer,
2511            _request: RegionRequest,
2512        ) -> error::Result<RegionResponse> {
2513            Ok(RegionResponse::new(self.affected_rows))
2514        }
2515    }
2516
2517    #[tokio::test]
2518    async fn test_flush_batch_physical_returns_actual_affected_rows() {
2519        let table_batches = vec![TableBatch {
2520            table_name: "t1".to_string(),
2521            table_id: 11,
2522            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2523            row_count: 1,
2524        }];
2525        let ctx = session::context::QueryContext::arc();
2526
2527        let affected_rows = flush_batch_physical(
2528            &table_batches,
2529            "phy",
2530            &ctx,
2531            &MockFlushPartitionProvider {
2532                partition_rule_calls: Arc::new(AtomicUsize::new(0)),
2533                region_leader_calls: Arc::new(AtomicUsize::new(0)),
2534            },
2535            &AffectedRowsFlushNodeRequester { affected_rows: 7 },
2536            &MockFlushCatalogProvider {
2537                table: Some(mock_physical_table_metadata(1024)),
2538            },
2539        )
2540        .await
2541        .unwrap();
2542
2543        assert_eq!(7, affected_rows);
2544    }
2545
2546    #[tokio::test]
2547    async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
2548        let table_batches = vec![TableBatch {
2549            table_name: "t1".to_string(),
2550            table_id: 11,
2551            batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
2552            row_count: 1,
2553        }];
2554        let partition_calls = Arc::new(AtomicUsize::new(0));
2555        let leader_calls = Arc::new(AtomicUsize::new(0));
2556        let node = MockFlushNodeRequester::default();
2557        let ctx = session::context::QueryContext::arc();
2558
2559        let err = flush_batch_physical(
2560            &table_batches,
2561            "missing_phy",
2562            &ctx,
2563            &MockFlushPartitionProvider {
2564                partition_rule_calls: partition_calls.clone(),
2565                region_leader_calls: leader_calls.clone(),
2566            },
2567            &node,
2568            &MockFlushCatalogProvider { table: None },
2569        )
2570        .await
2571        .unwrap_err();
2572
2573        assert!(
2574            err.to_string()
2575                .contains("Physical table 'missing_phy' not found")
2576        );
2577        assert_eq!(0, partition_calls.load(Ordering::SeqCst));
2578        assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2579        assert_eq!(0, node.writes.load(Ordering::SeqCst));
2580    }
2581
2582    #[tokio::test]
2583    async fn test_flush_batch_physical_aborts_immediately_on_transform_error() {
2584        let table_batches = vec![
2585            TableBatch {
2586                table_name: "broken".to_string(),
2587                table_id: 11,
2588                batches: vec![mock_tag_batch("unknown_tag", "host-1", 1000, 1.0)],
2589                row_count: 1,
2590            },
2591            TableBatch {
2592                table_name: "healthy".to_string(),
2593                table_id: 12,
2594                batches: vec![mock_tag_batch("tag1", "host-2", 2000, 2.0)],
2595                row_count: 1,
2596            },
2597        ];
2598        let partition_calls = Arc::new(AtomicUsize::new(0));
2599        let leader_calls = Arc::new(AtomicUsize::new(0));
2600        let node = MockFlushNodeRequester::default();
2601        let ctx = session::context::QueryContext::arc();
2602
2603        let err = flush_batch_physical(
2604            &table_batches,
2605            "phy",
2606            &ctx,
2607            &MockFlushPartitionProvider {
2608                partition_rule_calls: partition_calls.clone(),
2609                region_leader_calls: leader_calls.clone(),
2610            },
2611            &node,
2612            &MockFlushCatalogProvider {
2613                table: Some(mock_physical_table_metadata(1024)),
2614            },
2615        )
2616        .await
2617        .unwrap_err();
2618
2619        assert!(err.to_string().contains("unknown_tag"));
2620        assert_eq!(1, partition_calls.load(Ordering::SeqCst));
2621        assert_eq!(0, leader_calls.load(Ordering::SeqCst));
2622        assert_eq!(0, node.writes.load(Ordering::SeqCst));
2623    }
2624
2625    #[test]
2626    fn test_plan_region_batches_splits_and_strips_partition_columns() {
2627        let combined_batch = RecordBatch::try_new(
2628            Arc::new(ArrowSchema::new(vec![
2629                Field::new("__primary_key", ArrowDataType::Binary, false),
2630                Field::new(
2631                    "greptime_timestamp",
2632                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2633                    false,
2634                ),
2635                Field::new("greptime_value", ArrowDataType::Float64, true),
2636                Field::new("host", ArrowDataType::Utf8, true),
2637            ])),
2638            vec![
2639                Arc::new(BinaryArray::from(vec![b"k1".as_slice(), b"k2".as_slice()])),
2640                Arc::new(TimestampMillisecondArray::from(vec![1000_i64, 2000_i64])),
2641                Arc::new(arrow::array::Float64Array::from(vec![1.0_f64, 2.0_f64])),
2642                Arc::new(StringArray::from(vec!["node-1", "node-2"])),
2643            ],
2644        )
2645        .unwrap();
2646        let mut planned_batches = plan_region_batches(
2647            combined_batch,
2648            1024,
2649            &TwoRegionPartitionRule {
2650                partition_columns: vec!["host".to_string()],
2651            },
2652            &["host".to_string()],
2653        )
2654        .unwrap();
2655        planned_batches.sort_by_key(|planned| planned.region_id.region_number());
2656
2657        assert_eq!(2, planned_batches.len());
2658        assert_eq!(RegionId::new(1024, 1), planned_batches[0].region_id);
2659        assert_eq!(1, planned_batches[0].num_rows());
2660        assert_eq!(3, planned_batches[0].batch.num_columns());
2661        assert_eq!(RegionId::new(1024, 2), planned_batches[1].region_id);
2662        assert_eq!(1, planned_batches[1].num_rows());
2663        assert_eq!(3, planned_batches[1].batch.num_columns());
2664    }
2665
2666    #[test]
2667    fn test_encode_region_write_requests_builds_bulk_insert_requests() {
2668        let planned_batch = PlannedRegionBatch {
2669            region_id: RegionId::new(1024, 1),
2670            batch: RecordBatch::try_new(
2671                Arc::new(ArrowSchema::new(vec![
2672                    Field::new("__primary_key", ArrowDataType::Binary, false),
2673                    Field::new(
2674                        "greptime_timestamp",
2675                        ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2676                        false,
2677                    ),
2678                    Field::new("greptime_value", ArrowDataType::Float64, true),
2679                ])),
2680                vec![
2681                    Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
2682                    Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
2683                    Arc::new(arrow::array::Float64Array::from(vec![1.0_f64])),
2684                ],
2685            )
2686            .unwrap(),
2687        };
2688        let resolved_batch = ResolvedRegionBatch {
2689            planned: planned_batch,
2690            datanode: Peer {
2691                id: 1,
2692                addr: "node-1".to_string(),
2693            },
2694        };
2695        let writes = encode_region_write_requests(vec![resolved_batch]).unwrap();
2696
2697        assert_eq!(1, writes.len());
2698        assert_eq!(1, writes[0].datanode.id);
2699        let Some(region_request::Body::BulkInsert(request)) = &writes[0].request.body else {
2700            panic!("expected bulk insert request");
2701        };
2702        assert_eq!(RegionId::new(1024, 1).as_u64(), request.region_id);
2703    }
2704}