1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
48 scan_flat_mem_ranges, scan_mem_ranges,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57pub struct SeqScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 compaction: bool,
68 metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74 pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
77 let mut properties = ScannerProperties::default()
78 .with_append_mode(input.append_mode)
79 .with_total_rows(input.total_rows());
80 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
81 properties.partitions = vec![stream_ctx.partition_ranges()];
82
83 Self {
84 properties,
85 stream_ctx,
86 compaction,
87 metrics_list: PartitionMetricsList::default(),
88 }
89 }
90
91 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
96 let metrics_set = ExecutionPlanMetricsSet::new();
97 let streams = (0..self.properties.partitions.len())
98 .map(|partition: usize| {
99 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
100 })
101 .collect::<Result<Vec<_>, _>>()?;
102
103 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
104 Ok(Box::pin(aggr_stream))
105 }
106
107 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
109 let metrics_set = ExecutionPlanMetricsSet::new();
110
111 let streams = (0..self.properties.partitions.len())
112 .map(|partition| {
113 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
114 self.scan_batch_in_partition(partition, metrics)
115 })
116 .collect::<Result<Vec<_>>>()?;
117
118 Ok(Box::pin(futures::stream::iter(streams).flatten()))
119 }
120
121 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
126 assert!(self.compaction);
127
128 let metrics_set = ExecutionPlanMetricsSet::new();
129 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
130 debug_assert_eq!(1, self.properties.partitions.len());
131 let partition_ranges = &self.properties.partitions[0];
132
133 let reader = Self::merge_all_ranges_for_compaction(
134 &self.stream_ctx,
135 partition_ranges,
136 &part_metrics,
137 )
138 .await?;
139 Ok(Box::new(reader))
140 }
141
142 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
147 assert!(self.compaction);
148
149 let metrics_set = ExecutionPlanMetricsSet::new();
150 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
151 debug_assert_eq!(1, self.properties.partitions.len());
152 let partition_ranges = &self.properties.partitions[0];
153
154 let reader = Self::merge_all_flat_ranges_for_compaction(
155 &self.stream_ctx,
156 partition_ranges,
157 &part_metrics,
158 )
159 .await?;
160 Ok(reader)
161 }
162
163 async fn merge_all_ranges_for_compaction(
166 stream_ctx: &Arc<StreamContext>,
167 partition_ranges: &[PartitionRange],
168 part_metrics: &PartitionMetrics,
169 ) -> Result<BoxedBatchReader> {
170 let mut sources = Vec::new();
171 let range_builder_list = Arc::new(RangeBuilderList::new(
172 stream_ctx.input.num_memtables(),
173 stream_ctx.input.num_files(),
174 ));
175 for part_range in partition_ranges {
176 build_sources(
177 stream_ctx,
178 part_range,
179 true,
180 part_metrics,
181 range_builder_list.clone(),
182 &mut sources,
183 )
184 .await?;
185 }
186
187 common_telemetry::debug!(
188 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
189 stream_ctx.input.mapper.metadata().region_id,
190 partition_ranges.len(),
191 sources.len()
192 );
193 Self::build_reader_from_sources(stream_ctx, sources, None).await
194 }
195
196 async fn merge_all_flat_ranges_for_compaction(
199 stream_ctx: &Arc<StreamContext>,
200 partition_ranges: &[PartitionRange],
201 part_metrics: &PartitionMetrics,
202 ) -> Result<BoxedRecordBatchStream> {
203 let mut sources = Vec::new();
204 let range_builder_list = Arc::new(RangeBuilderList::new(
205 stream_ctx.input.num_memtables(),
206 stream_ctx.input.num_files(),
207 ));
208 for part_range in partition_ranges {
209 build_flat_sources(
210 stream_ctx,
211 part_range,
212 true,
213 part_metrics,
214 range_builder_list.clone(),
215 &mut sources,
216 )
217 .await?;
218 }
219
220 common_telemetry::debug!(
221 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
222 stream_ctx.input.mapper.metadata().region_id,
223 partition_ranges.len(),
224 sources.len()
225 );
226 Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
227 }
228
229 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
232 pub(crate) async fn build_reader_from_sources(
233 stream_ctx: &StreamContext,
234 mut sources: Vec<Source>,
235 semaphore: Option<Arc<Semaphore>>,
236 ) -> Result<BoxedBatchReader> {
237 if let Some(semaphore) = semaphore.as_ref() {
238 if sources.len() > 1 {
240 sources = stream_ctx
241 .input
242 .create_parallel_sources(sources, semaphore.clone())?;
243 }
244 }
245
246 let mut builder = MergeReaderBuilder::from_sources(sources);
247 let reader = builder.build().await?;
248
249 let dedup = !stream_ctx.input.append_mode;
250 let reader = if dedup {
251 match stream_ctx.input.merge_mode {
252 MergeMode::LastRow => Box::new(DedupReader::new(
253 reader,
254 LastRow::new(stream_ctx.input.filter_deleted),
255 )) as _,
256 MergeMode::LastNonNull => Box::new(DedupReader::new(
257 reader,
258 LastNonNull::new(stream_ctx.input.filter_deleted),
259 )) as _,
260 }
261 } else {
262 Box::new(reader) as _
263 };
264
265 let reader = match &stream_ctx.input.series_row_selector {
266 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
267 None => reader,
268 };
269
270 Ok(reader)
271 }
272
273 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
276 pub(crate) async fn build_flat_reader_from_sources(
277 stream_ctx: &StreamContext,
278 mut sources: Vec<BoxedRecordBatchStream>,
279 semaphore: Option<Arc<Semaphore>>,
280 ) -> Result<BoxedRecordBatchStream> {
281 if let Some(semaphore) = semaphore.as_ref() {
282 if sources.len() > 1 {
284 sources = stream_ctx
285 .input
286 .create_parallel_flat_sources(sources, semaphore.clone())?;
287 }
288 }
289
290 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
291 let schema = mapper.input_arrow_schema();
292
293 let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
294
295 let dedup = !stream_ctx.input.append_mode;
296 let reader = if dedup {
297 match stream_ctx.input.merge_mode {
298 MergeMode::LastRow => Box::pin(
299 FlatDedupReader::new(
300 reader.into_stream().boxed(),
301 FlatLastRow::new(stream_ctx.input.filter_deleted),
302 )
303 .into_stream(),
304 ) as _,
305 MergeMode::LastNonNull => Box::pin(
306 FlatDedupReader::new(
307 reader.into_stream().boxed(),
308 FlatLastNonNull::new(
309 mapper.field_column_start(),
310 stream_ctx.input.filter_deleted,
311 ),
312 )
313 .into_stream(),
314 ) as _,
315 }
316 } else {
317 Box::pin(reader.into_stream()) as _
318 };
319
320 Ok(reader)
321 }
322
323 fn scan_partition_impl(
326 &self,
327 ctx: &QueryScanContext,
328 metrics_set: &ExecutionPlanMetricsSet,
329 partition: usize,
330 ) -> Result<SendableRecordBatchStream> {
331 if ctx.explain_verbose {
332 common_telemetry::info!(
333 "SeqScan partition {}, region_id: {}",
334 partition,
335 self.stream_ctx.input.region_metadata().region_id
336 );
337 }
338
339 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
340 let input = &self.stream_ctx.input;
341
342 let batch_stream = if input.flat_format {
343 self.scan_flat_batch_in_partition(partition, metrics.clone())?
345 } else {
346 self.scan_batch_in_partition(partition, metrics.clone())?
348 };
349 let record_batch_stream = ConvertBatchStream::new(
350 batch_stream,
351 input.mapper.clone(),
352 input.cache_strategy.clone(),
353 metrics,
354 );
355
356 Ok(Box::pin(RecordBatchStreamWrapper::new(
357 input.mapper.output_schema(),
358 Box::pin(record_batch_stream),
359 )))
360 }
361
362 fn scan_batch_in_partition(
363 &self,
364 partition: usize,
365 part_metrics: PartitionMetrics,
366 ) -> Result<ScanBatchStream> {
367 ensure!(
368 partition < self.properties.partitions.len(),
369 PartitionOutOfRangeSnafu {
370 given: partition,
371 all: self.properties.partitions.len(),
372 }
373 );
374
375 if self.properties.partitions[partition].is_empty() {
376 return Ok(Box::pin(futures::stream::empty()));
377 }
378
379 let stream_ctx = self.stream_ctx.clone();
380 let semaphore = self.new_semaphore();
381 let partition_ranges = self.properties.partitions[partition].clone();
382 let compaction = self.compaction;
383 let distinguish_range = self.properties.distinguish_partition_range;
384
385 let stream = try_stream! {
386 part_metrics.on_first_poll();
387
388 let range_builder_list = Arc::new(RangeBuilderList::new(
389 stream_ctx.input.num_memtables(),
390 stream_ctx.input.num_files(),
391 ));
392 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
393 reason: "Unexpected format",
394 })?;
395 for part_range in partition_ranges {
397 let mut sources = Vec::new();
398 build_sources(
399 &stream_ctx,
400 &part_range,
401 compaction,
402 &part_metrics,
403 range_builder_list.clone(),
404 &mut sources,
405 ).await?;
406
407 let mut metrics = ScannerMetrics::default();
408 let mut fetch_start = Instant::now();
409 let mut reader =
410 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
411 .await?;
412 #[cfg(debug_assertions)]
413 let mut checker = crate::read::BatchChecker::default()
414 .with_start(Some(part_range.start))
415 .with_end(Some(part_range.end));
416
417 while let Some(batch) = reader.next_batch().await? {
418 metrics.scan_cost += fetch_start.elapsed();
419 metrics.num_batches += 1;
420 metrics.num_rows += batch.num_rows();
421
422 debug_assert!(!batch.is_empty());
423 if batch.is_empty() {
424 continue;
425 }
426
427 #[cfg(debug_assertions)]
428 checker.ensure_part_range_batch(
429 "SeqScan",
430 _mapper.metadata().region_id,
431 partition,
432 part_range,
433 &batch,
434 );
435
436 let yield_start = Instant::now();
437 yield ScanBatch::Normal(batch);
438 metrics.yield_cost += yield_start.elapsed();
439
440 fetch_start = Instant::now();
441 }
442
443 if distinguish_range {
446 let yield_start = Instant::now();
447 yield ScanBatch::Normal(Batch::empty());
448 metrics.yield_cost += yield_start.elapsed();
449 }
450
451 metrics.scan_cost += fetch_start.elapsed();
452 part_metrics.merge_metrics(&metrics);
453 }
454
455 part_metrics.on_finish();
456 };
457 Ok(Box::pin(stream))
458 }
459
460 fn scan_flat_batch_in_partition(
461 &self,
462 partition: usize,
463 part_metrics: PartitionMetrics,
464 ) -> Result<ScanBatchStream> {
465 ensure!(
466 partition < self.properties.partitions.len(),
467 PartitionOutOfRangeSnafu {
468 given: partition,
469 all: self.properties.partitions.len(),
470 }
471 );
472
473 if self.properties.partitions[partition].is_empty() {
474 return Ok(Box::pin(futures::stream::empty()));
475 }
476
477 let stream_ctx = self.stream_ctx.clone();
478 let semaphore = self.new_semaphore();
479 let partition_ranges = self.properties.partitions[partition].clone();
480 let compaction = self.compaction;
481
482 let stream = try_stream! {
483 part_metrics.on_first_poll();
484
485 let range_builder_list = Arc::new(RangeBuilderList::new(
486 stream_ctx.input.num_memtables(),
487 stream_ctx.input.num_files(),
488 ));
489 for part_range in partition_ranges {
491 let mut sources = Vec::new();
492 build_flat_sources(
493 &stream_ctx,
494 &part_range,
495 compaction,
496 &part_metrics,
497 range_builder_list.clone(),
498 &mut sources,
499 ).await?;
500
501 let mut metrics = ScannerMetrics::default();
502 let mut fetch_start = Instant::now();
503 let mut reader =
504 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
505 .await?;
506
507 while let Some(record_batch) = reader.try_next().await? {
508 metrics.scan_cost += fetch_start.elapsed();
509 metrics.num_batches += 1;
510 metrics.num_rows += record_batch.num_rows();
511
512 debug_assert!(record_batch.num_rows() > 0);
513 if record_batch.num_rows() == 0 {
514 continue;
515 }
516
517 let yield_start = Instant::now();
518 yield ScanBatch::RecordBatch(record_batch);
519 metrics.yield_cost += yield_start.elapsed();
520
521 fetch_start = Instant::now();
522 }
523
524 metrics.scan_cost += fetch_start.elapsed();
525 part_metrics.merge_metrics(&metrics);
526 }
527
528 part_metrics.on_finish();
529 };
530 Ok(Box::pin(stream))
531 }
532
533 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
534 if self.properties.target_partitions() > self.properties.num_partitions() {
535 Some(Arc::new(Semaphore::new(
541 self.properties.target_partitions() - self.properties.num_partitions() + 1,
542 )))
543 } else {
544 None
545 }
546 }
547
548 fn new_partition_metrics(
551 &self,
552 explain_verbose: bool,
553 metrics_set: &ExecutionPlanMetricsSet,
554 partition: usize,
555 ) -> PartitionMetrics {
556 let metrics = PartitionMetrics::new(
557 self.stream_ctx.input.mapper.metadata().region_id,
558 partition,
559 get_scanner_type(self.compaction),
560 self.stream_ctx.query_start,
561 explain_verbose,
562 metrics_set,
563 );
564
565 if !self.compaction {
566 self.metrics_list.set(partition, metrics.clone());
567 }
568
569 metrics
570 }
571
572 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
574 partition_ranges
575 .iter()
576 .map(|part_range| {
577 let range_meta = &ranges[part_range.identifier];
578 range_meta.indices.len()
579 })
580 .max()
581 .unwrap_or(0)
582 }
583
584 pub(crate) fn check_scan_limit(&self) -> Result<()> {
586 let total_max_files: usize = self
588 .properties
589 .partitions
590 .iter()
591 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
592 .sum();
593
594 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
595 if total_max_files > max_concurrent_files {
596 return TooManyFilesToReadSnafu {
597 actual: total_max_files,
598 max: max_concurrent_files,
599 }
600 .fail();
601 }
602
603 Ok(())
604 }
605}
606
607impl RegionScanner for SeqScan {
608 fn properties(&self) -> &ScannerProperties {
609 &self.properties
610 }
611
612 fn schema(&self) -> SchemaRef {
613 self.stream_ctx.input.mapper.output_schema()
614 }
615
616 fn metadata(&self) -> RegionMetadataRef {
617 self.stream_ctx.input.mapper.metadata().clone()
618 }
619
620 fn scan_partition(
621 &self,
622 ctx: &QueryScanContext,
623 metrics_set: &ExecutionPlanMetricsSet,
624 partition: usize,
625 ) -> Result<SendableRecordBatchStream, BoxedError> {
626 self.scan_partition_impl(ctx, metrics_set, partition)
627 .map_err(BoxedError::new)
628 }
629
630 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
631 self.properties.prepare(request);
632
633 self.check_scan_limit().map_err(BoxedError::new)?;
634
635 Ok(())
636 }
637
638 fn has_predicate(&self) -> bool {
639 let predicate = self.stream_ctx.input.predicate();
640 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
641 }
642
643 fn set_logical_region(&mut self, logical_region: bool) {
644 self.properties.set_logical_region(logical_region);
645 }
646}
647
648impl DisplayAs for SeqScan {
649 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
650 write!(
651 f,
652 "SeqScan: region={}, ",
653 self.stream_ctx.input.mapper.metadata().region_id
654 )?;
655 match t {
656 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
658 self.stream_ctx.format_for_explain(false, f)
659 }
660 DisplayFormatType::Verbose => {
661 self.stream_ctx.format_for_explain(true, f)?;
662 self.metrics_list.format_verbose_metrics(f)
663 }
664 }
665 }
666}
667
668impl fmt::Debug for SeqScan {
669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670 f.debug_struct("SeqScan")
671 .field("num_ranges", &self.stream_ctx.ranges.len())
672 .finish()
673 }
674}
675
676pub(crate) async fn build_sources(
678 stream_ctx: &Arc<StreamContext>,
679 part_range: &PartitionRange,
680 compaction: bool,
681 part_metrics: &PartitionMetrics,
682 range_builder_list: Arc<RangeBuilderList>,
683 sources: &mut Vec<Source>,
684) -> Result<()> {
685 let range_meta = &stream_ctx.ranges[part_range.identifier];
687 #[cfg(debug_assertions)]
688 if compaction {
689 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
691 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
692 debug_assert_eq!(
694 -1, row_group_idx.row_group_index,
695 "Expect {} range scan all row groups, given: {}",
696 i, row_group_idx.row_group_index,
697 );
698 }
699 }
700
701 sources.reserve(range_meta.row_group_indices.len());
702 for index in &range_meta.row_group_indices {
703 let stream = if stream_ctx.is_mem_range_index(*index) {
704 let stream = scan_mem_ranges(
705 stream_ctx.clone(),
706 part_metrics.clone(),
707 *index,
708 range_meta.time_range,
709 );
710 Box::pin(stream) as _
711 } else if stream_ctx.is_file_range_index(*index) {
712 let read_type = if compaction {
713 "compaction"
714 } else {
715 "seq_scan_files"
716 };
717 let stream = scan_file_ranges(
718 stream_ctx.clone(),
719 part_metrics.clone(),
720 *index,
721 read_type,
722 range_builder_list.clone(),
723 )
724 .await?;
725 Box::pin(stream) as _
726 } else {
727 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
728 };
729 sources.push(Source::Stream(stream));
730 }
731 Ok(())
732}
733
734pub(crate) async fn build_flat_sources(
736 stream_ctx: &Arc<StreamContext>,
737 part_range: &PartitionRange,
738 compaction: bool,
739 part_metrics: &PartitionMetrics,
740 range_builder_list: Arc<RangeBuilderList>,
741 sources: &mut Vec<BoxedRecordBatchStream>,
742) -> Result<()> {
743 let range_meta = &stream_ctx.ranges[part_range.identifier];
745 #[cfg(debug_assertions)]
746 if compaction {
747 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
749 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
750 debug_assert_eq!(
752 -1, row_group_idx.row_group_index,
753 "Expect {} range scan all row groups, given: {}",
754 i, row_group_idx.row_group_index,
755 );
756 }
757 }
758
759 sources.reserve(range_meta.row_group_indices.len());
760 for index in &range_meta.row_group_indices {
761 let stream = if stream_ctx.is_mem_range_index(*index) {
762 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
763 Box::pin(stream) as _
764 } else if stream_ctx.is_file_range_index(*index) {
765 let read_type = if compaction {
766 "compaction"
767 } else {
768 "seq_scan_files"
769 };
770 let stream = scan_flat_file_ranges(
771 stream_ctx.clone(),
772 part_metrics.clone(),
773 *index,
774 read_type,
775 range_builder_list.clone(),
776 )
777 .await?;
778 Box::pin(stream) as _
779 } else {
780 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
781 };
782 sources.push(stream);
783 }
784 Ok(())
785}
786
787#[cfg(test)]
788impl SeqScan {
789 pub(crate) fn input(&self) -> &ScanInput {
791 &self.stream_ctx.input
792 }
793}
794
795fn get_scanner_type(compaction: bool) -> &'static str {
797 if compaction {
798 "SeqScan(compaction)"
799 } else {
800 "SeqScan"
801 }
802}