1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::{FlatLastRowReader, LastRowReader};
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::pruner::{PartitionPruner, Pruner};
45use crate::read::range::RangeMeta;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{
48 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
49 scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
50 should_split_flat_batches_for_merge,
51};
52use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
53use crate::read::{
54 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
55};
56use crate::region::options::MergeMode;
57use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
58
59pub struct SeqScan {
64 properties: ScannerProperties,
66 stream_ctx: Arc<StreamContext>,
68 pruner: Arc<Pruner>,
70 metrics_list: PartitionMetricsList,
73}
74
75impl SeqScan {
76 pub(crate) fn new(input: ScanInput) -> Self {
79 let mut properties = ScannerProperties::default()
80 .with_append_mode(input.append_mode)
81 .with_total_rows(input.total_rows());
82 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
83 properties.partitions = vec![stream_ctx.partition_ranges()];
84
85 let num_workers = common_stat::get_total_cpu_cores().max(1);
87 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
88
89 Self {
90 properties,
91 stream_ctx,
92 pruner,
93 metrics_list: PartitionMetricsList::default(),
94 }
95 }
96
97 #[tracing::instrument(
102 skip_all,
103 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
104 )]
105 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
106 let metrics_set = ExecutionPlanMetricsSet::new();
107 let streams = (0..self.properties.partitions.len())
108 .map(|partition: usize| {
109 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
110 })
111 .collect::<Result<Vec<_>, _>>()?;
112
113 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
114 Ok(Box::pin(aggr_stream))
115 }
116
117 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
119 let metrics_set = ExecutionPlanMetricsSet::new();
120
121 let streams = (0..self.properties.partitions.len())
122 .map(|partition| {
123 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
124 self.scan_batch_in_partition(partition, metrics)
125 })
126 .collect::<Result<Vec<_>>>()?;
127
128 Ok(Box::pin(futures::stream::iter(streams).flatten()))
129 }
130
131 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
136 assert!(self.stream_ctx.input.compaction);
137
138 let metrics_set = ExecutionPlanMetricsSet::new();
139 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
140 debug_assert_eq!(1, self.properties.partitions.len());
141 let partition_ranges = &self.properties.partitions[0];
142
143 let reader = Self::merge_all_flat_ranges_for_compaction(
144 &self.stream_ctx,
145 partition_ranges,
146 &part_metrics,
147 self.pruner.clone(),
148 )
149 .await?;
150 Ok(reader)
151 }
152
153 async fn merge_all_flat_ranges_for_compaction(
156 stream_ctx: &Arc<StreamContext>,
157 partition_ranges: &[PartitionRange],
158 part_metrics: &PartitionMetrics,
159 pruner: Arc<Pruner>,
160 ) -> Result<BoxedRecordBatchStream> {
161 pruner.add_partition_ranges(partition_ranges);
162 let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
163
164 let mut sources = Vec::new();
165 for part_range in partition_ranges {
166 build_flat_sources(
167 stream_ctx,
168 part_range,
169 true,
170 part_metrics,
171 partition_pruner.clone(),
172 &mut sources,
173 None,
174 )
175 .await?;
176 }
177
178 common_telemetry::debug!(
179 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
180 stream_ctx.input.mapper.metadata().region_id,
181 partition_ranges.len(),
182 sources.len()
183 );
184 Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
185 }
186
187 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
190 pub(crate) async fn build_reader_from_sources(
191 stream_ctx: &StreamContext,
192 mut sources: Vec<Source>,
193 semaphore: Option<Arc<Semaphore>>,
194 part_metrics: Option<&PartitionMetrics>,
195 ) -> Result<BoxedBatchReader> {
196 if let Some(semaphore) = semaphore.as_ref() {
197 if sources.len() > 1 {
199 sources = stream_ctx
200 .input
201 .create_parallel_sources(sources, semaphore.clone())?;
202 }
203 }
204
205 let mut builder = MergeReaderBuilder::from_sources(sources);
206 if let Some(metrics) = part_metrics {
207 builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
208 }
209 let reader = builder.build().await?;
210
211 let dedup = !stream_ctx.input.append_mode;
212 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
213 let reader = if dedup {
214 match stream_ctx.input.merge_mode {
215 MergeMode::LastRow => Box::new(DedupReader::new(
216 reader,
217 LastRow::new(stream_ctx.input.filter_deleted),
218 dedup_metrics_reporter,
219 )) as _,
220 MergeMode::LastNonNull => Box::new(DedupReader::new(
221 reader,
222 LastNonNull::new(stream_ctx.input.filter_deleted),
223 dedup_metrics_reporter,
224 )) as _,
225 }
226 } else {
227 Box::new(reader) as _
228 };
229
230 let reader = match &stream_ctx.input.series_row_selector {
231 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
232 None => reader,
233 };
234
235 Ok(reader)
236 }
237
238 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
241 pub(crate) async fn build_flat_reader_from_sources(
242 stream_ctx: &StreamContext,
243 mut sources: Vec<BoxedRecordBatchStream>,
244 semaphore: Option<Arc<Semaphore>>,
245 part_metrics: Option<&PartitionMetrics>,
246 ) -> Result<BoxedRecordBatchStream> {
247 if let Some(semaphore) = semaphore.as_ref() {
248 if sources.len() > 1 {
250 sources = stream_ctx
251 .input
252 .create_parallel_flat_sources(sources, semaphore.clone())?;
253 }
254 }
255
256 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
257 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
258
259 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
260 let reader =
261 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
262 .await?;
263
264 let dedup = !stream_ctx.input.append_mode;
265 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
266 let reader = if dedup {
267 match stream_ctx.input.merge_mode {
268 MergeMode::LastRow => Box::pin(
269 FlatDedupReader::new(
270 reader.into_stream().boxed(),
271 FlatLastRow::new(stream_ctx.input.filter_deleted),
272 dedup_metrics_reporter,
273 )
274 .into_stream(),
275 ) as _,
276 MergeMode::LastNonNull => Box::pin(
277 FlatDedupReader::new(
278 reader.into_stream().boxed(),
279 FlatLastNonNull::new(
280 mapper.field_column_start(),
281 stream_ctx.input.filter_deleted,
282 ),
283 dedup_metrics_reporter,
284 )
285 .into_stream(),
286 ) as _,
287 }
288 } else {
289 Box::pin(reader.into_stream()) as _
290 };
291
292 let reader = match &stream_ctx.input.series_row_selector {
293 Some(TimeSeriesRowSelector::LastRow) => {
294 Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
295 }
296 None => reader,
297 };
298
299 Ok(reader)
300 }
301
302 fn scan_partition_impl(
305 &self,
306 ctx: &QueryScanContext,
307 metrics_set: &ExecutionPlanMetricsSet,
308 partition: usize,
309 ) -> Result<SendableRecordBatchStream> {
310 if ctx.explain_verbose {
311 common_telemetry::info!(
312 "SeqScan partition {}, region_id: {}",
313 partition,
314 self.stream_ctx.input.region_metadata().region_id
315 );
316 }
317
318 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
319 let input = &self.stream_ctx.input;
320
321 let batch_stream = if input.flat_format {
322 self.scan_flat_batch_in_partition(partition, metrics.clone())?
324 } else {
325 self.scan_batch_in_partition(partition, metrics.clone())?
327 };
328 let record_batch_stream = ConvertBatchStream::new(
329 batch_stream,
330 input.mapper.clone(),
331 input.cache_strategy.clone(),
332 metrics,
333 );
334
335 Ok(Box::pin(RecordBatchStreamWrapper::new(
336 input.mapper.output_schema(),
337 Box::pin(record_batch_stream),
338 )))
339 }
340
341 #[tracing::instrument(
342 skip_all,
343 fields(
344 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
345 partition = partition
346 )
347 )]
348 fn scan_batch_in_partition(
349 &self,
350 partition: usize,
351 part_metrics: PartitionMetrics,
352 ) -> Result<ScanBatchStream> {
353 ensure!(
354 partition < self.properties.partitions.len(),
355 PartitionOutOfRangeSnafu {
356 given: partition,
357 all: self.properties.partitions.len(),
358 }
359 );
360
361 if self.properties.partitions[partition].is_empty() {
362 return Ok(Box::pin(futures::stream::empty()));
363 }
364
365 let stream_ctx = self.stream_ctx.clone();
366 let semaphore = self.new_semaphore();
367 let partition_ranges = self.properties.partitions[partition].clone();
368 let compaction = self.stream_ctx.input.compaction;
369 let distinguish_range = self.properties.distinguish_partition_range;
370 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
371 let pruner = self.pruner.clone();
372 pruner.add_partition_ranges(&partition_ranges);
377 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
378
379 let stream = try_stream! {
380 part_metrics.on_first_poll();
381 let mut fetch_start = Instant::now();
384
385 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
386 reason: "Unexpected format",
387 })?;
388 for part_range in partition_ranges {
390 let mut sources = Vec::new();
391 build_sources(
392 &stream_ctx,
393 &part_range,
394 compaction,
395 &part_metrics,
396 partition_pruner.clone(),
397 &mut sources,
398 file_scan_semaphore.clone(),
399 ).await?;
400
401 let mut reader =
402 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
403 .await?;
404 #[cfg(debug_assertions)]
405 let mut checker = crate::read::BatchChecker::default()
406 .with_start(Some(part_range.start))
407 .with_end(Some(part_range.end));
408
409 let mut metrics = ScannerMetrics {
410 scan_cost: fetch_start.elapsed(),
411 ..Default::default()
412 };
413 fetch_start = Instant::now();
414
415 while let Some(batch) = reader.next_batch().await? {
416 metrics.scan_cost += fetch_start.elapsed();
417 metrics.num_batches += 1;
418 metrics.num_rows += batch.num_rows();
419
420 debug_assert!(!batch.is_empty());
421 if batch.is_empty() {
422 fetch_start = Instant::now();
423 continue;
424 }
425
426 #[cfg(debug_assertions)]
427 checker.ensure_part_range_batch(
428 "SeqScan",
429 _mapper.metadata().region_id,
430 partition,
431 part_range,
432 &batch,
433 );
434
435 let yield_start = Instant::now();
436 yield ScanBatch::Normal(batch);
437 metrics.yield_cost += yield_start.elapsed();
438
439 fetch_start = Instant::now();
440 }
441
442 if distinguish_range {
445 let yield_start = Instant::now();
446 yield ScanBatch::Normal(Batch::empty());
447 metrics.yield_cost += yield_start.elapsed();
448 }
449
450 metrics.scan_cost += fetch_start.elapsed();
451 fetch_start = Instant::now();
452 part_metrics.merge_metrics(&metrics);
453 }
454
455 part_metrics.on_finish();
456 };
457 Ok(Box::pin(stream))
458 }
459
460 #[tracing::instrument(
461 skip_all,
462 fields(
463 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
464 partition = partition
465 )
466 )]
467 fn scan_flat_batch_in_partition(
468 &self,
469 partition: usize,
470 part_metrics: PartitionMetrics,
471 ) -> Result<ScanBatchStream> {
472 ensure!(
473 partition < self.properties.partitions.len(),
474 PartitionOutOfRangeSnafu {
475 given: partition,
476 all: self.properties.partitions.len(),
477 }
478 );
479
480 if self.properties.partitions[partition].is_empty() {
481 return Ok(Box::pin(futures::stream::empty()));
482 }
483
484 let stream_ctx = self.stream_ctx.clone();
485 let semaphore = self.new_semaphore();
486 let partition_ranges = self.properties.partitions[partition].clone();
487 let compaction = self.stream_ctx.input.compaction;
488 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
489 let pruner = self.pruner.clone();
490 pruner.add_partition_ranges(&partition_ranges);
495 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
496
497 let stream = try_stream! {
498 part_metrics.on_first_poll();
499 let mut fetch_start = Instant::now();
502
503 for part_range in partition_ranges {
505 let mut sources = Vec::new();
506 build_flat_sources(
507 &stream_ctx,
508 &part_range,
509 compaction,
510 &part_metrics,
511 partition_pruner.clone(),
512 &mut sources,
513 file_scan_semaphore.clone(),
514 ).await?;
515
516 let mut reader =
517 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
518 .await?;
519
520 let mut metrics = ScannerMetrics {
521 scan_cost: fetch_start.elapsed(),
522 ..Default::default()
523 };
524 fetch_start = Instant::now();
525
526 while let Some(record_batch) = reader.try_next().await? {
527 metrics.scan_cost += fetch_start.elapsed();
528 metrics.num_batches += 1;
529 metrics.num_rows += record_batch.num_rows();
530
531 debug_assert!(record_batch.num_rows() > 0);
532 if record_batch.num_rows() == 0 {
533 fetch_start = Instant::now();
534 continue;
535 }
536
537 let yield_start = Instant::now();
538 yield ScanBatch::RecordBatch(record_batch);
539 metrics.yield_cost += yield_start.elapsed();
540
541 fetch_start = Instant::now();
542 }
543
544 metrics.scan_cost += fetch_start.elapsed();
545 fetch_start = Instant::now();
546 part_metrics.merge_metrics(&metrics);
547 }
548
549 part_metrics.on_finish();
550 };
551 Ok(Box::pin(stream))
552 }
553
554 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
555 if self.properties.target_partitions() > self.properties.num_partitions() {
556 Some(Arc::new(Semaphore::new(
562 self.properties.target_partitions() - self.properties.num_partitions() + 1,
563 )))
564 } else {
565 None
566 }
567 }
568
569 fn new_partition_metrics(
572 &self,
573 explain_verbose: bool,
574 metrics_set: &ExecutionPlanMetricsSet,
575 partition: usize,
576 ) -> PartitionMetrics {
577 let metrics = PartitionMetrics::new(
578 self.stream_ctx.input.mapper.metadata().region_id,
579 partition,
580 get_scanner_type(self.stream_ctx.input.compaction),
581 self.stream_ctx.query_start,
582 explain_verbose,
583 metrics_set,
584 );
585
586 if !self.stream_ctx.input.compaction {
587 self.metrics_list.set(partition, metrics.clone());
588 }
589
590 metrics
591 }
592
593 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
595 partition_ranges
596 .iter()
597 .map(|part_range| {
598 let range_meta = &ranges[part_range.identifier];
599 range_meta.indices.len()
600 })
601 .max()
602 .unwrap_or(0)
603 }
604
605 pub(crate) fn check_scan_limit(&self) -> Result<()> {
607 let total_max_files: usize = self
609 .properties
610 .partitions
611 .iter()
612 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
613 .sum();
614
615 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
616 if total_max_files > max_concurrent_files {
617 return TooManyFilesToReadSnafu {
618 actual: total_max_files,
619 max: max_concurrent_files,
620 }
621 .fail();
622 }
623
624 Ok(())
625 }
626}
627
628impl RegionScanner for SeqScan {
629 fn name(&self) -> &str {
630 "SeqScan"
631 }
632
633 fn properties(&self) -> &ScannerProperties {
634 &self.properties
635 }
636
637 fn schema(&self) -> SchemaRef {
638 self.stream_ctx.input.mapper.output_schema()
639 }
640
641 fn metadata(&self) -> RegionMetadataRef {
642 self.stream_ctx.input.mapper.metadata().clone()
643 }
644
645 fn scan_partition(
646 &self,
647 ctx: &QueryScanContext,
648 metrics_set: &ExecutionPlanMetricsSet,
649 partition: usize,
650 ) -> Result<SendableRecordBatchStream, BoxedError> {
651 self.scan_partition_impl(ctx, metrics_set, partition)
652 .map_err(BoxedError::new)
653 }
654
655 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
656 self.properties.prepare(request);
657
658 self.check_scan_limit().map_err(BoxedError::new)?;
659
660 Ok(())
661 }
662
663 fn has_predicate_without_region(&self) -> bool {
664 let predicate = self
665 .stream_ctx
666 .input
667 .predicate_group()
668 .predicate_without_region();
669 predicate.is_some()
670 }
671
672 fn add_dyn_filter_to_predicate(
673 &mut self,
674 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
675 ) -> Vec<bool> {
676 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
677 }
678
679 fn set_logical_region(&mut self, logical_region: bool) {
680 self.properties.set_logical_region(logical_region);
681 }
682}
683
684impl DisplayAs for SeqScan {
685 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
686 write!(
687 f,
688 "SeqScan: region={}, ",
689 self.stream_ctx.input.mapper.metadata().region_id
690 )?;
691 match t {
692 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
694 self.stream_ctx.format_for_explain(false, f)
695 }
696 DisplayFormatType::Verbose => {
697 self.stream_ctx.format_for_explain(true, f)?;
698 self.metrics_list.format_verbose_metrics(f)
699 }
700 }
701 }
702}
703
704impl fmt::Debug for SeqScan {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 f.debug_struct("SeqScan")
707 .field("num_ranges", &self.stream_ctx.ranges.len())
708 .finish()
709 }
710}
711
712pub(crate) async fn build_sources(
714 stream_ctx: &Arc<StreamContext>,
715 part_range: &PartitionRange,
716 compaction: bool,
717 part_metrics: &PartitionMetrics,
718 partition_pruner: Arc<PartitionPruner>,
719 sources: &mut Vec<Source>,
720 semaphore: Option<Arc<Semaphore>>,
721) -> Result<()> {
722 let range_meta = &stream_ctx.ranges[part_range.identifier];
724 #[cfg(debug_assertions)]
725 if compaction {
726 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
728 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
729 debug_assert_eq!(
731 -1, row_group_idx.row_group_index,
732 "Expect {} range scan all row groups, given: {}",
733 i, row_group_idx.row_group_index,
734 );
735 }
736 }
737
738 let read_type = if compaction {
739 "compaction"
740 } else {
741 "seq_scan_files"
742 };
743 let num_indices = range_meta.row_group_indices.len();
744 if num_indices == 0 {
745 return Ok(());
746 }
747
748 sources.reserve(num_indices);
749 let mut ordered_sources = Vec::with_capacity(num_indices);
750 ordered_sources.resize_with(num_indices, || None);
751 let mut file_scan_tasks = Vec::new();
752
753 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
754 if stream_ctx.is_mem_range_index(*index) {
755 let stream = scan_mem_ranges(
756 stream_ctx.clone(),
757 part_metrics.clone(),
758 *index,
759 range_meta.time_range,
760 );
761 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
762 } else if stream_ctx.is_file_range_index(*index) {
763 if let Some(semaphore_ref) = semaphore.as_ref() {
764 let stream_ctx = stream_ctx.clone();
766 let part_metrics = part_metrics.clone();
767 let partition_pruner = partition_pruner.clone();
768 let semaphore = Arc::clone(semaphore_ref);
769 let row_group_index = *index;
770 file_scan_tasks.push(async move {
771 let _permit = semaphore.acquire().await.unwrap();
772 let stream = scan_file_ranges(
773 stream_ctx,
774 part_metrics,
775 row_group_index,
776 read_type,
777 partition_pruner,
778 )
779 .await?;
780 Ok((position, Source::Stream(Box::pin(stream) as _)))
781 });
782 } else {
783 let stream = scan_file_ranges(
785 stream_ctx.clone(),
786 part_metrics.clone(),
787 *index,
788 read_type,
789 partition_pruner.clone(),
790 )
791 .await?;
792 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
793 }
794 } else {
795 let stream =
796 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
797 ordered_sources[position] = Some(Source::Stream(stream));
798 }
799 }
800
801 if !file_scan_tasks.is_empty() {
802 let results = futures::future::try_join_all(file_scan_tasks).await?;
803 for (position, source) in results {
804 ordered_sources[position] = Some(source);
805 }
806 }
807
808 for source in ordered_sources.into_iter().flatten() {
809 sources.push(source);
810 }
811 Ok(())
812}
813
814pub(crate) async fn build_flat_sources(
816 stream_ctx: &Arc<StreamContext>,
817 part_range: &PartitionRange,
818 compaction: bool,
819 part_metrics: &PartitionMetrics,
820 partition_pruner: Arc<PartitionPruner>,
821 sources: &mut Vec<BoxedRecordBatchStream>,
822 semaphore: Option<Arc<Semaphore>>,
823) -> Result<()> {
824 let range_meta = &stream_ctx.ranges[part_range.identifier];
826 #[cfg(debug_assertions)]
827 if compaction {
828 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
830 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
831 debug_assert_eq!(
833 -1, row_group_idx.row_group_index,
834 "Expect {} range scan all row groups, given: {}",
835 i, row_group_idx.row_group_index,
836 );
837 }
838 }
839
840 let read_type = if compaction {
841 "compaction"
842 } else {
843 "seq_scan_files"
844 };
845 let num_indices = range_meta.row_group_indices.len();
846 if num_indices == 0 {
847 return Ok(());
848 }
849
850 let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
851 sources.reserve(num_indices);
852 let mut ordered_sources = Vec::with_capacity(num_indices);
853 ordered_sources.resize_with(num_indices, || None);
854 let mut file_scan_tasks = Vec::new();
855
856 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
857 if stream_ctx.is_mem_range_index(*index) {
858 let stream = scan_flat_mem_ranges(
859 stream_ctx.clone(),
860 part_metrics.clone(),
861 *index,
862 range_meta.time_range,
863 );
864 ordered_sources[position] = Some(Box::pin(stream) as _);
865 } else if stream_ctx.is_file_range_index(*index) {
866 if let Some(semaphore_ref) = semaphore.as_ref() {
867 let stream_ctx = stream_ctx.clone();
869 let part_metrics = part_metrics.clone();
870 let partition_pruner = partition_pruner.clone();
871 let semaphore = Arc::clone(semaphore_ref);
872 let row_group_index = *index;
873 file_scan_tasks.push(async move {
874 let _permit = semaphore.acquire().await.unwrap();
875 let stream = scan_flat_file_ranges(
876 stream_ctx,
877 part_metrics,
878 row_group_index,
879 read_type,
880 partition_pruner,
881 )
882 .await?;
883 Ok((position, Box::pin(stream) as _))
884 });
885 } else {
886 let stream = scan_flat_file_ranges(
888 stream_ctx.clone(),
889 part_metrics.clone(),
890 *index,
891 read_type,
892 partition_pruner.clone(),
893 )
894 .await?;
895 ordered_sources[position] = Some(Box::pin(stream) as _);
896 }
897 } else {
898 let stream =
899 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
900 ordered_sources[position] = Some(stream);
901 }
902 }
903
904 if !file_scan_tasks.is_empty() {
905 let results = futures::future::try_join_all(file_scan_tasks).await?;
906 for (position, stream) in results {
907 ordered_sources[position] = Some(stream);
908 }
909 }
910
911 for stream in ordered_sources.into_iter().flatten() {
912 if should_split {
913 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
914 } else {
915 sources.push(stream);
916 }
917 }
918
919 if should_split {
920 common_telemetry::debug!(
921 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
922 stream_ctx.input.region_metadata().region_id,
923 sources.len(),
924 part_range,
925 );
926 }
927
928 Ok(())
929}
930
931#[cfg(test)]
932impl SeqScan {
933 pub(crate) fn input(&self) -> &ScanInput {
935 &self.stream_ctx.input
936 }
937}
938
939fn get_scanner_type(compaction: bool) -> &'static str {
941 if compaction {
942 "SeqScan(compaction)"
943 } else {
944 "SeqScan"
945 }
946}