1use std::collections::VecDeque;
18use std::fmt;
19use std::pin::Pin;
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll};
22use std::time::{Duration, Instant};
23
24use async_stream::try_stream;
25use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::timestamp::timestamp_array_to_primitive;
28use futures::Stream;
29use prometheus::IntGauge;
30use smallvec::SmallVec;
31use snafu::OptionExt;
32use store_api::storage::RegionId;
33
34use crate::error::{Result, UnexpectedSnafu};
35use crate::memtable::MemScanMetrics;
36use crate::metrics::{
37 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
38 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
39};
40use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
41use crate::read::scan_region::StreamContext;
42use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
43use crate::sst::file::FileTimeRange;
44use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
45use crate::sst::parquet::file_range::FileRange;
46use crate::sst::parquet::flat_format::time_index_column_index;
47use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
48
49#[derive(Default)]
51pub(crate) struct ScanMetricsSet {
52 prepare_scan_cost: Duration,
54 build_reader_cost: Duration,
56 scan_cost: Duration,
58 yield_cost: Duration,
60 total_cost: Duration,
62 num_rows: usize,
64 num_batches: usize,
66 num_mem_ranges: usize,
68 num_file_ranges: usize,
70
71 mem_scan_cost: Duration,
74 mem_rows: usize,
76 mem_batches: usize,
78 mem_series: usize,
80
81 build_parts_cost: Duration,
84 rg_total: usize,
86 rg_fulltext_filtered: usize,
88 rg_inverted_filtered: usize,
90 rg_minmax_filtered: usize,
92 rg_bloom_filtered: usize,
94 rows_before_filter: usize,
96 rows_fulltext_filtered: usize,
98 rows_inverted_filtered: usize,
100 rows_bloom_filtered: usize,
102 rows_precise_filtered: usize,
104 num_sst_record_batches: usize,
106 num_sst_batches: usize,
108 num_sst_rows: usize,
110
111 first_poll: Duration,
113
114 num_series_send_timeout: usize,
116 num_series_send_full: usize,
118 num_distributor_rows: usize,
120 num_distributor_batches: usize,
122 distributor_scan_cost: Duration,
124 distributor_yield_cost: Duration,
126
127 stream_eof: bool,
129}
130
131impl fmt::Debug for ScanMetricsSet {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 let ScanMetricsSet {
134 prepare_scan_cost,
135 build_reader_cost,
136 scan_cost,
137 yield_cost,
138 total_cost,
139 num_rows,
140 num_batches,
141 num_mem_ranges,
142 num_file_ranges,
143 build_parts_cost,
144 rg_total,
145 rg_fulltext_filtered,
146 rg_inverted_filtered,
147 rg_minmax_filtered,
148 rg_bloom_filtered,
149 rows_before_filter,
150 rows_fulltext_filtered,
151 rows_inverted_filtered,
152 rows_bloom_filtered,
153 rows_precise_filtered,
154 num_sst_record_batches,
155 num_sst_batches,
156 num_sst_rows,
157 first_poll,
158 num_series_send_timeout,
159 num_series_send_full,
160 num_distributor_rows,
161 num_distributor_batches,
162 distributor_scan_cost,
163 distributor_yield_cost,
164 stream_eof,
165 mem_scan_cost,
166 mem_rows,
167 mem_batches,
168 mem_series,
169 } = self;
170
171 write!(
173 f,
174 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
175 \"build_reader_cost\":\"{build_reader_cost:?}\", \
176 \"scan_cost\":\"{scan_cost:?}\", \
177 \"yield_cost\":\"{yield_cost:?}\", \
178 \"total_cost\":\"{total_cost:?}\", \
179 \"num_rows\":{num_rows}, \
180 \"num_batches\":{num_batches}, \
181 \"num_mem_ranges\":{num_mem_ranges}, \
182 \"num_file_ranges\":{num_file_ranges}, \
183 \"build_parts_cost\":\"{build_parts_cost:?}\", \
184 \"rg_total\":{rg_total}, \
185 \"rows_before_filter\":{rows_before_filter}, \
186 \"num_sst_record_batches\":{num_sst_record_batches}, \
187 \"num_sst_batches\":{num_sst_batches}, \
188 \"num_sst_rows\":{num_sst_rows}, \
189 \"first_poll\":\"{first_poll:?}\""
190 )?;
191
192 if *rg_fulltext_filtered > 0 {
194 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
195 }
196 if *rg_inverted_filtered > 0 {
197 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
198 }
199 if *rg_minmax_filtered > 0 {
200 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
201 }
202 if *rg_bloom_filtered > 0 {
203 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
204 }
205 if *rows_fulltext_filtered > 0 {
206 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
207 }
208 if *rows_inverted_filtered > 0 {
209 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
210 }
211 if *rows_bloom_filtered > 0 {
212 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
213 }
214 if *rows_precise_filtered > 0 {
215 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
216 }
217
218 if *num_series_send_timeout > 0 {
220 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
221 }
222 if *num_series_send_full > 0 {
223 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
224 }
225 if *num_distributor_rows > 0 {
226 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
227 }
228 if *num_distributor_batches > 0 {
229 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
230 }
231 if !distributor_scan_cost.is_zero() {
232 write!(
233 f,
234 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
235 )?;
236 }
237 if !distributor_yield_cost.is_zero() {
238 write!(
239 f,
240 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
241 )?;
242 }
243
244 if *mem_rows > 0 {
246 write!(f, ", \"mem_rows\":{mem_rows}")?;
247 }
248 if *mem_batches > 0 {
249 write!(f, ", \"mem_batches\":{mem_batches}")?;
250 }
251 if *mem_series > 0 {
252 write!(f, ", \"mem_series\":{mem_series}")?;
253 }
254 if !mem_scan_cost.is_zero() {
255 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
256 }
257
258 write!(f, ", \"stream_eof\":{stream_eof}}}")
259 }
260}
261impl ScanMetricsSet {
262 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
264 self.prepare_scan_cost += cost;
265 self
266 }
267
268 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
270 let ScannerMetrics {
271 prepare_scan_cost,
272 build_reader_cost,
273 scan_cost,
274 yield_cost,
275 num_batches,
276 num_rows,
277 num_mem_ranges,
278 num_file_ranges,
279 } = other;
280
281 self.prepare_scan_cost += *prepare_scan_cost;
282 self.build_reader_cost += *build_reader_cost;
283 self.scan_cost += *scan_cost;
284 self.yield_cost += *yield_cost;
285 self.num_rows += *num_rows;
286 self.num_batches += *num_batches;
287 self.num_mem_ranges += *num_mem_ranges;
288 self.num_file_ranges += *num_file_ranges;
289 }
290
291 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
293 let ReaderMetrics {
294 build_cost,
295 filter_metrics:
296 ReaderFilterMetrics {
297 rg_total,
298 rg_fulltext_filtered,
299 rg_inverted_filtered,
300 rg_minmax_filtered,
301 rg_bloom_filtered,
302 rows_total,
303 rows_fulltext_filtered,
304 rows_inverted_filtered,
305 rows_bloom_filtered,
306 rows_precise_filtered,
307 },
308 num_record_batches,
309 num_batches,
310 num_rows,
311 scan_cost: _,
312 } = other;
313
314 self.build_parts_cost += *build_cost;
315
316 self.rg_total += *rg_total;
317 self.rg_fulltext_filtered += *rg_fulltext_filtered;
318 self.rg_inverted_filtered += *rg_inverted_filtered;
319 self.rg_minmax_filtered += *rg_minmax_filtered;
320 self.rg_bloom_filtered += *rg_bloom_filtered;
321
322 self.rows_before_filter += *rows_total;
323 self.rows_fulltext_filtered += *rows_fulltext_filtered;
324 self.rows_inverted_filtered += *rows_inverted_filtered;
325 self.rows_bloom_filtered += *rows_bloom_filtered;
326 self.rows_precise_filtered += *rows_precise_filtered;
327
328 self.num_sst_record_batches += *num_record_batches;
329 self.num_sst_batches += *num_batches;
330 self.num_sst_rows += *num_rows;
331 }
332
333 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
335 let SeriesDistributorMetrics {
336 num_series_send_timeout,
337 num_series_send_full,
338 num_rows,
339 num_batches,
340 scan_cost,
341 yield_cost,
342 } = distributor_metrics;
343
344 self.num_series_send_timeout += *num_series_send_timeout;
345 self.num_series_send_full += *num_series_send_full;
346 self.num_distributor_rows += *num_rows;
347 self.num_distributor_batches += *num_batches;
348 self.distributor_scan_cost += *scan_cost;
349 self.distributor_yield_cost += *yield_cost;
350 }
351
352 fn observe_metrics(&self) {
354 READ_STAGE_ELAPSED
355 .with_label_values(&["prepare_scan"])
356 .observe(self.prepare_scan_cost.as_secs_f64());
357 READ_STAGE_ELAPSED
358 .with_label_values(&["build_reader"])
359 .observe(self.build_reader_cost.as_secs_f64());
360 READ_STAGE_ELAPSED
361 .with_label_values(&["scan"])
362 .observe(self.scan_cost.as_secs_f64());
363 READ_STAGE_ELAPSED
364 .with_label_values(&["yield"])
365 .observe(self.yield_cost.as_secs_f64());
366 READ_STAGE_ELAPSED
367 .with_label_values(&["total"])
368 .observe(self.total_cost.as_secs_f64());
369 READ_ROWS_RETURN.observe(self.num_rows as f64);
370 READ_BATCHES_RETURN.observe(self.num_batches as f64);
371
372 READ_STAGE_ELAPSED
373 .with_label_values(&["build_parts"])
374 .observe(self.build_parts_cost.as_secs_f64());
375
376 READ_ROW_GROUPS_TOTAL
377 .with_label_values(&["before_filtering"])
378 .inc_by(self.rg_total as u64);
379 READ_ROW_GROUPS_TOTAL
380 .with_label_values(&["fulltext_index_filtered"])
381 .inc_by(self.rg_fulltext_filtered as u64);
382 READ_ROW_GROUPS_TOTAL
383 .with_label_values(&["inverted_index_filtered"])
384 .inc_by(self.rg_inverted_filtered as u64);
385 READ_ROW_GROUPS_TOTAL
386 .with_label_values(&["minmax_index_filtered"])
387 .inc_by(self.rg_minmax_filtered as u64);
388 READ_ROW_GROUPS_TOTAL
389 .with_label_values(&["bloom_filter_index_filtered"])
390 .inc_by(self.rg_bloom_filtered as u64);
391
392 PRECISE_FILTER_ROWS_TOTAL
393 .with_label_values(&["parquet"])
394 .inc_by(self.rows_precise_filtered as u64);
395 READ_ROWS_IN_ROW_GROUP_TOTAL
396 .with_label_values(&["before_filtering"])
397 .inc_by(self.rows_before_filter as u64);
398 READ_ROWS_IN_ROW_GROUP_TOTAL
399 .with_label_values(&["fulltext_index_filtered"])
400 .inc_by(self.rows_fulltext_filtered as u64);
401 READ_ROWS_IN_ROW_GROUP_TOTAL
402 .with_label_values(&["inverted_index_filtered"])
403 .inc_by(self.rows_inverted_filtered as u64);
404 READ_ROWS_IN_ROW_GROUP_TOTAL
405 .with_label_values(&["bloom_filter_index_filtered"])
406 .inc_by(self.rows_bloom_filtered as u64);
407 }
408}
409
410struct PartitionMetricsInner {
411 region_id: RegionId,
412 partition: usize,
414 scanner_type: &'static str,
416 query_start: Instant,
418 explain_verbose: bool,
420 metrics: Mutex<ScanMetricsSet>,
422 in_progress_scan: IntGauge,
423
424 build_parts_cost: Time,
427 build_reader_cost: Time,
429 scan_cost: Time,
431 yield_cost: Time,
433 convert_cost: Time,
435 elapsed_compute: Time,
437}
438
439impl PartitionMetricsInner {
440 fn on_finish(&self, stream_eof: bool) {
441 let mut metrics = self.metrics.lock().unwrap();
442 if metrics.total_cost.is_zero() {
443 metrics.total_cost = self.query_start.elapsed();
444 }
445 if !metrics.stream_eof {
446 metrics.stream_eof = stream_eof;
447 }
448 }
449}
450
451impl Drop for PartitionMetricsInner {
452 fn drop(&mut self) {
453 self.on_finish(false);
454 let metrics = self.metrics.lock().unwrap();
455 metrics.observe_metrics();
456 self.in_progress_scan.dec();
457
458 if self.explain_verbose {
459 common_telemetry::info!(
460 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
461 self.scanner_type,
462 self.region_id,
463 self.partition,
464 metrics,
465 self.convert_cost,
466 );
467 } else {
468 common_telemetry::debug!(
469 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
470 self.scanner_type,
471 self.region_id,
472 self.partition,
473 metrics,
474 self.convert_cost,
475 );
476 }
477 }
478}
479
480#[derive(Default)]
482pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
483
484impl PartitionMetricsList {
485 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
487 let mut list = self.0.lock().unwrap();
488 if list.len() <= partition {
489 list.resize(partition + 1, None);
490 }
491 list[partition] = Some(metrics);
492 }
493
494 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
496 let list = self.0.lock().unwrap();
497 write!(f, ", \"metrics_per_partition\": ")?;
498 f.debug_list()
499 .entries(list.iter().filter_map(|p| p.as_ref()))
500 .finish()?;
501 write!(f, "}}")
502 }
503}
504
505#[derive(Clone)]
507pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
508
509impl PartitionMetrics {
510 pub(crate) fn new(
511 region_id: RegionId,
512 partition: usize,
513 scanner_type: &'static str,
514 query_start: Instant,
515 explain_verbose: bool,
516 metrics_set: &ExecutionPlanMetricsSet,
517 ) -> Self {
518 let partition_str = partition.to_string();
519 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
520 in_progress_scan.inc();
521 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
522 let inner = PartitionMetricsInner {
523 region_id,
524 partition,
525 scanner_type,
526 query_start,
527 explain_verbose,
528 metrics: Mutex::new(metrics),
529 in_progress_scan,
530 build_parts_cost: MetricBuilder::new(metrics_set)
531 .subset_time("build_parts_cost", partition),
532 build_reader_cost: MetricBuilder::new(metrics_set)
533 .subset_time("build_reader_cost", partition),
534 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
535 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
536 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
537 elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
538 };
539 Self(Arc::new(inner))
540 }
541
542 pub(crate) fn on_first_poll(&self) {
543 let mut metrics = self.0.metrics.lock().unwrap();
544 metrics.first_poll = self.0.query_start.elapsed();
545 }
546
547 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
548 let mut metrics = self.0.metrics.lock().unwrap();
549 metrics.num_mem_ranges += num;
550 }
551
552 pub fn inc_num_file_ranges(&self, num: usize) {
553 let mut metrics = self.0.metrics.lock().unwrap();
554 metrics.num_file_ranges += num;
555 }
556
557 fn record_elapsed_compute(&self, duration: Duration) {
558 if duration.is_zero() {
559 return;
560 }
561 self.0.elapsed_compute.add_duration(duration);
562 }
563
564 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
566 self.0.build_reader_cost.add_duration(cost);
567
568 let mut metrics = self.0.metrics.lock().unwrap();
569 metrics.build_reader_cost += cost;
570 }
571
572 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
573 self.0.convert_cost.add_duration(cost);
574 self.record_elapsed_compute(cost);
575 }
576
577 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
579 let mut metrics = self.0.metrics.lock().unwrap();
580 metrics.mem_scan_cost += data.scan_cost;
581 metrics.mem_rows += data.num_rows;
582 metrics.mem_batches += data.num_batches;
583 metrics.mem_series += data.total_series;
584 }
585
586 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
588 self.0
589 .build_reader_cost
590 .add_duration(metrics.build_reader_cost);
591 self.0.scan_cost.add_duration(metrics.scan_cost);
592 self.record_elapsed_compute(metrics.scan_cost);
593 self.0.yield_cost.add_duration(metrics.yield_cost);
594 self.record_elapsed_compute(metrics.yield_cost);
595
596 let mut metrics_set = self.0.metrics.lock().unwrap();
597 metrics_set.merge_scanner_metrics(metrics);
598 }
599
600 pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
602 self.0.build_parts_cost.add_duration(metrics.build_cost);
603
604 let mut metrics_set = self.0.metrics.lock().unwrap();
605 metrics_set.merge_reader_metrics(metrics);
606 }
607
608 pub(crate) fn on_finish(&self) {
610 self.0.on_finish(true);
611 }
612
613 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
615 let mut metrics_set = self.0.metrics.lock().unwrap();
616 metrics_set.set_distributor_metrics(metrics);
617 }
618}
619
620impl fmt::Debug for PartitionMetrics {
621 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
622 let metrics = self.0.metrics.lock().unwrap();
623 write!(
624 f,
625 r#"{{"partition":{}, "metrics":{:?}}}"#,
626 self.0.partition, metrics
627 )
628 }
629}
630
631#[derive(Default)]
633pub(crate) struct SeriesDistributorMetrics {
634 pub(crate) num_series_send_timeout: usize,
636 pub(crate) num_series_send_full: usize,
638 pub(crate) num_rows: usize,
640 pub(crate) num_batches: usize,
642 pub(crate) scan_cost: Duration,
644 pub(crate) yield_cost: Duration,
646}
647
648pub(crate) fn scan_mem_ranges(
650 stream_ctx: Arc<StreamContext>,
651 part_metrics: PartitionMetrics,
652 index: RowGroupIndex,
653 time_range: FileTimeRange,
654) -> impl Stream<Item = Result<Batch>> {
655 try_stream! {
656 let ranges = stream_ctx.input.build_mem_ranges(index);
657 part_metrics.inc_num_mem_ranges(ranges.len());
658 for range in ranges {
659 let build_reader_start = Instant::now();
660 let mem_scan_metrics = Some(MemScanMetrics::default());
661 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
662 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
663
664 let mut source = Source::Iter(iter);
665 while let Some(batch) = source.next_batch().await? {
666 yield batch;
667 }
668
669 if let Some(ref metrics) = mem_scan_metrics {
671 let data = metrics.data();
672 part_metrics.report_mem_scan_metrics(&data);
673 }
674 }
675 }
676}
677
678pub(crate) fn scan_flat_mem_ranges(
680 stream_ctx: Arc<StreamContext>,
681 part_metrics: PartitionMetrics,
682 index: RowGroupIndex,
683) -> impl Stream<Item = Result<RecordBatch>> {
684 try_stream! {
685 let ranges = stream_ctx.input.build_mem_ranges(index);
686 part_metrics.inc_num_mem_ranges(ranges.len());
687 for range in ranges {
688 let build_reader_start = Instant::now();
689 let mem_scan_metrics = Some(MemScanMetrics::default());
690 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
691 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
692
693 while let Some(record_batch) = iter.next().transpose()? {
694 yield record_batch;
695 }
696
697 if let Some(ref metrics) = mem_scan_metrics {
699 let data = metrics.data();
700 part_metrics.report_mem_scan_metrics(&data);
701 }
702 }
703 }
704}
705
706const SPLIT_ROW_THRESHOLD: u64 = DEFAULT_ROW_GROUP_SIZE as u64;
708const NUM_SERIES_THRESHOLD: u64 = 10240;
710const BATCH_SIZE_THRESHOLD: u64 = 50;
713
714pub(crate) fn should_split_flat_batches_for_merge(
716 stream_ctx: &Arc<StreamContext>,
717 range_meta: &RangeMeta,
718) -> bool {
719 let mut num_files_to_split = 0;
721 let mut num_mem_rows = 0;
722 let mut num_mem_series = 0;
723 for index in &range_meta.row_group_indices {
727 if stream_ctx.is_mem_range_index(*index) {
728 let memtable = &stream_ctx.input.memtables[index.index];
729 let stats = memtable.stats();
731 num_mem_rows += stats.num_rows();
732 num_mem_series += stats.series_count();
733 } else if stream_ctx.is_file_range_index(*index) {
734 let file_index = index.index - stream_ctx.input.num_memtables();
736 let file = &stream_ctx.input.files[file_index];
737 if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
738 continue;
740 }
741 debug_assert!(file.meta_ref().num_rows > 0);
742 if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
743 return false;
745 } else {
746 num_files_to_split += 1;
747 }
748 }
749 }
751
752 if num_files_to_split > 0 {
753 true
755 } else if num_mem_series > 0 && num_mem_rows > 0 {
756 can_split_series(num_mem_rows as u64, num_mem_series as u64)
758 } else {
759 false
760 }
761}
762
763fn can_split_series(num_rows: u64, num_series: u64) -> bool {
764 assert!(num_series > 0);
765 assert!(num_rows > 0);
766
767 num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
769}
770
771pub(crate) async fn scan_file_ranges(
773 stream_ctx: Arc<StreamContext>,
774 part_metrics: PartitionMetrics,
775 index: RowGroupIndex,
776 read_type: &'static str,
777 range_builder: Arc<RangeBuilderList>,
778) -> Result<impl Stream<Item = Result<Batch>>> {
779 let mut reader_metrics = ReaderMetrics::default();
780 let ranges = range_builder
781 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
782 .await?;
783 part_metrics.inc_num_file_ranges(ranges.len());
784 part_metrics.merge_reader_metrics(&reader_metrics);
785
786 Ok(build_file_range_scan_stream(
787 stream_ctx,
788 part_metrics,
789 read_type,
790 ranges,
791 ))
792}
793
794pub(crate) async fn scan_flat_file_ranges(
796 stream_ctx: Arc<StreamContext>,
797 part_metrics: PartitionMetrics,
798 index: RowGroupIndex,
799 read_type: &'static str,
800 range_builder: Arc<RangeBuilderList>,
801) -> Result<impl Stream<Item = Result<RecordBatch>>> {
802 let mut reader_metrics = ReaderMetrics::default();
803 let ranges = range_builder
804 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
805 .await?;
806 part_metrics.inc_num_file_ranges(ranges.len());
807 part_metrics.merge_reader_metrics(&reader_metrics);
808
809 Ok(build_flat_file_range_scan_stream(
810 stream_ctx,
811 part_metrics,
812 read_type,
813 ranges,
814 ))
815}
816
817pub fn build_file_range_scan_stream(
819 stream_ctx: Arc<StreamContext>,
820 part_metrics: PartitionMetrics,
821 read_type: &'static str,
822 ranges: SmallVec<[FileRange; 2]>,
823) -> impl Stream<Item = Result<Batch>> {
824 try_stream! {
825 let reader_metrics = &mut ReaderMetrics::default();
826 for range in ranges {
827 let build_reader_start = Instant::now();
828 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
829 let build_cost = build_reader_start.elapsed();
830 part_metrics.inc_build_reader_cost(build_cost);
831 let compat_batch = range.compat_batch();
832 let mut source = Source::PruneReader(reader);
833 while let Some(mut batch) = source.next_batch().await? {
834 if let Some(compact_batch) = compat_batch {
835 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
836 }
837 yield batch;
838 }
839 if let Source::PruneReader(reader) = source {
840 let prune_metrics = reader.metrics();
841 reader_metrics.merge_from(&prune_metrics);
842 }
843 }
844
845 reader_metrics.observe_rows(read_type);
847 reader_metrics.filter_metrics.observe();
848 part_metrics.merge_reader_metrics(reader_metrics);
849 }
850}
851
852pub fn build_flat_file_range_scan_stream(
854 _stream_ctx: Arc<StreamContext>,
855 part_metrics: PartitionMetrics,
856 read_type: &'static str,
857 ranges: SmallVec<[FileRange; 2]>,
858) -> impl Stream<Item = Result<RecordBatch>> {
859 try_stream! {
860 let reader_metrics = &mut ReaderMetrics::default();
861 for range in ranges {
862 let build_reader_start = Instant::now();
863 let mut reader = range.flat_reader().await?;
864 let build_cost = build_reader_start.elapsed();
865 part_metrics.inc_build_reader_cost(build_cost);
866
867 let may_compat = range
868 .compat_batch()
869 .map(|compat| {
870 compat.as_flat().context(UnexpectedSnafu {
871 reason: "Invalid compat for flat format",
872 })
873 })
874 .transpose()?;
875 while let Some(record_batch) = reader.next_batch()? {
876 if let Some(flat_compat) = may_compat {
877 let batch = flat_compat.compat(record_batch)?;
878 yield batch;
879 } else {
880 yield record_batch;
881 }
882 }
883
884 let prune_metrics = reader.metrics();
885 reader_metrics.merge_from(&prune_metrics);
886 }
887
888 reader_metrics.observe_rows(read_type);
890 reader_metrics.filter_metrics.observe();
891 part_metrics.merge_reader_metrics(reader_metrics);
892 }
893}
894
895#[cfg(feature = "enterprise")]
897pub(crate) async fn scan_extension_range(
898 context: Arc<StreamContext>,
899 index: RowGroupIndex,
900 partition_metrics: PartitionMetrics,
901) -> Result<BoxedBatchStream> {
902 use snafu::ResultExt;
903
904 let range = context.input.extension_range(index.index);
905 let reader = range.reader(context.as_ref());
906 let stream = reader
907 .read(context, partition_metrics, index)
908 .await
909 .context(crate::error::ScanExternalRangeSnafu)?;
910 Ok(stream)
911}
912
913pub(crate) async fn maybe_scan_other_ranges(
914 context: &Arc<StreamContext>,
915 index: RowGroupIndex,
916 metrics: &PartitionMetrics,
917) -> Result<BoxedBatchStream> {
918 #[cfg(feature = "enterprise")]
919 {
920 scan_extension_range(context.clone(), index, metrics.clone()).await
921 }
922
923 #[cfg(not(feature = "enterprise"))]
924 {
925 let _ = context;
926 let _ = index;
927 let _ = metrics;
928
929 crate::error::UnexpectedSnafu {
930 reason: "no other ranges scannable",
931 }
932 .fail()
933 }
934}
935
936pub(crate) async fn maybe_scan_flat_other_ranges(
937 context: &Arc<StreamContext>,
938 index: RowGroupIndex,
939 metrics: &PartitionMetrics,
940) -> Result<BoxedRecordBatchStream> {
941 let _ = context;
942 let _ = index;
943 let _ = metrics;
944
945 crate::error::UnexpectedSnafu {
946 reason: "no other ranges scannable in flat format",
947 }
948 .fail()
949}
950
951pub(crate) struct SplitRecordBatchStream<S> {
953 inner: S,
955 batches: VecDeque<RecordBatch>,
957}
958
959impl<S> SplitRecordBatchStream<S> {
960 pub(crate) fn new(inner: S) -> Self {
962 Self {
963 inner,
964 batches: VecDeque::new(),
965 }
966 }
967}
968
969impl<S> Stream for SplitRecordBatchStream<S>
970where
971 S: Stream<Item = Result<RecordBatch>> + Unpin,
972{
973 type Item = Result<RecordBatch>;
974
975 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
976 loop {
977 if let Some(batch) = self.batches.pop_front() {
979 return Poll::Ready(Some(Ok(batch)));
980 }
981
982 let record_batch = match futures::ready!(Pin::new(&mut self.inner).poll_next(cx)) {
984 Some(Ok(batch)) => batch,
985 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
986 None => return Poll::Ready(None),
987 };
988
989 split_record_batch(record_batch, &mut self.batches);
991 }
993 }
994}
995
996pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeque<RecordBatch>) {
1001 let batch_rows = record_batch.num_rows();
1002 if batch_rows == 0 {
1003 return;
1004 }
1005 if batch_rows < 2 {
1006 batches.push_back(record_batch);
1007 return;
1008 }
1009
1010 let time_index_pos = time_index_column_index(record_batch.num_columns());
1011 let timestamps = record_batch.column(time_index_pos);
1012 let (ts_values, _unit) = timestamp_array_to_primitive(timestamps).unwrap();
1013 let mut offsets = Vec::with_capacity(16);
1014 offsets.push(0);
1015 let values = ts_values.values();
1016 for (i, &value) in values.iter().take(batch_rows - 1).enumerate() {
1017 if value > values[i + 1] {
1018 offsets.push(i + 1);
1019 }
1020 }
1021 offsets.push(values.len());
1022
1023 for (i, &start) in offsets[..offsets.len() - 1].iter().enumerate() {
1025 let end = offsets[i + 1];
1026 let rows_in_batch = end - start;
1027 batches.push_back(record_batch.slice(start, rows_in_batch));
1028 }
1029}