metric_engine/engine/
bulk_insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
18use bytes::Bytes;
19use common_error::ext::ErrorExt;
20use common_error::status_code::StatusCode;
21use common_grpc::flight::{FlightEncoder, FlightMessage};
22use common_query::prelude::{greptime_timestamp, greptime_value};
23use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
24use datatypes::arrow::record_batch::RecordBatch;
25use snafu::{OptionExt, ensure};
26use store_api::codec::PrimaryKeyEncoding;
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_request::{
29    AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
30};
31use store_api::storage::RegionId;
32
33use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
34use crate::engine::MetricEngineInner;
35use crate::error;
36use crate::error::Result;
37
38impl MetricEngineInner {
39    /// Bulk-inserts logical rows into a metric region.
40    ///
41    /// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical
42    /// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`.
43    ///
44    /// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts`
45    /// request to the data region, along with the original `partition_expr_version`.
46    /// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request
47    /// is transparently retried as a `Put` by converting the original logical batch into
48    /// `api::v1::Rows`, so callers observe the same semantics as `put_region`.
49    ///
50    /// Returns the number of affected rows, or `0` if the input batch is empty.
51    pub async fn bulk_insert_region(
52        &self,
53        region_id: RegionId,
54        request: RegionBulkInsertsRequest,
55    ) -> Result<AffectedRows> {
56        ensure!(
57            !self.is_physical_region(region_id),
58            error::UnsupportedRegionRequestSnafu {
59                request: RegionRequest::BulkInserts(request),
60            }
61        );
62
63        let (physical_region_id, data_region_id, primary_key_encoding) =
64            self.find_data_region_meta(region_id)?;
65
66        if primary_key_encoding != PrimaryKeyEncoding::Sparse {
67            return error::UnsupportedRegionRequestSnafu {
68                request: RegionRequest::BulkInserts(request),
69            }
70            .fail();
71        }
72
73        let batch = request.payload;
74        if batch.num_rows() == 0 {
75            return Ok(0);
76        }
77
78        let logical_metadata = self
79            .logical_region_metadata(physical_region_id, region_id)
80            .await?;
81        let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
82            region_id,
83            data_region_id,
84            &batch,
85            &logical_metadata,
86        )?;
87        let modified_batch = modify_batch_sparse(
88            batch.clone(),
89            region_id.table_id(),
90            &tag_columns,
91            &non_tag_indices,
92        )?;
93        let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
94
95        let partition_expr_version = request.partition_expr_version;
96        let request = RegionBulkInsertsRequest {
97            region_id: data_region_id,
98            payload: modified_batch,
99            raw_data: ArrowIpc {
100                schema,
101                data_header,
102                payload,
103            },
104            partition_expr_version,
105        };
106        match self
107            .data_region
108            .write_data(data_region_id, RegionRequest::BulkInserts(request))
109            .await
110        {
111            Ok(affected_rows) => Ok(affected_rows),
112            Err(err) if err.status_code() == StatusCode::Unsupported => {
113                // todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
114                let rows = record_batch_to_rows(&batch, region_id)?;
115                self.put_region(
116                    region_id,
117                    RegionPutRequest {
118                        rows,
119                        hint: None,
120                        partition_expr_version,
121                    },
122                )
123                .await
124            }
125            Err(err) => Err(err),
126        }
127    }
128
129    fn resolve_tag_columns_from_metadata(
130        &self,
131        logical_region_id: RegionId,
132        data_region_id: RegionId,
133        batch: &RecordBatch,
134        logical_metadata: &RegionMetadataRef,
135    ) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
136        let tag_names: HashSet<&str> = logical_metadata
137            .column_metadatas
138            .iter()
139            .filter_map(|column| {
140                if column.semantic_type == SemanticType::Tag {
141                    Some(column.column_schema.name.as_str())
142                } else {
143                    None
144                }
145            })
146            .collect();
147
148        let mut tag_columns = Vec::new();
149        let mut non_tag_indices = Vec::new();
150        {
151            let state = self.state.read().unwrap();
152            let physical_columns = state
153                .physical_region_states()
154                .get(&data_region_id)
155                .context(error::PhysicalRegionNotFoundSnafu {
156                    region_id: data_region_id,
157                })?
158                .physical_columns();
159
160            for (index, field) in batch.schema().fields().iter().enumerate() {
161                let name = field.name();
162                let column_id =
163                    *physical_columns
164                        .get(name)
165                        .with_context(|| error::ColumnNotFoundSnafu {
166                            name: name.clone(),
167                            region_id: logical_region_id,
168                        })?;
169                if tag_names.contains(name.as_str()) {
170                    tag_columns.push(TagColumnInfo {
171                        name: name.clone(),
172                        index,
173                        column_id,
174                    });
175                } else {
176                    non_tag_indices.push(index);
177                }
178            }
179        }
180
181        tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
182        Ok((tag_columns, non_tag_indices))
183    }
184}
185
186fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
187    let schema_ref = batch.schema();
188    let fields = schema_ref.fields();
189
190    let mut ts_idx = None;
191    let mut val_idx = None;
192    let mut tag_indices = Vec::new();
193
194    for (idx, field) in fields.iter().enumerate() {
195        if field.name() == greptime_timestamp() {
196            ts_idx = Some(idx);
197            if !matches!(
198                field.data_type(),
199                datatypes::arrow::datatypes::DataType::Timestamp(
200                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
201                    _
202                )
203            ) {
204                return error::UnexpectedRequestSnafu {
205                    reason: format!(
206                        "Timestamp column '{}' in region {:?} has incompatible type: {:?}",
207                        field.name(),
208                        logical_region_id,
209                        field.data_type()
210                    ),
211                }
212                .fail();
213            }
214        } else if field.name() == greptime_value() {
215            val_idx = Some(idx);
216            if !matches!(
217                field.data_type(),
218                datatypes::arrow::datatypes::DataType::Float64
219            ) {
220                return error::UnexpectedRequestSnafu {
221                    reason: format!(
222                        "Value column '{}' in region {:?} has incompatible type: {:?}",
223                        field.name(),
224                        logical_region_id,
225                        field.data_type()
226                    ),
227                }
228                .fail();
229            }
230        } else {
231            if !matches!(
232                field.data_type(),
233                datatypes::arrow::datatypes::DataType::Utf8
234            ) {
235                return error::UnexpectedRequestSnafu {
236                    reason: format!(
237                        "Tag column '{}' in region {:?} must be Utf8, found: {:?}",
238                        field.name(),
239                        logical_region_id,
240                        field.data_type()
241                    ),
242                }
243                .fail();
244            }
245            tag_indices.push(idx);
246        }
247    }
248
249    let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
250        reason: format!(
251            "Timestamp column '{}' not found in RecordBatch for region {:?}",
252            greptime_timestamp(),
253            logical_region_id
254        ),
255    })?;
256    let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
257        reason: format!(
258            "Value column '{}' not found in RecordBatch for region {:?}",
259            greptime_value(),
260            logical_region_id
261        ),
262    })?;
263
264    let mut schema = Vec::with_capacity(2 + tag_indices.len());
265    schema.push(api::v1::ColumnSchema {
266        column_name: greptime_timestamp().to_string(),
267        datatype: ColumnDataType::TimestampMillisecond as i32,
268        semantic_type: SemanticType::Timestamp as i32,
269        datatype_extension: None,
270        options: None,
271    });
272    schema.push(api::v1::ColumnSchema {
273        column_name: greptime_value().to_string(),
274        datatype: ColumnDataType::Float64 as i32,
275        semantic_type: SemanticType::Field as i32,
276        datatype_extension: None,
277        options: None,
278    });
279    for &idx in &tag_indices {
280        let field = &fields[idx];
281        schema.push(api::v1::ColumnSchema {
282            column_name: field.name().clone(),
283            datatype: ColumnDataType::String as i32,
284            semantic_type: SemanticType::Tag as i32,
285            datatype_extension: None,
286            options: None,
287        });
288    }
289
290    let ts_array = batch
291        .column(ts_idx)
292        .as_any()
293        .downcast_ref::<TimestampMillisecondArray>()
294        .expect("validated as TimestampMillisecond");
295    let val_array = batch
296        .column(val_idx)
297        .as_any()
298        .downcast_ref::<Float64Array>()
299        .expect("validated as Float64");
300    let tag_arrays: Vec<&StringArray> = tag_indices
301        .iter()
302        .map(|&idx| {
303            batch
304                .column(idx)
305                .as_any()
306                .downcast_ref::<StringArray>()
307                .expect("validated as Utf8")
308        })
309        .collect();
310
311    let num_rows = batch.num_rows();
312    let mut rows = Vec::with_capacity(num_rows);
313    for row_idx in 0..num_rows {
314        let mut values = Vec::with_capacity(2 + tag_arrays.len());
315
316        if ts_array.is_null(row_idx) {
317            values.push(api::v1::Value { value_data: None });
318        } else {
319            values.push(api::v1::Value {
320                value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
321                    ts_array.value(row_idx),
322                )),
323            });
324        }
325
326        if val_array.is_null(row_idx) {
327            values.push(api::v1::Value { value_data: None });
328        } else {
329            values.push(api::v1::Value {
330                value_data: Some(api::v1::value::ValueData::F64Value(
331                    val_array.value(row_idx),
332                )),
333            });
334        }
335
336        for arr in &tag_arrays {
337            if arr.is_null(row_idx) {
338                values.push(api::v1::Value { value_data: None });
339            } else {
340                values.push(api::v1::Value {
341                    value_data: Some(api::v1::value::ValueData::StringValue(
342                        arr.value(row_idx).to_string(),
343                    )),
344                });
345            }
346        }
347
348        rows.push(api::v1::Row { values });
349    }
350
351    Ok(api::v1::Rows { schema, rows })
352}
353
354fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
355    let mut encoder = FlightEncoder::default();
356    let schema = encoder.encode_schema(record_batch.schema().as_ref());
357    let mut iter = encoder
358        .encode(FlightMessage::RecordBatch(record_batch.clone()))
359        .into_iter();
360
361    let Some(flight_data) = iter.next() else {
362        return error::UnexpectedRequestSnafu {
363            reason: "Failed to encode empty flight data",
364        }
365        .fail();
366    };
367    ensure!(
368        iter.next().is_none(),
369        error::UnexpectedRequestSnafu {
370            reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
371        }
372    );
373
374    Ok((
375        schema.data_header,
376        flight_data.data_header,
377        flight_data.data_body,
378    ))
379}
380
381#[cfg(test)]
382mod tests {
383    use std::assert_matches::assert_matches;
384    use std::sync::Arc;
385
386    use api::v1::ArrowIpc;
387    use common_error::ext::ErrorExt;
388    use common_query::prelude::{greptime_timestamp, greptime_value};
389    use common_recordbatch::RecordBatches;
390    use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
391    use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
392    use datatypes::arrow::record_batch::RecordBatch;
393    use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
394    use store_api::path_utils::table_dir;
395    use store_api::region_engine::RegionEngine;
396    use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
397    use store_api::storage::{RegionId, ScanRequest};
398
399    use super::record_batch_to_ipc;
400    use crate::error::Error;
401    use crate::test_util::{self, TestEnv};
402
403    fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
404        let schema = Arc::new(ArrowSchema::new(vec![
405            Field::new(
406                greptime_timestamp(),
407                DataType::Timestamp(TimeUnit::Millisecond, None),
408                false,
409            ),
410            Field::new(greptime_value(), DataType::Float64, true),
411            Field::new("job", DataType::Utf8, true),
412        ]));
413
414        let mut ts = Vec::with_capacity(rows);
415        let mut values = Vec::with_capacity(rows);
416        let mut tags = Vec::with_capacity(rows);
417        for i in start..start + rows {
418            ts.push(i as i64);
419            values.push(i as f64);
420            tags.push("tag_0".to_string());
421        }
422
423        RecordBatch::try_new(
424            schema,
425            vec![
426                Arc::new(TimestampMillisecondArray::from(ts)),
427                Arc::new(Float64Array::from(values)),
428                Arc::new(StringArray::from(tags)),
429            ],
430        )
431        .unwrap()
432    }
433
434    fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
435        let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
436        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
437            region_id: logical_region_id,
438            payload: batch,
439            raw_data: ArrowIpc {
440                schema,
441                data_header,
442                payload,
443            },
444            partition_expr_version: None,
445        })
446    }
447
448    async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
449        let physical_region_id = env.default_physical_region_id();
450        env.create_physical_region(
451            physical_region_id,
452            &TestEnv::default_table_dir(),
453            vec![(
454                MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
455                "dense".to_string(),
456            )],
457        )
458        .await;
459
460        let logical_region_id = env.default_logical_region_id();
461        let request = test_util::create_logical_region_request(
462            &["job"],
463            physical_region_id,
464            &table_dir("test", logical_region_id.table_id()),
465        );
466        env.metric()
467            .handle_request(logical_region_id, RegionRequest::Create(request))
468            .await
469            .unwrap();
470        logical_region_id
471    }
472
473    #[tokio::test]
474    async fn test_bulk_insert_empty_batch_returns_zero() {
475        let env = TestEnv::new().await;
476        env.init_metric_region().await;
477        let logical_region_id = env.default_logical_region_id();
478
479        let batch = build_logical_batch(0, 0);
480        let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
481            region_id: logical_region_id,
482            payload: batch,
483            raw_data: ArrowIpc::default(),
484            partition_expr_version: None,
485        });
486        let response = env
487            .metric()
488            .handle_request(logical_region_id, request)
489            .await
490            .unwrap();
491        assert_eq!(response.affected_rows, 0);
492    }
493
494    #[tokio::test]
495    async fn test_bulk_insert_physical_region_rejected() {
496        let env = TestEnv::new().await;
497        env.init_metric_region().await;
498
499        let physical_region_id = env.default_physical_region_id();
500        let batch = build_logical_batch(0, 2);
501        let request = build_bulk_request(physical_region_id, batch);
502
503        let err = env
504            .metric()
505            .handle_request(physical_region_id, request)
506            .await
507            .unwrap_err();
508        let Some(err) = err.as_any().downcast_ref::<Error>() else {
509            panic!("unexpected error type");
510        };
511        assert_matches!(err, Error::UnsupportedRegionRequest { .. });
512    }
513
514    #[tokio::test]
515    async fn test_bulk_insert_unknown_column_errors() {
516        let env = TestEnv::new().await;
517        env.init_metric_region().await;
518        let logical_region_id = env.default_logical_region_id();
519
520        let schema = Arc::new(ArrowSchema::new(vec![
521            Field::new(
522                greptime_timestamp(),
523                DataType::Timestamp(TimeUnit::Millisecond, None),
524                false,
525            ),
526            Field::new(greptime_value(), DataType::Float64, true),
527            Field::new("nonexistent_column", DataType::Utf8, true),
528        ]));
529        let batch = RecordBatch::try_new(
530            schema,
531            vec![
532                Arc::new(TimestampMillisecondArray::from(vec![0i64])),
533                Arc::new(Float64Array::from(vec![1.0])),
534                Arc::new(StringArray::from(vec!["val"])),
535            ],
536        )
537        .unwrap();
538
539        let request = build_bulk_request(logical_region_id, batch);
540        let err = env
541            .metric()
542            .handle_request(logical_region_id, request)
543            .await
544            .unwrap_err();
545        let Some(err) = err.as_any().downcast_ref::<Error>() else {
546            panic!("unexpected error type");
547        };
548        assert_matches!(err, Error::ColumnNotFound { .. });
549    }
550
551    #[tokio::test]
552    async fn test_bulk_insert_multiple_tag_columns() {
553        let env = TestEnv::new().await;
554        let physical_region_id = env.default_physical_region_id();
555        env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
556            .await;
557        let logical_region_id = env.default_logical_region_id();
558        let request = test_util::create_logical_region_request(
559            &["host", "region"],
560            physical_region_id,
561            &table_dir("test", logical_region_id.table_id()),
562        );
563        env.metric()
564            .handle_request(logical_region_id, RegionRequest::Create(request))
565            .await
566            .unwrap();
567
568        let schema = Arc::new(ArrowSchema::new(vec![
569            Field::new(
570                greptime_timestamp(),
571                DataType::Timestamp(TimeUnit::Millisecond, None),
572                false,
573            ),
574            Field::new(greptime_value(), DataType::Float64, true),
575            Field::new("host", DataType::Utf8, true),
576            Field::new("region", DataType::Utf8, true),
577        ]));
578        let batch = RecordBatch::try_new(
579            schema,
580            vec![
581                Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
582                Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
583                Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
584                Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
585            ],
586        )
587        .unwrap();
588
589        let request = build_bulk_request(logical_region_id, batch);
590        let response = env
591            .metric()
592            .handle_request(logical_region_id, request)
593            .await
594            .unwrap();
595        assert_eq!(response.affected_rows, 3);
596
597        let stream = env
598            .metric()
599            .scan_to_stream(logical_region_id, ScanRequest::default())
600            .await
601            .unwrap();
602        let batches = RecordBatches::try_collect(stream).await.unwrap();
603        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
604    }
605
606    #[tokio::test]
607    async fn test_bulk_insert_accumulates_rows() {
608        let env = TestEnv::new().await;
609        env.init_metric_region().await;
610        let logical_region_id = env.default_logical_region_id();
611
612        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
613        let response = env
614            .metric()
615            .handle_request(logical_region_id, request)
616            .await
617            .unwrap();
618        assert_eq!(response.affected_rows, 3);
619
620        let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
621        let response = env
622            .metric()
623            .handle_request(logical_region_id, request)
624            .await
625            .unwrap();
626        assert_eq!(response.affected_rows, 5);
627
628        let stream = env
629            .metric()
630            .scan_to_stream(logical_region_id, ScanRequest::default())
631            .await
632            .unwrap();
633        let batches = RecordBatches::try_collect(stream).await.unwrap();
634        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
635    }
636
637    #[tokio::test]
638    async fn test_bulk_insert_sparse_encoding() {
639        let env = TestEnv::new().await;
640        env.init_metric_region().await;
641        let logical_region_id = env.default_logical_region_id();
642
643        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
644        let response = env
645            .metric()
646            .handle_request(logical_region_id, request)
647            .await
648            .unwrap();
649        assert_eq!(response.affected_rows, 4);
650
651        let stream = env
652            .metric()
653            .scan_to_stream(logical_region_id, ScanRequest::default())
654            .await
655            .unwrap();
656        let batches = RecordBatches::try_collect(stream).await.unwrap();
657        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
658    }
659
660    #[tokio::test]
661    async fn test_bulk_insert_dense_encoding_rejected() {
662        let env = TestEnv::new().await;
663        let logical_region_id = init_dense_metric_region(&env).await;
664
665        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
666        let err = env
667            .metric()
668            .handle_request(logical_region_id, request)
669            .await
670            .unwrap_err();
671        let Some(err) = err.as_any().downcast_ref::<Error>() else {
672            panic!("unexpected error type");
673        };
674        assert_matches!(err, Error::UnsupportedRegionRequest { .. });
675    }
676
677    #[tokio::test]
678    async fn test_bulk_insert_matches_put() {
679        let env_put = TestEnv::new().await;
680        env_put.init_metric_region().await;
681        let logical_region_id = env_put.default_logical_region_id();
682        let schema = test_util::row_schema_with_tags(&["job"]);
683        let rows = test_util::build_rows(1, 5);
684        env_put
685            .metric()
686            .handle_request(
687                logical_region_id,
688                RegionRequest::Put(RegionPutRequest {
689                    rows: api::v1::Rows { schema, rows },
690                    hint: None,
691                    partition_expr_version: None,
692                }),
693            )
694            .await
695            .unwrap();
696        let put_stream = env_put
697            .metric()
698            .scan_to_stream(logical_region_id, ScanRequest::default())
699            .await
700            .unwrap();
701        let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
702        let put_output = put_batches.pretty_print().unwrap();
703
704        let env_bulk = TestEnv::new().await;
705        env_bulk.init_metric_region().await;
706        let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
707        env_bulk
708            .metric()
709            .handle_request(logical_region_id, request)
710            .await
711            .unwrap();
712        let bulk_stream = env_bulk
713            .metric()
714            .scan_to_stream(logical_region_id, ScanRequest::default())
715            .await
716            .unwrap();
717        let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
718        let bulk_output = bulk_batches.pretty_print().unwrap();
719
720        assert_eq!(put_output, bulk_output);
721    }
722
723    #[test]
724    fn test_record_batch_to_rows_with_null_values() {
725        use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
726        use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
727        use datatypes::arrow::record_batch::RecordBatch;
728        use store_api::storage::RegionId;
729
730        use crate::engine::bulk_insert::record_batch_to_rows;
731
732        let schema = Arc::new(ArrowSchema::new(vec![
733            Field::new(
734                greptime_timestamp(),
735                DataType::Timestamp(TimeUnit::Millisecond, None),
736                true,
737            ),
738            Field::new(greptime_value(), DataType::Float64, true),
739            Field::new("job", DataType::Utf8, true),
740            Field::new("host", DataType::Utf8, true),
741        ]));
742
743        let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
744        let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
745        let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
746        let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
747
748        let batch = RecordBatch::try_new(
749            schema,
750            vec![
751                Arc::new(ts_array),
752                Arc::new(val_array),
753                Arc::new(job_array),
754                Arc::new(host_array),
755            ],
756        )
757        .unwrap();
758
759        let region_id = RegionId::new(1, 1);
760        let rows = record_batch_to_rows(&batch, region_id).unwrap();
761
762        assert_eq!(rows.rows.len(), 3);
763        assert_eq!(rows.schema.len(), 4);
764
765        // Row 0: all non-null except host
766        assert!(rows.rows[0].values[0].value_data.is_some());
767        assert!(rows.rows[0].values[1].value_data.is_some());
768        assert!(rows.rows[0].values[2].value_data.is_some());
769        assert!(rows.rows[0].values[3].value_data.is_none());
770
771        // Row 1: null timestamp, null job
772        assert!(rows.rows[1].values[0].value_data.is_none());
773        assert!(rows.rows[1].values[1].value_data.is_some());
774        assert!(rows.rows[1].values[2].value_data.is_none());
775        assert!(rows.rows[1].values[3].value_data.is_some());
776
777        // Row 2: null value
778        assert!(rows.rows[2].values[0].value_data.is_some());
779        assert!(rows.rows[2].values[1].value_data.is_none());
780        assert!(rows.rows[2].values[2].value_data.is_some());
781        assert!(rows.rows[2].values[3].value_data.is_some());
782    }
783}