1use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{smallvec, SmallVec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
29use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
30use crate::sst::parquet::format::parquet_row_group_time_range;
31use crate::sst::parquet::reader::ReaderMetrics;
32use crate::sst::parquet::row_selection::RowGroupSelection;
33use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40 pub(crate) index: usize,
42 pub(crate) num_row_groups: u64,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50 pub(crate) index: usize,
52 pub row_group_index: i64,
55}
56
57#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62 pub(crate) time_range: FileTimeRange,
64 pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66 pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68 pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73 pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76 PartitionRange {
77 start: self.time_range.0,
78 end: Timestamp::new(
79 self.time_range
82 .1
83 .value()
84 .checked_add(1)
85 .unwrap_or(self.time_range.1.value()),
86 self.time_range.1.unit(),
87 ),
88 num_rows: self.num_rows,
89 identifier,
90 }
91 }
92
93 pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97 Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98 Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100 #[cfg(feature = "enterprise")]
101 Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
102
103 let ranges = group_ranges_for_seq_scan(ranges);
104 if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105 return ranges;
107 }
108 maybe_split_ranges_for_seq_scan(ranges)
109 }
110
111 pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113 let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114 Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115 Self::push_unordered_file_ranges(
116 input.memtables.len(),
117 &input.files,
118 &input.cache_strategy,
119 &mut ranges,
120 );
121
122 #[cfg(feature = "enterprise")]
123 Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
124
125 ranges
126 }
127
128 fn overlaps(&self, meta: &RangeMeta) -> bool {
130 overlaps(&self.time_range, &meta.time_range)
131 }
132
133 fn merge(&mut self, mut other: RangeMeta) {
136 debug_assert!(self.overlaps(&other));
137 debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138 debug_assert!(self
139 .row_group_indices
140 .iter()
141 .all(|idx| !other.row_group_indices.contains(idx)));
142
143 self.time_range = (
144 self.time_range.0.min(other.time_range.0),
145 self.time_range.1.max(other.time_range.1),
146 );
147 self.indices.append(&mut other.indices);
148 self.row_group_indices.append(&mut other.row_group_indices);
149 self.num_rows += other.num_rows;
150 }
151
152 fn can_split_preserve_order(&self) -> bool {
155 self.indices.len() == 1 && self.indices[0].num_row_groups > 1
156 }
157
158 fn maybe_split(self, output: &mut Vec<RangeMeta>) {
160 if self.can_split_preserve_order() {
161 let num_row_groups = self.indices[0].num_row_groups;
162 debug_assert_eq!(1, self.row_group_indices.len());
163 debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
164
165 output.reserve(self.row_group_indices.len());
166 let num_rows = self.num_rows / num_row_groups as usize;
167 for row_group_index in 0..num_row_groups {
169 output.push(RangeMeta {
170 time_range: self.time_range,
171 indices: self.indices.clone(),
172 row_group_indices: smallvec![RowGroupIndex {
173 index: self.indices[0].index,
174 row_group_index: row_group_index as i64,
175 }],
176 num_rows,
177 });
178 }
179 } else {
180 output.push(self);
181 }
182 }
183
184 fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
185 for (memtable_index, memtable) in memtables.iter().enumerate() {
187 let stats = memtable.stats();
188 let Some(time_range) = stats.time_range() else {
189 continue;
190 };
191 for row_group_index in 0..stats.num_ranges() {
192 let num_rows = stats.num_rows() / stats.num_ranges();
193 ranges.push(RangeMeta {
194 time_range,
195 indices: smallvec![SourceIndex {
196 index: memtable_index,
197 num_row_groups: stats.num_ranges() as u64,
198 }],
199 row_group_indices: smallvec![RowGroupIndex {
200 index: memtable_index,
201 row_group_index: row_group_index as i64,
202 }],
203 num_rows,
204 });
205 }
206 }
207 }
208
209 fn push_unordered_file_ranges(
210 num_memtables: usize,
211 files: &[FileHandle],
212 cache: &CacheStrategy,
213 ranges: &mut Vec<RangeMeta>,
214 ) {
215 for (i, file) in files.iter().enumerate() {
217 let file_index = num_memtables + i;
218 let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
220 if let Some(parquet_meta) = parquet_meta {
221 for row_group_index in 0..file.meta_ref().num_row_groups {
223 let time_range = parquet_row_group_time_range(
224 file.meta_ref(),
225 &parquet_meta,
226 row_group_index as usize,
227 );
228 let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
229 ranges.push(RangeMeta {
230 time_range: time_range.unwrap_or_else(|| file.time_range()),
231 indices: smallvec![SourceIndex {
232 index: file_index,
233 num_row_groups: file.meta_ref().num_row_groups,
234 }],
235 row_group_indices: smallvec![RowGroupIndex {
236 index: file_index,
237 row_group_index: row_group_index as i64,
238 }],
239 num_rows: num_rows as usize,
240 });
241 }
242 } else if file.meta_ref().num_row_groups > 0 {
243 for row_group_index in 0..file.meta_ref().num_row_groups {
245 ranges.push(RangeMeta {
246 time_range: file.time_range(),
247 indices: smallvec![SourceIndex {
248 index: file_index,
249 num_row_groups: file.meta_ref().num_row_groups,
250 }],
251 row_group_indices: smallvec![RowGroupIndex {
252 index: file_index,
253 row_group_index: row_group_index as i64,
254 }],
255 num_rows: DEFAULT_ROW_GROUP_SIZE,
256 });
257 }
258 } else {
259 ranges.push(RangeMeta {
261 time_range: file.time_range(),
262 indices: smallvec![SourceIndex {
263 index: file_index,
264 num_row_groups: 0,
265 }],
266 row_group_indices: smallvec![RowGroupIndex {
267 index: file_index,
268 row_group_index: ALL_ROW_GROUPS,
269 }],
270 num_rows: file.meta_ref().num_rows as usize,
272 });
273 }
274 }
275 }
276
277 fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
278 for (i, memtable) in memtables.iter().enumerate() {
280 let stats = memtable.stats();
281 let Some(time_range) = stats.time_range() else {
282 continue;
283 };
284 ranges.push(RangeMeta {
285 time_range,
286 indices: smallvec![SourceIndex {
287 index: i,
288 num_row_groups: stats.num_ranges() as u64,
289 }],
290 row_group_indices: smallvec![RowGroupIndex {
291 index: i,
292 row_group_index: ALL_ROW_GROUPS,
293 }],
294 num_rows: stats.num_rows(),
295 });
296 }
297 }
298
299 fn push_seq_file_ranges(
300 num_memtables: usize,
301 files: &[FileHandle],
302 ranges: &mut Vec<RangeMeta>,
303 ) {
304 for (i, file) in files.iter().enumerate() {
306 let file_index = num_memtables + i;
307 ranges.push(RangeMeta {
308 time_range: file.time_range(),
309 indices: smallvec![SourceIndex {
310 index: file_index,
311 num_row_groups: file.meta_ref().num_row_groups,
312 }],
313 row_group_indices: smallvec![RowGroupIndex {
314 index: file_index,
315 row_group_index: ALL_ROW_GROUPS,
316 }],
317 num_rows: file.meta_ref().num_rows as usize,
318 });
319 }
320 }
321
322 #[cfg(feature = "enterprise")]
323 fn push_extension_ranges(
324 ranges: &[crate::extension::BoxedExtensionRange],
325 metas: &mut Vec<RangeMeta>,
326 ) {
327 for range in ranges.iter() {
328 let index = metas.len();
329 metas.push(RangeMeta {
330 time_range: range.time_range(),
331 indices: smallvec![SourceIndex {
332 index,
333 num_row_groups: range.num_row_groups(),
334 }],
335 row_group_indices: smallvec![RowGroupIndex {
336 index,
337 row_group_index: ALL_ROW_GROUPS,
338 }],
339 num_rows: range.num_rows() as usize,
340 });
341 }
342 }
343}
344
345fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
348 if ranges.is_empty() {
349 return ranges;
350 }
351
352 ranges.sort_unstable_by(|a, b| {
354 let l = a.time_range;
355 let r = b.time_range;
356 l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
357 });
358 let mut range_in_progress = None;
359 let mut exclusive_ranges = Vec::with_capacity(ranges.len());
361 for range in ranges {
362 let Some(mut prev_range) = range_in_progress.take() else {
363 range_in_progress = Some(range);
365 continue;
366 };
367
368 if prev_range.overlaps(&range) {
369 prev_range.merge(range);
370 range_in_progress = Some(prev_range);
371 } else {
372 exclusive_ranges.push(prev_range);
373 range_in_progress = Some(range);
374 }
375 }
376 if let Some(range) = range_in_progress {
377 exclusive_ranges.push(range);
378 }
379
380 exclusive_ranges
381}
382
383fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
386 let mut new_ranges = Vec::with_capacity(ranges.len());
387 for range in ranges {
388 range.maybe_split(&mut new_ranges);
389 }
390
391 new_ranges
392}
393
394#[derive(Default)]
396pub struct FileRangeBuilder {
397 context: Option<FileRangeContextRef>,
400 selection: RowGroupSelection,
402}
403
404impl FileRangeBuilder {
405 pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
407 Self {
408 context: Some(context),
409 selection,
410 }
411 }
412
413 pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
416 let Some(context) = self.context.clone() else {
417 return;
418 };
419 if row_group_index >= 0 {
420 let row_group_index = row_group_index as usize;
421 let Some(row_selection) = self.selection.get(row_group_index) else {
423 return;
424 };
425 ranges.push(FileRange::new(
426 context,
427 row_group_index,
428 Some(row_selection.clone()),
429 ));
430 } else {
431 ranges.extend(
433 self.selection
434 .iter()
435 .map(|(row_group_index, row_selection)| {
436 FileRange::new(
437 context.clone(),
438 *row_group_index,
439 Some(row_selection.clone()),
440 )
441 }),
442 );
443 }
444 }
445}
446
447pub(crate) struct MemRangeBuilder {
449 range: MemtableRange,
451 stats: MemtableStats,
453}
454
455impl MemRangeBuilder {
456 pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
458 Self { range, stats }
459 }
460
461 pub(crate) fn build_ranges(
464 &self,
465 _row_group_index: i64,
466 ranges: &mut SmallVec<[MemtableRange; 2]>,
467 ) {
468 ranges.push(self.range.clone())
469 }
470
471 pub(crate) fn stats(&self) -> &MemtableStats {
473 &self.stats
474 }
475}
476
477pub(crate) struct RangeBuilderList {
481 num_memtables: usize,
482 file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
483}
484
485impl RangeBuilderList {
486 pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
488 let file_builders = (0..num_files).map(|_| None).collect();
489 Self {
490 num_memtables,
491 file_builders: Mutex::new(file_builders),
492 }
493 }
494
495 pub(crate) async fn build_file_ranges(
497 &self,
498 input: &ScanInput,
499 index: RowGroupIndex,
500 reader_metrics: &mut ReaderMetrics,
501 ) -> Result<SmallVec<[FileRange; 2]>> {
502 let mut ranges = SmallVec::new();
503 let file_index = index.index - self.num_memtables;
504 let builder_opt = self.get_file_builder(file_index);
505 match builder_opt {
506 Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
507 None => {
508 let file = &input.files[file_index];
509 let builder = input.prune_file(file, reader_metrics).await?;
510 builder.build_ranges(index.row_group_index, &mut ranges);
511 self.set_file_builder(file_index, Arc::new(builder));
512 }
513 }
514 Ok(ranges)
515 }
516
517 fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
518 let file_builders = self.file_builders.lock().unwrap();
519 file_builders[index].clone()
520 }
521
522 fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
523 let mut file_builders = self.file_builders.lock().unwrap();
524 file_builders[index] = Some(builder);
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use common_time::timestamp::TimeUnit;
531 use common_time::Timestamp;
532
533 use super::*;
534
535 type Output = (Vec<usize>, i64, i64);
536
537 fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
538 let ranges = input
539 .iter()
540 .map(|(idx, start, end)| {
541 let time_range = (
542 Timestamp::new(*start, TimeUnit::Second),
543 Timestamp::new(*end, TimeUnit::Second),
544 );
545 RangeMeta {
546 time_range,
547 indices: smallvec![SourceIndex {
548 index: *idx,
549 num_row_groups: 0,
550 }],
551 row_group_indices: smallvec![RowGroupIndex {
552 index: *idx,
553 row_group_index: 0
554 }],
555 num_rows: 1,
556 }
557 })
558 .collect();
559 let output = group_ranges_for_seq_scan(ranges);
560 let actual: Vec<_> = output
561 .iter()
562 .map(|range| {
563 let indices = range.indices.iter().map(|index| index.index).collect();
564 let group_indices: Vec<_> = range
565 .row_group_indices
566 .iter()
567 .map(|idx| idx.index)
568 .collect();
569 assert_eq!(indices, group_indices);
570 let range = range.time_range;
571 (indices, range.0.value(), range.1.value())
572 })
573 .collect();
574 assert_eq!(expect, actual);
575 }
576
577 #[test]
578 fn test_group_ranges() {
579 run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
581
582 run_group_ranges_test(
584 &[
585 (1, 1000, 2000),
586 (2, 6000, 7000),
587 (3, 0, 1500),
588 (4, 1500, 3000),
589 ],
590 &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
591 );
592
593 run_group_ranges_test(
595 &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
596 &[
597 (vec![3], 0, 1000),
598 (vec![1], 3000, 4000),
599 (vec![2], 4001, 6000),
600 ],
601 );
602
603 run_group_ranges_test(
605 &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
606 &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
607 );
608 }
609
610 #[test]
611 fn test_merge_range() {
612 let mut left = RangeMeta {
613 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
614 indices: smallvec![SourceIndex {
615 index: 1,
616 num_row_groups: 2,
617 }],
618 row_group_indices: smallvec![
619 RowGroupIndex {
620 index: 1,
621 row_group_index: 1
622 },
623 RowGroupIndex {
624 index: 1,
625 row_group_index: 2
626 }
627 ],
628 num_rows: 5,
629 };
630 let right = RangeMeta {
631 time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
632 indices: smallvec![SourceIndex {
633 index: 2,
634 num_row_groups: 2,
635 }],
636 row_group_indices: smallvec![
637 RowGroupIndex {
638 index: 2,
639 row_group_index: 1
640 },
641 RowGroupIndex {
642 index: 2,
643 row_group_index: 2
644 }
645 ],
646 num_rows: 4,
647 };
648 left.merge(right);
649
650 assert_eq!(
651 left,
652 RangeMeta {
653 time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
654 indices: smallvec![
655 SourceIndex {
656 index: 1,
657 num_row_groups: 2
658 },
659 SourceIndex {
660 index: 2,
661 num_row_groups: 2
662 }
663 ],
664 row_group_indices: smallvec![
665 RowGroupIndex {
666 index: 1,
667 row_group_index: 1
668 },
669 RowGroupIndex {
670 index: 1,
671 row_group_index: 2
672 },
673 RowGroupIndex {
674 index: 2,
675 row_group_index: 1
676 },
677 RowGroupIndex {
678 index: 2,
679 row_group_index: 2
680 },
681 ],
682 num_rows: 9,
683 }
684 );
685 }
686
687 #[test]
688 fn test_split_range() {
689 let range = RangeMeta {
690 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
691 indices: smallvec![SourceIndex {
692 index: 1,
693 num_row_groups: 2,
694 }],
695 row_group_indices: smallvec![RowGroupIndex {
696 index: 1,
697 row_group_index: ALL_ROW_GROUPS,
698 }],
699 num_rows: 5,
700 };
701
702 assert!(range.can_split_preserve_order());
703 let mut output = Vec::new();
704 range.maybe_split(&mut output);
705
706 assert_eq!(
707 output,
708 &[
709 RangeMeta {
710 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
711 indices: smallvec![SourceIndex {
712 index: 1,
713 num_row_groups: 2,
714 }],
715 row_group_indices: smallvec![RowGroupIndex {
716 index: 1,
717 row_group_index: 0
718 },],
719 num_rows: 2,
720 },
721 RangeMeta {
722 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
723 indices: smallvec![SourceIndex {
724 index: 1,
725 num_row_groups: 2,
726 }],
727 row_group_indices: smallvec![RowGroupIndex {
728 index: 1,
729 row_group_index: 1
730 }],
731 num_rows: 2,
732 }
733 ]
734 );
735 }
736
737 #[test]
738 fn test_not_split_range() {
739 let range = RangeMeta {
740 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
741 indices: smallvec![
742 SourceIndex {
743 index: 1,
744 num_row_groups: 1,
745 },
746 SourceIndex {
747 index: 2,
748 num_row_groups: 1,
749 }
750 ],
751 row_group_indices: smallvec![
752 RowGroupIndex {
753 index: 1,
754 row_group_index: 1
755 },
756 RowGroupIndex {
757 index: 2,
758 row_group_index: 1
759 }
760 ],
761 num_rows: 5,
762 };
763
764 assert!(!range.can_split_preserve_order());
765 let mut output = Vec::new();
766 range.maybe_split(&mut output);
767 assert_eq!(1, output.len());
768 }
769
770 #[test]
771 fn test_maybe_split_ranges() {
772 let ranges = vec![
773 RangeMeta {
774 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
775 indices: smallvec![SourceIndex {
776 index: 0,
777 num_row_groups: 1,
778 }],
779 row_group_indices: smallvec![RowGroupIndex {
780 index: 0,
781 row_group_index: 0,
782 },],
783 num_rows: 4,
784 },
785 RangeMeta {
786 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
787 indices: smallvec![SourceIndex {
788 index: 1,
789 num_row_groups: 2,
790 }],
791 row_group_indices: smallvec![RowGroupIndex {
792 index: 1,
793 row_group_index: ALL_ROW_GROUPS,
794 },],
795 num_rows: 4,
796 },
797 RangeMeta {
798 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
799 indices: smallvec![
800 SourceIndex {
801 index: 2,
802 num_row_groups: 2,
803 },
804 SourceIndex {
805 index: 3,
806 num_row_groups: 0,
807 }
808 ],
809 row_group_indices: smallvec![
810 RowGroupIndex {
811 index: 2,
812 row_group_index: ALL_ROW_GROUPS,
813 },
814 RowGroupIndex {
815 index: 3,
816 row_group_index: ALL_ROW_GROUPS,
817 }
818 ],
819 num_rows: 5,
820 },
821 ];
822 let output = maybe_split_ranges_for_seq_scan(ranges);
823 assert_eq!(
824 output,
825 vec![
826 RangeMeta {
827 time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
828 indices: smallvec![SourceIndex {
829 index: 0,
830 num_row_groups: 1,
831 }],
832 row_group_indices: smallvec![RowGroupIndex {
833 index: 0,
834 row_group_index: 0
835 },],
836 num_rows: 4,
837 },
838 RangeMeta {
839 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
840 indices: smallvec![SourceIndex {
841 index: 1,
842 num_row_groups: 2,
843 }],
844 row_group_indices: smallvec![RowGroupIndex {
845 index: 1,
846 row_group_index: 0
847 },],
848 num_rows: 2,
849 },
850 RangeMeta {
851 time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
852 indices: smallvec![SourceIndex {
853 index: 1,
854 num_row_groups: 2,
855 }],
856 row_group_indices: smallvec![RowGroupIndex {
857 index: 1,
858 row_group_index: 1
859 }],
860 num_rows: 2,
861 },
862 RangeMeta {
863 time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
864 indices: smallvec![
865 SourceIndex {
866 index: 2,
867 num_row_groups: 2
868 },
869 SourceIndex {
870 index: 3,
871 num_row_groups: 0,
872 }
873 ],
874 row_group_indices: smallvec![
875 RowGroupIndex {
876 index: 2,
877 row_group_index: ALL_ROW_GROUPS,
878 },
879 RowGroupIndex {
880 index: 3,
881 row_group_index: ALL_ROW_GROUPS,
882 }
883 ],
884 num_rows: 5,
885 },
886 ]
887 )
888 }
889}