1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
48 scan_flat_mem_ranges, scan_mem_ranges,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57pub struct SeqScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 metrics_list: PartitionMetricsList,
69}
70
71impl SeqScan {
72 pub(crate) fn new(input: ScanInput) -> Self {
75 let mut properties = ScannerProperties::default()
76 .with_append_mode(input.append_mode)
77 .with_total_rows(input.total_rows());
78 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
79 properties.partitions = vec![stream_ctx.partition_ranges()];
80
81 Self {
82 properties,
83 stream_ctx,
84 metrics_list: PartitionMetricsList::default(),
85 }
86 }
87
88 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
93 let metrics_set = ExecutionPlanMetricsSet::new();
94 let streams = (0..self.properties.partitions.len())
95 .map(|partition: usize| {
96 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
97 })
98 .collect::<Result<Vec<_>, _>>()?;
99
100 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
101 Ok(Box::pin(aggr_stream))
102 }
103
104 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
106 let metrics_set = ExecutionPlanMetricsSet::new();
107
108 let streams = (0..self.properties.partitions.len())
109 .map(|partition| {
110 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
111 self.scan_batch_in_partition(partition, metrics)
112 })
113 .collect::<Result<Vec<_>>>()?;
114
115 Ok(Box::pin(futures::stream::iter(streams).flatten()))
116 }
117
118 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
123 assert!(self.stream_ctx.input.compaction);
124
125 let metrics_set = ExecutionPlanMetricsSet::new();
126 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
127 debug_assert_eq!(1, self.properties.partitions.len());
128 let partition_ranges = &self.properties.partitions[0];
129
130 let reader = Self::merge_all_ranges_for_compaction(
131 &self.stream_ctx,
132 partition_ranges,
133 &part_metrics,
134 )
135 .await?;
136 Ok(Box::new(reader))
137 }
138
139 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
144 assert!(self.stream_ctx.input.compaction);
145
146 let metrics_set = ExecutionPlanMetricsSet::new();
147 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
148 debug_assert_eq!(1, self.properties.partitions.len());
149 let partition_ranges = &self.properties.partitions[0];
150
151 let reader = Self::merge_all_flat_ranges_for_compaction(
152 &self.stream_ctx,
153 partition_ranges,
154 &part_metrics,
155 )
156 .await?;
157 Ok(reader)
158 }
159
160 async fn merge_all_ranges_for_compaction(
163 stream_ctx: &Arc<StreamContext>,
164 partition_ranges: &[PartitionRange],
165 part_metrics: &PartitionMetrics,
166 ) -> Result<BoxedBatchReader> {
167 let mut sources = Vec::new();
168 let range_builder_list = Arc::new(RangeBuilderList::new(
169 stream_ctx.input.num_memtables(),
170 stream_ctx.input.num_files(),
171 ));
172 for part_range in partition_ranges {
173 build_sources(
174 stream_ctx,
175 part_range,
176 true,
177 part_metrics,
178 range_builder_list.clone(),
179 &mut sources,
180 )
181 .await?;
182 }
183
184 common_telemetry::debug!(
185 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
186 stream_ctx.input.mapper.metadata().region_id,
187 partition_ranges.len(),
188 sources.len()
189 );
190 Self::build_reader_from_sources(stream_ctx, sources, None).await
191 }
192
193 async fn merge_all_flat_ranges_for_compaction(
196 stream_ctx: &Arc<StreamContext>,
197 partition_ranges: &[PartitionRange],
198 part_metrics: &PartitionMetrics,
199 ) -> Result<BoxedRecordBatchStream> {
200 let mut sources = Vec::new();
201 let range_builder_list = Arc::new(RangeBuilderList::new(
202 stream_ctx.input.num_memtables(),
203 stream_ctx.input.num_files(),
204 ));
205 for part_range in partition_ranges {
206 build_flat_sources(
207 stream_ctx,
208 part_range,
209 true,
210 part_metrics,
211 range_builder_list.clone(),
212 &mut sources,
213 )
214 .await?;
215 }
216
217 common_telemetry::debug!(
218 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
219 stream_ctx.input.mapper.metadata().region_id,
220 partition_ranges.len(),
221 sources.len()
222 );
223 Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
224 }
225
226 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
229 pub(crate) async fn build_reader_from_sources(
230 stream_ctx: &StreamContext,
231 mut sources: Vec<Source>,
232 semaphore: Option<Arc<Semaphore>>,
233 ) -> Result<BoxedBatchReader> {
234 if let Some(semaphore) = semaphore.as_ref() {
235 if sources.len() > 1 {
237 sources = stream_ctx
238 .input
239 .create_parallel_sources(sources, semaphore.clone())?;
240 }
241 }
242
243 let mut builder = MergeReaderBuilder::from_sources(sources);
244 let reader = builder.build().await?;
245
246 let dedup = !stream_ctx.input.append_mode;
247 let reader = if dedup {
248 match stream_ctx.input.merge_mode {
249 MergeMode::LastRow => Box::new(DedupReader::new(
250 reader,
251 LastRow::new(stream_ctx.input.filter_deleted),
252 )) as _,
253 MergeMode::LastNonNull => Box::new(DedupReader::new(
254 reader,
255 LastNonNull::new(stream_ctx.input.filter_deleted),
256 )) as _,
257 }
258 } else {
259 Box::new(reader) as _
260 };
261
262 let reader = match &stream_ctx.input.series_row_selector {
263 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
264 None => reader,
265 };
266
267 Ok(reader)
268 }
269
270 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
273 pub(crate) async fn build_flat_reader_from_sources(
274 stream_ctx: &StreamContext,
275 mut sources: Vec<BoxedRecordBatchStream>,
276 semaphore: Option<Arc<Semaphore>>,
277 ) -> Result<BoxedRecordBatchStream> {
278 if let Some(semaphore) = semaphore.as_ref() {
279 if sources.len() > 1 {
281 sources = stream_ctx
282 .input
283 .create_parallel_flat_sources(sources, semaphore.clone())?;
284 }
285 }
286
287 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
288 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
289
290 let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
291
292 let dedup = !stream_ctx.input.append_mode;
293 let reader = if dedup {
294 match stream_ctx.input.merge_mode {
295 MergeMode::LastRow => Box::pin(
296 FlatDedupReader::new(
297 reader.into_stream().boxed(),
298 FlatLastRow::new(stream_ctx.input.filter_deleted),
299 )
300 .into_stream(),
301 ) as _,
302 MergeMode::LastNonNull => Box::pin(
303 FlatDedupReader::new(
304 reader.into_stream().boxed(),
305 FlatLastNonNull::new(
306 mapper.field_column_start(),
307 stream_ctx.input.filter_deleted,
308 ),
309 )
310 .into_stream(),
311 ) as _,
312 }
313 } else {
314 Box::pin(reader.into_stream()) as _
315 };
316
317 Ok(reader)
318 }
319
320 fn scan_partition_impl(
323 &self,
324 ctx: &QueryScanContext,
325 metrics_set: &ExecutionPlanMetricsSet,
326 partition: usize,
327 ) -> Result<SendableRecordBatchStream> {
328 if ctx.explain_verbose {
329 common_telemetry::info!(
330 "SeqScan partition {}, region_id: {}",
331 partition,
332 self.stream_ctx.input.region_metadata().region_id
333 );
334 }
335
336 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
337 let input = &self.stream_ctx.input;
338
339 let batch_stream = if input.flat_format {
340 self.scan_flat_batch_in_partition(partition, metrics.clone())?
342 } else {
343 self.scan_batch_in_partition(partition, metrics.clone())?
345 };
346 let record_batch_stream = ConvertBatchStream::new(
347 batch_stream,
348 input.mapper.clone(),
349 input.cache_strategy.clone(),
350 metrics,
351 );
352
353 Ok(Box::pin(RecordBatchStreamWrapper::new(
354 input.mapper.output_schema(),
355 Box::pin(record_batch_stream),
356 )))
357 }
358
359 fn scan_batch_in_partition(
360 &self,
361 partition: usize,
362 part_metrics: PartitionMetrics,
363 ) -> Result<ScanBatchStream> {
364 ensure!(
365 partition < self.properties.partitions.len(),
366 PartitionOutOfRangeSnafu {
367 given: partition,
368 all: self.properties.partitions.len(),
369 }
370 );
371
372 if self.properties.partitions[partition].is_empty() {
373 return Ok(Box::pin(futures::stream::empty()));
374 }
375
376 let stream_ctx = self.stream_ctx.clone();
377 let semaphore = self.new_semaphore();
378 let partition_ranges = self.properties.partitions[partition].clone();
379 let compaction = self.stream_ctx.input.compaction;
380 let distinguish_range = self.properties.distinguish_partition_range;
381
382 let stream = try_stream! {
383 part_metrics.on_first_poll();
384
385 let range_builder_list = Arc::new(RangeBuilderList::new(
386 stream_ctx.input.num_memtables(),
387 stream_ctx.input.num_files(),
388 ));
389 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
390 reason: "Unexpected format",
391 })?;
392 for part_range in partition_ranges {
394 let mut sources = Vec::new();
395 build_sources(
396 &stream_ctx,
397 &part_range,
398 compaction,
399 &part_metrics,
400 range_builder_list.clone(),
401 &mut sources,
402 ).await?;
403
404 let mut metrics = ScannerMetrics::default();
405 let mut fetch_start = Instant::now();
406 let mut reader =
407 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
408 .await?;
409 #[cfg(debug_assertions)]
410 let mut checker = crate::read::BatchChecker::default()
411 .with_start(Some(part_range.start))
412 .with_end(Some(part_range.end));
413
414 while let Some(batch) = reader.next_batch().await? {
415 metrics.scan_cost += fetch_start.elapsed();
416 metrics.num_batches += 1;
417 metrics.num_rows += batch.num_rows();
418
419 debug_assert!(!batch.is_empty());
420 if batch.is_empty() {
421 continue;
422 }
423
424 #[cfg(debug_assertions)]
425 checker.ensure_part_range_batch(
426 "SeqScan",
427 _mapper.metadata().region_id,
428 partition,
429 part_range,
430 &batch,
431 );
432
433 let yield_start = Instant::now();
434 yield ScanBatch::Normal(batch);
435 metrics.yield_cost += yield_start.elapsed();
436
437 fetch_start = Instant::now();
438 }
439
440 if distinguish_range {
443 let yield_start = Instant::now();
444 yield ScanBatch::Normal(Batch::empty());
445 metrics.yield_cost += yield_start.elapsed();
446 }
447
448 metrics.scan_cost += fetch_start.elapsed();
449 part_metrics.merge_metrics(&metrics);
450 }
451
452 part_metrics.on_finish();
453 };
454 Ok(Box::pin(stream))
455 }
456
457 fn scan_flat_batch_in_partition(
458 &self,
459 partition: usize,
460 part_metrics: PartitionMetrics,
461 ) -> Result<ScanBatchStream> {
462 ensure!(
463 partition < self.properties.partitions.len(),
464 PartitionOutOfRangeSnafu {
465 given: partition,
466 all: self.properties.partitions.len(),
467 }
468 );
469
470 if self.properties.partitions[partition].is_empty() {
471 return Ok(Box::pin(futures::stream::empty()));
472 }
473
474 let stream_ctx = self.stream_ctx.clone();
475 let semaphore = self.new_semaphore();
476 let partition_ranges = self.properties.partitions[partition].clone();
477 let compaction = self.stream_ctx.input.compaction;
478
479 let stream = try_stream! {
480 part_metrics.on_first_poll();
481
482 let range_builder_list = Arc::new(RangeBuilderList::new(
483 stream_ctx.input.num_memtables(),
484 stream_ctx.input.num_files(),
485 ));
486 for part_range in partition_ranges {
488 let mut sources = Vec::new();
489 build_flat_sources(
490 &stream_ctx,
491 &part_range,
492 compaction,
493 &part_metrics,
494 range_builder_list.clone(),
495 &mut sources,
496 ).await?;
497
498 let mut metrics = ScannerMetrics::default();
499 let mut fetch_start = Instant::now();
500 let mut reader =
501 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
502 .await?;
503
504 while let Some(record_batch) = reader.try_next().await? {
505 metrics.scan_cost += fetch_start.elapsed();
506 metrics.num_batches += 1;
507 metrics.num_rows += record_batch.num_rows();
508
509 debug_assert!(record_batch.num_rows() > 0);
510 if record_batch.num_rows() == 0 {
511 continue;
512 }
513
514 let yield_start = Instant::now();
515 yield ScanBatch::RecordBatch(record_batch);
516 metrics.yield_cost += yield_start.elapsed();
517
518 fetch_start = Instant::now();
519 }
520
521 metrics.scan_cost += fetch_start.elapsed();
522 part_metrics.merge_metrics(&metrics);
523 }
524
525 part_metrics.on_finish();
526 };
527 Ok(Box::pin(stream))
528 }
529
530 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
531 if self.properties.target_partitions() > self.properties.num_partitions() {
532 Some(Arc::new(Semaphore::new(
538 self.properties.target_partitions() - self.properties.num_partitions() + 1,
539 )))
540 } else {
541 None
542 }
543 }
544
545 fn new_partition_metrics(
548 &self,
549 explain_verbose: bool,
550 metrics_set: &ExecutionPlanMetricsSet,
551 partition: usize,
552 ) -> PartitionMetrics {
553 let metrics = PartitionMetrics::new(
554 self.stream_ctx.input.mapper.metadata().region_id,
555 partition,
556 get_scanner_type(self.stream_ctx.input.compaction),
557 self.stream_ctx.query_start,
558 explain_verbose,
559 metrics_set,
560 );
561
562 if !self.stream_ctx.input.compaction {
563 self.metrics_list.set(partition, metrics.clone());
564 }
565
566 metrics
567 }
568
569 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
571 partition_ranges
572 .iter()
573 .map(|part_range| {
574 let range_meta = &ranges[part_range.identifier];
575 range_meta.indices.len()
576 })
577 .max()
578 .unwrap_or(0)
579 }
580
581 pub(crate) fn check_scan_limit(&self) -> Result<()> {
583 let total_max_files: usize = self
585 .properties
586 .partitions
587 .iter()
588 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
589 .sum();
590
591 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
592 if total_max_files > max_concurrent_files {
593 return TooManyFilesToReadSnafu {
594 actual: total_max_files,
595 max: max_concurrent_files,
596 }
597 .fail();
598 }
599
600 Ok(())
601 }
602}
603
604impl RegionScanner for SeqScan {
605 fn properties(&self) -> &ScannerProperties {
606 &self.properties
607 }
608
609 fn schema(&self) -> SchemaRef {
610 self.stream_ctx.input.mapper.output_schema()
611 }
612
613 fn metadata(&self) -> RegionMetadataRef {
614 self.stream_ctx.input.mapper.metadata().clone()
615 }
616
617 fn scan_partition(
618 &self,
619 ctx: &QueryScanContext,
620 metrics_set: &ExecutionPlanMetricsSet,
621 partition: usize,
622 ) -> Result<SendableRecordBatchStream, BoxedError> {
623 self.scan_partition_impl(ctx, metrics_set, partition)
624 .map_err(BoxedError::new)
625 }
626
627 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
628 self.properties.prepare(request);
629
630 self.check_scan_limit().map_err(BoxedError::new)?;
631
632 Ok(())
633 }
634
635 fn has_predicate(&self) -> bool {
636 let predicate = self.stream_ctx.input.predicate();
637 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
638 }
639
640 fn set_logical_region(&mut self, logical_region: bool) {
641 self.properties.set_logical_region(logical_region);
642 }
643}
644
645impl DisplayAs for SeqScan {
646 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
647 write!(
648 f,
649 "SeqScan: region={}, ",
650 self.stream_ctx.input.mapper.metadata().region_id
651 )?;
652 match t {
653 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
655 self.stream_ctx.format_for_explain(false, f)
656 }
657 DisplayFormatType::Verbose => {
658 self.stream_ctx.format_for_explain(true, f)?;
659 self.metrics_list.format_verbose_metrics(f)
660 }
661 }
662 }
663}
664
665impl fmt::Debug for SeqScan {
666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 f.debug_struct("SeqScan")
668 .field("num_ranges", &self.stream_ctx.ranges.len())
669 .finish()
670 }
671}
672
673pub(crate) async fn build_sources(
675 stream_ctx: &Arc<StreamContext>,
676 part_range: &PartitionRange,
677 compaction: bool,
678 part_metrics: &PartitionMetrics,
679 range_builder_list: Arc<RangeBuilderList>,
680 sources: &mut Vec<Source>,
681) -> Result<()> {
682 let range_meta = &stream_ctx.ranges[part_range.identifier];
684 #[cfg(debug_assertions)]
685 if compaction {
686 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
688 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
689 debug_assert_eq!(
691 -1, row_group_idx.row_group_index,
692 "Expect {} range scan all row groups, given: {}",
693 i, row_group_idx.row_group_index,
694 );
695 }
696 }
697
698 sources.reserve(range_meta.row_group_indices.len());
699 for index in &range_meta.row_group_indices {
700 let stream = if stream_ctx.is_mem_range_index(*index) {
701 let stream = scan_mem_ranges(
702 stream_ctx.clone(),
703 part_metrics.clone(),
704 *index,
705 range_meta.time_range,
706 );
707 Box::pin(stream) as _
708 } else if stream_ctx.is_file_range_index(*index) {
709 let read_type = if compaction {
710 "compaction"
711 } else {
712 "seq_scan_files"
713 };
714 let stream = scan_file_ranges(
715 stream_ctx.clone(),
716 part_metrics.clone(),
717 *index,
718 read_type,
719 range_builder_list.clone(),
720 )
721 .await?;
722 Box::pin(stream) as _
723 } else {
724 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
725 };
726 sources.push(Source::Stream(stream));
727 }
728 Ok(())
729}
730
731pub(crate) async fn build_flat_sources(
733 stream_ctx: &Arc<StreamContext>,
734 part_range: &PartitionRange,
735 compaction: bool,
736 part_metrics: &PartitionMetrics,
737 range_builder_list: Arc<RangeBuilderList>,
738 sources: &mut Vec<BoxedRecordBatchStream>,
739) -> Result<()> {
740 let range_meta = &stream_ctx.ranges[part_range.identifier];
742 #[cfg(debug_assertions)]
743 if compaction {
744 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
746 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
747 debug_assert_eq!(
749 -1, row_group_idx.row_group_index,
750 "Expect {} range scan all row groups, given: {}",
751 i, row_group_idx.row_group_index,
752 );
753 }
754 }
755
756 sources.reserve(range_meta.row_group_indices.len());
757 for index in &range_meta.row_group_indices {
758 let stream = if stream_ctx.is_mem_range_index(*index) {
759 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
760 Box::pin(stream) as _
761 } else if stream_ctx.is_file_range_index(*index) {
762 let read_type = if compaction {
763 "compaction"
764 } else {
765 "seq_scan_files"
766 };
767 let stream = scan_flat_file_ranges(
768 stream_ctx.clone(),
769 part_metrics.clone(),
770 *index,
771 read_type,
772 range_builder_list.clone(),
773 )
774 .await?;
775 Box::pin(stream) as _
776 } else {
777 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
778 };
779 sources.push(stream);
780 }
781 Ok(())
782}
783
784#[cfg(test)]
785impl SeqScan {
786 pub(crate) fn input(&self) -> &ScanInput {
788 &self.stream_ctx.input
789 }
790}
791
792fn get_scanner_type(compaction: bool) -> &'static str {
794 if compaction {
795 "SeqScan(compaction)"
796 } else {
797 "SeqScan"
798 }
799}