Skip to main content

servers/
pending_rows_batcher.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::Peer;
20use api::v1::region::{
21    BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request,
22};
23use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows};
24use arrow::compute::{concat_batches, filter_record_batch};
25use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit};
26use arrow::record_batch::RecordBatch;
27use async_trait::async_trait;
28use bytes::Bytes;
29use catalog::CatalogManagerRef;
30use common_grpc::flight::{FlightEncoder, FlightMessage};
31use common_meta::node_manager::NodeManagerRef;
32use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value};
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{debug, error, warn};
35use dashmap::DashMap;
36use dashmap::mapref::entry::Entry;
37use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse};
38use partition::manager::PartitionRuleManagerRef;
39use session::context::QueryContextRef;
40use smallvec::SmallVec;
41use snafu::{OptionExt, ensure};
42use store_api::storage::{RegionId, TableId};
43use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot};
44
45use crate::error;
46use crate::error::{Error, Result};
47use crate::metrics::{
48    FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES,
49    PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED,
50    PENDING_WORKERS,
51};
52use crate::prom_row_builder::{
53    build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
54    rows_to_aligned_record_batch,
55};
56
57const PHYSICAL_TABLE_KEY: &str = "physical_table";
58/// Whether wait for ingestion result before reply to client.
59const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC";
60const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3;
61const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3;
62
63#[async_trait]
64pub trait PendingRowsSchemaAlterer: Send + Sync {
65    /// Batch-create multiple logical tables that are missing.
66    /// Each entry is `(table_name, request_schema)`.
67    async fn create_tables_if_missing_batch(
68        &self,
69        catalog: &str,
70        schema: &str,
71        tables: &[(&str, &[ColumnSchema])],
72        with_metric_engine: bool,
73        ctx: QueryContextRef,
74    ) -> Result<()>;
75
76    /// Batch-alter multiple logical tables to add missing tag columns.
77    /// Each entry is `(table_name, missing_column_names)`.
78    async fn add_missing_prom_tag_columns_batch(
79        &self,
80        catalog: &str,
81        schema: &str,
82        tables: &[(&str, &[String])],
83        ctx: QueryContextRef,
84    ) -> Result<()>;
85}
86
87pub type PendingRowsSchemaAltererRef = Arc<dyn PendingRowsSchemaAlterer>;
88
89#[derive(Debug, Clone, Hash, Eq, PartialEq)]
90struct BatchKey {
91    catalog: String,
92    schema: String,
93    physical_table: String,
94}
95
96#[derive(Debug)]
97struct TableBatch {
98    table_name: String,
99    table_id: TableId,
100    batches: Vec<RecordBatch>,
101    row_count: usize,
102}
103
104/// Intermediate planning state for resolving and preparing logical tables
105/// before row-to-batch alignment.
106struct TableResolutionPlan {
107    /// Resolved table schema and table id by logical table name.
108    region_schemas: HashMap<String, (Arc<ArrowSchema>, u32)>,
109    /// Missing tables that need to be created before alignment.
110    tables_to_create: Vec<(String, Vec<ColumnSchema>)>,
111    /// Existing tables that need tag-column schema evolution.
112    tables_to_alter: Vec<(String, Vec<String>)>,
113}
114
115struct PendingBatch {
116    tables: HashMap<String, TableBatch>,
117    created_at: Option<Instant>,
118    total_row_count: usize,
119    ctx: Option<QueryContextRef>,
120    waiters: Vec<FlushWaiter>,
121}
122
123struct FlushWaiter {
124    response_tx: oneshot::Sender<Result<()>>,
125    _permit: OwnedSemaphorePermit,
126}
127
128struct FlushBatch {
129    table_batches: Vec<TableBatch>,
130    total_row_count: usize,
131    ctx: QueryContextRef,
132    waiters: Vec<FlushWaiter>,
133}
134
135#[derive(Clone)]
136struct PendingWorker {
137    tx: mpsc::Sender<WorkerCommand>,
138}
139
140enum WorkerCommand {
141    Submit {
142        table_batches: Vec<(String, u32, RecordBatch)>,
143        total_rows: usize,
144        ctx: QueryContextRef,
145        response_tx: oneshot::Sender<Result<()>>,
146        _permit: OwnedSemaphorePermit,
147    },
148}
149
150// Batch key is derived from QueryContext; it assumes catalog/schema/physical_table fully
151// define the write target and must remain consistent across the batch.
152fn batch_key_from_ctx(ctx: &QueryContextRef) -> BatchKey {
153    let physical_table = ctx
154        .extension(PHYSICAL_TABLE_KEY)
155        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
156        .to_string();
157    BatchKey {
158        catalog: ctx.current_catalog().to_string(),
159        schema: ctx.current_schema(),
160        physical_table,
161    }
162}
163
164/// Prometheus remote write pending rows batcher.
165pub struct PendingRowsBatcher {
166    workers: Arc<DashMap<BatchKey, PendingWorker>>,
167    flush_interval: Duration,
168    max_batch_rows: usize,
169    partition_manager: PartitionRuleManagerRef,
170    node_manager: NodeManagerRef,
171    catalog_manager: CatalogManagerRef,
172    flush_semaphore: Arc<Semaphore>,
173    inflight_semaphore: Arc<Semaphore>,
174    worker_channel_capacity: usize,
175    prom_store_with_metric_engine: bool,
176    schema_alterer: PendingRowsSchemaAltererRef,
177    pending_rows_batch_sync: bool,
178    shutdown: broadcast::Sender<()>,
179}
180
181impl PendingRowsBatcher {
182    #[allow(clippy::too_many_arguments)]
183    pub fn try_new(
184        partition_manager: PartitionRuleManagerRef,
185        node_manager: NodeManagerRef,
186        catalog_manager: CatalogManagerRef,
187        prom_store_with_metric_engine: bool,
188        schema_alterer: PendingRowsSchemaAltererRef,
189        flush_interval: Duration,
190        max_batch_rows: usize,
191        max_concurrent_flushes: usize,
192        worker_channel_capacity: usize,
193        max_inflight_requests: usize,
194    ) -> Option<Arc<Self>> {
195        // Disable the batcher if flush is disabled or configuration is invalid.
196        // Zero values for these knobs either cause panics (e.g., zero-capacity channels)
197        // or deadlocks (e.g., semaphores with no permits).
198        if flush_interval.is_zero()
199            || max_batch_rows == 0
200            || max_concurrent_flushes == 0
201            || worker_channel_capacity == 0
202            || max_inflight_requests == 0
203        {
204            return None;
205        }
206
207        let (shutdown, _) = broadcast::channel(1);
208        let pending_rows_batch_sync = std::env::var(PENDING_ROWS_BATCH_SYNC_ENV)
209            .ok()
210            .as_deref()
211            .and_then(|v| v.parse::<bool>().ok())
212            .unwrap_or(true);
213        let workers = Arc::new(DashMap::new());
214        PENDING_WORKERS.set(workers.len() as i64);
215
216        Some(Arc::new(Self {
217            workers,
218            flush_interval,
219            max_batch_rows,
220            partition_manager,
221            node_manager,
222            catalog_manager,
223            prom_store_with_metric_engine,
224            schema_alterer,
225            flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)),
226            inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)),
227            worker_channel_capacity,
228            pending_rows_batch_sync,
229            shutdown,
230        }))
231    }
232
233    pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result<u64> {
234        let (table_batches, total_rows) = {
235            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
236                .with_label_values(&["submit_build_and_align"])
237                .start_timer();
238            self.build_and_align_table_batches(requests, &ctx).await?
239        };
240        if total_rows == 0 {
241            return Ok(0);
242        }
243
244        let permit = {
245            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
246                .with_label_values(&["submit_acquire_inflight_permit"])
247                .start_timer();
248            self.inflight_semaphore
249                .clone()
250                .acquire_owned()
251                .await
252                .map_err(|_| error::BatcherChannelClosedSnafu.build())?
253        };
254
255        let (response_tx, response_rx) = oneshot::channel();
256
257        let batch_key = batch_key_from_ctx(&ctx);
258        let mut cmd = Some(WorkerCommand::Submit {
259            table_batches,
260            total_rows,
261            ctx,
262            response_tx,
263            _permit: permit,
264        });
265
266        {
267            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
268                .with_label_values(&["submit_send_to_worker"])
269                .start_timer();
270
271            for _ in 0..2 {
272                let worker = self.get_or_spawn_worker(batch_key.clone());
273                let Some(worker_cmd) = cmd.take() else {
274                    break;
275                };
276
277                match worker.tx.send(worker_cmd).await {
278                    Ok(()) => break,
279                    Err(err) => {
280                        cmd = Some(err.0);
281                        remove_worker_if_same_channel(
282                            self.workers.as_ref(),
283                            &batch_key,
284                            &worker.tx,
285                        );
286                    }
287                }
288            }
289
290            if cmd.is_some() {
291                return Err(Error::BatcherChannelClosed);
292            }
293        }
294
295        if self.pending_rows_batch_sync {
296            let result = {
297                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
298                    .with_label_values(&["submit_wait_flush_result"])
299                    .start_timer();
300                response_rx
301                    .await
302                    .map_err(|_| error::BatcherChannelClosedSnafu.build())?
303            };
304            result.map(|()| total_rows as u64)
305        } else {
306            Ok(total_rows as u64)
307        }
308    }
309
310    /// Converts proto `RowInsertRequests` directly into aligned `RecordBatch`es
311    /// in a single pass, handling table creation, schema alteration, column
312    /// renaming, reordering, and null-filling without building intermediate
313    /// RecordBatches.
314    async fn build_and_align_table_batches(
315        &self,
316        requests: RowInsertRequests,
317        ctx: &QueryContextRef,
318    ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> {
319        let catalog = ctx.current_catalog().to_string();
320        let schema = ctx.current_schema();
321
322        let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests);
323        if total_rows == 0 {
324            return Ok((Vec::new(), 0));
325        }
326
327        let unique_tables = Self::collect_unique_table_schemas(&table_rows)?;
328        let mut plan = self
329            .plan_table_resolution(&catalog, &schema, ctx, &unique_tables)
330            .await?;
331
332        self.create_missing_tables_and_refresh_schemas(
333            &catalog,
334            &schema,
335            ctx,
336            &table_rows,
337            &mut plan,
338        )
339        .await?;
340
341        self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan)
342            .await?;
343
344        let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?;
345
346        Ok((aligned_batches, total_rows))
347    }
348
349    /// Extracts non-empty `(table_name, rows)` pairs and computes total row
350    /// count across the retained entries.
351    fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) {
352        let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len());
353        let mut total_rows = 0;
354
355        for request in requests.inserts {
356            let Some(rows) = request.rows else {
357                continue;
358            };
359            if rows.rows.is_empty() {
360                continue;
361            }
362
363            total_rows += rows.rows.len();
364            table_rows.push((request.table_name, rows));
365        }
366
367        (table_rows, total_rows)
368    }
369
370    /// Returns unique `(table_name, proto_schema)` pairs while keeping the
371    /// first-seen schema for duplicate table names.
372    fn collect_unique_table_schemas(
373        table_rows: &[(String, Rows)],
374    ) -> Result<Vec<(&str, &[ColumnSchema])>> {
375        let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len());
376        let mut seen = HashSet::new();
377
378        for (table_name, rows) in table_rows {
379            if seen.insert(table_name.as_str()) {
380                unique_tables.push((table_name.as_str(), &rows.schema));
381            } else {
382                // table_rows should group rows by table name.
383                return error::InvalidPromRemoteRequestSnafu {
384                    msg: format!(
385                        "Found duplicated table name in RowInsertRequest: {}",
386                        table_name
387                    ),
388                }
389                .fail();
390            }
391        }
392
393        Ok(unique_tables)
394    }
395
396    /// Resolves table metadata and classifies each table into existing,
397    /// to-create, and to-alter groups used by subsequent DDL steps.
398    async fn plan_table_resolution(
399        &self,
400        catalog: &str,
401        schema: &str,
402        ctx: &QueryContextRef,
403        unique_tables: &[(&str, &[ColumnSchema])],
404    ) -> Result<TableResolutionPlan> {
405        let mut plan = TableResolutionPlan {
406            region_schemas: HashMap::with_capacity(unique_tables.len()),
407            tables_to_create: Vec::new(),
408            tables_to_alter: Vec::new(),
409        };
410
411        let resolved_tables = {
412            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
413                .with_label_values(&["align_resolve_table"])
414                .start_timer();
415            futures::future::join_all(unique_tables.iter().map(|(table_name, _)| {
416                self.catalog_manager
417                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
418            }))
419            .await
420        };
421
422        for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) {
423            let table = table_result?;
424
425            if let Some(table) = table {
426                let table_info = table.table_info();
427                let table_id = table_info.ident.table_id;
428                let region_schema = table_info.meta.schema.arrow_schema().clone();
429
430                let missing_columns = {
431                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
432                        .with_label_values(&["align_identify_missing_columns"])
433                        .start_timer();
434                    identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())?
435                };
436                if !missing_columns.is_empty() {
437                    plan.tables_to_alter
438                        .push(((*table_name).to_string(), missing_columns));
439                }
440                plan.region_schemas
441                    .insert((*table_name).to_string(), (region_schema, table_id));
442            } else {
443                let request_schema = {
444                    let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
445                        .with_label_values(&["align_build_create_table_schema"])
446                        .start_timer();
447                    build_prom_create_table_schema_from_proto(rows_schema)?
448                };
449                plan.tables_to_create
450                    .push(((*table_name).to_string(), request_schema));
451            }
452        }
453
454        Ok(plan)
455    }
456
457    /// Batch-creates missing tables, refreshes their schema metadata, and
458    /// enqueues follow-up alters for extra tag columns discovered in later rows.
459    async fn create_missing_tables_and_refresh_schemas(
460        &self,
461        catalog: &str,
462        schema: &str,
463        ctx: &QueryContextRef,
464        table_rows: &[(String, Rows)],
465        plan: &mut TableResolutionPlan,
466    ) -> Result<()> {
467        if plan.tables_to_create.is_empty() {
468            return Ok(());
469        }
470
471        let create_refs: Vec<(&str, &[ColumnSchema])> = plan
472            .tables_to_create
473            .iter()
474            .map(|(name, schema)| (name.as_str(), schema.as_slice()))
475            .collect();
476
477        {
478            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
479                .with_label_values(&["align_batch_create_tables"])
480                .start_timer();
481            self.schema_alterer
482                .create_tables_if_missing_batch(
483                    catalog,
484                    schema,
485                    &create_refs,
486                    self.prom_store_with_metric_engine,
487                    ctx.clone(),
488                )
489                .await?;
490        }
491
492        let created_table_results = {
493            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
494                .with_label_values(&["align_resolve_table_after_create"])
495                .start_timer();
496            futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| {
497                self.catalog_manager
498                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
499            }))
500            .await
501        };
502
503        for ((table_name, _), table_result) in
504            plan.tables_to_create.iter().zip(created_table_results)
505        {
506            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
507                reason: format!(
508                    "Table not found after pending batch create attempt: {}",
509                    table_name
510                ),
511            })?;
512            let table_info = table.table_info();
513            let table_id = table_info.ident.table_id;
514            let region_schema = table_info.meta.schema.arrow_schema().clone();
515            plan.region_schemas
516                .insert(table_name.clone(), (region_schema, table_id));
517        }
518
519        Self::enqueue_alter_for_new_tables(table_rows, plan)?;
520
521        Ok(())
522    }
523
524    /// For newly created tables, re-checks all row schemas and appends alter
525    /// operations when additional tag columns are still missing.
526    fn enqueue_alter_for_new_tables(
527        table_rows: &[(String, Rows)],
528        plan: &mut TableResolutionPlan,
529    ) -> Result<()> {
530        let created_tables: HashSet<&str> = plan
531            .tables_to_create
532            .iter()
533            .map(|(table_name, _)| table_name.as_str())
534            .collect();
535
536        for (table_name, rows) in table_rows {
537            if !created_tables.contains(table_name.as_str()) {
538                continue;
539            }
540
541            let Some((region_schema, _)) = plan.region_schemas.get(table_name) else {
542                continue;
543            };
544
545            let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?;
546            if missing_columns.is_empty()
547                || plan
548                    .tables_to_alter
549                    .iter()
550                    .any(|(existing_name, _)| existing_name == table_name)
551            {
552                continue;
553            }
554
555            plan.tables_to_alter
556                .push((table_name.clone(), missing_columns));
557        }
558
559        Ok(())
560    }
561
562    /// Batch-alters tables that have missing tag columns and refreshes the
563    /// in-memory schema map used for row alignment.
564    async fn alter_tables_and_refresh_schemas(
565        &self,
566        catalog: &str,
567        schema: &str,
568        ctx: &QueryContextRef,
569        plan: &mut TableResolutionPlan,
570    ) -> Result<()> {
571        if plan.tables_to_alter.is_empty() {
572            return Ok(());
573        }
574
575        let alter_refs: Vec<(&str, &[String])> = plan
576            .tables_to_alter
577            .iter()
578            .map(|(name, cols)| (name.as_str(), cols.as_slice()))
579            .collect();
580        {
581            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
582                .with_label_values(&["align_batch_add_missing_columns"])
583                .start_timer();
584            self.schema_alterer
585                .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone())
586                .await?;
587        }
588
589        let altered_table_results = {
590            let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
591                .with_label_values(&["align_resolve_table_after_schema_alter"])
592                .start_timer();
593            futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| {
594                self.catalog_manager
595                    .table(catalog, schema, table_name, Some(ctx.as_ref()))
596            }))
597            .await
598        };
599
600        for ((table_name, _), table_result) in
601            plan.tables_to_alter.iter().zip(altered_table_results)
602        {
603            let table = table_result?.with_context(|| error::UnexpectedResultSnafu {
604                reason: format!(
605                    "Table not found after pending batch schema alter: {}",
606                    table_name
607                ),
608            })?;
609            let table_info = table.table_info();
610            let table_id = table_info.ident.table_id;
611            let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone();
612            plan.region_schemas
613                .insert(table_name.clone(), (refreshed_region_schema, table_id));
614        }
615
616        Ok(())
617    }
618
619    /// Converts proto rows to `RecordBatch` values aligned to resolved region
620    /// schemas and returns `(table_name, table_id, batch)` tuples.
621    fn build_aligned_batches(
622        table_rows: &[(String, Rows)],
623        region_schemas: &HashMap<String, (Arc<ArrowSchema>, u32)>,
624    ) -> Result<Vec<(String, u32, RecordBatch)>> {
625        let mut aligned_batches = Vec::with_capacity(table_rows.len());
626        for (table_name, rows) in table_rows {
627            let (region_schema, table_id) =
628                region_schemas.get(table_name).cloned().with_context(|| {
629                    error::UnexpectedResultSnafu {
630                        reason: format!("Region schema not resolved for table: {}", table_name),
631                    }
632                })?;
633
634            let record_batch = {
635                let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED
636                    .with_label_values(&["align_rows_to_record_batch"])
637                    .start_timer();
638                rows_to_aligned_record_batch(rows, region_schema.as_ref())?
639            };
640            aligned_batches.push((table_name.clone(), table_id, record_batch));
641        }
642
643        Ok(aligned_batches)
644    }
645
646    fn get_or_spawn_worker(&self, key: BatchKey) -> PendingWorker {
647        if let Some(worker) = self.workers.get(&key)
648            && !worker.tx.is_closed()
649        {
650            return worker.clone();
651        }
652
653        let entry = self.workers.entry(key.clone());
654        match entry {
655            Entry::Occupied(mut worker) => {
656                if worker.get().tx.is_closed() {
657                    let new_worker = self.spawn_worker(key);
658                    worker.insert(new_worker.clone());
659                    PENDING_WORKERS.set(self.workers.len() as i64);
660                    new_worker
661                } else {
662                    worker.get().clone()
663                }
664            }
665            Entry::Vacant(vacant) => {
666                let worker = self.spawn_worker(key);
667
668                vacant.insert(worker.clone());
669                PENDING_WORKERS.set(self.workers.len() as i64);
670                worker
671            }
672        }
673    }
674
675    fn spawn_worker(&self, key: BatchKey) -> PendingWorker {
676        let (tx, rx) = mpsc::channel(self.worker_channel_capacity);
677        let worker = PendingWorker { tx: tx.clone() };
678        let worker_idle_timeout = self
679            .flush_interval
680            .checked_mul(WORKER_IDLE_TIMEOUT_MULTIPLIER)
681            .unwrap_or(self.flush_interval);
682
683        start_worker(
684            key,
685            worker.tx.clone(),
686            self.workers.clone(),
687            rx,
688            self.shutdown.clone(),
689            self.partition_manager.clone(),
690            self.node_manager.clone(),
691            self.catalog_manager.clone(),
692            self.flush_interval,
693            worker_idle_timeout,
694            self.max_batch_rows,
695            self.flush_semaphore.clone(),
696        );
697
698        worker
699    }
700}
701
702impl Drop for PendingRowsBatcher {
703    fn drop(&mut self) {
704        let _ = self.shutdown.send(());
705    }
706}
707
708impl PendingBatch {
709    fn new() -> Self {
710        Self {
711            tables: HashMap::new(),
712            created_at: None,
713            total_row_count: 0,
714            ctx: None,
715            waiters: Vec::new(),
716        }
717    }
718}
719
720#[allow(clippy::too_many_arguments)]
721fn start_worker(
722    key: BatchKey,
723    worker_tx: mpsc::Sender<WorkerCommand>,
724    workers: Arc<DashMap<BatchKey, PendingWorker>>,
725    mut rx: mpsc::Receiver<WorkerCommand>,
726    shutdown: broadcast::Sender<()>,
727    partition_manager: PartitionRuleManagerRef,
728    node_manager: NodeManagerRef,
729    catalog_manager: CatalogManagerRef,
730    flush_interval: Duration,
731    worker_idle_timeout: Duration,
732    max_batch_rows: usize,
733    flush_semaphore: Arc<Semaphore>,
734) {
735    tokio::spawn(async move {
736        let mut batch = PendingBatch::new();
737        let mut interval = tokio::time::interval(flush_interval);
738        let mut shutdown_rx = shutdown.subscribe();
739        let idle_deadline = tokio::time::Instant::now() + worker_idle_timeout;
740        let idle_timer = tokio::time::sleep_until(idle_deadline);
741        tokio::pin!(idle_timer);
742
743        loop {
744            tokio::select! {
745                cmd = rx.recv() => {
746                    match cmd {
747                        Some(WorkerCommand::Submit { table_batches, total_rows, ctx, response_tx, _permit }) => {
748                            idle_timer.as_mut().reset(tokio::time::Instant::now() + worker_idle_timeout);
749
750                            if batch.total_row_count == 0 {
751                                batch.created_at = Some(Instant::now());
752                                batch.ctx = Some(ctx);
753                                PENDING_BATCHES.inc();
754                            }
755
756                            batch.waiters.push(FlushWaiter { response_tx, _permit });
757
758                            for (table_name, table_id, record_batch) in table_batches {
759                                let entry = batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch {
760                                    table_name,
761                                    table_id,
762                                    batches: Vec::new(),
763                                    row_count: 0,
764                                });
765                                entry.row_count += record_batch.num_rows();
766                                entry.batches.push(record_batch);
767                            }
768
769                            batch.total_row_count += total_rows;
770                            PENDING_ROWS.add(total_rows as i64);
771
772                            if batch.total_row_count >= max_batch_rows
773                                && let Some(flush) = drain_batch(&mut batch) {
774                                    spawn_flush(
775                                        flush,
776                                        partition_manager.clone(),
777                                        node_manager.clone(),
778                                        catalog_manager.clone(),
779                                        flush_semaphore.clone(),
780                                    ).await;
781                            }
782                        }
783                        None => {
784                            if let Some(flush) = drain_batch(&mut batch) {
785                                flush_batch(
786                                    flush,
787                                    partition_manager.clone(),
788                                    node_manager.clone(),
789                                    catalog_manager.clone(),
790                                ).await;
791                            }
792                            break;
793                        }
794                    }
795                }
796                _ = &mut idle_timer => {
797                    if !should_close_worker_on_idle_timeout(batch.total_row_count, rx.len()) {
798                        idle_timer
799                            .as_mut()
800                            .reset(tokio::time::Instant::now() + worker_idle_timeout);
801                        continue;
802                    }
803
804                    debug!(
805                        "Closing idle pending rows worker due to timeout: catalog={}, schema={}, physical_table={}",
806                        key.catalog,
807                        key.schema,
808                        key.physical_table
809                    );
810                    break;
811                }
812                _ = interval.tick() => {
813                    if let Some(created_at) = batch.created_at
814                        && batch.total_row_count > 0
815                        && created_at.elapsed() >= flush_interval
816                        && let Some(flush) = drain_batch(&mut batch) {
817                            spawn_flush(
818                                flush,
819                                partition_manager.clone(),
820                                node_manager.clone(),
821                                catalog_manager.clone(),
822                                flush_semaphore.clone(),
823                            ).await;
824                    }
825                }
826                _ = shutdown_rx.recv() => {
827                    if let Some(flush) = drain_batch(&mut batch) {
828                        flush_batch(
829                            flush,
830                            partition_manager.clone(),
831                            node_manager.clone(),
832                            catalog_manager.clone(),
833                        ).await;
834                    }
835                    break;
836                }
837            }
838        }
839
840        remove_worker_if_same_channel(workers.as_ref(), &key, &worker_tx);
841    });
842}
843
844fn remove_worker_if_same_channel(
845    workers: &DashMap<BatchKey, PendingWorker>,
846    key: &BatchKey,
847    worker_tx: &mpsc::Sender<WorkerCommand>,
848) -> bool {
849    if let Some(worker) = workers.get(key)
850        && worker.tx.same_channel(worker_tx)
851    {
852        drop(worker);
853        workers.remove(key);
854        PENDING_WORKERS.set(workers.len() as i64);
855        return true;
856    }
857
858    false
859}
860
861fn should_close_worker_on_idle_timeout(total_row_count: usize, queued_requests: usize) -> bool {
862    total_row_count == 0 && queued_requests == 0
863}
864
865fn drain_batch(batch: &mut PendingBatch) -> Option<FlushBatch> {
866    if batch.total_row_count == 0 {
867        return None;
868    }
869
870    let ctx = match batch.ctx.take() {
871        Some(ctx) => ctx,
872        None => {
873            flush_with_error(batch, "Pending batch missing context");
874            return None;
875        }
876    };
877
878    let total_row_count = batch.total_row_count;
879    let table_batches = std::mem::take(&mut batch.tables).into_values().collect();
880    let waiters = std::mem::take(&mut batch.waiters);
881    batch.total_row_count = 0;
882    batch.created_at = None;
883
884    PENDING_ROWS.sub(total_row_count as i64);
885    PENDING_BATCHES.dec();
886
887    Some(FlushBatch {
888        table_batches,
889        total_row_count,
890        ctx,
891        waiters,
892    })
893}
894
895async fn spawn_flush(
896    flush: FlushBatch,
897    partition_manager: PartitionRuleManagerRef,
898    node_manager: NodeManagerRef,
899    catalog_manager: CatalogManagerRef,
900    semaphore: Arc<Semaphore>,
901) {
902    match semaphore.acquire_owned().await {
903        Ok(permit) => {
904            tokio::spawn(async move {
905                let _permit = permit;
906                flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
907            });
908        }
909        Err(err) => {
910            warn!(err; "Flush semaphore closed, flushing inline");
911            flush_batch(flush, partition_manager, node_manager, catalog_manager).await;
912        }
913    }
914}
915
916struct FlushRegionWrite {
917    region_id: RegionId,
918    row_count: usize,
919    datanode: Peer,
920    request: RegionRequest,
921}
922
923enum FlushWriteResult {
924    Success { row_count: usize },
925    Failed { row_count: usize, message: String },
926}
927
928fn should_dispatch_concurrently(region_write_count: usize) -> bool {
929    region_write_count > 1
930}
931
932/// Classifies columns in a logical-table batch for sparse primary-key conversion.
933///
934/// Returns:
935/// - `Vec<TagColumnInfo>`: all Utf8 tag columns sorted by tag name, used for
936///   TSID and sparse primary-key encoding.
937/// - `SmallVec<[usize; 3]>`: indices of columns copied into the physical batch
938///   after `__primary_key`, ordered as `[greptime_timestamp, greptime_value,
939///   partition_tag_columns...]`.
940fn columns_taxonomy(
941    batch_schema: &Arc<ArrowSchema>,
942    table_name: &str,
943    name_to_ids: &HashMap<String, u32>,
944    partition_columns: &HashSet<&str>,
945) -> Result<(Vec<TagColumnInfo>, SmallVec<[usize; 3]>)> {
946    let mut tag_columns = Vec::new();
947    let mut essential_column_indices =
948        SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len());
949    // Placeholder for greptime_timestamp and greptime_value
950    essential_column_indices.push(0);
951    essential_column_indices.push(0);
952
953    let mut timestamp_index = None;
954    let mut value_index = None;
955
956    for (index, field) in batch_schema.fields().iter().enumerate() {
957        match field.data_type() {
958            ArrowDataType::Utf8 => {
959                let column_id = name_to_ids.get(field.name()).copied().with_context(|| {
960                    error::InvalidPromRemoteRequestSnafu {
961                        msg: format!(
962                            "Column '{}' from logical table '{}' not found in physical table column IDs",
963                            field.name(),
964                            table_name
965                        ),
966                    }
967                })?;
968                tag_columns.push(TagColumnInfo {
969                    name: field.name().clone(),
970                    index,
971                    column_id,
972                });
973
974                if partition_columns.contains(field.name().as_str()) {
975                    essential_column_indices.push(index);
976                }
977            }
978            ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => {
979                ensure!(
980                    timestamp_index.replace(index).is_none(),
981                    error::InvalidPromRemoteRequestSnafu {
982                        msg: format!(
983                            "Duplicated timestamp column in logical table '{}' batch schema",
984                            table_name
985                        ),
986                    }
987                );
988            }
989            ArrowDataType::Float64 => {
990                ensure!(
991                    value_index.replace(index).is_none(),
992                    error::InvalidPromRemoteRequestSnafu {
993                        msg: format!(
994                            "Duplicated value column in logical table '{}' batch schema",
995                            table_name
996                        ),
997                    }
998                );
999            }
1000            datatype => {
1001                return error::InvalidPromRemoteRequestSnafu {
1002                    msg: format!(
1003                        "Unexpected data type '{datatype:?}' in logical table '{}' batch schema",
1004                        table_name
1005                    ),
1006                }
1007                .fail();
1008            }
1009        }
1010    }
1011
1012    let timestamp_index =
1013        timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1014            msg: format!(
1015                "Missing essential column '{}' in logical table '{}' batch schema",
1016                greptime_timestamp(),
1017                table_name
1018            ),
1019        })?;
1020    let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu {
1021        msg: format!(
1022            "Missing essential column '{}' in logical table '{}' batch schema",
1023            greptime_value(),
1024            table_name
1025        ),
1026    })?;
1027
1028    tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
1029
1030    essential_column_indices[0] = timestamp_index;
1031    essential_column_indices[1] = value_index;
1032
1033    Ok((tag_columns, essential_column_indices))
1034}
1035
1036fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch> {
1037    ensure!(
1038        batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1039        error::InternalSnafu {
1040            err_msg: format!(
1041                "Expected at least {} columns in physical batch, got {}",
1042                PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT,
1043                batch.num_columns()
1044            ),
1045        }
1046    );
1047    let essential_indices: Vec<usize> = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect();
1048    batch
1049        .project(&essential_indices)
1050        .map_err(|err| Error::Internal {
1051            err_msg: format!("Failed to project essential columns from RecordBatch: {err}"),
1052        })
1053}
1054
1055async fn flush_region_writes_concurrently(
1056    node_manager: NodeManagerRef,
1057    writes: Vec<FlushRegionWrite>,
1058) -> Vec<FlushWriteResult> {
1059    if !should_dispatch_concurrently(writes.len()) {
1060        let mut results = Vec::with_capacity(writes.len());
1061        for write in writes {
1062            let datanode = node_manager.datanode(&write.datanode).await;
1063            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1064                .with_label_values(&["flush_write_region"])
1065                .start_timer();
1066            match datanode.handle(write.request).await {
1067                Ok(_) => results.push(FlushWriteResult::Success {
1068                    row_count: write.row_count,
1069                }),
1070                Err(err) => results.push(FlushWriteResult::Failed {
1071                    row_count: write.row_count,
1072                    message: format!(
1073                        "Bulk insert flush failed for region {}: {:?}",
1074                        write.region_id, err
1075                    ),
1076                }),
1077            }
1078        }
1079        return results;
1080    }
1081
1082    let write_futures = writes.into_iter().map(|write| {
1083        let node_manager = node_manager.clone();
1084        async move {
1085            let datanode = node_manager.datanode(&write.datanode).await;
1086            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1087                .with_label_values(&["flush_write_region"])
1088                .start_timer();
1089
1090            match datanode.handle(write.request).await {
1091                Ok(_) => FlushWriteResult::Success {
1092                    row_count: write.row_count,
1093                },
1094                Err(err) => FlushWriteResult::Failed {
1095                    row_count: write.row_count,
1096                    message: format!(
1097                        "Bulk insert flush failed for region {}: {:?}",
1098                        write.region_id, err
1099                    ),
1100                },
1101            }
1102        }
1103    });
1104
1105    // todo(hl): should be bounded.
1106    futures::future::join_all(write_futures).await
1107}
1108
1109async fn flush_batch(
1110    flush: FlushBatch,
1111    partition_manager: PartitionRuleManagerRef,
1112    node_manager: NodeManagerRef,
1113    catalog_manager: CatalogManagerRef,
1114) {
1115    let FlushBatch {
1116        table_batches,
1117        total_row_count,
1118        ctx,
1119        waiters,
1120    } = flush;
1121    let start = Instant::now();
1122    let mut first_error: Option<String> = None;
1123
1124    // Physical-table-level flush: transform all logical table batches
1125    // into physical format and write them together.
1126    let physical_table_name = ctx
1127        .extension(PHYSICAL_TABLE_KEY)
1128        .unwrap_or(GREPTIME_PHYSICAL_TABLE)
1129        .to_string();
1130    flush_batch_physical(
1131        &table_batches,
1132        total_row_count,
1133        &physical_table_name,
1134        &ctx,
1135        &partition_manager,
1136        &node_manager,
1137        &catalog_manager,
1138        &mut first_error,
1139    )
1140    .await;
1141
1142    let elapsed = start.elapsed().as_secs_f64();
1143    FLUSH_ELAPSED.observe(elapsed);
1144    debug!(
1145        "Pending rows batch flushed, total rows: {}, elapsed time: {}s",
1146        total_row_count, elapsed
1147    );
1148
1149    notify_waiters(waiters, &first_error);
1150}
1151
1152/// Attempts to flush all table batches by transforming them into the physical
1153/// table format (sparse primary key encoding) and writing directly to the
1154/// physical data regions.
1155///
1156/// This is the only flush path. Any failure in resolving or transforming the
1157/// physical flush inputs is recorded as flush failure and reported to waiters.
1158#[allow(clippy::too_many_arguments)]
1159async fn flush_batch_physical(
1160    table_batches: &[TableBatch],
1161    total_row_count: usize,
1162    physical_table_name: &str,
1163    ctx: &QueryContextRef,
1164    partition_manager: &PartitionRuleManagerRef,
1165    node_manager: &NodeManagerRef,
1166    catalog_manager: &CatalogManagerRef,
1167    first_error: &mut Option<String>,
1168) {
1169    macro_rules! record_failure {
1170        ($row_count:expr, $msg:expr) => {{
1171            let msg = $msg;
1172            if first_error.is_none() {
1173                *first_error = Some(msg.clone());
1174            }
1175            mark_flush_failure($row_count, &msg);
1176        }};
1177    }
1178
1179    // 1. Resolve the physical table and get column ID mapping
1180    let physical_table = {
1181        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1182            .with_label_values(&["flush_physical_resolve_table"])
1183            .start_timer();
1184        match catalog_manager
1185            .table(
1186                ctx.current_catalog(),
1187                &ctx.current_schema(),
1188                physical_table_name,
1189                Some(ctx.as_ref()),
1190            )
1191            .await
1192        {
1193            Ok(Some(table)) => table,
1194            Ok(None) => {
1195                record_failure!(
1196                    total_row_count,
1197                    format!(
1198                        "Physical table '{}' not found during pending flush",
1199                        physical_table_name
1200                    )
1201                );
1202                return;
1203            }
1204            Err(err) => {
1205                record_failure!(
1206                    total_row_count,
1207                    format!(
1208                        "Failed to resolve physical table '{}' for pending flush: {:?}",
1209                        physical_table_name, err
1210                    )
1211                );
1212                return;
1213            }
1214        }
1215    };
1216
1217    let physical_table_info = physical_table.table_info();
1218    let name_to_ids = match physical_table_info.name_to_ids() {
1219        Some(ids) => ids,
1220        None => {
1221            record_failure!(
1222                total_row_count,
1223                format!(
1224                    "Physical table '{}' has no column IDs for pending flush",
1225                    physical_table_name
1226                )
1227            );
1228            return;
1229        }
1230    };
1231
1232    // 2. Get the physical table's partition rule (one lookup instead of N)
1233    let partition_rule = {
1234        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1235            .with_label_values(&["flush_physical_fetch_partition_rule"])
1236            .start_timer();
1237        match partition_manager
1238            .find_table_partition_rule(&physical_table_info)
1239            .await
1240        {
1241            Ok(rule) => rule,
1242            Err(err) => {
1243                record_failure!(
1244                    total_row_count,
1245                    format!(
1246                        "Failed to fetch partition rule for physical table '{}': {:?}",
1247                        physical_table_name, err
1248                    )
1249                );
1250                return;
1251            }
1252        }
1253    };
1254    let partition_columns = partition_rule.0.partition_columns();
1255    let partition_columns_set: HashSet<&str> =
1256        partition_columns.iter().map(String::as_str).collect();
1257
1258    // 3. Transform each logical table batch into physical format
1259    let mut modified_batches: Vec<RecordBatch> = Vec::with_capacity(table_batches.len());
1260    let mut modified_row_count: usize = 0;
1261
1262    let mut modify_elapsed = Duration::ZERO;
1263    let mut columns_taxonomy_elapsed = Duration::ZERO;
1264
1265    'next_table: for table_batch in table_batches {
1266        let table_id = table_batch.table_id;
1267
1268        // Transform each chunk to physical format directly, avoiding an
1269        // intermediate concat_batches per logical table.
1270        for batch in &table_batch.batches {
1271            // Identify tag columns and non-tag columns from the logical batch schema.
1272            // Chunks within a table_batch may have different schemas if new tag columns
1273            // are added between submits.
1274            // In prom batches, Float64 = value, Timestamp = timestamp, Utf8 = tags.
1275            let batch_schema = batch.schema();
1276            let start = Instant::now();
1277            let (tag_columns, essential_col_indices) = match columns_taxonomy(
1278                &batch_schema,
1279                &table_batch.table_name,
1280                &name_to_ids,
1281                &partition_columns_set,
1282            ) {
1283                Ok(columns) => columns,
1284                Err(err) => {
1285                    warn!(
1286                        "Failed to resolve columns for logical table '{}': {:?}",
1287                        table_batch.table_name, err
1288                    );
1289                    record_failure!(table_batch.row_count, err.to_string());
1290                    continue 'next_table;
1291                }
1292            };
1293
1294            columns_taxonomy_elapsed += start.elapsed();
1295            if tag_columns.is_empty() && essential_col_indices.is_empty() {
1296                continue;
1297            }
1298
1299            let modified = {
1300                let start = Instant::now();
1301                match modify_batch_sparse(
1302                    batch.clone(),
1303                    table_id,
1304                    &tag_columns,
1305                    &essential_col_indices,
1306                ) {
1307                    Ok(batch) => {
1308                        modify_elapsed += start.elapsed();
1309                        batch
1310                    }
1311                    Err(err) => {
1312                        record_failure!(
1313                            table_batch.row_count,
1314                            format!(
1315                                "Failed to modify batch for logical table '{}': {:?}",
1316                                table_batch.table_name, err
1317                            )
1318                        );
1319                        continue 'next_table;
1320                    }
1321                }
1322            };
1323
1324            modified_row_count += modified.num_rows();
1325            modified_batches.push(modified);
1326        }
1327    }
1328
1329    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1330        .with_label_values(&["flush_physical_modify_batch"])
1331        .observe(modify_elapsed.as_secs_f64());
1332    PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1333        .with_label_values(&["flush_physical_columns_taxonomy"])
1334        .observe(columns_taxonomy_elapsed.as_secs_f64());
1335
1336    if modified_batches.is_empty() {
1337        if first_error.is_none() {
1338            record_failure!(
1339                total_row_count,
1340                format!(
1341                    "No batches can be transformed for physical table '{}' during pending flush",
1342                    physical_table_name
1343                )
1344            );
1345        }
1346        return;
1347    }
1348
1349    // 4. Concatenate all modified batches (all share the same physical schema)
1350    let combined_batch = {
1351        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1352            .with_label_values(&["flush_physical_concat_all"])
1353            .start_timer();
1354        let combined_schema = modified_batches[0].schema();
1355        // todo(hl): maybe limit max rows to concat.
1356        match concat_batches(&combined_schema, &modified_batches) {
1357            Ok(batch) => batch,
1358            Err(err) => {
1359                record_failure!(
1360                    modified_row_count,
1361                    format!("Failed to concat modified batches: {:?}", err)
1362                );
1363                return;
1364            }
1365        }
1366    };
1367
1368    // 5. Split by physical partition rule and send to regions
1369    let physical_table_id = physical_table_info.table_id();
1370    let region_masks = {
1371        let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1372            .with_label_values(&["flush_physical_split_record_batch"])
1373            .start_timer();
1374        match partition_rule.0.split_record_batch(&combined_batch) {
1375            Ok(masks) => masks,
1376            Err(err) => {
1377                record_failure!(
1378                    total_row_count,
1379                    format!(
1380                        "Failed to split combined batch for physical table '{}': {:?}",
1381                        physical_table_name, err
1382                    )
1383                );
1384                return;
1385            }
1386        }
1387    };
1388
1389    let stripped_batch = if partition_columns.is_empty() {
1390        combined_batch
1391    } else {
1392        // Strip partition columns before encoding and sending requests.
1393        match strip_partition_columns_from_batch(combined_batch) {
1394            Ok(batch) => batch,
1395            Err(err) => {
1396                record_failure!(
1397                    total_row_count,
1398                    format!(
1399                        "Failed to strip partition columns for physical table '{}': {:?}",
1400                        physical_table_name, err
1401                    )
1402                );
1403                return;
1404            }
1405        }
1406    };
1407
1408    let mut region_writes = Vec::new();
1409    for (region_number, mask) in region_masks {
1410        if mask.select_none() {
1411            continue;
1412        }
1413
1414        let region_batch = if mask.select_all() {
1415            stripped_batch.clone()
1416        } else {
1417            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1418                .with_label_values(&["flush_physical_filter_record_batch"])
1419                .start_timer();
1420            match filter_record_batch(&stripped_batch, mask.array()) {
1421                Ok(batch) => batch,
1422                Err(err) => {
1423                    record_failure!(
1424                        total_row_count,
1425                        format!(
1426                            "Failed to filter combined batch for physical table '{}': {:?}",
1427                            physical_table_name, err
1428                        )
1429                    );
1430                    continue;
1431                }
1432            }
1433        };
1434
1435        let row_count = region_batch.num_rows();
1436        if row_count == 0 {
1437            continue;
1438        }
1439
1440        let region_id = RegionId::new(physical_table_id, region_number);
1441        let datanode = {
1442            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1443                .with_label_values(&["flush_physical_resolve_region_leader"])
1444                .start_timer();
1445            match partition_manager.find_region_leader(region_id).await {
1446                Ok(peer) => peer,
1447                Err(err) => {
1448                    record_failure!(
1449                        row_count,
1450                        format!(
1451                            "Failed to resolve region leader for physical region {}: {:?}",
1452                            region_id, err
1453                        )
1454                    );
1455                    continue;
1456                }
1457            }
1458        };
1459
1460        let (schema_bytes, data_header, payload) = {
1461            let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
1462                .with_label_values(&["flush_physical_encode_ipc"])
1463                .start_timer();
1464            match record_batch_to_ipc(region_batch) {
1465                Ok(encoded) => encoded,
1466                Err(err) => {
1467                    record_failure!(
1468                        row_count,
1469                        format!(
1470                            "Failed to encode Arrow IPC for physical region {}: {:?}",
1471                            region_id, err
1472                        )
1473                    );
1474                    continue;
1475                }
1476            }
1477        };
1478
1479        let request = RegionRequest {
1480            header: Some(RegionRequestHeader {
1481                tracing_context: TracingContext::from_current_span().to_w3c(),
1482                ..Default::default()
1483            }),
1484            body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
1485                region_id: region_id.as_u64(),
1486                partition_expr_version: None,
1487                body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
1488                    schema: schema_bytes,
1489                    data_header,
1490                    payload,
1491                })),
1492            })),
1493        };
1494
1495        region_writes.push(FlushRegionWrite {
1496            region_id,
1497            row_count,
1498            datanode,
1499            request,
1500        });
1501    }
1502
1503    for result in flush_region_writes_concurrently(node_manager.clone(), region_writes).await {
1504        match result {
1505            FlushWriteResult::Success { row_count } => {
1506                FLUSH_TOTAL.inc();
1507                FLUSH_ROWS.observe(row_count as f64);
1508            }
1509            FlushWriteResult::Failed { row_count, message } => {
1510                record_failure!(row_count, message);
1511            }
1512        }
1513    }
1514}
1515
1516fn notify_waiters(waiters: Vec<FlushWaiter>, first_error: &Option<String>) {
1517    for waiter in waiters {
1518        let result = match first_error {
1519            Some(err_msg) => Err(Error::Internal {
1520                err_msg: err_msg.clone(),
1521            }),
1522            None => Ok(()),
1523        };
1524        let _ = waiter.response_tx.send(result);
1525        // waiter._permit is dropped here, releasing the inflight semaphore slot
1526    }
1527}
1528
1529fn mark_flush_failure(row_count: usize, message: &str) {
1530    error!("Pending rows batch flush failed, message: {}", message);
1531    FLUSH_FAILURES.inc();
1532    FLUSH_DROPPED_ROWS.inc_by(row_count as u64);
1533}
1534
1535fn flush_with_error(batch: &mut PendingBatch, message: &str) {
1536    if batch.total_row_count == 0 {
1537        return;
1538    }
1539
1540    let row_count = batch.total_row_count;
1541    let waiters = std::mem::take(&mut batch.waiters);
1542    batch.tables.clear();
1543    batch.total_row_count = 0;
1544    batch.created_at = None;
1545    batch.ctx = None;
1546
1547    PENDING_ROWS.sub(row_count as i64);
1548    PENDING_BATCHES.dec();
1549
1550    let err_msg = Some(message.to_string());
1551    notify_waiters(waiters, &err_msg);
1552    mark_flush_failure(row_count, message);
1553}
1554
1555fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
1556    let mut encoder = FlightEncoder::default();
1557    let schema = encoder.encode_schema(record_batch.schema().as_ref());
1558    let mut iter = encoder
1559        .encode(FlightMessage::RecordBatch(record_batch))
1560        .into_iter();
1561    let Some(flight_data) = iter.next() else {
1562        return Err(Error::Internal {
1563            err_msg: "Failed to encode empty flight data".to_string(),
1564        });
1565    };
1566    if iter.next().is_some() {
1567        return Err(Error::NotSupported {
1568            feat: "bulk insert RecordBatch with dictionary arrays".to_string(),
1569        });
1570    }
1571
1572    Ok((
1573        schema.data_header,
1574        flight_data.data_header,
1575        flight_data.data_body,
1576    ))
1577}
1578
1579#[cfg(test)]
1580mod tests {
1581    use std::collections::{HashMap, HashSet};
1582    use std::sync::Arc;
1583    use std::sync::atomic::{AtomicUsize, Ordering};
1584    use std::time::Duration;
1585
1586    use api::region::RegionResponse;
1587    use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
1588    use api::v1::meta::Peer;
1589    use api::v1::region::{InsertRequests, RegionRequest};
1590    use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows};
1591    use arrow::array::{BinaryArray, StringArray, TimestampMillisecondArray};
1592    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
1593    use arrow::record_batch::RecordBatch;
1594    use async_trait::async_trait;
1595    use common_meta::error::Result as MetaResult;
1596    use common_meta::node_manager::{
1597        Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef,
1598    };
1599    use common_query::request::QueryRequest;
1600    use common_recordbatch::SendableRecordBatchStream;
1601    use dashmap::DashMap;
1602    use smallvec::SmallVec;
1603    use store_api::storage::RegionId;
1604    use tokio::sync::mpsc;
1605    use tokio::time::sleep;
1606
1607    use super::{
1608        BatchKey, Error, FlushRegionWrite, FlushWriteResult, PendingRowsBatcher, PendingWorker,
1609        WorkerCommand, columns_taxonomy, flush_region_writes_concurrently,
1610        remove_worker_if_same_channel, should_close_worker_on_idle_timeout,
1611        should_dispatch_concurrently, strip_partition_columns_from_batch,
1612    };
1613
1614    fn mock_rows(row_count: usize, schema_name: &str) -> Rows {
1615        Rows {
1616            schema: vec![ColumnSchema {
1617                column_name: schema_name.to_string(),
1618                ..Default::default()
1619            }],
1620            rows: (0..row_count).map(|_| Row { values: vec![] }).collect(),
1621        }
1622    }
1623
1624    #[test]
1625    fn test_collect_non_empty_table_rows_filters_empty_payloads() {
1626        let requests = RowInsertRequests {
1627            inserts: vec![
1628                RowInsertRequest {
1629                    table_name: "cpu".to_string(),
1630                    rows: Some(mock_rows(2, "host")),
1631                },
1632                RowInsertRequest {
1633                    table_name: "mem".to_string(),
1634                    rows: Some(mock_rows(0, "host")),
1635                },
1636                RowInsertRequest {
1637                    table_name: "disk".to_string(),
1638                    rows: None,
1639                },
1640            ],
1641        };
1642
1643        let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests);
1644
1645        assert_eq!(2, total_rows);
1646        assert_eq!(1, table_rows.len());
1647        assert_eq!("cpu", table_rows[0].0);
1648        assert_eq!(2, table_rows[0].1.rows.len());
1649    }
1650
1651    #[derive(Clone)]
1652    struct ConcurrentMockDatanode {
1653        delay: Duration,
1654        inflight: Arc<AtomicUsize>,
1655        max_inflight: Arc<AtomicUsize>,
1656    }
1657
1658    #[async_trait]
1659    impl Datanode for ConcurrentMockDatanode {
1660        async fn handle(&self, _request: RegionRequest) -> MetaResult<RegionResponse> {
1661            let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
1662            loop {
1663                let max = self.max_inflight.load(Ordering::SeqCst);
1664                if now <= max {
1665                    break;
1666                }
1667                if self
1668                    .max_inflight
1669                    .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst)
1670                    .is_ok()
1671                {
1672                    break;
1673                }
1674            }
1675
1676            sleep(self.delay).await;
1677            self.inflight.fetch_sub(1, Ordering::SeqCst);
1678            Ok(RegionResponse::new(0))
1679        }
1680
1681        async fn handle_query(
1682            &self,
1683            _request: QueryRequest,
1684        ) -> MetaResult<SendableRecordBatchStream> {
1685            unimplemented!()
1686        }
1687    }
1688
1689    #[derive(Clone)]
1690    struct ConcurrentMockNodeManager {
1691        datanodes: Arc<HashMap<u64, DatanodeRef>>,
1692    }
1693
1694    #[async_trait]
1695    impl DatanodeManager for ConcurrentMockNodeManager {
1696        async fn datanode(&self, node: &Peer) -> DatanodeRef {
1697            self.datanodes
1698                .get(&node.id)
1699                .expect("datanode not found")
1700                .clone()
1701        }
1702    }
1703
1704    struct NoopFlownode;
1705
1706    #[async_trait]
1707    impl Flownode for NoopFlownode {
1708        async fn handle(&self, _request: FlowRequest) -> MetaResult<FlowResponse> {
1709            unimplemented!()
1710        }
1711
1712        async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult<FlowResponse> {
1713            unimplemented!()
1714        }
1715
1716        async fn handle_mark_window_dirty(
1717            &self,
1718            _req: DirtyWindowRequests,
1719        ) -> MetaResult<FlowResponse> {
1720            unimplemented!()
1721        }
1722    }
1723
1724    #[async_trait]
1725    impl FlownodeManager for ConcurrentMockNodeManager {
1726        async fn flownode(&self, _node: &Peer) -> FlownodeRef {
1727            Arc::new(NoopFlownode)
1728        }
1729    }
1730
1731    #[test]
1732    fn test_remove_worker_if_same_channel_removes_matching_entry() {
1733        let workers = DashMap::new();
1734        let key = BatchKey {
1735            catalog: "greptime".to_string(),
1736            schema: "public".to_string(),
1737            physical_table: "phy".to_string(),
1738        };
1739
1740        let (tx, _rx) = mpsc::channel::<WorkerCommand>(1);
1741        workers.insert(key.clone(), PendingWorker { tx: tx.clone() });
1742
1743        assert!(remove_worker_if_same_channel(&workers, &key, &tx));
1744        assert!(!workers.contains_key(&key));
1745    }
1746
1747    #[test]
1748    fn test_remove_worker_if_same_channel_keeps_newer_entry() {
1749        let workers = DashMap::new();
1750        let key = BatchKey {
1751            catalog: "greptime".to_string(),
1752            schema: "public".to_string(),
1753            physical_table: "phy".to_string(),
1754        };
1755
1756        let (stale_tx, _stale_rx) = mpsc::channel::<WorkerCommand>(1);
1757        let (fresh_tx, _fresh_rx) = mpsc::channel::<WorkerCommand>(1);
1758        workers.insert(
1759            key.clone(),
1760            PendingWorker {
1761                tx: fresh_tx.clone(),
1762            },
1763        );
1764
1765        assert!(!remove_worker_if_same_channel(&workers, &key, &stale_tx));
1766        assert!(workers.contains_key(&key));
1767        assert!(workers.get(&key).unwrap().tx.same_channel(&fresh_tx));
1768    }
1769
1770    #[test]
1771    fn test_worker_idle_timeout_close_decision() {
1772        assert!(should_close_worker_on_idle_timeout(0, 0));
1773        assert!(!should_close_worker_on_idle_timeout(1, 0));
1774        assert!(!should_close_worker_on_idle_timeout(0, 1));
1775    }
1776
1777    #[tokio::test]
1778    async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() {
1779        let inflight = Arc::new(AtomicUsize::new(0));
1780        let max_inflight = Arc::new(AtomicUsize::new(0));
1781        let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode {
1782            delay: Duration::from_millis(100),
1783            inflight: inflight.clone(),
1784            max_inflight: max_inflight.clone(),
1785        });
1786        let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode {
1787            delay: Duration::from_millis(100),
1788            inflight,
1789            max_inflight: max_inflight.clone(),
1790        });
1791
1792        let mut datanodes = HashMap::new();
1793        datanodes.insert(1, datanode1);
1794        datanodes.insert(2, datanode2);
1795        let node_manager = Arc::new(ConcurrentMockNodeManager {
1796            datanodes: Arc::new(datanodes),
1797        });
1798
1799        let writes = vec![
1800            FlushRegionWrite {
1801                region_id: RegionId::new(1024, 1),
1802                row_count: 10,
1803                datanode: Peer {
1804                    id: 1,
1805                    addr: "node1".to_string(),
1806                },
1807                request: RegionRequest::default(),
1808            },
1809            FlushRegionWrite {
1810                region_id: RegionId::new(1024, 2),
1811                row_count: 12,
1812                datanode: Peer {
1813                    id: 2,
1814                    addr: "node2".to_string(),
1815                },
1816                request: RegionRequest::default(),
1817            },
1818        ];
1819
1820        let results = flush_region_writes_concurrently(node_manager, writes).await;
1821        assert_eq!(2, results.len());
1822        assert!(
1823            results
1824                .iter()
1825                .all(|result| matches!(result, FlushWriteResult::Success { .. }))
1826        );
1827        assert!(max_inflight.load(Ordering::SeqCst) >= 2);
1828    }
1829
1830    #[test]
1831    fn test_should_dispatch_concurrently_by_region_count() {
1832        assert!(!should_dispatch_concurrently(0));
1833        assert!(!should_dispatch_concurrently(1));
1834        assert!(should_dispatch_concurrently(2));
1835    }
1836
1837    #[test]
1838    fn test_strip_partition_columns_from_batch_removes_partition_tags() {
1839        let batch = RecordBatch::try_new(
1840            Arc::new(ArrowSchema::new(vec![
1841                Field::new("__primary_key", ArrowDataType::Binary, false),
1842                Field::new(
1843                    "greptime_timestamp",
1844                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1845                    false,
1846                ),
1847                Field::new("greptime_value", ArrowDataType::Float64, true),
1848                Field::new("host", ArrowDataType::Utf8, true),
1849            ])),
1850            vec![
1851                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
1852                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
1853                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
1854                Arc::new(StringArray::from(vec!["node-1"])),
1855            ],
1856        )
1857        .unwrap();
1858
1859        let stripped = strip_partition_columns_from_batch(batch).unwrap();
1860
1861        assert_eq!(3, stripped.num_columns());
1862        assert_eq!("__primary_key", stripped.schema().field(0).name());
1863        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
1864        assert_eq!("greptime_value", stripped.schema().field(2).name());
1865    }
1866
1867    #[test]
1868    fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() {
1869        let batch = RecordBatch::try_new(
1870            Arc::new(ArrowSchema::new(vec![
1871                Field::new("__primary_key", ArrowDataType::Binary, false),
1872                Field::new(
1873                    "greptime_timestamp",
1874                    ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1875                    false,
1876                ),
1877                Field::new("greptime_value", ArrowDataType::Float64, true),
1878                Field::new("host", ArrowDataType::Utf8, true),
1879            ])),
1880            vec![
1881                Arc::new(BinaryArray::from(vec![b"k1".as_slice()])),
1882                Arc::new(TimestampMillisecondArray::from(vec![1000_i64])),
1883                Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])),
1884                Arc::new(StringArray::from(vec!["node-1"])),
1885            ],
1886        )
1887        .unwrap();
1888
1889        let stripped = strip_partition_columns_from_batch(batch).unwrap();
1890
1891        assert_eq!(3, stripped.num_columns());
1892        assert_eq!("__primary_key", stripped.schema().field(0).name());
1893        assert_eq!("greptime_timestamp", stripped.schema().field(1).name());
1894        assert_eq!("greptime_value", stripped.schema().field(2).name());
1895    }
1896
1897    #[test]
1898    fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() {
1899        let schema = Arc::new(ArrowSchema::new(vec![
1900            Field::new(
1901                "greptime_timestamp",
1902                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1903                false,
1904            ),
1905            Field::new("greptime_value", ArrowDataType::Float64, true),
1906            Field::new("host", ArrowDataType::Utf8, true),
1907            Field::new("region", ArrowDataType::Utf8, true),
1908        ]));
1909        let name_to_ids =
1910            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
1911        let partition_columns = HashSet::from(["host"]);
1912
1913        let (tag_columns, non_tag_indices) =
1914            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
1915
1916        assert_eq!(2, tag_columns.len());
1917        assert_eq!(&[0, 1, 2], non_tag_indices.as_slice());
1918    }
1919
1920    #[test]
1921    fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() {
1922        let schema = Arc::new(ArrowSchema::new(vec![
1923            Field::new("host", ArrowDataType::Utf8, true),
1924            Field::new("greptime_value", ArrowDataType::Float64, true),
1925            Field::new(
1926                "greptime_timestamp",
1927                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1928                false,
1929            ),
1930            Field::new("region", ArrowDataType::Utf8, true),
1931        ]));
1932        let name_to_ids =
1933            HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]);
1934        let partition_columns = HashSet::from(["host", "region"]);
1935
1936        let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) =
1937            columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap();
1938
1939        assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice());
1940    }
1941
1942    #[test]
1943    fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() {
1944        let schema = Arc::new(ArrowSchema::new(vec![
1945            Field::new(
1946                "greptime_timestamp",
1947                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1948                false,
1949            ),
1950            Field::new("greptime_value", ArrowDataType::Float64, true),
1951            Field::new("host", ArrowDataType::Utf8, true),
1952            Field::new("invalid", ArrowDataType::Boolean, true),
1953        ]));
1954        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
1955        let partition_columns = HashSet::from(["host"]);
1956
1957        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
1958
1959        assert!(matches!(
1960            result,
1961            Err(Error::InvalidPromRemoteRequest { .. })
1962        ));
1963    }
1964
1965    #[test]
1966    fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() {
1967        let schema = Arc::new(ArrowSchema::new(vec![
1968            Field::new("greptime_timestamp", ArrowDataType::Int64, false),
1969            Field::new("greptime_value", ArrowDataType::Float64, true),
1970            Field::new("host", ArrowDataType::Utf8, true),
1971        ]));
1972        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
1973        let partition_columns = HashSet::from(["host"]);
1974
1975        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
1976
1977        assert!(matches!(
1978            result,
1979            Err(Error::InvalidPromRemoteRequest { .. })
1980        ));
1981    }
1982
1983    #[test]
1984    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() {
1985        let schema = Arc::new(ArrowSchema::new(vec![
1986            Field::new(
1987                "ts1",
1988                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1989                false,
1990            ),
1991            Field::new(
1992                "ts2",
1993                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1994                false,
1995            ),
1996            Field::new("greptime_value", ArrowDataType::Float64, true),
1997            Field::new("host", ArrowDataType::Utf8, true),
1998        ]));
1999        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2000        let partition_columns = HashSet::from(["host"]);
2001
2002        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2003
2004        assert!(matches!(
2005            result,
2006            Err(Error::InvalidPromRemoteRequest { .. })
2007        ));
2008    }
2009
2010    #[test]
2011    fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() {
2012        let schema = Arc::new(ArrowSchema::new(vec![
2013            Field::new(
2014                "greptime_timestamp",
2015                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2016                false,
2017            ),
2018            Field::new("value1", ArrowDataType::Float64, true),
2019            Field::new("value2", ArrowDataType::Float64, true),
2020            Field::new("host", ArrowDataType::Utf8, true),
2021        ]));
2022        let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]);
2023        let partition_columns = HashSet::from(["host"]);
2024
2025        let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns);
2026
2027        assert!(matches!(
2028            result,
2029            Err(Error::InvalidPromRemoteRequest { .. })
2030        ));
2031    }
2032
2033    #[test]
2034    fn test_modify_batch_sparse_with_taxonomy_per_batch() {
2035        use arrow::array::BinaryArray;
2036        use metric_engine::batch_modifier::modify_batch_sparse;
2037
2038        let schema1 = Arc::new(ArrowSchema::new(vec![
2039            Field::new(
2040                "greptime_timestamp",
2041                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2042                false,
2043            ),
2044            Field::new("greptime_value", ArrowDataType::Float64, true),
2045            Field::new("tag1", ArrowDataType::Utf8, true),
2046        ]));
2047
2048        let schema2 = Arc::new(ArrowSchema::new(vec![
2049            Field::new(
2050                "greptime_timestamp",
2051                ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2052                false,
2053            ),
2054            Field::new("greptime_value", ArrowDataType::Float64, true),
2055            Field::new("tag1", ArrowDataType::Utf8, true),
2056            Field::new("tag2", ArrowDataType::Utf8, true),
2057        ]));
2058        let batch2 = RecordBatch::try_new(
2059            schema2.clone(),
2060            vec![
2061                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2062                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2063                Arc::new(StringArray::from(vec!["v1"])),
2064                Arc::new(StringArray::from(vec!["v2"])),
2065            ],
2066        )
2067        .unwrap();
2068
2069        let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]);
2070        let partition_columns = HashSet::new();
2071
2072        // A batch that only has tag1, same values as batch2 for ts and val.
2073        let batch3 = RecordBatch::try_new(
2074            schema1.clone(),
2075            vec![
2076                Arc::new(TimestampMillisecondArray::from(vec![2000])),
2077                Arc::new(arrow::array::Float64Array::from(vec![2.0])),
2078                Arc::new(StringArray::from(vec!["v1"])),
2079            ],
2080        )
2081        .unwrap();
2082
2083        // Simulate the new loop logic in flush_batch_physical:
2084        // Resolve taxonomy FOR EACH BATCH.
2085        let (tag_columns2, indices2) =
2086            columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2087        let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap();
2088
2089        let (tag_columns3, indices3) =
2090            columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap();
2091        let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap();
2092
2093        let pk2 = modified2
2094            .column(0)
2095            .as_any()
2096            .downcast_ref::<BinaryArray>()
2097            .unwrap();
2098        let pk3 = modified3
2099            .column(0)
2100            .as_any()
2101            .downcast_ref::<BinaryArray>()
2102            .unwrap();
2103
2104        // Now they SHOULD be different because tag2 is included in pk2 but not in pk3.
2105        assert_ne!(
2106            pk2.value(0),
2107            pk3.value(0),
2108            "PK should be different because batch2 has tag2!"
2109        );
2110    }
2111}