1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
23use datatypes::arrow::record_batch::RecordBatch;
24use futures::Stream;
25use prometheus::IntGauge;
26use smallvec::SmallVec;
27use snafu::OptionExt;
28use store_api::storage::RegionId;
29
30use crate::error::{Result, UnexpectedSnafu};
31use crate::memtable::MemScanMetrics;
32use crate::metrics::{
33 IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
34 READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
35};
36use crate::read::range::{RangeBuilderList, RowGroupIndex};
37use crate::read::scan_region::StreamContext;
38use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::file_range::FileRange;
41use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
42
43#[derive(Default)]
45pub(crate) struct ScanMetricsSet {
46 prepare_scan_cost: Duration,
48 build_reader_cost: Duration,
50 scan_cost: Duration,
52 yield_cost: Duration,
54 total_cost: Duration,
56 num_rows: usize,
58 num_batches: usize,
60 num_mem_ranges: usize,
62 num_file_ranges: usize,
64
65 mem_scan_cost: Duration,
68 mem_rows: usize,
70 mem_batches: usize,
72 mem_series: usize,
74
75 build_parts_cost: Duration,
78 rg_total: usize,
80 rg_fulltext_filtered: usize,
82 rg_inverted_filtered: usize,
84 rg_minmax_filtered: usize,
86 rg_bloom_filtered: usize,
88 rows_before_filter: usize,
90 rows_fulltext_filtered: usize,
92 rows_inverted_filtered: usize,
94 rows_bloom_filtered: usize,
96 rows_precise_filtered: usize,
98 num_sst_record_batches: usize,
100 num_sst_batches: usize,
102 num_sst_rows: usize,
104
105 first_poll: Duration,
107
108 num_series_send_timeout: usize,
110 num_series_send_full: usize,
112 num_distributor_rows: usize,
114 num_distributor_batches: usize,
116 distributor_scan_cost: Duration,
118 distributor_yield_cost: Duration,
120
121 stream_eof: bool,
123}
124
125impl fmt::Debug for ScanMetricsSet {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 let ScanMetricsSet {
128 prepare_scan_cost,
129 build_reader_cost,
130 scan_cost,
131 yield_cost,
132 total_cost,
133 num_rows,
134 num_batches,
135 num_mem_ranges,
136 num_file_ranges,
137 build_parts_cost,
138 rg_total,
139 rg_fulltext_filtered,
140 rg_inverted_filtered,
141 rg_minmax_filtered,
142 rg_bloom_filtered,
143 rows_before_filter,
144 rows_fulltext_filtered,
145 rows_inverted_filtered,
146 rows_bloom_filtered,
147 rows_precise_filtered,
148 num_sst_record_batches,
149 num_sst_batches,
150 num_sst_rows,
151 first_poll,
152 num_series_send_timeout,
153 num_series_send_full,
154 num_distributor_rows,
155 num_distributor_batches,
156 distributor_scan_cost,
157 distributor_yield_cost,
158 stream_eof,
159 mem_scan_cost,
160 mem_rows,
161 mem_batches,
162 mem_series,
163 } = self;
164
165 write!(
167 f,
168 "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
169 \"build_reader_cost\":\"{build_reader_cost:?}\", \
170 \"scan_cost\":\"{scan_cost:?}\", \
171 \"yield_cost\":\"{yield_cost:?}\", \
172 \"total_cost\":\"{total_cost:?}\", \
173 \"num_rows\":{num_rows}, \
174 \"num_batches\":{num_batches}, \
175 \"num_mem_ranges\":{num_mem_ranges}, \
176 \"num_file_ranges\":{num_file_ranges}, \
177 \"build_parts_cost\":\"{build_parts_cost:?}\", \
178 \"rg_total\":{rg_total}, \
179 \"rows_before_filter\":{rows_before_filter}, \
180 \"num_sst_record_batches\":{num_sst_record_batches}, \
181 \"num_sst_batches\":{num_sst_batches}, \
182 \"num_sst_rows\":{num_sst_rows}, \
183 \"first_poll\":\"{first_poll:?}\""
184 )?;
185
186 if *rg_fulltext_filtered > 0 {
188 write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
189 }
190 if *rg_inverted_filtered > 0 {
191 write!(f, ", \"rg_inverted_filtered\":{rg_inverted_filtered}")?;
192 }
193 if *rg_minmax_filtered > 0 {
194 write!(f, ", \"rg_minmax_filtered\":{rg_minmax_filtered}")?;
195 }
196 if *rg_bloom_filtered > 0 {
197 write!(f, ", \"rg_bloom_filtered\":{rg_bloom_filtered}")?;
198 }
199 if *rows_fulltext_filtered > 0 {
200 write!(f, ", \"rows_fulltext_filtered\":{rows_fulltext_filtered}")?;
201 }
202 if *rows_inverted_filtered > 0 {
203 write!(f, ", \"rows_inverted_filtered\":{rows_inverted_filtered}")?;
204 }
205 if *rows_bloom_filtered > 0 {
206 write!(f, ", \"rows_bloom_filtered\":{rows_bloom_filtered}")?;
207 }
208 if *rows_precise_filtered > 0 {
209 write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
210 }
211
212 if *num_series_send_timeout > 0 {
214 write!(f, ", \"num_series_send_timeout\":{num_series_send_timeout}")?;
215 }
216 if *num_series_send_full > 0 {
217 write!(f, ", \"num_series_send_full\":{num_series_send_full}")?;
218 }
219 if *num_distributor_rows > 0 {
220 write!(f, ", \"num_distributor_rows\":{num_distributor_rows}")?;
221 }
222 if *num_distributor_batches > 0 {
223 write!(f, ", \"num_distributor_batches\":{num_distributor_batches}")?;
224 }
225 if !distributor_scan_cost.is_zero() {
226 write!(
227 f,
228 ", \"distributor_scan_cost\":\"{distributor_scan_cost:?}\""
229 )?;
230 }
231 if !distributor_yield_cost.is_zero() {
232 write!(
233 f,
234 ", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
235 )?;
236 }
237
238 if *mem_rows > 0 {
240 write!(f, ", \"mem_rows\":{mem_rows}")?;
241 }
242 if *mem_batches > 0 {
243 write!(f, ", \"mem_batches\":{mem_batches}")?;
244 }
245 if *mem_series > 0 {
246 write!(f, ", \"mem_series\":{mem_series}")?;
247 }
248 if !mem_scan_cost.is_zero() {
249 write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
250 }
251
252 write!(f, ", \"stream_eof\":{stream_eof}}}")
253 }
254}
255impl ScanMetricsSet {
256 fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
258 self.prepare_scan_cost += cost;
259 self
260 }
261
262 fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
264 let ScannerMetrics {
265 prepare_scan_cost,
266 build_reader_cost,
267 scan_cost,
268 yield_cost,
269 num_batches,
270 num_rows,
271 num_mem_ranges,
272 num_file_ranges,
273 } = other;
274
275 self.prepare_scan_cost += *prepare_scan_cost;
276 self.build_reader_cost += *build_reader_cost;
277 self.scan_cost += *scan_cost;
278 self.yield_cost += *yield_cost;
279 self.num_rows += *num_rows;
280 self.num_batches += *num_batches;
281 self.num_mem_ranges += *num_mem_ranges;
282 self.num_file_ranges += *num_file_ranges;
283 }
284
285 fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
287 let ReaderMetrics {
288 build_cost,
289 filter_metrics:
290 ReaderFilterMetrics {
291 rg_total,
292 rg_fulltext_filtered,
293 rg_inverted_filtered,
294 rg_minmax_filtered,
295 rg_bloom_filtered,
296 rows_total,
297 rows_fulltext_filtered,
298 rows_inverted_filtered,
299 rows_bloom_filtered,
300 rows_precise_filtered,
301 },
302 num_record_batches,
303 num_batches,
304 num_rows,
305 scan_cost: _,
306 } = other;
307
308 self.build_parts_cost += *build_cost;
309
310 self.rg_total += *rg_total;
311 self.rg_fulltext_filtered += *rg_fulltext_filtered;
312 self.rg_inverted_filtered += *rg_inverted_filtered;
313 self.rg_minmax_filtered += *rg_minmax_filtered;
314 self.rg_bloom_filtered += *rg_bloom_filtered;
315
316 self.rows_before_filter += *rows_total;
317 self.rows_fulltext_filtered += *rows_fulltext_filtered;
318 self.rows_inverted_filtered += *rows_inverted_filtered;
319 self.rows_bloom_filtered += *rows_bloom_filtered;
320 self.rows_precise_filtered += *rows_precise_filtered;
321
322 self.num_sst_record_batches += *num_record_batches;
323 self.num_sst_batches += *num_batches;
324 self.num_sst_rows += *num_rows;
325 }
326
327 fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
329 let SeriesDistributorMetrics {
330 num_series_send_timeout,
331 num_series_send_full,
332 num_rows,
333 num_batches,
334 scan_cost,
335 yield_cost,
336 } = distributor_metrics;
337
338 self.num_series_send_timeout += *num_series_send_timeout;
339 self.num_series_send_full += *num_series_send_full;
340 self.num_distributor_rows += *num_rows;
341 self.num_distributor_batches += *num_batches;
342 self.distributor_scan_cost += *scan_cost;
343 self.distributor_yield_cost += *yield_cost;
344 }
345
346 fn observe_metrics(&self) {
348 READ_STAGE_ELAPSED
349 .with_label_values(&["prepare_scan"])
350 .observe(self.prepare_scan_cost.as_secs_f64());
351 READ_STAGE_ELAPSED
352 .with_label_values(&["build_reader"])
353 .observe(self.build_reader_cost.as_secs_f64());
354 READ_STAGE_ELAPSED
355 .with_label_values(&["scan"])
356 .observe(self.scan_cost.as_secs_f64());
357 READ_STAGE_ELAPSED
358 .with_label_values(&["yield"])
359 .observe(self.yield_cost.as_secs_f64());
360 READ_STAGE_ELAPSED
361 .with_label_values(&["total"])
362 .observe(self.total_cost.as_secs_f64());
363 READ_ROWS_RETURN.observe(self.num_rows as f64);
364 READ_BATCHES_RETURN.observe(self.num_batches as f64);
365
366 READ_STAGE_ELAPSED
367 .with_label_values(&["build_parts"])
368 .observe(self.build_parts_cost.as_secs_f64());
369
370 READ_ROW_GROUPS_TOTAL
371 .with_label_values(&["before_filtering"])
372 .inc_by(self.rg_total as u64);
373 READ_ROW_GROUPS_TOTAL
374 .with_label_values(&["fulltext_index_filtered"])
375 .inc_by(self.rg_fulltext_filtered as u64);
376 READ_ROW_GROUPS_TOTAL
377 .with_label_values(&["inverted_index_filtered"])
378 .inc_by(self.rg_inverted_filtered as u64);
379 READ_ROW_GROUPS_TOTAL
380 .with_label_values(&["minmax_index_filtered"])
381 .inc_by(self.rg_minmax_filtered as u64);
382 READ_ROW_GROUPS_TOTAL
383 .with_label_values(&["bloom_filter_index_filtered"])
384 .inc_by(self.rg_bloom_filtered as u64);
385
386 PRECISE_FILTER_ROWS_TOTAL
387 .with_label_values(&["parquet"])
388 .inc_by(self.rows_precise_filtered as u64);
389 READ_ROWS_IN_ROW_GROUP_TOTAL
390 .with_label_values(&["before_filtering"])
391 .inc_by(self.rows_before_filter as u64);
392 READ_ROWS_IN_ROW_GROUP_TOTAL
393 .with_label_values(&["fulltext_index_filtered"])
394 .inc_by(self.rows_fulltext_filtered as u64);
395 READ_ROWS_IN_ROW_GROUP_TOTAL
396 .with_label_values(&["inverted_index_filtered"])
397 .inc_by(self.rows_inverted_filtered as u64);
398 READ_ROWS_IN_ROW_GROUP_TOTAL
399 .with_label_values(&["bloom_filter_index_filtered"])
400 .inc_by(self.rows_bloom_filtered as u64);
401 }
402}
403
404struct PartitionMetricsInner {
405 region_id: RegionId,
406 partition: usize,
408 scanner_type: &'static str,
410 query_start: Instant,
412 explain_verbose: bool,
414 metrics: Mutex<ScanMetricsSet>,
416 in_progress_scan: IntGauge,
417
418 build_parts_cost: Time,
421 build_reader_cost: Time,
423 scan_cost: Time,
425 yield_cost: Time,
427 convert_cost: Time,
429}
430
431impl PartitionMetricsInner {
432 fn on_finish(&self, stream_eof: bool) {
433 let mut metrics = self.metrics.lock().unwrap();
434 if metrics.total_cost.is_zero() {
435 metrics.total_cost = self.query_start.elapsed();
436 }
437 if !metrics.stream_eof {
438 metrics.stream_eof = stream_eof;
439 }
440 }
441}
442
443impl Drop for PartitionMetricsInner {
444 fn drop(&mut self) {
445 self.on_finish(false);
446 let metrics = self.metrics.lock().unwrap();
447 metrics.observe_metrics();
448 self.in_progress_scan.dec();
449
450 if self.explain_verbose {
451 common_telemetry::info!(
452 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
453 self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
454 );
455 } else {
456 common_telemetry::debug!(
457 "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
458 self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
459 );
460 }
461 }
462}
463
464#[derive(Default)]
466pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
467
468impl PartitionMetricsList {
469 pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
471 let mut list = self.0.lock().unwrap();
472 if list.len() <= partition {
473 list.resize(partition + 1, None);
474 }
475 list[partition] = Some(metrics);
476 }
477
478 pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
480 let list = self.0.lock().unwrap();
481 write!(f, ", \"metrics_per_partition\": ")?;
482 f.debug_list()
483 .entries(list.iter().filter_map(|p| p.as_ref()))
484 .finish()?;
485 write!(f, "}}")
486 }
487}
488
489#[derive(Clone)]
491pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
492
493impl PartitionMetrics {
494 pub(crate) fn new(
495 region_id: RegionId,
496 partition: usize,
497 scanner_type: &'static str,
498 query_start: Instant,
499 explain_verbose: bool,
500 metrics_set: &ExecutionPlanMetricsSet,
501 ) -> Self {
502 let partition_str = partition.to_string();
503 let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
504 in_progress_scan.inc();
505 let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
506 let inner = PartitionMetricsInner {
507 region_id,
508 partition,
509 scanner_type,
510 query_start,
511 explain_verbose,
512 metrics: Mutex::new(metrics),
513 in_progress_scan,
514 build_parts_cost: MetricBuilder::new(metrics_set)
515 .subset_time("build_parts_cost", partition),
516 build_reader_cost: MetricBuilder::new(metrics_set)
517 .subset_time("build_reader_cost", partition),
518 scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
519 yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
520 convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
521 };
522 Self(Arc::new(inner))
523 }
524
525 pub(crate) fn on_first_poll(&self) {
526 let mut metrics = self.0.metrics.lock().unwrap();
527 metrics.first_poll = self.0.query_start.elapsed();
528 }
529
530 pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
531 let mut metrics = self.0.metrics.lock().unwrap();
532 metrics.num_mem_ranges += num;
533 }
534
535 pub fn inc_num_file_ranges(&self, num: usize) {
536 let mut metrics = self.0.metrics.lock().unwrap();
537 metrics.num_file_ranges += num;
538 }
539
540 pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
542 self.0.build_reader_cost.add_duration(cost);
543
544 let mut metrics = self.0.metrics.lock().unwrap();
545 metrics.build_reader_cost += cost;
546 }
547
548 pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
549 self.0.convert_cost.add_duration(cost);
550 }
551
552 pub(crate) fn report_mem_scan_metrics(&self, data: &crate::memtable::MemScanMetricsData) {
554 let mut metrics = self.0.metrics.lock().unwrap();
555 metrics.mem_scan_cost += data.scan_cost;
556 metrics.mem_rows += data.num_rows;
557 metrics.mem_batches += data.num_batches;
558 metrics.mem_series += data.total_series;
559 }
560
561 pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
563 self.0
564 .build_reader_cost
565 .add_duration(metrics.build_reader_cost);
566 self.0.scan_cost.add_duration(metrics.scan_cost);
567 self.0.yield_cost.add_duration(metrics.yield_cost);
568
569 let mut metrics_set = self.0.metrics.lock().unwrap();
570 metrics_set.merge_scanner_metrics(metrics);
571 }
572
573 pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
575 self.0.build_parts_cost.add_duration(metrics.build_cost);
576
577 let mut metrics_set = self.0.metrics.lock().unwrap();
578 metrics_set.merge_reader_metrics(metrics);
579 }
580
581 pub(crate) fn on_finish(&self) {
583 self.0.on_finish(true);
584 }
585
586 pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
588 let mut metrics_set = self.0.metrics.lock().unwrap();
589 metrics_set.set_distributor_metrics(metrics);
590 }
591}
592
593impl fmt::Debug for PartitionMetrics {
594 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
595 let metrics = self.0.metrics.lock().unwrap();
596 write!(
597 f,
598 r#"{{"partition":{}, "metrics":{:?}}}"#,
599 self.0.partition, metrics
600 )
601 }
602}
603
604#[derive(Default)]
606pub(crate) struct SeriesDistributorMetrics {
607 pub(crate) num_series_send_timeout: usize,
609 pub(crate) num_series_send_full: usize,
611 pub(crate) num_rows: usize,
613 pub(crate) num_batches: usize,
615 pub(crate) scan_cost: Duration,
617 pub(crate) yield_cost: Duration,
619}
620
621pub(crate) fn scan_mem_ranges(
623 stream_ctx: Arc<StreamContext>,
624 part_metrics: PartitionMetrics,
625 index: RowGroupIndex,
626 time_range: FileTimeRange,
627) -> impl Stream<Item = Result<Batch>> {
628 try_stream! {
629 let ranges = stream_ctx.input.build_mem_ranges(index);
630 part_metrics.inc_num_mem_ranges(ranges.len());
631 for range in ranges {
632 let build_reader_start = Instant::now();
633 let mem_scan_metrics = Some(MemScanMetrics::default());
634 let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?;
635 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
636
637 let mut source = Source::Iter(iter);
638 while let Some(batch) = source.next_batch().await? {
639 yield batch;
640 }
641
642 if let Some(ref metrics) = mem_scan_metrics {
644 let data = metrics.data();
645 part_metrics.report_mem_scan_metrics(&data);
646 }
647 }
648 }
649}
650
651#[allow(dead_code)]
653pub(crate) fn scan_flat_mem_ranges(
654 stream_ctx: Arc<StreamContext>,
655 part_metrics: PartitionMetrics,
656 index: RowGroupIndex,
657) -> impl Stream<Item = Result<RecordBatch>> {
658 try_stream! {
659 let ranges = stream_ctx.input.build_mem_ranges(index);
660 part_metrics.inc_num_mem_ranges(ranges.len());
661 for range in ranges {
662 let build_reader_start = Instant::now();
663 let mem_scan_metrics = Some(MemScanMetrics::default());
664 let mut iter = range.build_record_batch_iter(mem_scan_metrics.clone())?;
665 part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
666
667 while let Some(record_batch) = iter.next().transpose()? {
668 yield record_batch;
669 }
670
671 if let Some(ref metrics) = mem_scan_metrics {
673 let data = metrics.data();
674 part_metrics.report_mem_scan_metrics(&data);
675 }
676 }
677 }
678}
679
680pub(crate) async fn scan_file_ranges(
682 stream_ctx: Arc<StreamContext>,
683 part_metrics: PartitionMetrics,
684 index: RowGroupIndex,
685 read_type: &'static str,
686 range_builder: Arc<RangeBuilderList>,
687) -> Result<impl Stream<Item = Result<Batch>>> {
688 let mut reader_metrics = ReaderMetrics::default();
689 let ranges = range_builder
690 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
691 .await?;
692 part_metrics.inc_num_file_ranges(ranges.len());
693 part_metrics.merge_reader_metrics(&reader_metrics);
694
695 Ok(build_file_range_scan_stream(
696 stream_ctx,
697 part_metrics,
698 read_type,
699 ranges,
700 ))
701}
702
703#[allow(dead_code)]
705pub(crate) async fn scan_flat_file_ranges(
706 stream_ctx: Arc<StreamContext>,
707 part_metrics: PartitionMetrics,
708 index: RowGroupIndex,
709 read_type: &'static str,
710 range_builder: Arc<RangeBuilderList>,
711) -> Result<impl Stream<Item = Result<RecordBatch>>> {
712 let mut reader_metrics = ReaderMetrics::default();
713 let ranges = range_builder
714 .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
715 .await?;
716 part_metrics.inc_num_file_ranges(ranges.len());
717 part_metrics.merge_reader_metrics(&reader_metrics);
718
719 Ok(build_flat_file_range_scan_stream(
720 stream_ctx,
721 part_metrics,
722 read_type,
723 ranges,
724 ))
725}
726
727pub fn build_file_range_scan_stream(
729 stream_ctx: Arc<StreamContext>,
730 part_metrics: PartitionMetrics,
731 read_type: &'static str,
732 ranges: SmallVec<[FileRange; 2]>,
733) -> impl Stream<Item = Result<Batch>> {
734 try_stream! {
735 let reader_metrics = &mut ReaderMetrics::default();
736 for range in ranges {
737 let build_reader_start = Instant::now();
738 let reader = range.reader(stream_ctx.input.series_row_selector).await?;
739 let build_cost = build_reader_start.elapsed();
740 part_metrics.inc_build_reader_cost(build_cost);
741 let compat_batch = range.compat_batch();
742 let mut source = Source::PruneReader(reader);
743 while let Some(mut batch) = source.next_batch().await? {
744 if let Some(compact_batch) = compat_batch {
745 batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?;
746 }
747 yield batch;
748 }
749 if let Source::PruneReader(reader) = source {
750 let prune_metrics = reader.metrics();
751 reader_metrics.merge_from(&prune_metrics);
752 }
753 }
754
755 reader_metrics.observe_rows(read_type);
757 reader_metrics.filter_metrics.observe();
758 part_metrics.merge_reader_metrics(reader_metrics);
759 }
760}
761
762pub fn build_flat_file_range_scan_stream(
764 _stream_ctx: Arc<StreamContext>,
765 part_metrics: PartitionMetrics,
766 read_type: &'static str,
767 ranges: SmallVec<[FileRange; 2]>,
768) -> impl Stream<Item = Result<RecordBatch>> {
769 try_stream! {
770 let reader_metrics = &mut ReaderMetrics::default();
771 for range in ranges {
772 let build_reader_start = Instant::now();
773 let mut reader = range.flat_reader().await?;
774 let build_cost = build_reader_start.elapsed();
775 part_metrics.inc_build_reader_cost(build_cost);
776
777 let may_compat = range
778 .compat_batch()
779 .map(|compat| {
780 compat.as_flat().context(UnexpectedSnafu {
781 reason: "Invalid compat for flat format",
782 })
783 })
784 .transpose()?;
785 while let Some(record_batch) = reader.next_batch()? {
786 if let Some(flat_compat) = may_compat {
787 let batch = flat_compat.compat(record_batch)?;
788 yield batch;
789 } else {
790 yield record_batch;
791 }
792 }
793
794 let prune_metrics = reader.metrics();
795 reader_metrics.merge_from(&prune_metrics);
796 }
797
798 reader_metrics.observe_rows(read_type);
800 reader_metrics.filter_metrics.observe();
801 part_metrics.merge_reader_metrics(reader_metrics);
802 }
803}
804
805#[cfg(feature = "enterprise")]
807pub(crate) async fn scan_extension_range(
808 context: Arc<StreamContext>,
809 index: RowGroupIndex,
810 partition_metrics: PartitionMetrics,
811) -> Result<BoxedBatchStream> {
812 use snafu::ResultExt;
813
814 let range = context.input.extension_range(index.index);
815 let reader = range.reader(context.as_ref());
816 let stream = reader
817 .read(context, partition_metrics, index)
818 .await
819 .context(crate::error::ScanExternalRangeSnafu)?;
820 Ok(stream)
821}
822
823pub(crate) async fn maybe_scan_other_ranges(
824 context: &Arc<StreamContext>,
825 index: RowGroupIndex,
826 metrics: &PartitionMetrics,
827) -> Result<BoxedBatchStream> {
828 #[cfg(feature = "enterprise")]
829 {
830 scan_extension_range(context.clone(), index, metrics.clone()).await
831 }
832
833 #[cfg(not(feature = "enterprise"))]
834 {
835 let _ = context;
836 let _ = index;
837 let _ = metrics;
838
839 crate::error::UnexpectedSnafu {
840 reason: "no other ranges scannable",
841 }
842 .fail()
843 }
844}
845
846#[allow(dead_code)]
847pub(crate) async fn maybe_scan_flat_other_ranges(
848 context: &Arc<StreamContext>,
849 index: RowGroupIndex,
850 metrics: &PartitionMetrics,
851) -> Result<BoxedRecordBatchStream> {
852 let _ = context;
853 let _ = index;
854 let _ = metrics;
855
856 crate::error::UnexpectedSnafu {
857 reason: "no other ranges scannable in flat format",
858 }
859 .fail()
860}