1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
48 scan_flat_mem_ranges, scan_mem_ranges,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57pub struct SeqScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 metrics_list: PartitionMetricsList,
69}
70
71impl SeqScan {
72 pub(crate) fn new(input: ScanInput) -> Self {
75 let mut properties = ScannerProperties::default()
76 .with_append_mode(input.append_mode)
77 .with_total_rows(input.total_rows());
78 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
79 properties.partitions = vec![stream_ctx.partition_ranges()];
80
81 Self {
82 properties,
83 stream_ctx,
84 metrics_list: PartitionMetricsList::default(),
85 }
86 }
87
88 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
93 let metrics_set = ExecutionPlanMetricsSet::new();
94 let streams = (0..self.properties.partitions.len())
95 .map(|partition: usize| {
96 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
97 })
98 .collect::<Result<Vec<_>, _>>()?;
99
100 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
101 Ok(Box::pin(aggr_stream))
102 }
103
104 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
106 let metrics_set = ExecutionPlanMetricsSet::new();
107
108 let streams = (0..self.properties.partitions.len())
109 .map(|partition| {
110 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
111 self.scan_batch_in_partition(partition, metrics)
112 })
113 .collect::<Result<Vec<_>>>()?;
114
115 Ok(Box::pin(futures::stream::iter(streams).flatten()))
116 }
117
118 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
123 assert!(self.stream_ctx.input.compaction);
124
125 let metrics_set = ExecutionPlanMetricsSet::new();
126 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
127 debug_assert_eq!(1, self.properties.partitions.len());
128 let partition_ranges = &self.properties.partitions[0];
129
130 let reader = Self::merge_all_ranges_for_compaction(
131 &self.stream_ctx,
132 partition_ranges,
133 &part_metrics,
134 )
135 .await?;
136 Ok(Box::new(reader))
137 }
138
139 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
144 assert!(self.stream_ctx.input.compaction);
145
146 let metrics_set = ExecutionPlanMetricsSet::new();
147 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
148 debug_assert_eq!(1, self.properties.partitions.len());
149 let partition_ranges = &self.properties.partitions[0];
150
151 let reader = Self::merge_all_flat_ranges_for_compaction(
152 &self.stream_ctx,
153 partition_ranges,
154 &part_metrics,
155 )
156 .await?;
157 Ok(reader)
158 }
159
160 async fn merge_all_ranges_for_compaction(
163 stream_ctx: &Arc<StreamContext>,
164 partition_ranges: &[PartitionRange],
165 part_metrics: &PartitionMetrics,
166 ) -> Result<BoxedBatchReader> {
167 let mut sources = Vec::new();
168 let range_builder_list = Arc::new(RangeBuilderList::new(
169 stream_ctx.input.num_memtables(),
170 stream_ctx.input.num_files(),
171 ));
172 for part_range in partition_ranges {
173 build_sources(
174 stream_ctx,
175 part_range,
176 true,
177 part_metrics,
178 range_builder_list.clone(),
179 &mut sources,
180 )
181 .await?;
182 }
183
184 common_telemetry::debug!(
185 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
186 stream_ctx.input.mapper.metadata().region_id,
187 partition_ranges.len(),
188 sources.len()
189 );
190 Self::build_reader_from_sources(stream_ctx, sources, None).await
191 }
192
193 async fn merge_all_flat_ranges_for_compaction(
196 stream_ctx: &Arc<StreamContext>,
197 partition_ranges: &[PartitionRange],
198 part_metrics: &PartitionMetrics,
199 ) -> Result<BoxedRecordBatchStream> {
200 let mut sources = Vec::new();
201 let range_builder_list = Arc::new(RangeBuilderList::new(
202 stream_ctx.input.num_memtables(),
203 stream_ctx.input.num_files(),
204 ));
205 for part_range in partition_ranges {
206 build_flat_sources(
207 stream_ctx,
208 part_range,
209 true,
210 part_metrics,
211 range_builder_list.clone(),
212 &mut sources,
213 )
214 .await?;
215 }
216
217 common_telemetry::debug!(
218 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
219 stream_ctx.input.mapper.metadata().region_id,
220 partition_ranges.len(),
221 sources.len()
222 );
223 Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
224 }
225
226 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
229 pub(crate) async fn build_reader_from_sources(
230 stream_ctx: &StreamContext,
231 mut sources: Vec<Source>,
232 semaphore: Option<Arc<Semaphore>>,
233 ) -> Result<BoxedBatchReader> {
234 if let Some(semaphore) = semaphore.as_ref() {
235 if sources.len() > 1 {
237 sources = stream_ctx
238 .input
239 .create_parallel_sources(sources, semaphore.clone())?;
240 }
241 }
242
243 let mut builder = MergeReaderBuilder::from_sources(sources);
244 let reader = builder.build().await?;
245
246 let dedup = !stream_ctx.input.append_mode;
247 let reader = if dedup {
248 match stream_ctx.input.merge_mode {
249 MergeMode::LastRow => Box::new(DedupReader::new(
250 reader,
251 LastRow::new(stream_ctx.input.filter_deleted),
252 )) as _,
253 MergeMode::LastNonNull => Box::new(DedupReader::new(
254 reader,
255 LastNonNull::new(stream_ctx.input.filter_deleted),
256 )) as _,
257 }
258 } else {
259 Box::new(reader) as _
260 };
261
262 let reader = match &stream_ctx.input.series_row_selector {
263 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
264 None => reader,
265 };
266
267 Ok(reader)
268 }
269
270 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
273 pub(crate) async fn build_flat_reader_from_sources(
274 stream_ctx: &StreamContext,
275 mut sources: Vec<BoxedRecordBatchStream>,
276 semaphore: Option<Arc<Semaphore>>,
277 ) -> Result<BoxedRecordBatchStream> {
278 if let Some(semaphore) = semaphore.as_ref() {
279 if sources.len() > 1 {
281 sources = stream_ctx
282 .input
283 .create_parallel_flat_sources(sources, semaphore.clone())?;
284 }
285 }
286
287 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
288 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
289
290 let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
291
292 let dedup = !stream_ctx.input.append_mode;
293 let reader = if dedup {
294 match stream_ctx.input.merge_mode {
295 MergeMode::LastRow => Box::pin(
296 FlatDedupReader::new(
297 reader.into_stream().boxed(),
298 FlatLastRow::new(stream_ctx.input.filter_deleted),
299 )
300 .into_stream(),
301 ) as _,
302 MergeMode::LastNonNull => Box::pin(
303 FlatDedupReader::new(
304 reader.into_stream().boxed(),
305 FlatLastNonNull::new(
306 mapper.field_column_start(),
307 stream_ctx.input.filter_deleted,
308 ),
309 )
310 .into_stream(),
311 ) as _,
312 }
313 } else {
314 Box::pin(reader.into_stream()) as _
315 };
316
317 Ok(reader)
318 }
319
320 fn scan_partition_impl(
323 &self,
324 ctx: &QueryScanContext,
325 metrics_set: &ExecutionPlanMetricsSet,
326 partition: usize,
327 ) -> Result<SendableRecordBatchStream> {
328 if ctx.explain_verbose {
329 common_telemetry::info!(
330 "SeqScan partition {}, region_id: {}",
331 partition,
332 self.stream_ctx.input.region_metadata().region_id
333 );
334 }
335
336 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
337 let input = &self.stream_ctx.input;
338
339 let batch_stream = if input.flat_format {
340 self.scan_flat_batch_in_partition(partition, metrics.clone())?
342 } else {
343 self.scan_batch_in_partition(partition, metrics.clone())?
345 };
346 let record_batch_stream = ConvertBatchStream::new(
347 batch_stream,
348 input.mapper.clone(),
349 input.cache_strategy.clone(),
350 metrics,
351 );
352
353 Ok(Box::pin(RecordBatchStreamWrapper::new(
354 input.mapper.output_schema(),
355 Box::pin(record_batch_stream),
356 )))
357 }
358
359 fn scan_batch_in_partition(
360 &self,
361 partition: usize,
362 part_metrics: PartitionMetrics,
363 ) -> Result<ScanBatchStream> {
364 ensure!(
365 partition < self.properties.partitions.len(),
366 PartitionOutOfRangeSnafu {
367 given: partition,
368 all: self.properties.partitions.len(),
369 }
370 );
371
372 if self.properties.partitions[partition].is_empty() {
373 return Ok(Box::pin(futures::stream::empty()));
374 }
375
376 let stream_ctx = self.stream_ctx.clone();
377 let semaphore = self.new_semaphore();
378 let partition_ranges = self.properties.partitions[partition].clone();
379 let compaction = self.stream_ctx.input.compaction;
380 let distinguish_range = self.properties.distinguish_partition_range;
381
382 let stream = try_stream! {
383 part_metrics.on_first_poll();
384
385 let range_builder_list = Arc::new(RangeBuilderList::new(
386 stream_ctx.input.num_memtables(),
387 stream_ctx.input.num_files(),
388 ));
389 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
390 reason: "Unexpected format",
391 })?;
392 for part_range in partition_ranges {
394 let mut sources = Vec::new();
395 build_sources(
396 &stream_ctx,
397 &part_range,
398 compaction,
399 &part_metrics,
400 range_builder_list.clone(),
401 &mut sources,
402 ).await?;
403
404 let mut metrics = ScannerMetrics::default();
405 let mut fetch_start = Instant::now();
406 let mut reader =
407 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
408 .await?;
409 #[cfg(debug_assertions)]
410 let mut checker = crate::read::BatchChecker::default()
411 .with_start(Some(part_range.start))
412 .with_end(Some(part_range.end));
413
414 while let Some(batch) = reader.next_batch().await? {
415 metrics.scan_cost += fetch_start.elapsed();
416 metrics.num_batches += 1;
417 metrics.num_rows += batch.num_rows();
418
419 debug_assert!(!batch.is_empty());
420 if batch.is_empty() {
421 continue;
422 }
423
424 #[cfg(debug_assertions)]
425 checker.ensure_part_range_batch(
426 "SeqScan",
427 _mapper.metadata().region_id,
428 partition,
429 part_range,
430 &batch,
431 );
432
433 let yield_start = Instant::now();
434 yield ScanBatch::Normal(batch);
435 metrics.yield_cost += yield_start.elapsed();
436
437 fetch_start = Instant::now();
438 }
439
440 if distinguish_range {
443 let yield_start = Instant::now();
444 yield ScanBatch::Normal(Batch::empty());
445 metrics.yield_cost += yield_start.elapsed();
446 }
447
448 metrics.scan_cost += fetch_start.elapsed();
449 part_metrics.merge_metrics(&metrics);
450 }
451
452 part_metrics.on_finish();
453 };
454 Ok(Box::pin(stream))
455 }
456
457 fn scan_flat_batch_in_partition(
458 &self,
459 partition: usize,
460 part_metrics: PartitionMetrics,
461 ) -> Result<ScanBatchStream> {
462 ensure!(
463 partition < self.properties.partitions.len(),
464 PartitionOutOfRangeSnafu {
465 given: partition,
466 all: self.properties.partitions.len(),
467 }
468 );
469
470 if self.properties.partitions[partition].is_empty() {
471 return Ok(Box::pin(futures::stream::empty()));
472 }
473
474 let stream_ctx = self.stream_ctx.clone();
475 let semaphore = self.new_semaphore();
476 let partition_ranges = self.properties.partitions[partition].clone();
477 let compaction = self.stream_ctx.input.compaction;
478
479 let stream = try_stream! {
480 part_metrics.on_first_poll();
481
482 let range_builder_list = Arc::new(RangeBuilderList::new(
483 stream_ctx.input.num_memtables(),
484 stream_ctx.input.num_files(),
485 ));
486 for part_range in partition_ranges {
488 let mut sources = Vec::new();
489 build_flat_sources(
490 &stream_ctx,
491 &part_range,
492 compaction,
493 &part_metrics,
494 range_builder_list.clone(),
495 &mut sources,
496 ).await?;
497
498 let mut metrics = ScannerMetrics::default();
499 let mut fetch_start = Instant::now();
500 let mut reader =
501 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
502 .await?;
503
504 while let Some(record_batch) = reader.try_next().await? {
505 metrics.scan_cost += fetch_start.elapsed();
506 metrics.num_batches += 1;
507 metrics.num_rows += record_batch.num_rows();
508
509 debug_assert!(record_batch.num_rows() > 0);
510 if record_batch.num_rows() == 0 {
511 continue;
512 }
513
514 let yield_start = Instant::now();
515 yield ScanBatch::RecordBatch(record_batch);
516 metrics.yield_cost += yield_start.elapsed();
517
518 fetch_start = Instant::now();
519 }
520
521 metrics.scan_cost += fetch_start.elapsed();
522 part_metrics.merge_metrics(&metrics);
523 }
524
525 part_metrics.on_finish();
526 };
527 Ok(Box::pin(stream))
528 }
529
530 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
531 if self.properties.target_partitions() > self.properties.num_partitions() {
532 Some(Arc::new(Semaphore::new(
538 self.properties.target_partitions() - self.properties.num_partitions() + 1,
539 )))
540 } else {
541 None
542 }
543 }
544
545 fn new_partition_metrics(
548 &self,
549 explain_verbose: bool,
550 metrics_set: &ExecutionPlanMetricsSet,
551 partition: usize,
552 ) -> PartitionMetrics {
553 let metrics = PartitionMetrics::new(
554 self.stream_ctx.input.mapper.metadata().region_id,
555 partition,
556 get_scanner_type(self.stream_ctx.input.compaction),
557 self.stream_ctx.query_start,
558 explain_verbose,
559 metrics_set,
560 );
561
562 if !self.stream_ctx.input.compaction {
563 self.metrics_list.set(partition, metrics.clone());
564 }
565
566 metrics
567 }
568
569 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
571 partition_ranges
572 .iter()
573 .map(|part_range| {
574 let range_meta = &ranges[part_range.identifier];
575 range_meta.indices.len()
576 })
577 .max()
578 .unwrap_or(0)
579 }
580
581 pub(crate) fn check_scan_limit(&self) -> Result<()> {
583 let total_max_files: usize = self
585 .properties
586 .partitions
587 .iter()
588 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
589 .sum();
590
591 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
592 if total_max_files > max_concurrent_files {
593 return TooManyFilesToReadSnafu {
594 actual: total_max_files,
595 max: max_concurrent_files,
596 }
597 .fail();
598 }
599
600 Ok(())
601 }
602}
603
604impl RegionScanner for SeqScan {
605 fn properties(&self) -> &ScannerProperties {
606 &self.properties
607 }
608
609 fn schema(&self) -> SchemaRef {
610 self.stream_ctx.input.mapper.output_schema()
611 }
612
613 fn metadata(&self) -> RegionMetadataRef {
614 self.stream_ctx.input.mapper.metadata().clone()
615 }
616
617 fn scan_partition(
618 &self,
619 ctx: &QueryScanContext,
620 metrics_set: &ExecutionPlanMetricsSet,
621 partition: usize,
622 ) -> Result<SendableRecordBatchStream, BoxedError> {
623 self.scan_partition_impl(ctx, metrics_set, partition)
624 .map_err(BoxedError::new)
625 }
626
627 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
628 self.properties.prepare(request);
629
630 self.check_scan_limit().map_err(BoxedError::new)?;
631
632 Ok(())
633 }
634
635 fn has_predicate_without_region(&self) -> bool {
636 let predicate = self
637 .stream_ctx
638 .input
639 .predicate_group()
640 .predicate_without_region();
641 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
642 }
643
644 fn set_logical_region(&mut self, logical_region: bool) {
645 self.properties.set_logical_region(logical_region);
646 }
647}
648
649impl DisplayAs for SeqScan {
650 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
651 write!(
652 f,
653 "SeqScan: region={}, ",
654 self.stream_ctx.input.mapper.metadata().region_id
655 )?;
656 match t {
657 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
659 self.stream_ctx.format_for_explain(false, f)
660 }
661 DisplayFormatType::Verbose => {
662 self.stream_ctx.format_for_explain(true, f)?;
663 self.metrics_list.format_verbose_metrics(f)
664 }
665 }
666 }
667}
668
669impl fmt::Debug for SeqScan {
670 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
671 f.debug_struct("SeqScan")
672 .field("num_ranges", &self.stream_ctx.ranges.len())
673 .finish()
674 }
675}
676
677pub(crate) async fn build_sources(
679 stream_ctx: &Arc<StreamContext>,
680 part_range: &PartitionRange,
681 compaction: bool,
682 part_metrics: &PartitionMetrics,
683 range_builder_list: Arc<RangeBuilderList>,
684 sources: &mut Vec<Source>,
685) -> Result<()> {
686 let range_meta = &stream_ctx.ranges[part_range.identifier];
688 #[cfg(debug_assertions)]
689 if compaction {
690 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
692 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
693 debug_assert_eq!(
695 -1, row_group_idx.row_group_index,
696 "Expect {} range scan all row groups, given: {}",
697 i, row_group_idx.row_group_index,
698 );
699 }
700 }
701
702 sources.reserve(range_meta.row_group_indices.len());
703 for index in &range_meta.row_group_indices {
704 let stream = if stream_ctx.is_mem_range_index(*index) {
705 let stream = scan_mem_ranges(
706 stream_ctx.clone(),
707 part_metrics.clone(),
708 *index,
709 range_meta.time_range,
710 );
711 Box::pin(stream) as _
712 } else if stream_ctx.is_file_range_index(*index) {
713 let read_type = if compaction {
714 "compaction"
715 } else {
716 "seq_scan_files"
717 };
718 let stream = scan_file_ranges(
719 stream_ctx.clone(),
720 part_metrics.clone(),
721 *index,
722 read_type,
723 range_builder_list.clone(),
724 )
725 .await?;
726 Box::pin(stream) as _
727 } else {
728 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
729 };
730 sources.push(Source::Stream(stream));
731 }
732 Ok(())
733}
734
735pub(crate) async fn build_flat_sources(
737 stream_ctx: &Arc<StreamContext>,
738 part_range: &PartitionRange,
739 compaction: bool,
740 part_metrics: &PartitionMetrics,
741 range_builder_list: Arc<RangeBuilderList>,
742 sources: &mut Vec<BoxedRecordBatchStream>,
743) -> Result<()> {
744 let range_meta = &stream_ctx.ranges[part_range.identifier];
746 #[cfg(debug_assertions)]
747 if compaction {
748 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
750 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
751 debug_assert_eq!(
753 -1, row_group_idx.row_group_index,
754 "Expect {} range scan all row groups, given: {}",
755 i, row_group_idx.row_group_index,
756 );
757 }
758 }
759
760 sources.reserve(range_meta.row_group_indices.len());
761 for index in &range_meta.row_group_indices {
762 let stream = if stream_ctx.is_mem_range_index(*index) {
763 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
764 Box::pin(stream) as _
765 } else if stream_ctx.is_file_range_index(*index) {
766 let read_type = if compaction {
767 "compaction"
768 } else {
769 "seq_scan_files"
770 };
771 let stream = scan_flat_file_ranges(
772 stream_ctx.clone(),
773 part_metrics.clone(),
774 *index,
775 read_type,
776 range_builder_list.clone(),
777 )
778 .await?;
779 Box::pin(stream) as _
780 } else {
781 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
782 };
783 sources.push(stream);
784 }
785 Ok(())
786}
787
788#[cfg(test)]
789impl SeqScan {
790 pub(crate) fn input(&self) -> &ScanInput {
792 &self.stream_ctx.input
793 }
794}
795
796fn get_scanner_type(compaction: bool) -> &'static str {
798 if compaction {
799 "SeqScan(compaction)"
800 } else {
801 "SeqScan"
802 }
803}