1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{ensure, OptionExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
48 PartitionMetrics, PartitionMetricsList,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52 scan_util, Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57pub struct SeqScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 compaction: bool,
68 metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74 pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
77 let mut properties = ScannerProperties::default()
78 .with_append_mode(input.append_mode)
79 .with_total_rows(input.total_rows());
80 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
81 properties.partitions = vec![stream_ctx.partition_ranges()];
82
83 Self {
84 properties,
85 stream_ctx,
86 compaction,
87 metrics_list: PartitionMetricsList::default(),
88 }
89 }
90
91 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
96 let metrics_set = ExecutionPlanMetricsSet::new();
97 let streams = (0..self.properties.partitions.len())
98 .map(|partition: usize| {
99 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
100 })
101 .collect::<Result<Vec<_>, _>>()?;
102
103 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
104 Ok(Box::pin(aggr_stream))
105 }
106
107 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
109 let metrics_set = ExecutionPlanMetricsSet::new();
110
111 let streams = (0..self.properties.partitions.len())
112 .map(|partition| {
113 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
114 self.scan_batch_in_partition(partition, metrics)
115 })
116 .collect::<Result<Vec<_>>>()?;
117
118 Ok(Box::pin(futures::stream::iter(streams).flatten()))
119 }
120
121 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
126 assert!(self.compaction);
127
128 let metrics_set = ExecutionPlanMetricsSet::new();
129 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
130 debug_assert_eq!(1, self.properties.partitions.len());
131 let partition_ranges = &self.properties.partitions[0];
132
133 let reader = Self::merge_all_ranges_for_compaction(
134 &self.stream_ctx,
135 partition_ranges,
136 &part_metrics,
137 )
138 .await?;
139 Ok(Box::new(reader))
140 }
141
142 async fn merge_all_ranges_for_compaction(
145 stream_ctx: &Arc<StreamContext>,
146 partition_ranges: &[PartitionRange],
147 part_metrics: &PartitionMetrics,
148 ) -> Result<BoxedBatchReader> {
149 let mut sources = Vec::new();
150 let range_builder_list = Arc::new(RangeBuilderList::new(
151 stream_ctx.input.num_memtables(),
152 stream_ctx.input.num_files(),
153 ));
154 for part_range in partition_ranges {
155 build_sources(
156 stream_ctx,
157 part_range,
158 true,
159 part_metrics,
160 range_builder_list.clone(),
161 &mut sources,
162 )
163 .await?;
164 }
165
166 common_telemetry::debug!(
167 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
168 stream_ctx.input.mapper.metadata().region_id,
169 partition_ranges.len(),
170 sources.len()
171 );
172 Self::build_reader_from_sources(stream_ctx, sources, None).await
173 }
174
175 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
178 pub(crate) async fn build_reader_from_sources(
179 stream_ctx: &StreamContext,
180 mut sources: Vec<Source>,
181 semaphore: Option<Arc<Semaphore>>,
182 ) -> Result<BoxedBatchReader> {
183 if let Some(semaphore) = semaphore.as_ref() {
184 if sources.len() > 1 {
186 sources = stream_ctx
187 .input
188 .create_parallel_sources(sources, semaphore.clone())?;
189 }
190 }
191
192 let mut builder = MergeReaderBuilder::from_sources(sources);
193 let reader = builder.build().await?;
194
195 let dedup = !stream_ctx.input.append_mode;
196 let reader = if dedup {
197 match stream_ctx.input.merge_mode {
198 MergeMode::LastRow => Box::new(DedupReader::new(
199 reader,
200 LastRow::new(stream_ctx.input.filter_deleted),
201 )) as _,
202 MergeMode::LastNonNull => Box::new(DedupReader::new(
203 reader,
204 LastNonNull::new(stream_ctx.input.filter_deleted),
205 )) as _,
206 }
207 } else {
208 Box::new(reader) as _
209 };
210
211 let reader = match &stream_ctx.input.series_row_selector {
212 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
213 None => reader,
214 };
215
216 Ok(reader)
217 }
218
219 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
222 pub(crate) async fn build_flat_reader_from_sources(
223 stream_ctx: &StreamContext,
224 mut sources: Vec<BoxedRecordBatchStream>,
225 semaphore: Option<Arc<Semaphore>>,
226 ) -> Result<BoxedRecordBatchStream> {
227 if let Some(semaphore) = semaphore.as_ref() {
228 if sources.len() > 1 {
230 sources = stream_ctx
231 .input
232 .create_parallel_flat_sources(sources, semaphore.clone())?;
233 }
234 }
235
236 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
237 let schema = mapper.input_arrow_schema();
238
239 let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
240
241 let dedup = !stream_ctx.input.append_mode;
242 let reader = if dedup {
243 match stream_ctx.input.merge_mode {
244 MergeMode::LastRow => Box::pin(
245 FlatDedupReader::new(
246 reader.into_stream().boxed(),
247 FlatLastRow::new(stream_ctx.input.filter_deleted),
248 )
249 .into_stream(),
250 ) as _,
251 MergeMode::LastNonNull => Box::pin(
252 FlatDedupReader::new(
253 reader.into_stream().boxed(),
254 FlatLastNonNull::new(
255 mapper.field_column_start(),
256 stream_ctx.input.filter_deleted,
257 ),
258 )
259 .into_stream(),
260 ) as _,
261 }
262 } else {
263 Box::pin(reader.into_stream()) as _
264 };
265
266 Ok(reader)
267 }
268
269 fn scan_partition_impl(
272 &self,
273 ctx: &QueryScanContext,
274 metrics_set: &ExecutionPlanMetricsSet,
275 partition: usize,
276 ) -> Result<SendableRecordBatchStream> {
277 if ctx.explain_verbose {
278 common_telemetry::info!(
279 "SeqScan partition {}, region_id: {}",
280 partition,
281 self.stream_ctx.input.region_metadata().region_id
282 );
283 }
284
285 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
286 let input = &self.stream_ctx.input;
287
288 let batch_stream = if input.flat_format {
289 self.scan_flat_batch_in_partition(partition, metrics.clone())?
291 } else {
292 self.scan_batch_in_partition(partition, metrics.clone())?
294 };
295 let record_batch_stream = ConvertBatchStream::new(
296 batch_stream,
297 input.mapper.clone(),
298 input.cache_strategy.clone(),
299 metrics,
300 );
301
302 Ok(Box::pin(RecordBatchStreamWrapper::new(
303 input.mapper.output_schema(),
304 Box::pin(record_batch_stream),
305 )))
306 }
307
308 fn scan_batch_in_partition(
309 &self,
310 partition: usize,
311 part_metrics: PartitionMetrics,
312 ) -> Result<ScanBatchStream> {
313 ensure!(
314 partition < self.properties.partitions.len(),
315 PartitionOutOfRangeSnafu {
316 given: partition,
317 all: self.properties.partitions.len(),
318 }
319 );
320
321 if self.properties.partitions[partition].is_empty() {
322 return Ok(Box::pin(futures::stream::empty()));
323 }
324
325 let stream_ctx = self.stream_ctx.clone();
326 let semaphore = self.new_semaphore();
327 let partition_ranges = self.properties.partitions[partition].clone();
328 let compaction = self.compaction;
329 let distinguish_range = self.properties.distinguish_partition_range;
330
331 let stream = try_stream! {
332 part_metrics.on_first_poll();
333
334 let range_builder_list = Arc::new(RangeBuilderList::new(
335 stream_ctx.input.num_memtables(),
336 stream_ctx.input.num_files(),
337 ));
338 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
339 reason: "Unexpected format",
340 })?;
341 for part_range in partition_ranges {
343 let mut sources = Vec::new();
344 build_sources(
345 &stream_ctx,
346 &part_range,
347 compaction,
348 &part_metrics,
349 range_builder_list.clone(),
350 &mut sources,
351 ).await?;
352
353 let mut metrics = ScannerMetrics::default();
354 let mut fetch_start = Instant::now();
355 let mut reader =
356 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
357 .await?;
358 #[cfg(debug_assertions)]
359 let mut checker = crate::read::BatchChecker::default()
360 .with_start(Some(part_range.start))
361 .with_end(Some(part_range.end));
362
363 while let Some(batch) = reader.next_batch().await? {
364 metrics.scan_cost += fetch_start.elapsed();
365 metrics.num_batches += 1;
366 metrics.num_rows += batch.num_rows();
367
368 debug_assert!(!batch.is_empty());
369 if batch.is_empty() {
370 continue;
371 }
372
373 #[cfg(debug_assertions)]
374 checker.ensure_part_range_batch(
375 "SeqScan",
376 _mapper.metadata().region_id,
377 partition,
378 part_range,
379 &batch,
380 );
381
382 let yield_start = Instant::now();
383 yield ScanBatch::Normal(batch);
384 metrics.yield_cost += yield_start.elapsed();
385
386 fetch_start = Instant::now();
387 }
388
389 if distinguish_range {
392 let yield_start = Instant::now();
393 yield ScanBatch::Normal(Batch::empty());
394 metrics.yield_cost += yield_start.elapsed();
395 }
396
397 metrics.scan_cost += fetch_start.elapsed();
398 part_metrics.merge_metrics(&metrics);
399 }
400
401 part_metrics.on_finish();
402 };
403 Ok(Box::pin(stream))
404 }
405
406 fn scan_flat_batch_in_partition(
407 &self,
408 partition: usize,
409 part_metrics: PartitionMetrics,
410 ) -> Result<ScanBatchStream> {
411 ensure!(
412 partition < self.properties.partitions.len(),
413 PartitionOutOfRangeSnafu {
414 given: partition,
415 all: self.properties.partitions.len(),
416 }
417 );
418
419 if self.properties.partitions[partition].is_empty() {
420 return Ok(Box::pin(futures::stream::empty()));
421 }
422
423 let stream_ctx = self.stream_ctx.clone();
424 let semaphore = self.new_semaphore();
425 let partition_ranges = self.properties.partitions[partition].clone();
426 let compaction = self.compaction;
427
428 let stream = try_stream! {
429 part_metrics.on_first_poll();
430
431 let range_builder_list = Arc::new(RangeBuilderList::new(
432 stream_ctx.input.num_memtables(),
433 stream_ctx.input.num_files(),
434 ));
435 for part_range in partition_ranges {
437 let mut sources = Vec::new();
438 build_flat_sources(
439 &stream_ctx,
440 &part_range,
441 compaction,
442 &part_metrics,
443 range_builder_list.clone(),
444 &mut sources,
445 ).await?;
446
447 let mut metrics = ScannerMetrics::default();
448 let mut fetch_start = Instant::now();
449 let mut reader =
450 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
451 .await?;
452
453 while let Some(record_batch) = reader.try_next().await? {
454 metrics.scan_cost += fetch_start.elapsed();
455 metrics.num_batches += 1;
456 metrics.num_rows += record_batch.num_rows();
457
458 debug_assert!(record_batch.num_rows() > 0);
459 if record_batch.num_rows() == 0 {
460 continue;
461 }
462
463 let yield_start = Instant::now();
464 yield ScanBatch::RecordBatch(record_batch);
465 metrics.yield_cost += yield_start.elapsed();
466
467 fetch_start = Instant::now();
468 }
469
470 metrics.scan_cost += fetch_start.elapsed();
471 part_metrics.merge_metrics(&metrics);
472 }
473
474 part_metrics.on_finish();
475 };
476 Ok(Box::pin(stream))
477 }
478
479 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
480 if self.properties.target_partitions() > self.properties.num_partitions() {
481 Some(Arc::new(Semaphore::new(
487 self.properties.target_partitions() - self.properties.num_partitions() + 1,
488 )))
489 } else {
490 None
491 }
492 }
493
494 fn new_partition_metrics(
497 &self,
498 explain_verbose: bool,
499 metrics_set: &ExecutionPlanMetricsSet,
500 partition: usize,
501 ) -> PartitionMetrics {
502 let metrics = PartitionMetrics::new(
503 self.stream_ctx.input.mapper.metadata().region_id,
504 partition,
505 get_scanner_type(self.compaction),
506 self.stream_ctx.query_start,
507 explain_verbose,
508 metrics_set,
509 );
510
511 if !self.compaction {
512 self.metrics_list.set(partition, metrics.clone());
513 }
514
515 metrics
516 }
517
518 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
520 partition_ranges
521 .iter()
522 .map(|part_range| {
523 let range_meta = &ranges[part_range.identifier];
524 range_meta.indices.len()
525 })
526 .max()
527 .unwrap_or(0)
528 }
529
530 pub(crate) fn check_scan_limit(&self) -> Result<()> {
532 let total_max_files: usize = self
534 .properties
535 .partitions
536 .iter()
537 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
538 .sum();
539
540 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
541 if total_max_files > max_concurrent_files {
542 return TooManyFilesToReadSnafu {
543 actual: total_max_files,
544 max: max_concurrent_files,
545 }
546 .fail();
547 }
548
549 Ok(())
550 }
551}
552
553impl RegionScanner for SeqScan {
554 fn properties(&self) -> &ScannerProperties {
555 &self.properties
556 }
557
558 fn schema(&self) -> SchemaRef {
559 self.stream_ctx.input.mapper.output_schema()
560 }
561
562 fn metadata(&self) -> RegionMetadataRef {
563 self.stream_ctx.input.mapper.metadata().clone()
564 }
565
566 fn scan_partition(
567 &self,
568 ctx: &QueryScanContext,
569 metrics_set: &ExecutionPlanMetricsSet,
570 partition: usize,
571 ) -> Result<SendableRecordBatchStream, BoxedError> {
572 self.scan_partition_impl(ctx, metrics_set, partition)
573 .map_err(BoxedError::new)
574 }
575
576 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
577 self.properties.prepare(request);
578
579 self.check_scan_limit().map_err(BoxedError::new)?;
580
581 Ok(())
582 }
583
584 fn has_predicate(&self) -> bool {
585 let predicate = self.stream_ctx.input.predicate();
586 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
587 }
588
589 fn set_logical_region(&mut self, logical_region: bool) {
590 self.properties.set_logical_region(logical_region);
591 }
592}
593
594impl DisplayAs for SeqScan {
595 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
596 write!(
597 f,
598 "SeqScan: region={}, ",
599 self.stream_ctx.input.mapper.metadata().region_id
600 )?;
601 match t {
602 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
604 self.stream_ctx.format_for_explain(false, f)
605 }
606 DisplayFormatType::Verbose => {
607 self.stream_ctx.format_for_explain(true, f)?;
608 self.metrics_list.format_verbose_metrics(f)
609 }
610 }
611 }
612}
613
614impl fmt::Debug for SeqScan {
615 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616 f.debug_struct("SeqScan")
617 .field("num_ranges", &self.stream_ctx.ranges.len())
618 .finish()
619 }
620}
621
622pub(crate) async fn build_sources(
624 stream_ctx: &Arc<StreamContext>,
625 part_range: &PartitionRange,
626 compaction: bool,
627 part_metrics: &PartitionMetrics,
628 range_builder_list: Arc<RangeBuilderList>,
629 sources: &mut Vec<Source>,
630) -> Result<()> {
631 let range_meta = &stream_ctx.ranges[part_range.identifier];
633 #[cfg(debug_assertions)]
634 if compaction {
635 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
637 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
638 debug_assert_eq!(
640 -1, row_group_idx.row_group_index,
641 "Expect {} range scan all row groups, given: {}",
642 i, row_group_idx.row_group_index,
643 );
644 }
645 }
646
647 sources.reserve(range_meta.row_group_indices.len());
648 for index in &range_meta.row_group_indices {
649 let stream = if stream_ctx.is_mem_range_index(*index) {
650 let stream = scan_mem_ranges(
651 stream_ctx.clone(),
652 part_metrics.clone(),
653 *index,
654 range_meta.time_range,
655 );
656 Box::pin(stream) as _
657 } else if stream_ctx.is_file_range_index(*index) {
658 let read_type = if compaction {
659 "compaction"
660 } else {
661 "seq_scan_files"
662 };
663 let stream = scan_file_ranges(
664 stream_ctx.clone(),
665 part_metrics.clone(),
666 *index,
667 read_type,
668 range_builder_list.clone(),
669 )
670 .await?;
671 Box::pin(stream) as _
672 } else {
673 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
674 };
675 sources.push(Source::Stream(stream));
676 }
677 Ok(())
678}
679
680pub(crate) async fn build_flat_sources(
682 stream_ctx: &Arc<StreamContext>,
683 part_range: &PartitionRange,
684 compaction: bool,
685 part_metrics: &PartitionMetrics,
686 range_builder_list: Arc<RangeBuilderList>,
687 sources: &mut Vec<BoxedRecordBatchStream>,
688) -> Result<()> {
689 let range_meta = &stream_ctx.ranges[part_range.identifier];
691 #[cfg(debug_assertions)]
692 if compaction {
693 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
695 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
696 debug_assert_eq!(
698 -1, row_group_idx.row_group_index,
699 "Expect {} range scan all row groups, given: {}",
700 i, row_group_idx.row_group_index,
701 );
702 }
703 }
704
705 sources.reserve(range_meta.row_group_indices.len());
706 for index in &range_meta.row_group_indices {
707 let stream = if stream_ctx.is_mem_range_index(*index) {
708 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
709 Box::pin(stream) as _
710 } else if stream_ctx.is_file_range_index(*index) {
711 let read_type = if compaction {
712 "compaction"
713 } else {
714 "seq_scan_files"
715 };
716 let stream = scan_flat_file_ranges(
717 stream_ctx.clone(),
718 part_metrics.clone(),
719 *index,
720 read_type,
721 range_builder_list.clone(),
722 )
723 .await?;
724 Box::pin(stream) as _
725 } else {
726 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
727 };
728 sources.push(stream);
729 }
730 Ok(())
731}
732
733#[cfg(test)]
734impl SeqScan {
735 pub(crate) fn input(&self) -> &ScanInput {
737 &self.stream_ctx.input
738 }
739}
740
741fn get_scanner_type(compaction: bool) -> &'static str {
743 if compaction {
744 "SeqScan(compaction)"
745 } else {
746 "SeqScan"
747 }
748}