1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta, file_range_counts};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48 scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49 should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58pub struct SeqScan {
63 properties: ScannerProperties,
65 stream_ctx: Arc<StreamContext>,
67 metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 metrics_list: PartitionMetricsList::default(),
86 }
87 }
88
89 #[tracing::instrument(
94 skip_all,
95 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
96 )]
97 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
98 let metrics_set = ExecutionPlanMetricsSet::new();
99 let streams = (0..self.properties.partitions.len())
100 .map(|partition: usize| {
101 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
102 })
103 .collect::<Result<Vec<_>, _>>()?;
104
105 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
106 Ok(Box::pin(aggr_stream))
107 }
108
109 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
111 let metrics_set = ExecutionPlanMetricsSet::new();
112
113 let streams = (0..self.properties.partitions.len())
114 .map(|partition| {
115 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
116 self.scan_batch_in_partition(partition, metrics)
117 })
118 .collect::<Result<Vec<_>>>()?;
119
120 Ok(Box::pin(futures::stream::iter(streams).flatten()))
121 }
122
123 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
128 assert!(self.stream_ctx.input.compaction);
129
130 let metrics_set = ExecutionPlanMetricsSet::new();
131 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
132 debug_assert_eq!(1, self.properties.partitions.len());
133 let partition_ranges = &self.properties.partitions[0];
134
135 let reader = Self::merge_all_ranges_for_compaction(
136 &self.stream_ctx,
137 partition_ranges,
138 &part_metrics,
139 )
140 .await?;
141 Ok(Box::new(reader))
142 }
143
144 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
149 assert!(self.stream_ctx.input.compaction);
150
151 let metrics_set = ExecutionPlanMetricsSet::new();
152 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
153 debug_assert_eq!(1, self.properties.partitions.len());
154 let partition_ranges = &self.properties.partitions[0];
155
156 let reader = Self::merge_all_flat_ranges_for_compaction(
157 &self.stream_ctx,
158 partition_ranges,
159 &part_metrics,
160 )
161 .await?;
162 Ok(reader)
163 }
164
165 async fn merge_all_ranges_for_compaction(
168 stream_ctx: &Arc<StreamContext>,
169 partition_ranges: &[PartitionRange],
170 part_metrics: &PartitionMetrics,
171 ) -> Result<BoxedBatchReader> {
172 let mut sources = Vec::new();
173 let counts = file_range_counts(
174 stream_ctx.input.num_memtables(),
175 stream_ctx.input.num_files(),
176 &stream_ctx.ranges,
177 partition_ranges.iter(),
178 );
179 let range_builder_list = Arc::new(RangeBuilderList::new(
180 stream_ctx.input.num_memtables(),
181 counts,
182 ));
183 for part_range in partition_ranges {
184 build_sources(
185 stream_ctx,
186 part_range,
187 true,
188 part_metrics,
189 range_builder_list.clone(),
190 &mut sources,
191 None,
192 )
193 .await?;
194 }
195
196 common_telemetry::debug!(
197 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
198 stream_ctx.input.mapper.metadata().region_id,
199 partition_ranges.len(),
200 sources.len()
201 );
202 Self::build_reader_from_sources(stream_ctx, sources, None, None).await
203 }
204
205 async fn merge_all_flat_ranges_for_compaction(
208 stream_ctx: &Arc<StreamContext>,
209 partition_ranges: &[PartitionRange],
210 part_metrics: &PartitionMetrics,
211 ) -> Result<BoxedRecordBatchStream> {
212 let mut sources = Vec::new();
213 let counts = file_range_counts(
214 stream_ctx.input.num_memtables(),
215 stream_ctx.input.num_files(),
216 &stream_ctx.ranges,
217 partition_ranges.iter(),
218 );
219 let range_builder_list = Arc::new(RangeBuilderList::new(
220 stream_ctx.input.num_memtables(),
221 counts,
222 ));
223 for part_range in partition_ranges {
224 build_flat_sources(
225 stream_ctx,
226 part_range,
227 true,
228 part_metrics,
229 range_builder_list.clone(),
230 &mut sources,
231 None,
232 )
233 .await?;
234 }
235
236 common_telemetry::debug!(
237 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
238 stream_ctx.input.mapper.metadata().region_id,
239 partition_ranges.len(),
240 sources.len()
241 );
242 Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
243 }
244
245 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
248 pub(crate) async fn build_reader_from_sources(
249 stream_ctx: &StreamContext,
250 mut sources: Vec<Source>,
251 semaphore: Option<Arc<Semaphore>>,
252 part_metrics: Option<&PartitionMetrics>,
253 ) -> Result<BoxedBatchReader> {
254 if let Some(semaphore) = semaphore.as_ref() {
255 if sources.len() > 1 {
257 sources = stream_ctx
258 .input
259 .create_parallel_sources(sources, semaphore.clone())?;
260 }
261 }
262
263 let mut builder = MergeReaderBuilder::from_sources(sources);
264 if let Some(metrics) = part_metrics {
265 builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
266 }
267 let reader = builder.build().await?;
268
269 let dedup = !stream_ctx.input.append_mode;
270 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
271 let reader = if dedup {
272 match stream_ctx.input.merge_mode {
273 MergeMode::LastRow => Box::new(DedupReader::new(
274 reader,
275 LastRow::new(stream_ctx.input.filter_deleted),
276 dedup_metrics_reporter,
277 )) as _,
278 MergeMode::LastNonNull => Box::new(DedupReader::new(
279 reader,
280 LastNonNull::new(stream_ctx.input.filter_deleted),
281 dedup_metrics_reporter,
282 )) as _,
283 }
284 } else {
285 Box::new(reader) as _
286 };
287
288 let reader = match &stream_ctx.input.series_row_selector {
289 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
290 None => reader,
291 };
292
293 Ok(reader)
294 }
295
296 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
299 pub(crate) async fn build_flat_reader_from_sources(
300 stream_ctx: &StreamContext,
301 mut sources: Vec<BoxedRecordBatchStream>,
302 semaphore: Option<Arc<Semaphore>>,
303 part_metrics: Option<&PartitionMetrics>,
304 ) -> Result<BoxedRecordBatchStream> {
305 if let Some(semaphore) = semaphore.as_ref() {
306 if sources.len() > 1 {
308 sources = stream_ctx
309 .input
310 .create_parallel_flat_sources(sources, semaphore.clone())?;
311 }
312 }
313
314 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
315 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
316
317 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
318 let reader =
319 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
320 .await?;
321
322 let dedup = !stream_ctx.input.append_mode;
323 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
324 let reader = if dedup {
325 match stream_ctx.input.merge_mode {
326 MergeMode::LastRow => Box::pin(
327 FlatDedupReader::new(
328 reader.into_stream().boxed(),
329 FlatLastRow::new(stream_ctx.input.filter_deleted),
330 dedup_metrics_reporter,
331 )
332 .into_stream(),
333 ) as _,
334 MergeMode::LastNonNull => Box::pin(
335 FlatDedupReader::new(
336 reader.into_stream().boxed(),
337 FlatLastNonNull::new(
338 mapper.field_column_start(),
339 stream_ctx.input.filter_deleted,
340 ),
341 dedup_metrics_reporter,
342 )
343 .into_stream(),
344 ) as _,
345 }
346 } else {
347 Box::pin(reader.into_stream()) as _
348 };
349
350 Ok(reader)
351 }
352
353 fn scan_partition_impl(
356 &self,
357 ctx: &QueryScanContext,
358 metrics_set: &ExecutionPlanMetricsSet,
359 partition: usize,
360 ) -> Result<SendableRecordBatchStream> {
361 if ctx.explain_verbose {
362 common_telemetry::info!(
363 "SeqScan partition {}, region_id: {}",
364 partition,
365 self.stream_ctx.input.region_metadata().region_id
366 );
367 }
368
369 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
370 let input = &self.stream_ctx.input;
371
372 let batch_stream = if input.flat_format {
373 self.scan_flat_batch_in_partition(partition, metrics.clone())?
375 } else {
376 self.scan_batch_in_partition(partition, metrics.clone())?
378 };
379 let record_batch_stream = ConvertBatchStream::new(
380 batch_stream,
381 input.mapper.clone(),
382 input.cache_strategy.clone(),
383 metrics,
384 );
385
386 Ok(Box::pin(RecordBatchStreamWrapper::new(
387 input.mapper.output_schema(),
388 Box::pin(record_batch_stream),
389 )))
390 }
391
392 #[tracing::instrument(
393 skip_all,
394 fields(
395 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
396 partition = partition
397 )
398 )]
399 fn scan_batch_in_partition(
400 &self,
401 partition: usize,
402 part_metrics: PartitionMetrics,
403 ) -> Result<ScanBatchStream> {
404 ensure!(
405 partition < self.properties.partitions.len(),
406 PartitionOutOfRangeSnafu {
407 given: partition,
408 all: self.properties.partitions.len(),
409 }
410 );
411
412 if self.properties.partitions[partition].is_empty() {
413 return Ok(Box::pin(futures::stream::empty()));
414 }
415
416 let stream_ctx = self.stream_ctx.clone();
417 let semaphore = self.new_semaphore();
418 let partition_ranges = self.properties.partitions[partition].clone();
419 let compaction = self.stream_ctx.input.compaction;
420 let distinguish_range = self.properties.distinguish_partition_range;
421 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
422
423 let stream = try_stream! {
424 part_metrics.on_first_poll();
425 let mut fetch_start = Instant::now();
428
429 let counts = file_range_counts(
430 stream_ctx.input.num_memtables(),
431 stream_ctx.input.num_files(),
432 &stream_ctx.ranges,
433 partition_ranges.iter(),
434 );
435 let range_builder_list = Arc::new(RangeBuilderList::new(
436 stream_ctx.input.num_memtables(),
437 counts,
438 ));
439 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
440 reason: "Unexpected format",
441 })?;
442 for part_range in partition_ranges {
444 let mut sources = Vec::new();
445 build_sources(
446 &stream_ctx,
447 &part_range,
448 compaction,
449 &part_metrics,
450 range_builder_list.clone(),
451 &mut sources,
452 file_scan_semaphore.clone(),
453 ).await?;
454
455 let mut reader =
456 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
457 .await?;
458 #[cfg(debug_assertions)]
459 let mut checker = crate::read::BatchChecker::default()
460 .with_start(Some(part_range.start))
461 .with_end(Some(part_range.end));
462
463 let mut metrics = ScannerMetrics {
464 scan_cost: fetch_start.elapsed(),
465 ..Default::default()
466 };
467 fetch_start = Instant::now();
468
469 while let Some(batch) = reader.next_batch().await? {
470 metrics.scan_cost += fetch_start.elapsed();
471 metrics.num_batches += 1;
472 metrics.num_rows += batch.num_rows();
473
474 debug_assert!(!batch.is_empty());
475 if batch.is_empty() {
476 fetch_start = Instant::now();
477 continue;
478 }
479
480 #[cfg(debug_assertions)]
481 checker.ensure_part_range_batch(
482 "SeqScan",
483 _mapper.metadata().region_id,
484 partition,
485 part_range,
486 &batch,
487 );
488
489 let yield_start = Instant::now();
490 yield ScanBatch::Normal(batch);
491 metrics.yield_cost += yield_start.elapsed();
492
493 fetch_start = Instant::now();
494 }
495
496 if distinguish_range {
499 let yield_start = Instant::now();
500 yield ScanBatch::Normal(Batch::empty());
501 metrics.yield_cost += yield_start.elapsed();
502 }
503
504 metrics.scan_cost += fetch_start.elapsed();
505 fetch_start = Instant::now();
506 part_metrics.merge_metrics(&metrics);
507 }
508
509 part_metrics.on_finish();
510 };
511 Ok(Box::pin(stream))
512 }
513
514 #[tracing::instrument(
515 skip_all,
516 fields(
517 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
518 partition = partition
519 )
520 )]
521 fn scan_flat_batch_in_partition(
522 &self,
523 partition: usize,
524 part_metrics: PartitionMetrics,
525 ) -> Result<ScanBatchStream> {
526 ensure!(
527 partition < self.properties.partitions.len(),
528 PartitionOutOfRangeSnafu {
529 given: partition,
530 all: self.properties.partitions.len(),
531 }
532 );
533
534 if self.properties.partitions[partition].is_empty() {
535 return Ok(Box::pin(futures::stream::empty()));
536 }
537
538 let stream_ctx = self.stream_ctx.clone();
539 let semaphore = self.new_semaphore();
540 let partition_ranges = self.properties.partitions[partition].clone();
541 let compaction = self.stream_ctx.input.compaction;
542 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
543
544 let stream = try_stream! {
545 part_metrics.on_first_poll();
546 let mut fetch_start = Instant::now();
549
550 let counts = file_range_counts(
551 stream_ctx.input.num_memtables(),
552 stream_ctx.input.num_files(),
553 &stream_ctx.ranges,
554 partition_ranges.iter(),
555 );
556 let range_builder_list = Arc::new(RangeBuilderList::new(
557 stream_ctx.input.num_memtables(),
558 counts,
559 ));
560 for part_range in partition_ranges {
562 let mut sources = Vec::new();
563 build_flat_sources(
564 &stream_ctx,
565 &part_range,
566 compaction,
567 &part_metrics,
568 range_builder_list.clone(),
569 &mut sources,
570 file_scan_semaphore.clone(),
571 ).await?;
572
573 let mut reader =
574 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
575 .await?;
576
577 let mut metrics = ScannerMetrics {
578 scan_cost: fetch_start.elapsed(),
579 ..Default::default()
580 };
581 fetch_start = Instant::now();
582
583 while let Some(record_batch) = reader.try_next().await? {
584 metrics.scan_cost += fetch_start.elapsed();
585 metrics.num_batches += 1;
586 metrics.num_rows += record_batch.num_rows();
587
588 debug_assert!(record_batch.num_rows() > 0);
589 if record_batch.num_rows() == 0 {
590 fetch_start = Instant::now();
591 continue;
592 }
593
594 let yield_start = Instant::now();
595 yield ScanBatch::RecordBatch(record_batch);
596 metrics.yield_cost += yield_start.elapsed();
597
598 fetch_start = Instant::now();
599 }
600
601 metrics.scan_cost += fetch_start.elapsed();
602 fetch_start = Instant::now();
603 part_metrics.merge_metrics(&metrics);
604 }
605
606 part_metrics.on_finish();
607 };
608 Ok(Box::pin(stream))
609 }
610
611 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
612 if self.properties.target_partitions() > self.properties.num_partitions() {
613 Some(Arc::new(Semaphore::new(
619 self.properties.target_partitions() - self.properties.num_partitions() + 1,
620 )))
621 } else {
622 None
623 }
624 }
625
626 fn new_partition_metrics(
629 &self,
630 explain_verbose: bool,
631 metrics_set: &ExecutionPlanMetricsSet,
632 partition: usize,
633 ) -> PartitionMetrics {
634 let metrics = PartitionMetrics::new(
635 self.stream_ctx.input.mapper.metadata().region_id,
636 partition,
637 get_scanner_type(self.stream_ctx.input.compaction),
638 self.stream_ctx.query_start,
639 explain_verbose,
640 metrics_set,
641 );
642
643 if !self.stream_ctx.input.compaction {
644 self.metrics_list.set(partition, metrics.clone());
645 }
646
647 metrics
648 }
649
650 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
652 partition_ranges
653 .iter()
654 .map(|part_range| {
655 let range_meta = &ranges[part_range.identifier];
656 range_meta.indices.len()
657 })
658 .max()
659 .unwrap_or(0)
660 }
661
662 pub(crate) fn check_scan_limit(&self) -> Result<()> {
664 let total_max_files: usize = self
666 .properties
667 .partitions
668 .iter()
669 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
670 .sum();
671
672 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
673 if total_max_files > max_concurrent_files {
674 return TooManyFilesToReadSnafu {
675 actual: total_max_files,
676 max: max_concurrent_files,
677 }
678 .fail();
679 }
680
681 Ok(())
682 }
683}
684
685impl RegionScanner for SeqScan {
686 fn name(&self) -> &str {
687 "SeqScan"
688 }
689
690 fn properties(&self) -> &ScannerProperties {
691 &self.properties
692 }
693
694 fn schema(&self) -> SchemaRef {
695 self.stream_ctx.input.mapper.output_schema()
696 }
697
698 fn metadata(&self) -> RegionMetadataRef {
699 self.stream_ctx.input.mapper.metadata().clone()
700 }
701
702 fn scan_partition(
703 &self,
704 ctx: &QueryScanContext,
705 metrics_set: &ExecutionPlanMetricsSet,
706 partition: usize,
707 ) -> Result<SendableRecordBatchStream, BoxedError> {
708 self.scan_partition_impl(ctx, metrics_set, partition)
709 .map_err(BoxedError::new)
710 }
711
712 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
713 self.properties.prepare(request);
714
715 self.check_scan_limit().map_err(BoxedError::new)?;
716
717 Ok(())
718 }
719
720 fn has_predicate_without_region(&self) -> bool {
721 let predicate = self
722 .stream_ctx
723 .input
724 .predicate_group()
725 .predicate_without_region();
726 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
727 }
728
729 fn set_logical_region(&mut self, logical_region: bool) {
730 self.properties.set_logical_region(logical_region);
731 }
732}
733
734impl DisplayAs for SeqScan {
735 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
736 write!(
737 f,
738 "SeqScan: region={}, ",
739 self.stream_ctx.input.mapper.metadata().region_id
740 )?;
741 match t {
742 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
744 self.stream_ctx.format_for_explain(false, f)
745 }
746 DisplayFormatType::Verbose => {
747 self.stream_ctx.format_for_explain(true, f)?;
748 self.metrics_list.format_verbose_metrics(f)
749 }
750 }
751 }
752}
753
754impl fmt::Debug for SeqScan {
755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756 f.debug_struct("SeqScan")
757 .field("num_ranges", &self.stream_ctx.ranges.len())
758 .finish()
759 }
760}
761
762pub(crate) async fn build_sources(
764 stream_ctx: &Arc<StreamContext>,
765 part_range: &PartitionRange,
766 compaction: bool,
767 part_metrics: &PartitionMetrics,
768 range_builder_list: Arc<RangeBuilderList>,
769 sources: &mut Vec<Source>,
770 semaphore: Option<Arc<Semaphore>>,
771) -> Result<()> {
772 let range_meta = &stream_ctx.ranges[part_range.identifier];
774 #[cfg(debug_assertions)]
775 if compaction {
776 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
778 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
779 debug_assert_eq!(
781 -1, row_group_idx.row_group_index,
782 "Expect {} range scan all row groups, given: {}",
783 i, row_group_idx.row_group_index,
784 );
785 }
786 }
787
788 let read_type = if compaction {
789 "compaction"
790 } else {
791 "seq_scan_files"
792 };
793 let num_indices = range_meta.row_group_indices.len();
794 if num_indices == 0 {
795 return Ok(());
796 }
797
798 sources.reserve(num_indices);
799 let mut ordered_sources = Vec::with_capacity(num_indices);
800 ordered_sources.resize_with(num_indices, || None);
801 let mut file_scan_tasks = Vec::new();
802
803 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
804 if stream_ctx.is_mem_range_index(*index) {
805 let stream = scan_mem_ranges(
806 stream_ctx.clone(),
807 part_metrics.clone(),
808 *index,
809 range_meta.time_range,
810 );
811 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
812 } else if stream_ctx.is_file_range_index(*index) {
813 if let Some(semaphore_ref) = semaphore.as_ref() {
814 let stream_ctx = stream_ctx.clone();
816 let part_metrics = part_metrics.clone();
817 let range_builder_list = range_builder_list.clone();
818 let semaphore = Arc::clone(semaphore_ref);
819 let row_group_index = *index;
820 file_scan_tasks.push(async move {
821 let _permit = semaphore.acquire().await.unwrap();
822 let stream = scan_file_ranges(
823 stream_ctx,
824 part_metrics,
825 row_group_index,
826 read_type,
827 range_builder_list,
828 )
829 .await?;
830 Ok((position, Source::Stream(Box::pin(stream) as _)))
831 });
832 } else {
833 let stream = scan_file_ranges(
835 stream_ctx.clone(),
836 part_metrics.clone(),
837 *index,
838 read_type,
839 range_builder_list.clone(),
840 )
841 .await?;
842 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
843 }
844 } else {
845 let stream =
846 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
847 ordered_sources[position] = Some(Source::Stream(stream));
848 }
849 }
850
851 if !file_scan_tasks.is_empty() {
852 let results = futures::future::try_join_all(file_scan_tasks).await?;
853 for (position, source) in results {
854 ordered_sources[position] = Some(source);
855 }
856 }
857
858 for source in ordered_sources.into_iter().flatten() {
859 sources.push(source);
860 }
861 Ok(())
862}
863
864pub(crate) async fn build_flat_sources(
866 stream_ctx: &Arc<StreamContext>,
867 part_range: &PartitionRange,
868 compaction: bool,
869 part_metrics: &PartitionMetrics,
870 range_builder_list: Arc<RangeBuilderList>,
871 sources: &mut Vec<BoxedRecordBatchStream>,
872 semaphore: Option<Arc<Semaphore>>,
873) -> Result<()> {
874 let range_meta = &stream_ctx.ranges[part_range.identifier];
876 #[cfg(debug_assertions)]
877 if compaction {
878 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
880 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
881 debug_assert_eq!(
883 -1, row_group_idx.row_group_index,
884 "Expect {} range scan all row groups, given: {}",
885 i, row_group_idx.row_group_index,
886 );
887 }
888 }
889
890 let read_type = if compaction {
891 "compaction"
892 } else {
893 "seq_scan_files"
894 };
895 let num_indices = range_meta.row_group_indices.len();
896 if num_indices == 0 {
897 return Ok(());
898 }
899
900 let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
901 sources.reserve(num_indices);
902 let mut ordered_sources = Vec::with_capacity(num_indices);
903 ordered_sources.resize_with(num_indices, || None);
904 let mut file_scan_tasks = Vec::new();
905
906 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
907 if stream_ctx.is_mem_range_index(*index) {
908 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
909 ordered_sources[position] = Some(Box::pin(stream) as _);
910 } else if stream_ctx.is_file_range_index(*index) {
911 if let Some(semaphore_ref) = semaphore.as_ref() {
912 let stream_ctx = stream_ctx.clone();
914 let part_metrics = part_metrics.clone();
915 let range_builder_list = range_builder_list.clone();
916 let semaphore = Arc::clone(semaphore_ref);
917 let row_group_index = *index;
918 file_scan_tasks.push(async move {
919 let _permit = semaphore.acquire().await.unwrap();
920 let stream = scan_flat_file_ranges(
921 stream_ctx,
922 part_metrics,
923 row_group_index,
924 read_type,
925 range_builder_list,
926 )
927 .await?;
928 Ok((position, Box::pin(stream) as _))
929 });
930 } else {
931 let stream = scan_flat_file_ranges(
933 stream_ctx.clone(),
934 part_metrics.clone(),
935 *index,
936 read_type,
937 range_builder_list.clone(),
938 )
939 .await?;
940 ordered_sources[position] = Some(Box::pin(stream) as _);
941 }
942 } else {
943 let stream =
944 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
945 ordered_sources[position] = Some(stream);
946 }
947 }
948
949 if !file_scan_tasks.is_empty() {
950 let results = futures::future::try_join_all(file_scan_tasks).await?;
951 for (position, stream) in results {
952 ordered_sources[position] = Some(stream);
953 }
954 }
955
956 for stream in ordered_sources.into_iter().flatten() {
957 if should_split {
958 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
959 } else {
960 sources.push(stream);
961 }
962 }
963
964 if should_split {
965 common_telemetry::debug!(
966 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
967 stream_ctx.input.region_metadata().region_id,
968 sources.len(),
969 part_range,
970 );
971 }
972
973 Ok(())
974}
975
976#[cfg(test)]
977impl SeqScan {
978 pub(crate) fn input(&self) -> &ScanInput {
980 &self.stream_ctx.input
981 }
982}
983
984fn get_scanner_type(compaction: bool) -> &'static str {
986 if compaction {
987 "SeqScan(compaction)"
988 } else {
989 "SeqScan"
990 }
991}