1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48 scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49 should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58pub struct SeqScan {
63 properties: ScannerProperties,
65 stream_ctx: Arc<StreamContext>,
67 metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 metrics_list: PartitionMetricsList::default(),
86 }
87 }
88
89 #[tracing::instrument(
94 skip_all,
95 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
96 )]
97 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
98 let metrics_set = ExecutionPlanMetricsSet::new();
99 let streams = (0..self.properties.partitions.len())
100 .map(|partition: usize| {
101 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
102 })
103 .collect::<Result<Vec<_>, _>>()?;
104
105 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
106 Ok(Box::pin(aggr_stream))
107 }
108
109 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
111 let metrics_set = ExecutionPlanMetricsSet::new();
112
113 let streams = (0..self.properties.partitions.len())
114 .map(|partition| {
115 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
116 self.scan_batch_in_partition(partition, metrics)
117 })
118 .collect::<Result<Vec<_>>>()?;
119
120 Ok(Box::pin(futures::stream::iter(streams).flatten()))
121 }
122
123 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
128 assert!(self.stream_ctx.input.compaction);
129
130 let metrics_set = ExecutionPlanMetricsSet::new();
131 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
132 debug_assert_eq!(1, self.properties.partitions.len());
133 let partition_ranges = &self.properties.partitions[0];
134
135 let reader = Self::merge_all_ranges_for_compaction(
136 &self.stream_ctx,
137 partition_ranges,
138 &part_metrics,
139 )
140 .await?;
141 Ok(Box::new(reader))
142 }
143
144 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
149 assert!(self.stream_ctx.input.compaction);
150
151 let metrics_set = ExecutionPlanMetricsSet::new();
152 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
153 debug_assert_eq!(1, self.properties.partitions.len());
154 let partition_ranges = &self.properties.partitions[0];
155
156 let reader = Self::merge_all_flat_ranges_for_compaction(
157 &self.stream_ctx,
158 partition_ranges,
159 &part_metrics,
160 )
161 .await?;
162 Ok(reader)
163 }
164
165 async fn merge_all_ranges_for_compaction(
168 stream_ctx: &Arc<StreamContext>,
169 partition_ranges: &[PartitionRange],
170 part_metrics: &PartitionMetrics,
171 ) -> Result<BoxedBatchReader> {
172 let mut sources = Vec::new();
173 let range_builder_list = Arc::new(RangeBuilderList::new(
174 stream_ctx.input.num_memtables(),
175 stream_ctx.input.num_files(),
176 ));
177 for part_range in partition_ranges {
178 build_sources(
179 stream_ctx,
180 part_range,
181 true,
182 part_metrics,
183 range_builder_list.clone(),
184 &mut sources,
185 None,
186 )
187 .await?;
188 }
189
190 common_telemetry::debug!(
191 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
192 stream_ctx.input.mapper.metadata().region_id,
193 partition_ranges.len(),
194 sources.len()
195 );
196 Self::build_reader_from_sources(stream_ctx, sources, None, None).await
197 }
198
199 async fn merge_all_flat_ranges_for_compaction(
202 stream_ctx: &Arc<StreamContext>,
203 partition_ranges: &[PartitionRange],
204 part_metrics: &PartitionMetrics,
205 ) -> Result<BoxedRecordBatchStream> {
206 let mut sources = Vec::new();
207 let range_builder_list = Arc::new(RangeBuilderList::new(
208 stream_ctx.input.num_memtables(),
209 stream_ctx.input.num_files(),
210 ));
211 for part_range in partition_ranges {
212 build_flat_sources(
213 stream_ctx,
214 part_range,
215 true,
216 part_metrics,
217 range_builder_list.clone(),
218 &mut sources,
219 None,
220 )
221 .await?;
222 }
223
224 common_telemetry::debug!(
225 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
226 stream_ctx.input.mapper.metadata().region_id,
227 partition_ranges.len(),
228 sources.len()
229 );
230 Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
231 }
232
233 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
236 pub(crate) async fn build_reader_from_sources(
237 stream_ctx: &StreamContext,
238 mut sources: Vec<Source>,
239 semaphore: Option<Arc<Semaphore>>,
240 part_metrics: Option<&PartitionMetrics>,
241 ) -> Result<BoxedBatchReader> {
242 if let Some(semaphore) = semaphore.as_ref() {
243 if sources.len() > 1 {
245 sources = stream_ctx
246 .input
247 .create_parallel_sources(sources, semaphore.clone())?;
248 }
249 }
250
251 let mut builder = MergeReaderBuilder::from_sources(sources);
252 if let Some(metrics) = part_metrics {
253 builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
254 }
255 let reader = builder.build().await?;
256
257 let dedup = !stream_ctx.input.append_mode;
258 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
259 let reader = if dedup {
260 match stream_ctx.input.merge_mode {
261 MergeMode::LastRow => Box::new(DedupReader::new(
262 reader,
263 LastRow::new(stream_ctx.input.filter_deleted),
264 dedup_metrics_reporter,
265 )) as _,
266 MergeMode::LastNonNull => Box::new(DedupReader::new(
267 reader,
268 LastNonNull::new(stream_ctx.input.filter_deleted),
269 dedup_metrics_reporter,
270 )) as _,
271 }
272 } else {
273 Box::new(reader) as _
274 };
275
276 let reader = match &stream_ctx.input.series_row_selector {
277 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
278 None => reader,
279 };
280
281 Ok(reader)
282 }
283
284 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
287 pub(crate) async fn build_flat_reader_from_sources(
288 stream_ctx: &StreamContext,
289 mut sources: Vec<BoxedRecordBatchStream>,
290 semaphore: Option<Arc<Semaphore>>,
291 part_metrics: Option<&PartitionMetrics>,
292 ) -> Result<BoxedRecordBatchStream> {
293 if let Some(semaphore) = semaphore.as_ref() {
294 if sources.len() > 1 {
296 sources = stream_ctx
297 .input
298 .create_parallel_flat_sources(sources, semaphore.clone())?;
299 }
300 }
301
302 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
303 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
304
305 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
306 let reader =
307 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
308 .await?;
309
310 let dedup = !stream_ctx.input.append_mode;
311 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
312 let reader = if dedup {
313 match stream_ctx.input.merge_mode {
314 MergeMode::LastRow => Box::pin(
315 FlatDedupReader::new(
316 reader.into_stream().boxed(),
317 FlatLastRow::new(stream_ctx.input.filter_deleted),
318 dedup_metrics_reporter,
319 )
320 .into_stream(),
321 ) as _,
322 MergeMode::LastNonNull => Box::pin(
323 FlatDedupReader::new(
324 reader.into_stream().boxed(),
325 FlatLastNonNull::new(
326 mapper.field_column_start(),
327 stream_ctx.input.filter_deleted,
328 ),
329 dedup_metrics_reporter,
330 )
331 .into_stream(),
332 ) as _,
333 }
334 } else {
335 Box::pin(reader.into_stream()) as _
336 };
337
338 Ok(reader)
339 }
340
341 fn scan_partition_impl(
344 &self,
345 ctx: &QueryScanContext,
346 metrics_set: &ExecutionPlanMetricsSet,
347 partition: usize,
348 ) -> Result<SendableRecordBatchStream> {
349 if ctx.explain_verbose {
350 common_telemetry::info!(
351 "SeqScan partition {}, region_id: {}",
352 partition,
353 self.stream_ctx.input.region_metadata().region_id
354 );
355 }
356
357 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
358 let input = &self.stream_ctx.input;
359
360 let batch_stream = if input.flat_format {
361 self.scan_flat_batch_in_partition(partition, metrics.clone())?
363 } else {
364 self.scan_batch_in_partition(partition, metrics.clone())?
366 };
367 let record_batch_stream = ConvertBatchStream::new(
368 batch_stream,
369 input.mapper.clone(),
370 input.cache_strategy.clone(),
371 metrics,
372 );
373
374 Ok(Box::pin(RecordBatchStreamWrapper::new(
375 input.mapper.output_schema(),
376 Box::pin(record_batch_stream),
377 )))
378 }
379
380 #[tracing::instrument(
381 skip_all,
382 fields(
383 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
384 partition = partition
385 )
386 )]
387 fn scan_batch_in_partition(
388 &self,
389 partition: usize,
390 part_metrics: PartitionMetrics,
391 ) -> Result<ScanBatchStream> {
392 ensure!(
393 partition < self.properties.partitions.len(),
394 PartitionOutOfRangeSnafu {
395 given: partition,
396 all: self.properties.partitions.len(),
397 }
398 );
399
400 if self.properties.partitions[partition].is_empty() {
401 return Ok(Box::pin(futures::stream::empty()));
402 }
403
404 let stream_ctx = self.stream_ctx.clone();
405 let semaphore = self.new_semaphore();
406 let partition_ranges = self.properties.partitions[partition].clone();
407 let compaction = self.stream_ctx.input.compaction;
408 let distinguish_range = self.properties.distinguish_partition_range;
409 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
410
411 let stream = try_stream! {
412 part_metrics.on_first_poll();
413
414 let range_builder_list = Arc::new(RangeBuilderList::new(
415 stream_ctx.input.num_memtables(),
416 stream_ctx.input.num_files(),
417 ));
418 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
419 reason: "Unexpected format",
420 })?;
421 for part_range in partition_ranges {
423 let mut sources = Vec::new();
424 build_sources(
425 &stream_ctx,
426 &part_range,
427 compaction,
428 &part_metrics,
429 range_builder_list.clone(),
430 &mut sources,
431 file_scan_semaphore.clone(),
432 ).await?;
433
434 let mut metrics = ScannerMetrics::default();
435 let mut fetch_start = Instant::now();
436 let mut reader =
437 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
438 .await?;
439 #[cfg(debug_assertions)]
440 let mut checker = crate::read::BatchChecker::default()
441 .with_start(Some(part_range.start))
442 .with_end(Some(part_range.end));
443
444 while let Some(batch) = reader.next_batch().await? {
445 metrics.scan_cost += fetch_start.elapsed();
446 metrics.num_batches += 1;
447 metrics.num_rows += batch.num_rows();
448
449 debug_assert!(!batch.is_empty());
450 if batch.is_empty() {
451 continue;
452 }
453
454 #[cfg(debug_assertions)]
455 checker.ensure_part_range_batch(
456 "SeqScan",
457 _mapper.metadata().region_id,
458 partition,
459 part_range,
460 &batch,
461 );
462
463 let yield_start = Instant::now();
464 yield ScanBatch::Normal(batch);
465 metrics.yield_cost += yield_start.elapsed();
466
467 fetch_start = Instant::now();
468 }
469
470 if distinguish_range {
473 let yield_start = Instant::now();
474 yield ScanBatch::Normal(Batch::empty());
475 metrics.yield_cost += yield_start.elapsed();
476 }
477
478 metrics.scan_cost += fetch_start.elapsed();
479 part_metrics.merge_metrics(&metrics);
480 }
481
482 part_metrics.on_finish();
483 };
484 Ok(Box::pin(stream))
485 }
486
487 #[tracing::instrument(
488 skip_all,
489 fields(
490 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
491 partition = partition
492 )
493 )]
494 fn scan_flat_batch_in_partition(
495 &self,
496 partition: usize,
497 part_metrics: PartitionMetrics,
498 ) -> Result<ScanBatchStream> {
499 ensure!(
500 partition < self.properties.partitions.len(),
501 PartitionOutOfRangeSnafu {
502 given: partition,
503 all: self.properties.partitions.len(),
504 }
505 );
506
507 if self.properties.partitions[partition].is_empty() {
508 return Ok(Box::pin(futures::stream::empty()));
509 }
510
511 let stream_ctx = self.stream_ctx.clone();
512 let semaphore = self.new_semaphore();
513 let partition_ranges = self.properties.partitions[partition].clone();
514 let compaction = self.stream_ctx.input.compaction;
515 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
516
517 let stream = try_stream! {
518 part_metrics.on_first_poll();
519
520 let range_builder_list = Arc::new(RangeBuilderList::new(
521 stream_ctx.input.num_memtables(),
522 stream_ctx.input.num_files(),
523 ));
524 for part_range in partition_ranges {
526 let mut sources = Vec::new();
527 build_flat_sources(
528 &stream_ctx,
529 &part_range,
530 compaction,
531 &part_metrics,
532 range_builder_list.clone(),
533 &mut sources,
534 file_scan_semaphore.clone(),
535 ).await?;
536
537 let mut metrics = ScannerMetrics::default();
538 let mut fetch_start = Instant::now();
539 let mut reader =
540 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
541 .await?;
542
543 while let Some(record_batch) = reader.try_next().await? {
544 metrics.scan_cost += fetch_start.elapsed();
545 metrics.num_batches += 1;
546 metrics.num_rows += record_batch.num_rows();
547
548 debug_assert!(record_batch.num_rows() > 0);
549 if record_batch.num_rows() == 0 {
550 continue;
551 }
552
553 let yield_start = Instant::now();
554 yield ScanBatch::RecordBatch(record_batch);
555 metrics.yield_cost += yield_start.elapsed();
556
557 fetch_start = Instant::now();
558 }
559
560 metrics.scan_cost += fetch_start.elapsed();
561 part_metrics.merge_metrics(&metrics);
562 }
563
564 part_metrics.on_finish();
565 };
566 Ok(Box::pin(stream))
567 }
568
569 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
570 if self.properties.target_partitions() > self.properties.num_partitions() {
571 Some(Arc::new(Semaphore::new(
577 self.properties.target_partitions() - self.properties.num_partitions() + 1,
578 )))
579 } else {
580 None
581 }
582 }
583
584 fn new_partition_metrics(
587 &self,
588 explain_verbose: bool,
589 metrics_set: &ExecutionPlanMetricsSet,
590 partition: usize,
591 ) -> PartitionMetrics {
592 let metrics = PartitionMetrics::new(
593 self.stream_ctx.input.mapper.metadata().region_id,
594 partition,
595 get_scanner_type(self.stream_ctx.input.compaction),
596 self.stream_ctx.query_start,
597 explain_verbose,
598 metrics_set,
599 );
600
601 if !self.stream_ctx.input.compaction {
602 self.metrics_list.set(partition, metrics.clone());
603 }
604
605 metrics
606 }
607
608 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
610 partition_ranges
611 .iter()
612 .map(|part_range| {
613 let range_meta = &ranges[part_range.identifier];
614 range_meta.indices.len()
615 })
616 .max()
617 .unwrap_or(0)
618 }
619
620 pub(crate) fn check_scan_limit(&self) -> Result<()> {
622 let total_max_files: usize = self
624 .properties
625 .partitions
626 .iter()
627 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
628 .sum();
629
630 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
631 if total_max_files > max_concurrent_files {
632 return TooManyFilesToReadSnafu {
633 actual: total_max_files,
634 max: max_concurrent_files,
635 }
636 .fail();
637 }
638
639 Ok(())
640 }
641}
642
643impl RegionScanner for SeqScan {
644 fn name(&self) -> &str {
645 "SeqScan"
646 }
647
648 fn properties(&self) -> &ScannerProperties {
649 &self.properties
650 }
651
652 fn schema(&self) -> SchemaRef {
653 self.stream_ctx.input.mapper.output_schema()
654 }
655
656 fn metadata(&self) -> RegionMetadataRef {
657 self.stream_ctx.input.mapper.metadata().clone()
658 }
659
660 fn scan_partition(
661 &self,
662 ctx: &QueryScanContext,
663 metrics_set: &ExecutionPlanMetricsSet,
664 partition: usize,
665 ) -> Result<SendableRecordBatchStream, BoxedError> {
666 self.scan_partition_impl(ctx, metrics_set, partition)
667 .map_err(BoxedError::new)
668 }
669
670 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
671 self.properties.prepare(request);
672
673 self.check_scan_limit().map_err(BoxedError::new)?;
674
675 Ok(())
676 }
677
678 fn has_predicate_without_region(&self) -> bool {
679 let predicate = self
680 .stream_ctx
681 .input
682 .predicate_group()
683 .predicate_without_region();
684 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
685 }
686
687 fn set_logical_region(&mut self, logical_region: bool) {
688 self.properties.set_logical_region(logical_region);
689 }
690}
691
692impl DisplayAs for SeqScan {
693 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
694 write!(
695 f,
696 "SeqScan: region={}, ",
697 self.stream_ctx.input.mapper.metadata().region_id
698 )?;
699 match t {
700 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
702 self.stream_ctx.format_for_explain(false, f)
703 }
704 DisplayFormatType::Verbose => {
705 self.stream_ctx.format_for_explain(true, f)?;
706 self.metrics_list.format_verbose_metrics(f)
707 }
708 }
709 }
710}
711
712impl fmt::Debug for SeqScan {
713 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
714 f.debug_struct("SeqScan")
715 .field("num_ranges", &self.stream_ctx.ranges.len())
716 .finish()
717 }
718}
719
720pub(crate) async fn build_sources(
722 stream_ctx: &Arc<StreamContext>,
723 part_range: &PartitionRange,
724 compaction: bool,
725 part_metrics: &PartitionMetrics,
726 range_builder_list: Arc<RangeBuilderList>,
727 sources: &mut Vec<Source>,
728 semaphore: Option<Arc<Semaphore>>,
729) -> Result<()> {
730 let range_meta = &stream_ctx.ranges[part_range.identifier];
732 #[cfg(debug_assertions)]
733 if compaction {
734 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
736 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
737 debug_assert_eq!(
739 -1, row_group_idx.row_group_index,
740 "Expect {} range scan all row groups, given: {}",
741 i, row_group_idx.row_group_index,
742 );
743 }
744 }
745
746 let read_type = if compaction {
747 "compaction"
748 } else {
749 "seq_scan_files"
750 };
751 let num_indices = range_meta.row_group_indices.len();
752 if num_indices == 0 {
753 return Ok(());
754 }
755
756 sources.reserve(num_indices);
757 let mut ordered_sources = Vec::with_capacity(num_indices);
758 ordered_sources.resize_with(num_indices, || None);
759 let mut file_scan_tasks = Vec::new();
760
761 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
762 if stream_ctx.is_mem_range_index(*index) {
763 let stream = scan_mem_ranges(
764 stream_ctx.clone(),
765 part_metrics.clone(),
766 *index,
767 range_meta.time_range,
768 );
769 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
770 } else if stream_ctx.is_file_range_index(*index) {
771 if let Some(semaphore_ref) = semaphore.as_ref() {
772 let stream_ctx = stream_ctx.clone();
774 let part_metrics = part_metrics.clone();
775 let range_builder_list = range_builder_list.clone();
776 let semaphore = Arc::clone(semaphore_ref);
777 let row_group_index = *index;
778 file_scan_tasks.push(async move {
779 let _permit = semaphore.acquire().await.unwrap();
780 let stream = scan_file_ranges(
781 stream_ctx,
782 part_metrics,
783 row_group_index,
784 read_type,
785 range_builder_list,
786 )
787 .await?;
788 Ok((position, Source::Stream(Box::pin(stream) as _)))
789 });
790 } else {
791 let stream = scan_file_ranges(
793 stream_ctx.clone(),
794 part_metrics.clone(),
795 *index,
796 read_type,
797 range_builder_list.clone(),
798 )
799 .await?;
800 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
801 }
802 } else {
803 let stream =
804 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
805 ordered_sources[position] = Some(Source::Stream(stream));
806 }
807 }
808
809 if !file_scan_tasks.is_empty() {
810 let results = futures::future::try_join_all(file_scan_tasks).await?;
811 for (position, source) in results {
812 ordered_sources[position] = Some(source);
813 }
814 }
815
816 for source in ordered_sources.into_iter().flatten() {
817 sources.push(source);
818 }
819 Ok(())
820}
821
822pub(crate) async fn build_flat_sources(
824 stream_ctx: &Arc<StreamContext>,
825 part_range: &PartitionRange,
826 compaction: bool,
827 part_metrics: &PartitionMetrics,
828 range_builder_list: Arc<RangeBuilderList>,
829 sources: &mut Vec<BoxedRecordBatchStream>,
830 semaphore: Option<Arc<Semaphore>>,
831) -> Result<()> {
832 let range_meta = &stream_ctx.ranges[part_range.identifier];
834 #[cfg(debug_assertions)]
835 if compaction {
836 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
838 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
839 debug_assert_eq!(
841 -1, row_group_idx.row_group_index,
842 "Expect {} range scan all row groups, given: {}",
843 i, row_group_idx.row_group_index,
844 );
845 }
846 }
847
848 let read_type = if compaction {
849 "compaction"
850 } else {
851 "seq_scan_files"
852 };
853 let num_indices = range_meta.row_group_indices.len();
854 if num_indices == 0 {
855 return Ok(());
856 }
857
858 let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
859 sources.reserve(num_indices);
860 let mut ordered_sources = Vec::with_capacity(num_indices);
861 ordered_sources.resize_with(num_indices, || None);
862 let mut file_scan_tasks = Vec::new();
863
864 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
865 if stream_ctx.is_mem_range_index(*index) {
866 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
867 ordered_sources[position] = Some(Box::pin(stream) as _);
868 } else if stream_ctx.is_file_range_index(*index) {
869 if let Some(semaphore_ref) = semaphore.as_ref() {
870 let stream_ctx = stream_ctx.clone();
872 let part_metrics = part_metrics.clone();
873 let range_builder_list = range_builder_list.clone();
874 let semaphore = Arc::clone(semaphore_ref);
875 let row_group_index = *index;
876 file_scan_tasks.push(async move {
877 let _permit = semaphore.acquire().await.unwrap();
878 let stream = scan_flat_file_ranges(
879 stream_ctx,
880 part_metrics,
881 row_group_index,
882 read_type,
883 range_builder_list,
884 )
885 .await?;
886 Ok((position, Box::pin(stream) as _))
887 });
888 } else {
889 let stream = scan_flat_file_ranges(
891 stream_ctx.clone(),
892 part_metrics.clone(),
893 *index,
894 read_type,
895 range_builder_list.clone(),
896 )
897 .await?;
898 ordered_sources[position] = Some(Box::pin(stream) as _);
899 }
900 } else {
901 let stream =
902 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
903 ordered_sources[position] = Some(stream);
904 }
905 }
906
907 if !file_scan_tasks.is_empty() {
908 let results = futures::future::try_join_all(file_scan_tasks).await?;
909 for (position, stream) in results {
910 ordered_sources[position] = Some(stream);
911 }
912 }
913
914 for stream in ordered_sources.into_iter().flatten() {
915 if should_split {
916 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
917 } else {
918 sources.push(stream);
919 }
920 }
921
922 if should_split {
923 common_telemetry::debug!(
924 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
925 stream_ctx.input.region_metadata().region_id,
926 sources.len(),
927 part_range,
928 );
929 }
930
931 Ok(())
932}
933
934#[cfg(test)]
935impl SeqScan {
936 pub(crate) fn input(&self) -> &ScanInput {
938 &self.stream_ctx.input
939 }
940}
941
942fn get_scanner_type(compaction: bool) -> &'static str {
944 if compaction {
945 "SeqScan(compaction)"
946 } else {
947 "SeqScan"
948 }
949}