1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use datatypes::arrow::record_batch::RecordBatch;
24use futures::Stream;
25use prometheus::IntGauge;
26use smallvec::SmallVec;
27use snafu::OptionExt;
28use store_api::storage::RegionId;
29
30use crate::error::{Result, UnexpectedSnafu};
31use crate::memtable::MemScanMetrics;
32use crate::metrics::{
33 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROW_GROUPS_TOTAL,
34 READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_RETURN, READ_STAGE_ELAPSED,
35};
36use crate::read::range::{RangeBuilderList, RowGroupIndex};
37use crate::read::scan_region::StreamContext;
38use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::file_range::FileRange;
41use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
42
43#[derive(Default)]
45pub(crate) struct ScanMetricsSet {
46 prepare_scan_cost: Duration,
48 build_reader_cost: Duration,
50 scan_cost: Duration,
52 yield_cost: Duration,
54 total_cost: Duration,
56 num_rows: usize,
58 num_batches: usize,
60 num_mem_ranges: usize,
62 num_file_ranges: usize,
64
65 mem_scan_cost: Duration,
68 mem_rows: usize,
70 mem_batches: usize,
72 mem_series: usize,
74
75 build_parts_cost: Duration,
78 rg_total: usize,
80 rg_fulltext_filtered: usize,
82 rg_inverted_filtered: usize,
84 rg_minmax_filtered: usize,
86 rg_bloom_filtered: usize,
88 rows_before_filter: usize,
90 rows_fulltext_filtered: usize,
92 rows_inverted_filtered: usize,
94 rows_bloom_filtered: usize,
96 rows_precise_filtered: usize,
98 num_sst_record_batches: usize,
100 num_sst_batches: usize,
102 num_sst_rows: usize,
104
105 first_poll: Duration,
107
108 num_series_send_timeout: usize,
110 num_series_send_full: usize,
112 num_distributor_rows: usize,
114 num_distributor_batches: usize,
116 distributor_scan_cost: Duration,
118 distributor_yield_cost: Duration,
120
121 stream_eof: bool,
123}
124
125impl fmt::Debug for ScanMetricsSet {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 let ScanMetricsSet {
128 prepare_scan_cost,
129 build_reader_cost,
130 scan_cost,
131 yield_cost,
132 total_cost,
133 num_rows,
134 num_batches,
135 num_mem_ranges,
136 num_file_ranges,
137 build_parts_cost,
138 rg_total,
139 rg_fulltext_filtered,
140 rg_inverted_filtered,
141 rg_minmax_filtered,
142 rg_bloom_filtered,
143 rows_before_filter,
144 rows_fulltext_filtered,
145 rows_inverted_filtered,
146 rows_bloom_filtered,
147 rows_precise_filtered,
148 num_sst_record_batches,
149 num_sst_batches,
150 num_sst_rows,
151 first_poll,
152 num_series_send_timeout,
153 num_series_send_full,
154 num_distributor_rows,
155 num_distributor_batches,
156 distributor_scan_cost,
157 distributor_yield_cost,
158 stream_eof,
159 mem_scan_cost,
160 mem_rows,
161 mem_batches,
162 mem_series,
163 } = self;
164
165 write!(
167 f,
168 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
169 \"build_reader_cost\":\"{build_reader_cost:?}\", \
170 \"scan_cost\":\"{scan_cost:?}\", \
171 \"yield_cost\":\"{yield_cost:?}\", \
172 \"total_cost\":\"{total_cost:?}\", \
173 \"num_rows\":{num_rows}, \
174 \"num_batches\":{num_batches}, \
175 \"num_mem_ranges\":{num_mem_ranges}, \
176 \"num_file_ranges\":{num_file_ranges}, \
177 \"build_parts_cost\":\"{build_parts_cost:?}\", \
178 \"rg_total\":{rg_total}, \
179 \"rows_before_filter\":{rows_before_filter}, \
180 \"num_sst_record_batches\":{num_sst_record_batches}, \
181 \"num_sst_batches\":{num_sst_batches}, \
182 \"num_sst_rows\":{num_sst_rows}, \
183 \"first_poll\":\"{first_poll:?}\""
184 )?;
185
186 if *rg_fulltext_filtered > 0 {
188 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
189 }
190 if *rg_inverted_filtered > 0 {
191 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
192 }
193 if *rg_minmax_filtered > 0 {
194 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
195 }
196 if *rg_bloom_filtered > 0 {
197 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
198 }
199 if *rows_fulltext_filtered > 0 {
200 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
201 }
202 if *rows_inverted_filtered > 0 {
203 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
204 }
205 if *rows_bloom_filtered > 0 {
206 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
207 }
208 if *rows_precise_filtered > 0 {
209 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
210 }
211
212 if *num_series_send_timeout > 0 {
214 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
215 }
216 if *num_series_send_full > 0 {
217 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
218 }
219 if *num_distributor_rows > 0 {
220 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
221 }
222 if *num_distributor_batches > 0 {
223 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
224 }
225 if !distributor_scan_cost.is_zero() {
226 write!(
227 f,
228 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
229 )?;
230 }
231 if !distributor_yield_cost.is_zero() {
232 write!(
233 f,
234 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
235 )?;
236 }
237
238 if *mem_rows > 0 {
240 write!(f, ", \"mem_rows\":{mem_rows}")?;
241 }
242 if *mem_batches > 0 {
243 write!(f, ", \"mem_batches\":{mem_batches}")?;
244 }
245 if *mem_series > 0 {
246 write!(f, ", \"mem_series\":{mem_series}")?;
247 }
248 if !mem_scan_cost.is_zero() {
249 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
250 }
251
252 write!(f, ", \"stream_eof\":{stream_eof}}}")
253 }
254}
255impl ScanMetricsSet {
256 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
258 self.prepare_scan_cost += cost;
259 self
260 }
261
262 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
264 let ScannerMetrics {
265 prepare_scan_cost,
266 build_reader_cost,
267 scan_cost,
268 yield_cost,
269 num_batches,
270 num_rows,
271 num_mem_ranges,
272 num_file_ranges,
273 } = other;
274
275 self.prepare_scan_cost += *prepare_scan_cost;
276 self.build_reader_cost += *build_reader_cost;
277 self.scan_cost += *scan_cost;
278 self.yield_cost += *yield_cost;
279 self.num_rows += *num_rows;
280 self.num_batches += *num_batches;
281 self.num_mem_ranges += *num_mem_ranges;
282 self.num_file_ranges += *num_file_ranges;
283 }
284
285 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
287 let ReaderMetrics {
288 build_cost,
289 filter_metrics:
290 ReaderFilterMetrics {
291 rg_total,
292 rg_fulltext_filtered,
293 rg_inverted_filtered,
294 rg_minmax_filtered,
295 rg_bloom_filtered,
296 rows_total,
297 rows_fulltext_filtered,
298 rows_inverted_filtered,
299 rows_bloom_filtered,
300 rows_precise_filtered,
301 },
302 num_record_batches,
303 num_batches,
304 num_rows,
305 scan_cost: _,
306 } = other;
307
308 self.build_parts_cost += *build_cost;
309
310 self.rg_total += *rg_total;
311 self.rg_fulltext_filtered += *rg_fulltext_filtered;
312 self.rg_inverted_filtered += *rg_inverted_filtered;
313 self.rg_minmax_filtered += *rg_minmax_filtered;
314 self.rg_bloom_filtered += *rg_bloom_filtered;
315
316 self.rows_before_filter += *rows_total;
317 self.rows_fulltext_filtered += *rows_fulltext_filtered;
318 self.rows_inverted_filtered += *rows_inverted_filtered;
319 self.rows_bloom_filtered += *rows_bloom_filtered;
320 self.rows_precise_filtered += *rows_precise_filtered;
321
322 self.num_sst_record_batches += *num_record_batches;
323 self.num_sst_batches += *num_batches;
324 self.num_sst_rows += *num_rows;
325 }
326
327 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
329 let SeriesDistributorMetrics {
330 num_series_send_timeout,
331 num_series_send_full,
332 num_rows,
333 num_batches,
334 scan_cost,
335 yield_cost,
336 } = distributor_metrics;
337
338 self.num_series_send_timeout += *num_series_send_timeout;
339 self.num_series_send_full += *num_series_send_full;
340 self.num_distributor_rows += *num_rows;
341 self.num_distributor_batches += *num_batches;
342 self.distributor_scan_cost += *scan_cost;
343 self.distributor_yield_cost += *yield_cost;
344 }
345
346 fn observe_metrics(&self) {
348 READ_STAGE_ELAPSED
349 .with_label_values(&["prepare_scan"])
350 .observe(self.prepare_scan_cost.as_secs_f64());
351 READ_STAGE_ELAPSED
352 .with_label_values(&["build_reader"])
353 .observe(self.build_reader_cost.as_secs_f64());
354 READ_STAGE_ELAPSED
355 .with_label_values(&["scan"])
356 .observe(self.scan_cost.as_secs_f64());
357 READ_STAGE_ELAPSED
358 .with_label_values(&["yield"])
359 .observe(self.yield_cost.as_secs_f64());
360 READ_STAGE_ELAPSED
361 .with_label_values(&["total"])
362 .observe(self.total_cost.as_secs_f64());
363 READ_ROWS_RETURN.observe(self.num_rows as f64);
364 READ_BATCHES_RETURN.observe(self.num_batches as f64);
365
366 READ_STAGE_ELAPSED
367 .with_label_values(&["build_parts"])
368 .observe(self.build_parts_cost.as_secs_f64());
369
370 READ_ROW_GROUPS_TOTAL
371 .with_label_values(&["before_filtering"])
372 .inc_by(self.rg_total as u64);
373 READ_ROW_GROUPS_TOTAL
374 .with_label_values(&["fulltext_index_filtered"])
375 .inc_by(self.rg_fulltext_filtered as u64);
376 READ_ROW_GROUPS_TOTAL
377 .with_label_values(&["inverted_index_filtered"])
378 .inc_by(self.rg_inverted_filtered as u64);
379 READ_ROW_GROUPS_TOTAL
380 .with_label_values(&["minmax_index_filtered"])
381 .inc_by(self.rg_minmax_filtered as u64);
382 READ_ROW_GROUPS_TOTAL
383 .with_label_values(&["bloom_filter_index_filtered"])
384 .inc_by(self.rg_bloom_filtered as u64);
385
386 PRECISE_FILTER_ROWS_TOTAL
387 .with_label_values(&["parquet"])
388 .inc_by(self.rows_precise_filtered as u64);
389 READ_ROWS_IN_ROW_GROUP_TOTAL
390 .with_label_values(&["before_filtering"])
391 .inc_by(self.rows_before_filter as u64);
392 READ_ROWS_IN_ROW_GROUP_TOTAL
393 .with_label_values(&["fulltext_index_filtered"])
394 .inc_by(self.rows_fulltext_filtered as u64);
395 READ_ROWS_IN_ROW_GROUP_TOTAL
396 .with_label_values(&["inverted_index_filtered"])
397 .inc_by(self.rows_inverted_filtered as u64);
398 READ_ROWS_IN_ROW_GROUP_TOTAL
399 .with_label_values(&["bloom_filter_index_filtered"])
400 .inc_by(self.rows_bloom_filtered as u64);
401 }
402}
403
404struct PartitionMetricsInner {
405 region_id: RegionId,
406 partition: usize,
408 scanner_type: &'static str,
410 query_start: Instant,
412 explain_verbose: bool,
414 metrics: Mutex<ScanMetricsSet>,
416 in_progress_scan: IntGauge,
417
418 build_parts_cost: Time,
421 build_reader_cost: Time,
423 scan_cost: Time,
425 yield_cost: Time,
427 convert_cost: Time,
429}
430
431impl PartitionMetricsInner {
432 fn on_finish(&self, stream_eof: bool) {
433 let mut metrics = self.metrics.lock().unwrap();
434 if metrics.total_cost.is_zero() {
435 metrics.total_cost = self.query_start.elapsed();
436 }
437 if !metrics.stream_eof {
438 metrics.stream_eof = stream_eof;
439 }
440 }
441}
442
443impl Drop for PartitionMetricsInner {
444 fn drop(&mut self) {
445 self.on_finish(false);
446 let metrics = self.metrics.lock().unwrap();
447 metrics.observe_metrics();
448 self.in_progress_scan.dec();
449
450 if self.explain_verbose {
451 common_telemetry::info!(
452 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
453 self.scanner_type,
454 self.region_id,
455 self.partition,
456 metrics,
457 self.convert_cost,
458 );
459 } else {
460 common_telemetry::debug!(
461 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
462 self.scanner_type,
463 self.region_id,
464 self.partition,
465 metrics,
466 self.convert_cost,
467 );
468 }
469 }
470}
471
472#[derive(Default)]
474pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
475
476impl PartitionMetricsList {
477 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
479 let mut list = self.0.lock().unwrap();
480 if list.len() <= partition {
481 list.resize(partition + 1, None);
482 }
483 list[partition] = Some(metrics);
484 }
485
486 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
488 let list = self.0.lock().unwrap();
489 write!(f, ", \"metrics_per_partition\": ")?;
490 f.debug_list()
491 .entries(list.iter().filter_map(|p| p.as_ref()))
492 .finish()?;
493 write!(f, "}}")
494 }
495}
496
497#[derive(Clone)]
499pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
500
501impl PartitionMetrics {
502 pub(crate) fn new(
503 region_id: RegionId,
504 partition: usize,
505 scanner_type: &'static str,
506 query_start: Instant,
507 explain_verbose: bool,
508 metrics_set: &ExecutionPlanMetricsSet,
509 ) -> Self {
510 let partition_str = partition.to_string();
511 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
512 in_progress_scan.inc();
513 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
514 let inner = PartitionMetricsInner {
515 region_id,
516 partition,
517 scanner_type,
518 query_start,
519 explain_verbose,
520 metrics: Mutex::new(metrics),
521 in_progress_scan,
522 build_parts_cost: MetricBuilder::new(metrics_set)
523 .subset_time("build_parts_cost", partition),
524 build_reader_cost: MetricBuilder::new(metrics_set)
525 .subset_time("build_reader_cost", partition),
526 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
527 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
528 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
529 };
530 Self(Arc::new(inner))
531 }
532
533 pub(crate) fn on_first_poll(&self) {
534 let mut metrics = self.0.metrics.lock().unwrap();
535 metrics.first_poll = self.0.query_start.elapsed();
536 }
537
538 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
539 let mut metrics = self.0.metrics.lock().unwrap();
540 metrics.num_mem_ranges += num;
541 }
542
543 pub fn inc_num_file_ranges(&self, num: usize) {
544 let mut metrics = self.0.metrics.lock().unwrap();
545 metrics.num_file_ranges += num;
546 }
547
548 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
550 self.0.build_reader_cost.add_duration(cost);
551
552 let mut metrics = self.0.metrics.lock().unwrap();
553 metrics.build_reader_cost += cost;
554 }
555
556 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
557 self.0.convert_cost.add_duration(cost);
558 }
559
560 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
562 let mut metrics = self.0.metrics.lock().unwrap();
563 metrics.mem_scan_cost += data.scan_cost;
564 metrics.mem_rows += data.num_rows;
565 metrics.mem_batches += data.num_batches;
566 metrics.mem_series += data.total_series;
567 }
568
569 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
571 self.0
572 .build_reader_cost
573 .add_duration(metrics.build_reader_cost);
574 self.0.scan_cost.add_duration(metrics.scan_cost);
575 self.0.yield_cost.add_duration(metrics.yield_cost);
576
577 let mut metrics_set = self.0.metrics.lock().unwrap();
578 metrics_set.merge_scanner_metrics(metrics);
579 }
580
581 pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
583 self.0.build_parts_cost.add_duration(metrics.build_cost);
584
585 let mut metrics_set = self.0.metrics.lock().unwrap();
586 metrics_set.merge_reader_metrics(metrics);
587 }
588
589 pub(crate) fn on_finish(&self) {
591 self.0.on_finish(true);
592 }
593
594 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
596 let mut metrics_set = self.0.metrics.lock().unwrap();
597 metrics_set.set_distributor_metrics(metrics);
598 }
599}
600
601impl fmt::Debug for PartitionMetrics {
602 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603 let metrics = self.0.metrics.lock().unwrap();
604 write!(
605 f,
606 r#"{{"partition":{}, "metrics":{:?}}}"#,
607 self.0.partition, metrics
608 )
609 }
610}
611
612#[derive(Default)]
614pub(crate) struct SeriesDistributorMetrics {
615 pub(crate) num_series_send_timeout: usize,
617 pub(crate) num_series_send_full: usize,
619 pub(crate) num_rows: usize,
621 pub(crate) num_batches: usize,
623 pub(crate) scan_cost: Duration,
625 pub(crate) yield_cost: Duration,
627}
628
629pub(crate) fn scan_mem_ranges(
631 stream_ctx: Arc<StreamContext>,
632 part_metrics: PartitionMetrics,
633 index: RowGroupIndex,
634 time_range: FileTimeRange,
635) -> impl Stream<Item = Result<Batch>> {
636 try_stream! {
637 let ranges = stream_ctx.input.build_mem_ranges(index);
638 part_metrics.inc_num_mem_ranges(ranges.len());
639 for range in ranges {
640 let build_reader_start = Instant::now();
641 let mem_scan_metrics = Some(MemScanMetrics::default());
642 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
643 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
644
645 let mut source = Source::Iter(iter);
646 while let Some(batch) = source.next_batch().await? {
647 yield batch;
648 }
649
650 if let Some(ref metrics) = mem_scan_metrics {
652 let data = metrics.data();
653 part_metrics.report_mem_scan_metrics(&data);
654 }
655 }
656 }
657}
658
659#[allow(dead_code)]
661pub(crate) fn scan_flat_mem_ranges(
662 stream_ctx: Arc<StreamContext>,
663 part_metrics: PartitionMetrics,
664 index: RowGroupIndex,
665) -> impl Stream<Item = Result<RecordBatch>> {
666 try_stream! {
667 let ranges = stream_ctx.input.build_mem_ranges(index);
668 part_metrics.inc_num_mem_ranges(ranges.len());
669 for range in ranges {
670 let build_reader_start = Instant::now();
671 let mem_scan_metrics = Some(MemScanMetrics::default());
672 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
673 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
674
675 while let Some(record_batch) = iter.next().transpose()? {
676 yield record_batch;
677 }
678
679 if let Some(ref metrics) = mem_scan_metrics {
681 let data = metrics.data();
682 part_metrics.report_mem_scan_metrics(&data);
683 }
684 }
685 }
686}
687
688pub(crate) async fn scan_file_ranges(
690 stream_ctx: Arc<StreamContext>,
691 part_metrics: PartitionMetrics,
692 index: RowGroupIndex,
693 read_type: &'static str,
694 range_builder: Arc<RangeBuilderList>,
695) -> Result<impl Stream<Item = Result<Batch>>> {
696 let mut reader_metrics = ReaderMetrics::default();
697 let ranges = range_builder
698 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
699 .await?;
700 part_metrics.inc_num_file_ranges(ranges.len());
701 part_metrics.merge_reader_metrics(&reader_metrics);
702
703 Ok(build_file_range_scan_stream(
704 stream_ctx,
705 part_metrics,
706 read_type,
707 ranges,
708 ))
709}
710
711#[allow(dead_code)]
713pub(crate) async fn scan_flat_file_ranges(
714 stream_ctx: Arc<StreamContext>,
715 part_metrics: PartitionMetrics,
716 index: RowGroupIndex,
717 read_type: &'static str,
718 range_builder: Arc<RangeBuilderList>,
719) -> Result<impl Stream<Item = Result<RecordBatch>>> {
720 let mut reader_metrics = ReaderMetrics::default();
721 let ranges = range_builder
722 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
723 .await?;
724 part_metrics.inc_num_file_ranges(ranges.len());
725 part_metrics.merge_reader_metrics(&reader_metrics);
726
727 Ok(build_flat_file_range_scan_stream(
728 stream_ctx,
729 part_metrics,
730 read_type,
731 ranges,
732 ))
733}
734
735pub fn build_file_range_scan_stream(
737 stream_ctx: Arc<StreamContext>,
738 part_metrics: PartitionMetrics,
739 read_type: &'static str,
740 ranges: SmallVec<[FileRange; 2]>,
741) -> impl Stream<Item = Result<Batch>> {
742 try_stream! {
743 let reader_metrics = &mut ReaderMetrics::default();
744 for range in ranges {
745 let build_reader_start = Instant::now();
746 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
747 let build_cost = build_reader_start.elapsed();
748 part_metrics.inc_build_reader_cost(build_cost);
749 let compat_batch = range.compat_batch();
750 let mut source = Source::PruneReader(reader);
751 while let Some(mut batch) = source.next_batch().await? {
752 if let Some(compact_batch) = compat_batch {
753 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
754 }
755 yield batch;
756 }
757 if let Source::PruneReader(reader) = source {
758 let prune_metrics = reader.metrics();
759 reader_metrics.merge_from(&prune_metrics);
760 }
761 }
762
763 reader_metrics.observe_rows(read_type);
765 reader_metrics.filter_metrics.observe();
766 part_metrics.merge_reader_metrics(reader_metrics);
767 }
768}
769
770pub fn build_flat_file_range_scan_stream(
772 _stream_ctx: Arc<StreamContext>,
773 part_metrics: PartitionMetrics,
774 read_type: &'static str,
775 ranges: SmallVec<[FileRange; 2]>,
776) -> impl Stream<Item = Result<RecordBatch>> {
777 try_stream! {
778 let reader_metrics = &mut ReaderMetrics::default();
779 for range in ranges {
780 let build_reader_start = Instant::now();
781 let mut reader = range.flat_reader().await?;
782 let build_cost = build_reader_start.elapsed();
783 part_metrics.inc_build_reader_cost(build_cost);
784
785 let may_compat = range
786 .compat_batch()
787 .map(|compat| {
788 compat.as_flat().context(UnexpectedSnafu {
789 reason: "Invalid compat for flat format",
790 })
791 })
792 .transpose()?;
793 while let Some(record_batch) = reader.next_batch()? {
794 if let Some(flat_compat) = may_compat {
795 let batch = flat_compat.compat(record_batch)?;
796 yield batch;
797 } else {
798 yield record_batch;
799 }
800 }
801
802 let prune_metrics = reader.metrics();
803 reader_metrics.merge_from(&prune_metrics);
804 }
805
806 reader_metrics.observe_rows(read_type);
808 reader_metrics.filter_metrics.observe();
809 part_metrics.merge_reader_metrics(reader_metrics);
810 }
811}
812
813#[cfg(feature = "enterprise")]
815pub(crate) async fn scan_extension_range(
816 context: Arc<StreamContext>,
817 index: RowGroupIndex,
818 partition_metrics: PartitionMetrics,
819) -> Result<BoxedBatchStream> {
820 use snafu::ResultExt;
821
822 let range = context.input.extension_range(index.index);
823 let reader = range.reader(context.as_ref());
824 let stream = reader
825 .read(context, partition_metrics, index)
826 .await
827 .context(crate::error::ScanExternalRangeSnafu)?;
828 Ok(stream)
829}
830
831pub(crate) async fn maybe_scan_other_ranges(
832 context: &Arc<StreamContext>,
833 index: RowGroupIndex,
834 metrics: &PartitionMetrics,
835) -> Result<BoxedBatchStream> {
836 #[cfg(feature = "enterprise")]
837 {
838 scan_extension_range(context.clone(), index, metrics.clone()).await
839 }
840
841 #[cfg(not(feature = "enterprise"))]
842 {
843 let _ = context;
844 let _ = index;
845 let _ = metrics;
846
847 crate::error::UnexpectedSnafu {
848 reason: "no other ranges scannable",
849 }
850 .fail()
851 }
852}
853
854#[allow(dead_code)]
855pub(crate) async fn maybe_scan_flat_other_ranges(
856 context: &Arc<StreamContext>,
857 index: RowGroupIndex,
858 metrics: &PartitionMetrics,
859) -> Result<BoxedRecordBatchStream> {
860 let _ = context;
861 let _ = index;
862 let _ = metrics;
863
864 crate::error::UnexpectedSnafu {
865 reason: "no other ranges scannable in flat format",
866 }
867 .fail()
868}