1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48 scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49 should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58pub struct SeqScan {
63 properties: ScannerProperties,
65 stream_ctx: Arc<StreamContext>,
67 metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 metrics_list: PartitionMetricsList::default(),
86 }
87 }
88
89 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
94 let metrics_set = ExecutionPlanMetricsSet::new();
95 let streams = (0..self.properties.partitions.len())
96 .map(|partition: usize| {
97 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
98 })
99 .collect::<Result<Vec<_>, _>>()?;
100
101 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
102 Ok(Box::pin(aggr_stream))
103 }
104
105 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
107 let metrics_set = ExecutionPlanMetricsSet::new();
108
109 let streams = (0..self.properties.partitions.len())
110 .map(|partition| {
111 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
112 self.scan_batch_in_partition(partition, metrics)
113 })
114 .collect::<Result<Vec<_>>>()?;
115
116 Ok(Box::pin(futures::stream::iter(streams).flatten()))
117 }
118
119 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
124 assert!(self.stream_ctx.input.compaction);
125
126 let metrics_set = ExecutionPlanMetricsSet::new();
127 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
128 debug_assert_eq!(1, self.properties.partitions.len());
129 let partition_ranges = &self.properties.partitions[0];
130
131 let reader = Self::merge_all_ranges_for_compaction(
132 &self.stream_ctx,
133 partition_ranges,
134 &part_metrics,
135 )
136 .await?;
137 Ok(Box::new(reader))
138 }
139
140 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
145 assert!(self.stream_ctx.input.compaction);
146
147 let metrics_set = ExecutionPlanMetricsSet::new();
148 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
149 debug_assert_eq!(1, self.properties.partitions.len());
150 let partition_ranges = &self.properties.partitions[0];
151
152 let reader = Self::merge_all_flat_ranges_for_compaction(
153 &self.stream_ctx,
154 partition_ranges,
155 &part_metrics,
156 )
157 .await?;
158 Ok(reader)
159 }
160
161 async fn merge_all_ranges_for_compaction(
164 stream_ctx: &Arc<StreamContext>,
165 partition_ranges: &[PartitionRange],
166 part_metrics: &PartitionMetrics,
167 ) -> Result<BoxedBatchReader> {
168 let mut sources = Vec::new();
169 let range_builder_list = Arc::new(RangeBuilderList::new(
170 stream_ctx.input.num_memtables(),
171 stream_ctx.input.num_files(),
172 ));
173 for part_range in partition_ranges {
174 build_sources(
175 stream_ctx,
176 part_range,
177 true,
178 part_metrics,
179 range_builder_list.clone(),
180 &mut sources,
181 None,
182 )
183 .await?;
184 }
185
186 common_telemetry::debug!(
187 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
188 stream_ctx.input.mapper.metadata().region_id,
189 partition_ranges.len(),
190 sources.len()
191 );
192 Self::build_reader_from_sources(stream_ctx, sources, None, None).await
193 }
194
195 async fn merge_all_flat_ranges_for_compaction(
198 stream_ctx: &Arc<StreamContext>,
199 partition_ranges: &[PartitionRange],
200 part_metrics: &PartitionMetrics,
201 ) -> Result<BoxedRecordBatchStream> {
202 let mut sources = Vec::new();
203 let range_builder_list = Arc::new(RangeBuilderList::new(
204 stream_ctx.input.num_memtables(),
205 stream_ctx.input.num_files(),
206 ));
207 for part_range in partition_ranges {
208 build_flat_sources(
209 stream_ctx,
210 part_range,
211 true,
212 part_metrics,
213 range_builder_list.clone(),
214 &mut sources,
215 None,
216 )
217 .await?;
218 }
219
220 common_telemetry::debug!(
221 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
222 stream_ctx.input.mapper.metadata().region_id,
223 partition_ranges.len(),
224 sources.len()
225 );
226 Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
227 }
228
229 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
232 pub(crate) async fn build_reader_from_sources(
233 stream_ctx: &StreamContext,
234 mut sources: Vec<Source>,
235 semaphore: Option<Arc<Semaphore>>,
236 part_metrics: Option<&PartitionMetrics>,
237 ) -> Result<BoxedBatchReader> {
238 if let Some(semaphore) = semaphore.as_ref() {
239 if sources.len() > 1 {
241 sources = stream_ctx
242 .input
243 .create_parallel_sources(sources, semaphore.clone())?;
244 }
245 }
246
247 let mut builder = MergeReaderBuilder::from_sources(sources);
248 if let Some(metrics) = part_metrics {
249 builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
250 }
251 let reader = builder.build().await?;
252
253 let dedup = !stream_ctx.input.append_mode;
254 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
255 let reader = if dedup {
256 match stream_ctx.input.merge_mode {
257 MergeMode::LastRow => Box::new(DedupReader::new(
258 reader,
259 LastRow::new(stream_ctx.input.filter_deleted),
260 dedup_metrics_reporter,
261 )) as _,
262 MergeMode::LastNonNull => Box::new(DedupReader::new(
263 reader,
264 LastNonNull::new(stream_ctx.input.filter_deleted),
265 dedup_metrics_reporter,
266 )) as _,
267 }
268 } else {
269 Box::new(reader) as _
270 };
271
272 let reader = match &stream_ctx.input.series_row_selector {
273 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
274 None => reader,
275 };
276
277 Ok(reader)
278 }
279
280 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
283 pub(crate) async fn build_flat_reader_from_sources(
284 stream_ctx: &StreamContext,
285 mut sources: Vec<BoxedRecordBatchStream>,
286 semaphore: Option<Arc<Semaphore>>,
287 part_metrics: Option<&PartitionMetrics>,
288 ) -> Result<BoxedRecordBatchStream> {
289 if let Some(semaphore) = semaphore.as_ref() {
290 if sources.len() > 1 {
292 sources = stream_ctx
293 .input
294 .create_parallel_flat_sources(sources, semaphore.clone())?;
295 }
296 }
297
298 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
299 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
300
301 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
302 let reader =
303 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
304 .await?;
305
306 let dedup = !stream_ctx.input.append_mode;
307 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
308 let reader = if dedup {
309 match stream_ctx.input.merge_mode {
310 MergeMode::LastRow => Box::pin(
311 FlatDedupReader::new(
312 reader.into_stream().boxed(),
313 FlatLastRow::new(stream_ctx.input.filter_deleted),
314 dedup_metrics_reporter,
315 )
316 .into_stream(),
317 ) as _,
318 MergeMode::LastNonNull => Box::pin(
319 FlatDedupReader::new(
320 reader.into_stream().boxed(),
321 FlatLastNonNull::new(
322 mapper.field_column_start(),
323 stream_ctx.input.filter_deleted,
324 ),
325 dedup_metrics_reporter,
326 )
327 .into_stream(),
328 ) as _,
329 }
330 } else {
331 Box::pin(reader.into_stream()) as _
332 };
333
334 Ok(reader)
335 }
336
337 fn scan_partition_impl(
340 &self,
341 ctx: &QueryScanContext,
342 metrics_set: &ExecutionPlanMetricsSet,
343 partition: usize,
344 ) -> Result<SendableRecordBatchStream> {
345 if ctx.explain_verbose {
346 common_telemetry::info!(
347 "SeqScan partition {}, region_id: {}",
348 partition,
349 self.stream_ctx.input.region_metadata().region_id
350 );
351 }
352
353 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
354 let input = &self.stream_ctx.input;
355
356 let batch_stream = if input.flat_format {
357 self.scan_flat_batch_in_partition(partition, metrics.clone())?
359 } else {
360 self.scan_batch_in_partition(partition, metrics.clone())?
362 };
363 let record_batch_stream = ConvertBatchStream::new(
364 batch_stream,
365 input.mapper.clone(),
366 input.cache_strategy.clone(),
367 metrics,
368 );
369
370 Ok(Box::pin(RecordBatchStreamWrapper::new(
371 input.mapper.output_schema(),
372 Box::pin(record_batch_stream),
373 )))
374 }
375
376 fn scan_batch_in_partition(
377 &self,
378 partition: usize,
379 part_metrics: PartitionMetrics,
380 ) -> Result<ScanBatchStream> {
381 ensure!(
382 partition < self.properties.partitions.len(),
383 PartitionOutOfRangeSnafu {
384 given: partition,
385 all: self.properties.partitions.len(),
386 }
387 );
388
389 if self.properties.partitions[partition].is_empty() {
390 return Ok(Box::pin(futures::stream::empty()));
391 }
392
393 let stream_ctx = self.stream_ctx.clone();
394 let semaphore = self.new_semaphore();
395 let partition_ranges = self.properties.partitions[partition].clone();
396 let compaction = self.stream_ctx.input.compaction;
397 let distinguish_range = self.properties.distinguish_partition_range;
398 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
399
400 let stream = try_stream! {
401 part_metrics.on_first_poll();
402
403 let range_builder_list = Arc::new(RangeBuilderList::new(
404 stream_ctx.input.num_memtables(),
405 stream_ctx.input.num_files(),
406 ));
407 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
408 reason: "Unexpected format",
409 })?;
410 for part_range in partition_ranges {
412 let mut sources = Vec::new();
413 build_sources(
414 &stream_ctx,
415 &part_range,
416 compaction,
417 &part_metrics,
418 range_builder_list.clone(),
419 &mut sources,
420 file_scan_semaphore.clone(),
421 ).await?;
422
423 let mut metrics = ScannerMetrics::default();
424 let mut fetch_start = Instant::now();
425 let mut reader =
426 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
427 .await?;
428 #[cfg(debug_assertions)]
429 let mut checker = crate::read::BatchChecker::default()
430 .with_start(Some(part_range.start))
431 .with_end(Some(part_range.end));
432
433 while let Some(batch) = reader.next_batch().await? {
434 metrics.scan_cost += fetch_start.elapsed();
435 metrics.num_batches += 1;
436 metrics.num_rows += batch.num_rows();
437
438 debug_assert!(!batch.is_empty());
439 if batch.is_empty() {
440 continue;
441 }
442
443 #[cfg(debug_assertions)]
444 checker.ensure_part_range_batch(
445 "SeqScan",
446 _mapper.metadata().region_id,
447 partition,
448 part_range,
449 &batch,
450 );
451
452 let yield_start = Instant::now();
453 yield ScanBatch::Normal(batch);
454 metrics.yield_cost += yield_start.elapsed();
455
456 fetch_start = Instant::now();
457 }
458
459 if distinguish_range {
462 let yield_start = Instant::now();
463 yield ScanBatch::Normal(Batch::empty());
464 metrics.yield_cost += yield_start.elapsed();
465 }
466
467 metrics.scan_cost += fetch_start.elapsed();
468 part_metrics.merge_metrics(&metrics);
469 }
470
471 part_metrics.on_finish();
472 };
473 Ok(Box::pin(stream))
474 }
475
476 fn scan_flat_batch_in_partition(
477 &self,
478 partition: usize,
479 part_metrics: PartitionMetrics,
480 ) -> Result<ScanBatchStream> {
481 ensure!(
482 partition < self.properties.partitions.len(),
483 PartitionOutOfRangeSnafu {
484 given: partition,
485 all: self.properties.partitions.len(),
486 }
487 );
488
489 if self.properties.partitions[partition].is_empty() {
490 return Ok(Box::pin(futures::stream::empty()));
491 }
492
493 let stream_ctx = self.stream_ctx.clone();
494 let semaphore = self.new_semaphore();
495 let partition_ranges = self.properties.partitions[partition].clone();
496 let compaction = self.stream_ctx.input.compaction;
497 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
498
499 let stream = try_stream! {
500 part_metrics.on_first_poll();
501
502 let range_builder_list = Arc::new(RangeBuilderList::new(
503 stream_ctx.input.num_memtables(),
504 stream_ctx.input.num_files(),
505 ));
506 for part_range in partition_ranges {
508 let mut sources = Vec::new();
509 build_flat_sources(
510 &stream_ctx,
511 &part_range,
512 compaction,
513 &part_metrics,
514 range_builder_list.clone(),
515 &mut sources,
516 file_scan_semaphore.clone(),
517 ).await?;
518
519 let mut metrics = ScannerMetrics::default();
520 let mut fetch_start = Instant::now();
521 let mut reader =
522 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
523 .await?;
524
525 while let Some(record_batch) = reader.try_next().await? {
526 metrics.scan_cost += fetch_start.elapsed();
527 metrics.num_batches += 1;
528 metrics.num_rows += record_batch.num_rows();
529
530 debug_assert!(record_batch.num_rows() > 0);
531 if record_batch.num_rows() == 0 {
532 continue;
533 }
534
535 let yield_start = Instant::now();
536 yield ScanBatch::RecordBatch(record_batch);
537 metrics.yield_cost += yield_start.elapsed();
538
539 fetch_start = Instant::now();
540 }
541
542 metrics.scan_cost += fetch_start.elapsed();
543 part_metrics.merge_metrics(&metrics);
544 }
545
546 part_metrics.on_finish();
547 };
548 Ok(Box::pin(stream))
549 }
550
551 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
552 if self.properties.target_partitions() > self.properties.num_partitions() {
553 Some(Arc::new(Semaphore::new(
559 self.properties.target_partitions() - self.properties.num_partitions() + 1,
560 )))
561 } else {
562 None
563 }
564 }
565
566 fn new_partition_metrics(
569 &self,
570 explain_verbose: bool,
571 metrics_set: &ExecutionPlanMetricsSet,
572 partition: usize,
573 ) -> PartitionMetrics {
574 let metrics = PartitionMetrics::new(
575 self.stream_ctx.input.mapper.metadata().region_id,
576 partition,
577 get_scanner_type(self.stream_ctx.input.compaction),
578 self.stream_ctx.query_start,
579 explain_verbose,
580 metrics_set,
581 );
582
583 if !self.stream_ctx.input.compaction {
584 self.metrics_list.set(partition, metrics.clone());
585 }
586
587 metrics
588 }
589
590 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
592 partition_ranges
593 .iter()
594 .map(|part_range| {
595 let range_meta = &ranges[part_range.identifier];
596 range_meta.indices.len()
597 })
598 .max()
599 .unwrap_or(0)
600 }
601
602 pub(crate) fn check_scan_limit(&self) -> Result<()> {
604 let total_max_files: usize = self
606 .properties
607 .partitions
608 .iter()
609 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
610 .sum();
611
612 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
613 if total_max_files > max_concurrent_files {
614 return TooManyFilesToReadSnafu {
615 actual: total_max_files,
616 max: max_concurrent_files,
617 }
618 .fail();
619 }
620
621 Ok(())
622 }
623}
624
625impl RegionScanner for SeqScan {
626 fn name(&self) -> &str {
627 "SeqScan"
628 }
629
630 fn properties(&self) -> &ScannerProperties {
631 &self.properties
632 }
633
634 fn schema(&self) -> SchemaRef {
635 self.stream_ctx.input.mapper.output_schema()
636 }
637
638 fn metadata(&self) -> RegionMetadataRef {
639 self.stream_ctx.input.mapper.metadata().clone()
640 }
641
642 fn scan_partition(
643 &self,
644 ctx: &QueryScanContext,
645 metrics_set: &ExecutionPlanMetricsSet,
646 partition: usize,
647 ) -> Result<SendableRecordBatchStream, BoxedError> {
648 self.scan_partition_impl(ctx, metrics_set, partition)
649 .map_err(BoxedError::new)
650 }
651
652 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
653 self.properties.prepare(request);
654
655 self.check_scan_limit().map_err(BoxedError::new)?;
656
657 Ok(())
658 }
659
660 fn has_predicate_without_region(&self) -> bool {
661 let predicate = self
662 .stream_ctx
663 .input
664 .predicate_group()
665 .predicate_without_region();
666 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
667 }
668
669 fn set_logical_region(&mut self, logical_region: bool) {
670 self.properties.set_logical_region(logical_region);
671 }
672}
673
674impl DisplayAs for SeqScan {
675 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
676 write!(
677 f,
678 "SeqScan: region={}, ",
679 self.stream_ctx.input.mapper.metadata().region_id
680 )?;
681 match t {
682 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
684 self.stream_ctx.format_for_explain(false, f)
685 }
686 DisplayFormatType::Verbose => {
687 self.stream_ctx.format_for_explain(true, f)?;
688 self.metrics_list.format_verbose_metrics(f)
689 }
690 }
691 }
692}
693
694impl fmt::Debug for SeqScan {
695 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696 f.debug_struct("SeqScan")
697 .field("num_ranges", &self.stream_ctx.ranges.len())
698 .finish()
699 }
700}
701
702pub(crate) async fn build_sources(
704 stream_ctx: &Arc<StreamContext>,
705 part_range: &PartitionRange,
706 compaction: bool,
707 part_metrics: &PartitionMetrics,
708 range_builder_list: Arc<RangeBuilderList>,
709 sources: &mut Vec<Source>,
710 semaphore: Option<Arc<Semaphore>>,
711) -> Result<()> {
712 let range_meta = &stream_ctx.ranges[part_range.identifier];
714 #[cfg(debug_assertions)]
715 if compaction {
716 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
718 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
719 debug_assert_eq!(
721 -1, row_group_idx.row_group_index,
722 "Expect {} range scan all row groups, given: {}",
723 i, row_group_idx.row_group_index,
724 );
725 }
726 }
727
728 let read_type = if compaction {
729 "compaction"
730 } else {
731 "seq_scan_files"
732 };
733 let num_indices = range_meta.row_group_indices.len();
734 if num_indices == 0 {
735 return Ok(());
736 }
737
738 sources.reserve(num_indices);
739 let mut ordered_sources = Vec::with_capacity(num_indices);
740 ordered_sources.resize_with(num_indices, || None);
741 let mut file_scan_tasks = Vec::new();
742
743 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
744 if stream_ctx.is_mem_range_index(*index) {
745 let stream = scan_mem_ranges(
746 stream_ctx.clone(),
747 part_metrics.clone(),
748 *index,
749 range_meta.time_range,
750 );
751 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
752 } else if stream_ctx.is_file_range_index(*index) {
753 if let Some(semaphore_ref) = semaphore.as_ref() {
754 let stream_ctx = stream_ctx.clone();
756 let part_metrics = part_metrics.clone();
757 let range_builder_list = range_builder_list.clone();
758 let semaphore = Arc::clone(semaphore_ref);
759 let row_group_index = *index;
760 file_scan_tasks.push(async move {
761 let _permit = semaphore.acquire().await.unwrap();
762 let stream = scan_file_ranges(
763 stream_ctx,
764 part_metrics,
765 row_group_index,
766 read_type,
767 range_builder_list,
768 )
769 .await?;
770 Ok((position, Source::Stream(Box::pin(stream) as _)))
771 });
772 } else {
773 let stream = scan_file_ranges(
775 stream_ctx.clone(),
776 part_metrics.clone(),
777 *index,
778 read_type,
779 range_builder_list.clone(),
780 )
781 .await?;
782 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
783 }
784 } else {
785 let stream =
786 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
787 ordered_sources[position] = Some(Source::Stream(stream));
788 }
789 }
790
791 if !file_scan_tasks.is_empty() {
792 let results = futures::future::try_join_all(file_scan_tasks).await?;
793 for (position, source) in results {
794 ordered_sources[position] = Some(source);
795 }
796 }
797
798 for source in ordered_sources.into_iter().flatten() {
799 sources.push(source);
800 }
801 Ok(())
802}
803
804pub(crate) async fn build_flat_sources(
806 stream_ctx: &Arc<StreamContext>,
807 part_range: &PartitionRange,
808 compaction: bool,
809 part_metrics: &PartitionMetrics,
810 range_builder_list: Arc<RangeBuilderList>,
811 sources: &mut Vec<BoxedRecordBatchStream>,
812 semaphore: Option<Arc<Semaphore>>,
813) -> Result<()> {
814 let range_meta = &stream_ctx.ranges[part_range.identifier];
816 #[cfg(debug_assertions)]
817 if compaction {
818 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
820 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
821 debug_assert_eq!(
823 -1, row_group_idx.row_group_index,
824 "Expect {} range scan all row groups, given: {}",
825 i, row_group_idx.row_group_index,
826 );
827 }
828 }
829
830 let read_type = if compaction {
831 "compaction"
832 } else {
833 "seq_scan_files"
834 };
835 let num_indices = range_meta.row_group_indices.len();
836 if num_indices == 0 {
837 return Ok(());
838 }
839
840 let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
841 sources.reserve(num_indices);
842 let mut ordered_sources = Vec::with_capacity(num_indices);
843 ordered_sources.resize_with(num_indices, || None);
844 let mut file_scan_tasks = Vec::new();
845
846 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
847 if stream_ctx.is_mem_range_index(*index) {
848 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
849 ordered_sources[position] = Some(Box::pin(stream) as _);
850 } else if stream_ctx.is_file_range_index(*index) {
851 if let Some(semaphore_ref) = semaphore.as_ref() {
852 let stream_ctx = stream_ctx.clone();
854 let part_metrics = part_metrics.clone();
855 let range_builder_list = range_builder_list.clone();
856 let semaphore = Arc::clone(semaphore_ref);
857 let row_group_index = *index;
858 file_scan_tasks.push(async move {
859 let _permit = semaphore.acquire().await.unwrap();
860 let stream = scan_flat_file_ranges(
861 stream_ctx,
862 part_metrics,
863 row_group_index,
864 read_type,
865 range_builder_list,
866 )
867 .await?;
868 Ok((position, Box::pin(stream) as _))
869 });
870 } else {
871 let stream = scan_flat_file_ranges(
873 stream_ctx.clone(),
874 part_metrics.clone(),
875 *index,
876 read_type,
877 range_builder_list.clone(),
878 )
879 .await?;
880 ordered_sources[position] = Some(Box::pin(stream) as _);
881 }
882 } else {
883 let stream =
884 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
885 ordered_sources[position] = Some(stream);
886 }
887 }
888
889 if !file_scan_tasks.is_empty() {
890 let results = futures::future::try_join_all(file_scan_tasks).await?;
891 for (position, stream) in results {
892 ordered_sources[position] = Some(stream);
893 }
894 }
895
896 for stream in ordered_sources.into_iter().flatten() {
897 if should_split {
898 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
899 } else {
900 sources.push(stream);
901 }
902 }
903
904 if should_split {
905 common_telemetry::debug!(
906 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
907 stream_ctx.input.region_metadata().region_id,
908 sources.len(),
909 part_range,
910 );
911 }
912
913 Ok(())
914}
915
916#[cfg(test)]
917impl SeqScan {
918 pub(crate) fn input(&self) -> &ScanInput {
920 &self.stream_ctx.input
921 }
922}
923
924fn get_scanner_type(compaction: bool) -> &'static str {
926 if compaction {
927 "SeqScan(compaction)"
928 } else {
929 "SeqScan"
930 }
931}