Skip to main content

metric_engine/engine/
bulk_insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
18use bytes::Bytes;
19use common_error::ext::ErrorExt;
20use common_error::status_code::StatusCode;
21use common_grpc::flight::{FlightEncoder, FlightMessage};
22use common_query::prelude::{greptime_timestamp, greptime_value};
23use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
24use datatypes::arrow::record_batch::RecordBatch;
25use snafu::{OptionExt, ensure};
26use store_api::codec::PrimaryKeyEncoding;
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_request::{
29    AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
30};
31use store_api::storage::RegionId;
32
33use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
34use crate::engine::MetricEngineInner;
35use crate::error;
36use crate::error::Result;
37use crate::metrics::MITO_OPERATION_ELAPSED;
38
39impl MetricEngineInner {
40    /// Bulk-inserts rows into a metric region.
41    ///
42    /// **Logical region path:** The request payload is a logical `RecordBatch`
43    /// (timestamp, value and tag columns). It is transformed to physical format
44    /// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a
45    /// `BulkInserts` request to the data region. If mito reports
46    /// `StatusCode::Unsupported`, the request is transparently retried as a `Put`.
47    ///
48    /// **Physical region path:** The request payload is already in physical format
49    /// (produced by the batcher's `flush_batch_physical`). It is forwarded directly
50    /// to the data region with no transformation.
51    ///
52    /// Returns the number of affected rows, or `0` if the input batch is empty.
53    pub async fn bulk_insert_region(
54        &self,
55        region_id: RegionId,
56        request: RegionBulkInsertsRequest,
57    ) -> Result<AffectedRows> {
58        if request.payload.num_rows() == 0 {
59            return Ok(0);
60        }
61        if self.is_physical_region(region_id) {
62            let _timer = MITO_OPERATION_ELAPSED
63                .with_label_values(&["bulk_insert_physical"])
64                .start_timer();
65            return self.bulk_insert_physical_region(region_id, request).await;
66        }
67
68        let _timer = MITO_OPERATION_ELAPSED
69            .with_label_values(&["bulk_insert_logical"])
70            .start_timer();
71        self.bulk_insert_logical_region(region_id, request).await
72    }
73
74    /// Passthrough for bulk inserts targeting a physical data region.
75    ///
76    /// The batch is already in physical format (with `__primary_key`, timestamp,
77    /// value columns), so no logical-to-physical transformation is needed.
78    async fn bulk_insert_physical_region(
79        &self,
80        region_id: RegionId,
81        request: RegionBulkInsertsRequest,
82    ) -> Result<AffectedRows> {
83        self.data_region
84            .write_data(region_id, RegionRequest::BulkInserts(request))
85            .await
86    }
87
88    /// Bulk-inserts logical rows, transforming them to physical format first.
89    async fn bulk_insert_logical_region(
90        &self,
91        region_id: RegionId,
92        request: RegionBulkInsertsRequest,
93    ) -> Result<AffectedRows> {
94        let (physical_region_id, data_region_id, primary_key_encoding) =
95            self.find_data_region_meta(region_id)?;
96
97        if primary_key_encoding != PrimaryKeyEncoding::Sparse {
98            return error::UnsupportedRegionRequestSnafu {
99                request: RegionRequest::BulkInserts(request),
100            }
101            .fail();
102        }
103
104        let batch = request.payload;
105        if batch.num_rows() == 0 {
106            return Ok(0);
107        }
108
109        let logical_metadata = self
110            .logical_region_metadata(physical_region_id, region_id)
111            .await?;
112        let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
113            region_id,
114            data_region_id,
115            &batch,
116            &logical_metadata,
117        )?;
118        let modified_batch = modify_batch_sparse(
119            batch.clone(),
120            region_id.table_id(),
121            &tag_columns,
122            &non_tag_indices,
123        )?;
124        let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
125
126        let partition_expr_version = request.partition_expr_version;
127        let request = RegionBulkInsertsRequest {
128            region_id: data_region_id,
129            payload: modified_batch,
130            raw_data: ArrowIpc {
131                schema,
132                data_header,
133                payload,
134            },
135            partition_expr_version,
136        };
137        match self
138            .data_region
139            .write_data(data_region_id, RegionRequest::BulkInserts(request))
140            .await
141        {
142            Ok(affected_rows) => Ok(affected_rows),
143            Err(err) if err.status_code() == StatusCode::Unsupported => {
144                // todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
145                let rows = record_batch_to_rows(&batch, region_id)?;
146                self.put_region(
147                    region_id,
148                    RegionPutRequest {
149                        rows,
150                        hint: None,
151                        partition_expr_version,
152                    },
153                )
154                .await
155            }
156            Err(err) => Err(err),
157        }
158    }
159
160    fn resolve_tag_columns_from_metadata(
161        &self,
162        logical_region_id: RegionId,
163        data_region_id: RegionId,
164        batch: &RecordBatch,
165        logical_metadata: &RegionMetadataRef,
166    ) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
167        let tag_names: HashSet<&str> = logical_metadata
168            .column_metadatas
169            .iter()
170            .filter_map(|column| {
171                if column.semantic_type == SemanticType::Tag {
172                    Some(column.column_schema.name.as_str())
173                } else {
174                    None
175                }
176            })
177            .collect();
178
179        let mut tag_columns = Vec::new();
180        let mut non_tag_indices = Vec::new();
181        {
182            let state = self.state.read().unwrap();
183            let physical_columns = state
184                .physical_region_states()
185                .get(&data_region_id)
186                .context(error::PhysicalRegionNotFoundSnafu {
187                    region_id: data_region_id,
188                })?
189                .physical_columns();
190
191            for (index, field) in batch.schema().fields().iter().enumerate() {
192                let name = field.name();
193                let column_id =
194                    *physical_columns
195                        .get(name)
196                        .with_context(|| error::ColumnNotFoundSnafu {
197                            name: name.clone(),
198                            region_id: logical_region_id,
199                        })?;
200                if tag_names.contains(name.as_str()) {
201                    tag_columns.push(TagColumnInfo {
202                        name: name.clone(),
203                        index,
204                        column_id,
205                    });
206                } else {
207                    non_tag_indices.push(index);
208                }
209            }
210        }
211
212        tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
213        Ok((tag_columns, non_tag_indices))
214    }
215}
216
217fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
218    let schema_ref = batch.schema();
219    let fields = schema_ref.fields();
220
221    let mut ts_idx = None;
222    let mut val_idx = None;
223    let mut tag_indices = Vec::new();
224
225    for (idx, field) in fields.iter().enumerate() {
226        if field.name() == greptime_timestamp() {
227            ts_idx = Some(idx);
228            if !matches!(
229                field.data_type(),
230                datatypes::arrow::datatypes::DataType::Timestamp(
231                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
232                    _
233                )
234            ) {
235                return error::UnexpectedRequestSnafu {
236                    reason: format!(
237                        "Timestamp column '{}' in region {:?} has incompatible type: {:?}",
238                        field.name(),
239                        logical_region_id,
240                        field.data_type()
241                    ),
242                }
243                .fail();
244            }
245        } else if field.name() == greptime_value() {
246            val_idx = Some(idx);
247            if !matches!(
248                field.data_type(),
249                datatypes::arrow::datatypes::DataType::Float64
250            ) {
251                return error::UnexpectedRequestSnafu {
252                    reason: format!(
253                        "Value column '{}' in region {:?} has incompatible type: {:?}",
254                        field.name(),
255                        logical_region_id,
256                        field.data_type()
257                    ),
258                }
259                .fail();
260            }
261        } else {
262            if !matches!(
263                field.data_type(),
264                datatypes::arrow::datatypes::DataType::Utf8
265            ) {
266                return error::UnexpectedRequestSnafu {
267                    reason: format!(
268                        "Tag column '{}' in region {:?} must be Utf8, found: {:?}",
269                        field.name(),
270                        logical_region_id,
271                        field.data_type()
272                    ),
273                }
274                .fail();
275            }
276            tag_indices.push(idx);
277        }
278    }
279
280    let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
281        reason: format!(
282            "Timestamp column '{}' not found in RecordBatch for region {:?}",
283            greptime_timestamp(),
284            logical_region_id
285        ),
286    })?;
287    let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
288        reason: format!(
289            "Value column '{}' not found in RecordBatch for region {:?}",
290            greptime_value(),
291            logical_region_id
292        ),
293    })?;
294
295    let mut schema = Vec::with_capacity(2 + tag_indices.len());
296    schema.push(api::v1::ColumnSchema {
297        column_name: greptime_timestamp().to_string(),
298        datatype: ColumnDataType::TimestampMillisecond as i32,
299        semantic_type: SemanticType::Timestamp as i32,
300        datatype_extension: None,
301        options: None,
302    });
303    schema.push(api::v1::ColumnSchema {
304        column_name: greptime_value().to_string(),
305        datatype: ColumnDataType::Float64 as i32,
306        semantic_type: SemanticType::Field as i32,
307        datatype_extension: None,
308        options: None,
309    });
310    for &idx in &tag_indices {
311        let field = &fields[idx];
312        schema.push(api::v1::ColumnSchema {
313            column_name: field.name().clone(),
314            datatype: ColumnDataType::String as i32,
315            semantic_type: SemanticType::Tag as i32,
316            datatype_extension: None,
317            options: None,
318        });
319    }
320
321    let ts_array = batch
322        .column(ts_idx)
323        .as_any()
324        .downcast_ref::<TimestampMillisecondArray>()
325        .expect("validated as TimestampMillisecond");
326    let val_array = batch
327        .column(val_idx)
328        .as_any()
329        .downcast_ref::<Float64Array>()
330        .expect("validated as Float64");
331    let tag_arrays: Vec<&StringArray> = tag_indices
332        .iter()
333        .map(|&idx| {
334            batch
335                .column(idx)
336                .as_any()
337                .downcast_ref::<StringArray>()
338                .expect("validated as Utf8")
339        })
340        .collect();
341
342    let num_rows = batch.num_rows();
343    let mut rows = Vec::with_capacity(num_rows);
344    for row_idx in 0..num_rows {
345        let mut values = Vec::with_capacity(2 + tag_arrays.len());
346
347        if ts_array.is_null(row_idx) {
348            values.push(api::v1::Value { value_data: None });
349        } else {
350            values.push(api::v1::Value {
351                value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
352                    ts_array.value(row_idx),
353                )),
354            });
355        }
356
357        if val_array.is_null(row_idx) {
358            values.push(api::v1::Value { value_data: None });
359        } else {
360            values.push(api::v1::Value {
361                value_data: Some(api::v1::value::ValueData::F64Value(
362                    val_array.value(row_idx),
363                )),
364            });
365        }
366
367        for arr in &tag_arrays {
368            if arr.is_null(row_idx) {
369                values.push(api::v1::Value { value_data: None });
370            } else {
371                values.push(api::v1::Value {
372                    value_data: Some(api::v1::value::ValueData::StringValue(
373                        arr.value(row_idx).to_string(),
374                    )),
375                });
376            }
377        }
378
379        rows.push(api::v1::Row { values });
380    }
381
382    Ok(api::v1::Rows { schema, rows })
383}
384
385fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
386    let mut encoder = FlightEncoder::default();
387    let schema = encoder.encode_schema(record_batch.schema().as_ref());
388    let mut iter = encoder
389        .encode(FlightMessage::RecordBatch(record_batch.clone()))
390        .into_iter();
391
392    let Some(flight_data) = iter.next() else {
393        return error::UnexpectedRequestSnafu {
394            reason: "Failed to encode empty flight data",
395        }
396        .fail();
397    };
398    ensure!(
399        iter.next().is_none(),
400        error::UnexpectedRequestSnafu {
401            reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
402        }
403    );
404
405    Ok((
406        schema.data_header,
407        flight_data.data_header,
408        flight_data.data_body,
409    ))
410}
411
412#[cfg(test)]
413mod tests {
414    use std::assert_matches;
415    use std::sync::Arc;
416
417    use api::v1::ArrowIpc;
418    use common_error::ext::ErrorExt;
419    use common_query::prelude::{greptime_timestamp, greptime_value};
420    use common_recordbatch::RecordBatches;
421    use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
422    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
423    use datatypes::arrow::record_batch::RecordBatch;
424    use mito2::config::MitoConfig;
425    use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
426    use store_api::path_utils::table_dir;
427    use store_api::region_engine::RegionEngine;
428    use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
429    use store_api::storage::{RegionId, ScanRequest};
430
431    use super::record_batch_to_ipc;
432    use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
433    use crate::error::Error;
434    use crate::test_util::{self, TestEnv};
435
436    fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
437        let schema = Arc::new(ArrowSchema::new(vec![
438            Field::new(
439                greptime_timestamp(),
440                DataType::Timestamp(TimeUnit::Millisecond, None),
441                false,
442            ),
443            Field::new(greptime_value(), DataType::Float64, true),
444            Field::new("job", DataType::Utf8, true),
445        ]));
446
447        let mut ts = Vec::with_capacity(rows);
448        let mut values = Vec::with_capacity(rows);
449        let mut tags = Vec::with_capacity(rows);
450        for i in start..start + rows {
451            ts.push(i as i64);
452            values.push(i as f64);
453            tags.push("tag_0".to_string());
454        }
455
456        RecordBatch::try_new(
457            schema,
458            vec![
459                Arc::new(TimestampMillisecondArray::from(ts)),
460                Arc::new(Float64Array::from(values)),
461                Arc::new(StringArray::from(tags)),
462            ],
463        )
464        .unwrap()
465    }
466
467    fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
468        let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
469        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
470            region_id: logical_region_id,
471            payload: batch,
472            raw_data: ArrowIpc {
473                schema,
474                data_header,
475                payload,
476            },
477            partition_expr_version: None,
478        })
479    }
480
481    async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
482        let physical_region_id = env.default_physical_region_id();
483        env.create_physical_region(
484            physical_region_id,
485            &TestEnv::default_table_dir(),
486            vec![(
487                MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
488                "dense".to_string(),
489            )],
490        )
491        .await;
492
493        let logical_region_id = env.default_logical_region_id();
494        let request = test_util::create_logical_region_request(
495            &["job"],
496            physical_region_id,
497            &table_dir("test", logical_region_id.table_id()),
498        );
499        env.metric()
500            .handle_request(logical_region_id, RegionRequest::Create(request))
501            .await
502            .unwrap();
503        logical_region_id
504    }
505
506    #[tokio::test]
507    async fn test_bulk_insert_empty_batch_returns_zero() {
508        let env = TestEnv::new().await;
509        env.init_metric_region().await;
510        let logical_region_id = env.default_logical_region_id();
511
512        let batch = build_logical_batch(0, 0);
513        let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
514            region_id: logical_region_id,
515            payload: batch,
516            raw_data: ArrowIpc::default(),
517            partition_expr_version: None,
518        });
519        let response = env
520            .metric()
521            .handle_request(logical_region_id, request)
522            .await
523            .unwrap();
524        assert_eq!(response.affected_rows, 0);
525    }
526
527    #[tokio::test]
528    async fn test_bulk_insert_physical_region_passthrough() {
529        // Use flat format so that BulkMemtable is used (supports write_bulk).
530        let mito_config = MitoConfig {
531            default_flat_format: true,
532            ..Default::default()
533        };
534        let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
535        env.init_metric_region().await;
536        let physical_region_id = env.default_physical_region_id();
537        let logical_region_id = env.default_logical_region_id();
538
539        // First, do a normal logical bulk insert so we can compare results.
540        let logical_batch = build_logical_batch(0, 3);
541        let logical_request = build_bulk_request(logical_region_id, logical_batch.clone());
542        let response = env
543            .metric()
544            .handle_request(logical_region_id, logical_request)
545            .await
546            .unwrap();
547        assert_eq!(response.affected_rows, 3);
548
549        // Now build a physical-format batch using modify_batch_sparse (simulating
550        // what the batcher's flush_batch_physical does) and send it directly to
551        // the physical region.
552        let tag_columns = vec![TagColumnInfo {
553            name: "job".to_string(),
554            index: 2,
555            column_id: 2, // column_id for "job" in the physical table
556        }];
557        let non_tag_indices = vec![0, 1]; // timestamp, value
558        let second_batch = build_logical_batch(3, 3);
559        let physical_batch = modify_batch_sparse(
560            second_batch,
561            logical_region_id.table_id(),
562            &tag_columns,
563            &non_tag_indices,
564        )
565        .unwrap();
566        let request = build_bulk_request(physical_region_id, physical_batch);
567        let response = env
568            .metric()
569            .handle_request(physical_region_id, request)
570            .await
571            .unwrap();
572        assert_eq!(response.affected_rows, 3);
573
574        // Verify all 6 rows are readable from the logical region.
575        let stream = env
576            .metric()
577            .scan_to_stream(logical_region_id, ScanRequest::default())
578            .await
579            .unwrap();
580        let batches = RecordBatches::try_collect(stream).await.unwrap();
581        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 6);
582    }
583
584    #[tokio::test]
585    async fn test_bulk_insert_physical_region_empty_batch() {
586        // Use flat format so that BulkMemtable is used (supports write_bulk).
587        let mito_config = MitoConfig {
588            default_flat_format: true,
589            ..Default::default()
590        };
591        let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
592        env.init_metric_region().await;
593        let physical_region_id = env.default_physical_region_id();
594
595        let batch = build_logical_batch(0, 0);
596        let request = build_bulk_request(physical_region_id, batch);
597        let response = env
598            .metric()
599            .handle_request(physical_region_id, request)
600            .await
601            .unwrap();
602        assert_eq!(response.affected_rows, 0);
603    }
604
605    #[tokio::test]
606    async fn test_bulk_insert_unknown_column_errors() {
607        let env = TestEnv::new().await;
608        env.init_metric_region().await;
609        let logical_region_id = env.default_logical_region_id();
610
611        let schema = Arc::new(ArrowSchema::new(vec![
612            Field::new(
613                greptime_timestamp(),
614                DataType::Timestamp(TimeUnit::Millisecond, None),
615                false,
616            ),
617            Field::new(greptime_value(), DataType::Float64, true),
618            Field::new("nonexistent_column", DataType::Utf8, true),
619        ]));
620        let batch = RecordBatch::try_new(
621            schema,
622            vec![
623                Arc::new(TimestampMillisecondArray::from(vec![0i64])),
624                Arc::new(Float64Array::from(vec![1.0])),
625                Arc::new(StringArray::from(vec!["val"])),
626            ],
627        )
628        .unwrap();
629
630        let request = build_bulk_request(logical_region_id, batch);
631        let err = env
632            .metric()
633            .handle_request(logical_region_id, request)
634            .await
635            .unwrap_err();
636        let Some(err) = err.as_any().downcast_ref::<Error>() else {
637            panic!("unexpected error type");
638        };
639        assert_matches!(err, Error::ColumnNotFound { .. });
640    }
641
642    #[tokio::test]
643    async fn test_bulk_insert_multiple_tag_columns() {
644        let env = TestEnv::new().await;
645        let physical_region_id = env.default_physical_region_id();
646        env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
647            .await;
648        let logical_region_id = env.default_logical_region_id();
649        let request = test_util::create_logical_region_request(
650            &["host", "region"],
651            physical_region_id,
652            &table_dir("test", logical_region_id.table_id()),
653        );
654        env.metric()
655            .handle_request(logical_region_id, RegionRequest::Create(request))
656            .await
657            .unwrap();
658
659        let schema = Arc::new(ArrowSchema::new(vec![
660            Field::new(
661                greptime_timestamp(),
662                DataType::Timestamp(TimeUnit::Millisecond, None),
663                false,
664            ),
665            Field::new(greptime_value(), DataType::Float64, true),
666            Field::new("host", DataType::Utf8, true),
667            Field::new("region", DataType::Utf8, true),
668        ]));
669        let batch = RecordBatch::try_new(
670            schema,
671            vec![
672                Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
673                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
674                Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
675                Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
676            ],
677        )
678        .unwrap();
679
680        let request = build_bulk_request(logical_region_id, batch);
681        let response = env
682            .metric()
683            .handle_request(logical_region_id, request)
684            .await
685            .unwrap();
686        assert_eq!(response.affected_rows, 3);
687
688        let stream = env
689            .metric()
690            .scan_to_stream(logical_region_id, ScanRequest::default())
691            .await
692            .unwrap();
693        let batches = RecordBatches::try_collect(stream).await.unwrap();
694        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
695    }
696
697    #[tokio::test]
698    async fn test_bulk_insert_accumulates_rows() {
699        let env = TestEnv::new().await;
700        env.init_metric_region().await;
701        let logical_region_id = env.default_logical_region_id();
702
703        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
704        let response = env
705            .metric()
706            .handle_request(logical_region_id, request)
707            .await
708            .unwrap();
709        assert_eq!(response.affected_rows, 3);
710
711        let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
712        let response = env
713            .metric()
714            .handle_request(logical_region_id, request)
715            .await
716            .unwrap();
717        assert_eq!(response.affected_rows, 5);
718
719        let stream = env
720            .metric()
721            .scan_to_stream(logical_region_id, ScanRequest::default())
722            .await
723            .unwrap();
724        let batches = RecordBatches::try_collect(stream).await.unwrap();
725        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
726    }
727
728    #[tokio::test]
729    async fn test_bulk_insert_sparse_encoding() {
730        let env = TestEnv::new().await;
731        env.init_metric_region().await;
732        let logical_region_id = env.default_logical_region_id();
733
734        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
735        let response = env
736            .metric()
737            .handle_request(logical_region_id, request)
738            .await
739            .unwrap();
740        assert_eq!(response.affected_rows, 4);
741
742        let stream = env
743            .metric()
744            .scan_to_stream(logical_region_id, ScanRequest::default())
745            .await
746            .unwrap();
747        let batches = RecordBatches::try_collect(stream).await.unwrap();
748        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
749    }
750
751    #[tokio::test]
752    async fn test_bulk_insert_dense_encoding_rejected() {
753        let env = TestEnv::new().await;
754        let logical_region_id = init_dense_metric_region(&env).await;
755
756        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
757        let err = env
758            .metric()
759            .handle_request(logical_region_id, request)
760            .await
761            .unwrap_err();
762        let Some(err) = err.as_any().downcast_ref::<Error>() else {
763            panic!("unexpected error type");
764        };
765        assert_matches!(err, Error::UnsupportedRegionRequest { .. });
766    }
767
768    #[tokio::test]
769    async fn test_bulk_insert_matches_put() {
770        let env_put = TestEnv::new().await;
771        env_put.init_metric_region().await;
772        let logical_region_id = env_put.default_logical_region_id();
773        let schema = test_util::row_schema_with_tags(&["job"]);
774        let rows = test_util::build_rows(1, 5);
775        env_put
776            .metric()
777            .handle_request(
778                logical_region_id,
779                RegionRequest::Put(RegionPutRequest {
780                    rows: api::v1::Rows { schema, rows },
781                    hint: None,
782                    partition_expr_version: None,
783                }),
784            )
785            .await
786            .unwrap();
787        let put_stream = env_put
788            .metric()
789            .scan_to_stream(logical_region_id, ScanRequest::default())
790            .await
791            .unwrap();
792        let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
793        let put_output = put_batches.pretty_print().unwrap();
794
795        let env_bulk = TestEnv::new().await;
796        env_bulk.init_metric_region().await;
797        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
798        env_bulk
799            .metric()
800            .handle_request(logical_region_id, request)
801            .await
802            .unwrap();
803        let bulk_stream = env_bulk
804            .metric()
805            .scan_to_stream(logical_region_id, ScanRequest::default())
806            .await
807            .unwrap();
808        let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
809        let bulk_output = bulk_batches.pretty_print().unwrap();
810
811        assert_eq!(put_output, bulk_output);
812    }
813
814    #[test]
815    fn test_record_batch_to_rows_with_null_values() {
816        use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
817        use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
818        use datatypes::arrow::record_batch::RecordBatch;
819        use store_api::storage::RegionId;
820
821        use crate::engine::bulk_insert::record_batch_to_rows;
822
823        let schema = Arc::new(ArrowSchema::new(vec![
824            Field::new(
825                greptime_timestamp(),
826                DataType::Timestamp(TimeUnit::Millisecond, None),
827                true,
828            ),
829            Field::new(greptime_value(), DataType::Float64, true),
830            Field::new("job", DataType::Utf8, true),
831            Field::new("host", DataType::Utf8, true),
832        ]));
833
834        let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
835        let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
836        let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
837        let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
838
839        let batch = RecordBatch::try_new(
840            schema,
841            vec![
842                Arc::new(ts_array),
843                Arc::new(val_array),
844                Arc::new(job_array),
845                Arc::new(host_array),
846            ],
847        )
848        .unwrap();
849
850        let region_id = RegionId::new(1, 1);
851        let rows = record_batch_to_rows(&batch, region_id).unwrap();
852
853        assert_eq!(rows.rows.len(), 3);
854        assert_eq!(rows.schema.len(), 4);
855
856        // Row 0: all non-null except host
857        assert!(rows.rows[0].values[0].value_data.is_some());
858        assert!(rows.rows[0].values[1].value_data.is_some());
859        assert!(rows.rows[0].values[2].value_data.is_some());
860        assert!(rows.rows[0].values[3].value_data.is_none());
861
862        // Row 1: null timestamp, null job
863        assert!(rows.rows[1].values[0].value_data.is_none());
864        assert!(rows.rows[1].values[1].value_data.is_some());
865        assert!(rows.rows[1].values[2].value_data.is_none());
866        assert!(rows.rows[1].values[3].value_data.is_some());
867
868        // Row 2: null value
869        assert!(rows.rows[2].values[0].value_data.is_some());
870        assert!(rows.rows[2].values[1].value_data.is_none());
871        assert!(rows.rows[2].values[2].value_data.is_some());
872        assert!(rows.rows[2].values[3].value_data.is_some());
873    }
874}