1use std::collections::HashSet;
16
17use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
18use bytes::Bytes;
19use common_error::ext::ErrorExt;
20use common_error::status_code::StatusCode;
21use common_grpc::flight::{FlightEncoder, FlightMessage};
22use common_query::prelude::{greptime_timestamp, greptime_value};
23use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
24use datatypes::arrow::record_batch::RecordBatch;
25use snafu::{OptionExt, ensure};
26use store_api::codec::PrimaryKeyEncoding;
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_request::{
29 AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
30};
31use store_api::storage::RegionId;
32
33use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
34use crate::engine::MetricEngineInner;
35use crate::error;
36use crate::error::Result;
37
38impl MetricEngineInner {
39 pub async fn bulk_insert_region(
52 &self,
53 region_id: RegionId,
54 request: RegionBulkInsertsRequest,
55 ) -> Result<AffectedRows> {
56 ensure!(
57 !self.is_physical_region(region_id),
58 error::UnsupportedRegionRequestSnafu {
59 request: RegionRequest::BulkInserts(request),
60 }
61 );
62
63 let (physical_region_id, data_region_id, primary_key_encoding) =
64 self.find_data_region_meta(region_id)?;
65
66 if primary_key_encoding != PrimaryKeyEncoding::Sparse {
67 return error::UnsupportedRegionRequestSnafu {
68 request: RegionRequest::BulkInserts(request),
69 }
70 .fail();
71 }
72
73 let batch = request.payload;
74 if batch.num_rows() == 0 {
75 return Ok(0);
76 }
77
78 let logical_metadata = self
79 .logical_region_metadata(physical_region_id, region_id)
80 .await?;
81 let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
82 region_id,
83 data_region_id,
84 &batch,
85 &logical_metadata,
86 )?;
87 let modified_batch = modify_batch_sparse(
88 batch.clone(),
89 region_id.table_id(),
90 &tag_columns,
91 &non_tag_indices,
92 )?;
93 let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
94
95 let partition_expr_version = request.partition_expr_version;
96 let request = RegionBulkInsertsRequest {
97 region_id: data_region_id,
98 payload: modified_batch,
99 raw_data: ArrowIpc {
100 schema,
101 data_header,
102 payload,
103 },
104 partition_expr_version,
105 };
106 match self
107 .data_region
108 .write_data(data_region_id, RegionRequest::BulkInserts(request))
109 .await
110 {
111 Ok(affected_rows) => Ok(affected_rows),
112 Err(err) if err.status_code() == StatusCode::Unsupported => {
113 let rows = record_batch_to_rows(&batch, region_id)?;
115 self.put_region(
116 region_id,
117 RegionPutRequest {
118 rows,
119 hint: None,
120 partition_expr_version,
121 },
122 )
123 .await
124 }
125 Err(err) => Err(err),
126 }
127 }
128
129 fn resolve_tag_columns_from_metadata(
130 &self,
131 logical_region_id: RegionId,
132 data_region_id: RegionId,
133 batch: &RecordBatch,
134 logical_metadata: &RegionMetadataRef,
135 ) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
136 let tag_names: HashSet<&str> = logical_metadata
137 .column_metadatas
138 .iter()
139 .filter_map(|column| {
140 if column.semantic_type == SemanticType::Tag {
141 Some(column.column_schema.name.as_str())
142 } else {
143 None
144 }
145 })
146 .collect();
147
148 let mut tag_columns = Vec::new();
149 let mut non_tag_indices = Vec::new();
150 {
151 let state = self.state.read().unwrap();
152 let physical_columns = state
153 .physical_region_states()
154 .get(&data_region_id)
155 .context(error::PhysicalRegionNotFoundSnafu {
156 region_id: data_region_id,
157 })?
158 .physical_columns();
159
160 for (index, field) in batch.schema().fields().iter().enumerate() {
161 let name = field.name();
162 let column_id =
163 *physical_columns
164 .get(name)
165 .with_context(|| error::ColumnNotFoundSnafu {
166 name: name.clone(),
167 region_id: logical_region_id,
168 })?;
169 if tag_names.contains(name.as_str()) {
170 tag_columns.push(TagColumnInfo {
171 name: name.clone(),
172 index,
173 column_id,
174 });
175 } else {
176 non_tag_indices.push(index);
177 }
178 }
179 }
180
181 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
182 Ok((tag_columns, non_tag_indices))
183 }
184}
185
186fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
187 let schema_ref = batch.schema();
188 let fields = schema_ref.fields();
189
190 let mut ts_idx = None;
191 let mut val_idx = None;
192 let mut tag_indices = Vec::new();
193
194 for (idx, field) in fields.iter().enumerate() {
195 if field.name() == greptime_timestamp() {
196 ts_idx = Some(idx);
197 if !matches!(
198 field.data_type(),
199 datatypes::arrow::datatypes::DataType::Timestamp(
200 datatypes::arrow::datatypes::TimeUnit::Millisecond,
201 _
202 )
203 ) {
204 return error::UnexpectedRequestSnafu {
205 reason: format!(
206 "Timestamp column '{}' in region {:?} has incompatible type: {:?}",
207 field.name(),
208 logical_region_id,
209 field.data_type()
210 ),
211 }
212 .fail();
213 }
214 } else if field.name() == greptime_value() {
215 val_idx = Some(idx);
216 if !matches!(
217 field.data_type(),
218 datatypes::arrow::datatypes::DataType::Float64
219 ) {
220 return error::UnexpectedRequestSnafu {
221 reason: format!(
222 "Value column '{}' in region {:?} has incompatible type: {:?}",
223 field.name(),
224 logical_region_id,
225 field.data_type()
226 ),
227 }
228 .fail();
229 }
230 } else {
231 if !matches!(
232 field.data_type(),
233 datatypes::arrow::datatypes::DataType::Utf8
234 ) {
235 return error::UnexpectedRequestSnafu {
236 reason: format!(
237 "Tag column '{}' in region {:?} must be Utf8, found: {:?}",
238 field.name(),
239 logical_region_id,
240 field.data_type()
241 ),
242 }
243 .fail();
244 }
245 tag_indices.push(idx);
246 }
247 }
248
249 let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
250 reason: format!(
251 "Timestamp column '{}' not found in RecordBatch for region {:?}",
252 greptime_timestamp(),
253 logical_region_id
254 ),
255 })?;
256 let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
257 reason: format!(
258 "Value column '{}' not found in RecordBatch for region {:?}",
259 greptime_value(),
260 logical_region_id
261 ),
262 })?;
263
264 let mut schema = Vec::with_capacity(2 + tag_indices.len());
265 schema.push(api::v1::ColumnSchema {
266 column_name: greptime_timestamp().to_string(),
267 datatype: ColumnDataType::TimestampMillisecond as i32,
268 semantic_type: SemanticType::Timestamp as i32,
269 datatype_extension: None,
270 options: None,
271 });
272 schema.push(api::v1::ColumnSchema {
273 column_name: greptime_value().to_string(),
274 datatype: ColumnDataType::Float64 as i32,
275 semantic_type: SemanticType::Field as i32,
276 datatype_extension: None,
277 options: None,
278 });
279 for &idx in &tag_indices {
280 let field = &fields[idx];
281 schema.push(api::v1::ColumnSchema {
282 column_name: field.name().clone(),
283 datatype: ColumnDataType::String as i32,
284 semantic_type: SemanticType::Tag as i32,
285 datatype_extension: None,
286 options: None,
287 });
288 }
289
290 let ts_array = batch
291 .column(ts_idx)
292 .as_any()
293 .downcast_ref::<TimestampMillisecondArray>()
294 .expect("validated as TimestampMillisecond");
295 let val_array = batch
296 .column(val_idx)
297 .as_any()
298 .downcast_ref::<Float64Array>()
299 .expect("validated as Float64");
300 let tag_arrays: Vec<&StringArray> = tag_indices
301 .iter()
302 .map(|&idx| {
303 batch
304 .column(idx)
305 .as_any()
306 .downcast_ref::<StringArray>()
307 .expect("validated as Utf8")
308 })
309 .collect();
310
311 let num_rows = batch.num_rows();
312 let mut rows = Vec::with_capacity(num_rows);
313 for row_idx in 0..num_rows {
314 let mut values = Vec::with_capacity(2 + tag_arrays.len());
315
316 if ts_array.is_null(row_idx) {
317 values.push(api::v1::Value { value_data: None });
318 } else {
319 values.push(api::v1::Value {
320 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
321 ts_array.value(row_idx),
322 )),
323 });
324 }
325
326 if val_array.is_null(row_idx) {
327 values.push(api::v1::Value { value_data: None });
328 } else {
329 values.push(api::v1::Value {
330 value_data: Some(api::v1::value::ValueData::F64Value(
331 val_array.value(row_idx),
332 )),
333 });
334 }
335
336 for arr in &tag_arrays {
337 if arr.is_null(row_idx) {
338 values.push(api::v1::Value { value_data: None });
339 } else {
340 values.push(api::v1::Value {
341 value_data: Some(api::v1::value::ValueData::StringValue(
342 arr.value(row_idx).to_string(),
343 )),
344 });
345 }
346 }
347
348 rows.push(api::v1::Row { values });
349 }
350
351 Ok(api::v1::Rows { schema, rows })
352}
353
354fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
355 let mut encoder = FlightEncoder::default();
356 let schema = encoder.encode_schema(record_batch.schema().as_ref());
357 let mut iter = encoder
358 .encode(FlightMessage::RecordBatch(record_batch.clone()))
359 .into_iter();
360
361 let Some(flight_data) = iter.next() else {
362 return error::UnexpectedRequestSnafu {
363 reason: "Failed to encode empty flight data",
364 }
365 .fail();
366 };
367 ensure!(
368 iter.next().is_none(),
369 error::UnexpectedRequestSnafu {
370 reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
371 }
372 );
373
374 Ok((
375 schema.data_header,
376 flight_data.data_header,
377 flight_data.data_body,
378 ))
379}
380
381#[cfg(test)]
382mod tests {
383 use std::assert_matches::assert_matches;
384 use std::sync::Arc;
385
386 use api::v1::ArrowIpc;
387 use common_error::ext::ErrorExt;
388 use common_query::prelude::{greptime_timestamp, greptime_value};
389 use common_recordbatch::RecordBatches;
390 use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
391 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
392 use datatypes::arrow::record_batch::RecordBatch;
393 use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
394 use store_api::path_utils::table_dir;
395 use store_api::region_engine::RegionEngine;
396 use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
397 use store_api::storage::{RegionId, ScanRequest};
398
399 use super::record_batch_to_ipc;
400 use crate::error::Error;
401 use crate::test_util::{self, TestEnv};
402
403 fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
404 let schema = Arc::new(ArrowSchema::new(vec![
405 Field::new(
406 greptime_timestamp(),
407 DataType::Timestamp(TimeUnit::Millisecond, None),
408 false,
409 ),
410 Field::new(greptime_value(), DataType::Float64, true),
411 Field::new("job", DataType::Utf8, true),
412 ]));
413
414 let mut ts = Vec::with_capacity(rows);
415 let mut values = Vec::with_capacity(rows);
416 let mut tags = Vec::with_capacity(rows);
417 for i in start..start + rows {
418 ts.push(i as i64);
419 values.push(i as f64);
420 tags.push("tag_0".to_string());
421 }
422
423 RecordBatch::try_new(
424 schema,
425 vec![
426 Arc::new(TimestampMillisecondArray::from(ts)),
427 Arc::new(Float64Array::from(values)),
428 Arc::new(StringArray::from(tags)),
429 ],
430 )
431 .unwrap()
432 }
433
434 fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
435 let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
436 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
437 region_id: logical_region_id,
438 payload: batch,
439 raw_data: ArrowIpc {
440 schema,
441 data_header,
442 payload,
443 },
444 partition_expr_version: None,
445 })
446 }
447
448 async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
449 let physical_region_id = env.default_physical_region_id();
450 env.create_physical_region(
451 physical_region_id,
452 &TestEnv::default_table_dir(),
453 vec![(
454 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
455 "dense".to_string(),
456 )],
457 )
458 .await;
459
460 let logical_region_id = env.default_logical_region_id();
461 let request = test_util::create_logical_region_request(
462 &["job"],
463 physical_region_id,
464 &table_dir("test", logical_region_id.table_id()),
465 );
466 env.metric()
467 .handle_request(logical_region_id, RegionRequest::Create(request))
468 .await
469 .unwrap();
470 logical_region_id
471 }
472
473 #[tokio::test]
474 async fn test_bulk_insert_empty_batch_returns_zero() {
475 let env = TestEnv::new().await;
476 env.init_metric_region().await;
477 let logical_region_id = env.default_logical_region_id();
478
479 let batch = build_logical_batch(0, 0);
480 let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
481 region_id: logical_region_id,
482 payload: batch,
483 raw_data: ArrowIpc::default(),
484 partition_expr_version: None,
485 });
486 let response = env
487 .metric()
488 .handle_request(logical_region_id, request)
489 .await
490 .unwrap();
491 assert_eq!(response.affected_rows, 0);
492 }
493
494 #[tokio::test]
495 async fn test_bulk_insert_physical_region_rejected() {
496 let env = TestEnv::new().await;
497 env.init_metric_region().await;
498
499 let physical_region_id = env.default_physical_region_id();
500 let batch = build_logical_batch(0, 2);
501 let request = build_bulk_request(physical_region_id, batch);
502
503 let err = env
504 .metric()
505 .handle_request(physical_region_id, request)
506 .await
507 .unwrap_err();
508 let Some(err) = err.as_any().downcast_ref::<Error>() else {
509 panic!("unexpected error type");
510 };
511 assert_matches!(err, Error::UnsupportedRegionRequest { .. });
512 }
513
514 #[tokio::test]
515 async fn test_bulk_insert_unknown_column_errors() {
516 let env = TestEnv::new().await;
517 env.init_metric_region().await;
518 let logical_region_id = env.default_logical_region_id();
519
520 let schema = Arc::new(ArrowSchema::new(vec![
521 Field::new(
522 greptime_timestamp(),
523 DataType::Timestamp(TimeUnit::Millisecond, None),
524 false,
525 ),
526 Field::new(greptime_value(), DataType::Float64, true),
527 Field::new("nonexistent_column", DataType::Utf8, true),
528 ]));
529 let batch = RecordBatch::try_new(
530 schema,
531 vec![
532 Arc::new(TimestampMillisecondArray::from(vec![0i64])),
533 Arc::new(Float64Array::from(vec![1.0])),
534 Arc::new(StringArray::from(vec!["val"])),
535 ],
536 )
537 .unwrap();
538
539 let request = build_bulk_request(logical_region_id, batch);
540 let err = env
541 .metric()
542 .handle_request(logical_region_id, request)
543 .await
544 .unwrap_err();
545 let Some(err) = err.as_any().downcast_ref::<Error>() else {
546 panic!("unexpected error type");
547 };
548 assert_matches!(err, Error::ColumnNotFound { .. });
549 }
550
551 #[tokio::test]
552 async fn test_bulk_insert_multiple_tag_columns() {
553 let env = TestEnv::new().await;
554 let physical_region_id = env.default_physical_region_id();
555 env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
556 .await;
557 let logical_region_id = env.default_logical_region_id();
558 let request = test_util::create_logical_region_request(
559 &["host", "region"],
560 physical_region_id,
561 &table_dir("test", logical_region_id.table_id()),
562 );
563 env.metric()
564 .handle_request(logical_region_id, RegionRequest::Create(request))
565 .await
566 .unwrap();
567
568 let schema = Arc::new(ArrowSchema::new(vec![
569 Field::new(
570 greptime_timestamp(),
571 DataType::Timestamp(TimeUnit::Millisecond, None),
572 false,
573 ),
574 Field::new(greptime_value(), DataType::Float64, true),
575 Field::new("host", DataType::Utf8, true),
576 Field::new("region", DataType::Utf8, true),
577 ]));
578 let batch = RecordBatch::try_new(
579 schema,
580 vec![
581 Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
582 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
583 Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
584 Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
585 ],
586 )
587 .unwrap();
588
589 let request = build_bulk_request(logical_region_id, batch);
590 let response = env
591 .metric()
592 .handle_request(logical_region_id, request)
593 .await
594 .unwrap();
595 assert_eq!(response.affected_rows, 3);
596
597 let stream = env
598 .metric()
599 .scan_to_stream(logical_region_id, ScanRequest::default())
600 .await
601 .unwrap();
602 let batches = RecordBatches::try_collect(stream).await.unwrap();
603 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
604 }
605
606 #[tokio::test]
607 async fn test_bulk_insert_accumulates_rows() {
608 let env = TestEnv::new().await;
609 env.init_metric_region().await;
610 let logical_region_id = env.default_logical_region_id();
611
612 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
613 let response = env
614 .metric()
615 .handle_request(logical_region_id, request)
616 .await
617 .unwrap();
618 assert_eq!(response.affected_rows, 3);
619
620 let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
621 let response = env
622 .metric()
623 .handle_request(logical_region_id, request)
624 .await
625 .unwrap();
626 assert_eq!(response.affected_rows, 5);
627
628 let stream = env
629 .metric()
630 .scan_to_stream(logical_region_id, ScanRequest::default())
631 .await
632 .unwrap();
633 let batches = RecordBatches::try_collect(stream).await.unwrap();
634 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
635 }
636
637 #[tokio::test]
638 async fn test_bulk_insert_sparse_encoding() {
639 let env = TestEnv::new().await;
640 env.init_metric_region().await;
641 let logical_region_id = env.default_logical_region_id();
642
643 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
644 let response = env
645 .metric()
646 .handle_request(logical_region_id, request)
647 .await
648 .unwrap();
649 assert_eq!(response.affected_rows, 4);
650
651 let stream = env
652 .metric()
653 .scan_to_stream(logical_region_id, ScanRequest::default())
654 .await
655 .unwrap();
656 let batches = RecordBatches::try_collect(stream).await.unwrap();
657 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
658 }
659
660 #[tokio::test]
661 async fn test_bulk_insert_dense_encoding_rejected() {
662 let env = TestEnv::new().await;
663 let logical_region_id = init_dense_metric_region(&env).await;
664
665 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
666 let err = env
667 .metric()
668 .handle_request(logical_region_id, request)
669 .await
670 .unwrap_err();
671 let Some(err) = err.as_any().downcast_ref::<Error>() else {
672 panic!("unexpected error type");
673 };
674 assert_matches!(err, Error::UnsupportedRegionRequest { .. });
675 }
676
677 #[tokio::test]
678 async fn test_bulk_insert_matches_put() {
679 let env_put = TestEnv::new().await;
680 env_put.init_metric_region().await;
681 let logical_region_id = env_put.default_logical_region_id();
682 let schema = test_util::row_schema_with_tags(&["job"]);
683 let rows = test_util::build_rows(1, 5);
684 env_put
685 .metric()
686 .handle_request(
687 logical_region_id,
688 RegionRequest::Put(RegionPutRequest {
689 rows: api::v1::Rows { schema, rows },
690 hint: None,
691 partition_expr_version: None,
692 }),
693 )
694 .await
695 .unwrap();
696 let put_stream = env_put
697 .metric()
698 .scan_to_stream(logical_region_id, ScanRequest::default())
699 .await
700 .unwrap();
701 let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
702 let put_output = put_batches.pretty_print().unwrap();
703
704 let env_bulk = TestEnv::new().await;
705 env_bulk.init_metric_region().await;
706 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
707 env_bulk
708 .metric()
709 .handle_request(logical_region_id, request)
710 .await
711 .unwrap();
712 let bulk_stream = env_bulk
713 .metric()
714 .scan_to_stream(logical_region_id, ScanRequest::default())
715 .await
716 .unwrap();
717 let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
718 let bulk_output = bulk_batches.pretty_print().unwrap();
719
720 assert_eq!(put_output, bulk_output);
721 }
722
723 #[test]
724 fn test_record_batch_to_rows_with_null_values() {
725 use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
726 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
727 use datatypes::arrow::record_batch::RecordBatch;
728 use store_api::storage::RegionId;
729
730 use crate::engine::bulk_insert::record_batch_to_rows;
731
732 let schema = Arc::new(ArrowSchema::new(vec![
733 Field::new(
734 greptime_timestamp(),
735 DataType::Timestamp(TimeUnit::Millisecond, None),
736 true,
737 ),
738 Field::new(greptime_value(), DataType::Float64, true),
739 Field::new("job", DataType::Utf8, true),
740 Field::new("host", DataType::Utf8, true),
741 ]));
742
743 let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
744 let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
745 let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
746 let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
747
748 let batch = RecordBatch::try_new(
749 schema,
750 vec![
751 Arc::new(ts_array),
752 Arc::new(val_array),
753 Arc::new(job_array),
754 Arc::new(host_array),
755 ],
756 )
757 .unwrap();
758
759 let region_id = RegionId::new(1, 1);
760 let rows = record_batch_to_rows(&batch, region_id).unwrap();
761
762 assert_eq!(rows.rows.len(), 3);
763 assert_eq!(rows.schema.len(), 4);
764
765 assert!(rows.rows[0].values[0].value_data.is_some());
767 assert!(rows.rows[0].values[1].value_data.is_some());
768 assert!(rows.rows[0].values[2].value_data.is_some());
769 assert!(rows.rows[0].values[3].value_data.is_none());
770
771 assert!(rows.rows[1].values[0].value_data.is_none());
773 assert!(rows.rows[1].values[1].value_data.is_some());
774 assert!(rows.rows[1].values[2].value_data.is_none());
775 assert!(rows.rows[1].values[3].value_data.is_some());
776
777 assert!(rows.rows[2].values[0].value_data.is_some());
779 assert!(rows.rows[2].values[1].value_data.is_none());
780 assert!(rows.rows[2].values[2].value_data.is_some());
781 assert!(rows.rows[2].values[3].value_data.is_some());
782 }
783}