1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
39use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
40use crate::read::flat_merge::FlatMergeReader;
41use crate::read::last_row::FlatLastRowReader;
42use crate::read::pruner::{PartitionPruner, Pruner};
43use crate::read::range::RangeMeta;
44use crate::read::range_cache::{
45 build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream,
46};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{
49 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
50 scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
51};
52use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
53use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57pub struct SeqScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 pruner: Arc<Pruner>,
68 metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74 pub(crate) fn new(input: ScanInput) -> Self {
77 let mut properties = ScannerProperties::default()
78 .with_append_mode(input.append_mode)
79 .with_total_rows(input.total_rows());
80 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
81 properties.partitions = vec![stream_ctx.partition_ranges()];
82
83 let num_workers = common_stat::get_total_cpu_cores().max(1);
85 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
86
87 Self {
88 properties,
89 stream_ctx,
90 pruner,
91 metrics_list: PartitionMetricsList::default(),
92 }
93 }
94
95 #[tracing::instrument(
100 skip_all,
101 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
102 )]
103 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
104 let metrics_set = ExecutionPlanMetricsSet::new();
105 let streams = (0..self.properties.partitions.len())
106 .map(|partition: usize| {
107 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
108 })
109 .collect::<Result<Vec<_>, _>>()?;
110
111 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
112 Ok(Box::pin(aggr_stream))
113 }
114
115 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
117 let metrics_set = ExecutionPlanMetricsSet::new();
118
119 let streams = (0..self.properties.partitions.len())
120 .map(|partition| {
121 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
122 self.scan_flat_batch_in_partition(partition, metrics)
123 })
124 .collect::<Result<Vec<_>>>()?;
125
126 Ok(Box::pin(futures::stream::iter(streams).flatten()))
127 }
128
129 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
134 assert!(self.stream_ctx.input.compaction);
135
136 let metrics_set = ExecutionPlanMetricsSet::new();
137 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
138 debug_assert_eq!(1, self.properties.partitions.len());
139 let partition_ranges = &self.properties.partitions[0];
140
141 let reader = Self::merge_all_flat_ranges_for_compaction(
142 &self.stream_ctx,
143 partition_ranges,
144 &part_metrics,
145 self.pruner.clone(),
146 )
147 .await?;
148 Ok(reader)
149 }
150
151 async fn merge_all_flat_ranges_for_compaction(
154 stream_ctx: &Arc<StreamContext>,
155 partition_ranges: &[PartitionRange],
156 part_metrics: &PartitionMetrics,
157 pruner: Arc<Pruner>,
158 ) -> Result<BoxedRecordBatchStream> {
159 pruner.add_partition_ranges(partition_ranges);
160 let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
161
162 let mut sources = Vec::new();
163 for part_range in partition_ranges {
164 build_flat_sources(
165 stream_ctx,
166 part_range,
167 true,
168 part_metrics,
169 partition_pruner.clone(),
170 &mut sources,
171 None,
172 )
173 .await?;
174 }
175
176 common_telemetry::debug!(
177 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
178 stream_ctx.input.mapper.metadata().region_id,
179 partition_ranges.len(),
180 sources.len()
181 );
182 Self::build_flat_reader_from_sources(
183 stream_ctx,
184 sources,
185 None,
186 None,
187 false,
188 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
189 )
190 .await
191 }
192
193 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
197 pub(crate) async fn build_flat_reader_from_sources(
198 stream_ctx: &StreamContext,
199 mut sources: Vec<BoxedRecordBatchStream>,
200 semaphore: Option<Arc<Semaphore>>,
201 part_metrics: Option<&PartitionMetrics>,
202 skip_dedup: bool,
203 channel_size: usize,
204 ) -> Result<BoxedRecordBatchStream> {
205 if let Some(semaphore) = semaphore.as_ref() {
206 if sources.len() > 1 {
208 sources = stream_ctx.input.create_parallel_flat_sources(
209 sources,
210 semaphore.clone(),
211 channel_size,
212 )?;
213 }
214 }
215
216 let mapper = &stream_ctx.input.mapper;
217 let reader: BoxedRecordBatchStream = if sources.len() == 1 {
218 sources.pop().unwrap()
221 } else {
222 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
223 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
224 let reader =
225 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
226 .await?;
227 Box::pin(reader.into_stream())
228 };
229
230 let dedup = !skip_dedup && !stream_ctx.input.append_mode;
231 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
232 let reader = if dedup {
233 match stream_ctx.input.merge_mode {
234 MergeMode::LastRow => Box::pin(
235 FlatDedupReader::new(
236 reader,
237 FlatLastRow::new(stream_ctx.input.filter_deleted),
238 dedup_metrics_reporter,
239 )
240 .into_stream(),
241 ) as _,
242 MergeMode::LastNonNull => Box::pin(
243 FlatDedupReader::new(
244 reader,
245 FlatLastNonNull::new(
246 mapper.field_column_start(),
247 stream_ctx.input.filter_deleted,
248 ),
249 dedup_metrics_reporter,
250 )
251 .into_stream(),
252 ) as _,
253 }
254 } else {
255 reader
256 };
257
258 let reader = match &stream_ctx.input.series_row_selector {
259 Some(TimeSeriesRowSelector::LastRow) => {
260 Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
261 }
262 None => reader,
263 };
264
265 Ok(reader)
266 }
267
268 pub(crate) async fn build_flat_partition_range_read(
270 stream_ctx: &Arc<StreamContext>,
271 part_range: &PartitionRange,
272 compaction: bool,
273 part_metrics: &PartitionMetrics,
274 partition_pruner: Arc<PartitionPruner>,
275 file_scan_semaphore: Option<Arc<Semaphore>>,
276 merge_semaphore: Option<Arc<Semaphore>>,
277 ) -> Result<(BoxedRecordBatchStream, usize)> {
278 let cache_key = build_range_cache_key(stream_ctx, part_range);
279
280 if let Some(key) = cache_key.as_ref() {
281 if let Some(value) = stream_ctx.input.cache_strategy.get_range_result(key) {
282 part_metrics.inc_range_cache_hit();
283 return Ok((cached_flat_range_stream(value), DEFAULT_READ_BATCH_SIZE));
284 }
285 part_metrics.inc_range_cache_miss();
286 }
287
288 let mut sources = Vec::new();
289 let split_batch_size = build_flat_sources(
290 stream_ctx,
291 part_range,
292 compaction,
293 part_metrics,
294 partition_pruner,
295 &mut sources,
296 file_scan_semaphore,
297 )
298 .await?;
299 let estimated_rows_per_batch = split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE);
300 let channel_size = compute_parallel_channel_size(estimated_rows_per_batch);
301 let stream = Self::build_flat_reader_from_sources(
302 stream_ctx,
303 sources,
304 merge_semaphore,
305 Some(part_metrics),
306 false,
307 channel_size,
308 )
309 .await?;
310
311 let stream = match cache_key {
312 Some(key) => cache_flat_range_stream(
313 stream,
314 stream_ctx.input.cache_strategy.clone(),
315 key,
316 part_metrics.clone(),
317 ),
318 None => stream,
319 };
320
321 Ok((stream, estimated_rows_per_batch))
322 }
323
324 fn scan_partition_impl(
327 &self,
328 ctx: &QueryScanContext,
329 metrics_set: &ExecutionPlanMetricsSet,
330 partition: usize,
331 ) -> Result<SendableRecordBatchStream> {
332 if ctx.explain_verbose {
333 common_telemetry::info!(
334 "SeqScan partition {}, region_id: {}",
335 partition,
336 self.stream_ctx.input.region_metadata().region_id
337 );
338 }
339
340 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
341 let input = &self.stream_ctx.input;
342
343 let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
344 let record_batch_stream = ConvertBatchStream::new(
345 batch_stream,
346 input.mapper.clone(),
347 input.cache_strategy.clone(),
348 metrics,
349 );
350
351 Ok(Box::pin(RecordBatchStreamWrapper::new(
352 input.mapper.output_schema(),
353 Box::pin(record_batch_stream),
354 )))
355 }
356
357 #[tracing::instrument(
358 skip_all,
359 fields(
360 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
361 partition = partition
362 )
363 )]
364 fn scan_flat_batch_in_partition(
365 &self,
366 partition: usize,
367 part_metrics: PartitionMetrics,
368 ) -> Result<ScanBatchStream> {
369 ensure!(
370 partition < self.properties.partitions.len(),
371 PartitionOutOfRangeSnafu {
372 given: partition,
373 all: self.properties.partitions.len(),
374 }
375 );
376
377 if self.properties.partitions[partition].is_empty() {
378 return Ok(Box::pin(futures::stream::empty()));
379 }
380
381 let stream_ctx = self.stream_ctx.clone();
382 let semaphore = self.new_semaphore();
383 let partition_ranges = self.properties.partitions[partition].clone();
384 let compaction = self.stream_ctx.input.compaction;
385 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
386 let pruner = self.pruner.clone();
387 pruner.add_partition_ranges(&partition_ranges);
392 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
393
394 let stream = try_stream! {
395 part_metrics.on_first_poll();
396 let mut fetch_start = Instant::now();
399
400 for part_range in partition_ranges {
402 let (mut reader, _) = Self::build_flat_partition_range_read(
403 &stream_ctx,
404 &part_range,
405 compaction,
406 &part_metrics,
407 partition_pruner.clone(),
408 file_scan_semaphore.clone(),
409 semaphore.clone(),
410 )
411 .await?;
412
413 let mut metrics = ScannerMetrics {
414 scan_cost: fetch_start.elapsed(),
415 ..Default::default()
416 };
417 fetch_start = Instant::now();
418
419 while let Some(record_batch) = reader.try_next().await? {
420 metrics.scan_cost += fetch_start.elapsed();
421 metrics.num_batches += 1;
422 metrics.num_rows += record_batch.num_rows();
423
424 debug_assert!(record_batch.num_rows() > 0);
425 if record_batch.num_rows() == 0 {
426 fetch_start = Instant::now();
427 continue;
428 }
429
430 let yield_start = Instant::now();
431 yield ScanBatch::RecordBatch(record_batch);
432 metrics.yield_cost += yield_start.elapsed();
433
434 fetch_start = Instant::now();
435 }
436
437 metrics.scan_cost += fetch_start.elapsed();
438 fetch_start = Instant::now();
439 part_metrics.merge_metrics(&metrics);
440 }
441
442 part_metrics.on_finish();
443 };
444 Ok(Box::pin(stream))
445 }
446
447 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
448 if self.properties.target_partitions() > self.properties.num_partitions() {
449 Some(Arc::new(Semaphore::new(
455 self.properties.target_partitions() - self.properties.num_partitions() + 1,
456 )))
457 } else {
458 None
459 }
460 }
461
462 fn new_partition_metrics(
465 &self,
466 explain_verbose: bool,
467 metrics_set: &ExecutionPlanMetricsSet,
468 partition: usize,
469 ) -> PartitionMetrics {
470 let metrics = PartitionMetrics::new(
471 self.stream_ctx.input.mapper.metadata().region_id,
472 partition,
473 get_scanner_type(self.stream_ctx.input.compaction),
474 self.stream_ctx.query_start,
475 explain_verbose,
476 metrics_set,
477 );
478
479 if !self.stream_ctx.input.compaction {
480 self.metrics_list.set(partition, metrics.clone());
481 }
482
483 metrics
484 }
485
486 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
488 partition_ranges
489 .iter()
490 .map(|part_range| {
491 let range_meta = &ranges[part_range.identifier];
492 range_meta.indices.len()
493 })
494 .max()
495 .unwrap_or(0)
496 }
497
498 pub(crate) fn check_scan_limit(&self) -> Result<()> {
500 let total_max_files: usize = self
502 .properties
503 .partitions
504 .iter()
505 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
506 .sum();
507
508 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
509 if total_max_files > max_concurrent_files {
510 return TooManyFilesToReadSnafu {
511 actual: total_max_files,
512 max: max_concurrent_files,
513 }
514 .fail();
515 }
516
517 Ok(())
518 }
519}
520
521impl RegionScanner for SeqScan {
522 fn name(&self) -> &str {
523 "SeqScan"
524 }
525
526 fn properties(&self) -> &ScannerProperties {
527 &self.properties
528 }
529
530 fn schema(&self) -> SchemaRef {
531 self.stream_ctx.input.mapper.output_schema()
532 }
533
534 fn metadata(&self) -> RegionMetadataRef {
535 self.stream_ctx.input.mapper.metadata().clone()
536 }
537
538 fn scan_partition(
539 &self,
540 ctx: &QueryScanContext,
541 metrics_set: &ExecutionPlanMetricsSet,
542 partition: usize,
543 ) -> Result<SendableRecordBatchStream, BoxedError> {
544 self.scan_partition_impl(ctx, metrics_set, partition)
545 .map_err(BoxedError::new)
546 }
547
548 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
549 self.properties.prepare(request);
550
551 self.check_scan_limit().map_err(BoxedError::new)?;
552
553 Ok(())
554 }
555
556 fn has_predicate_without_region(&self) -> bool {
557 let predicate = self
558 .stream_ctx
559 .input
560 .predicate_group()
561 .predicate_without_region();
562 predicate.is_some()
563 }
564
565 fn add_dyn_filter_to_predicate(
566 &mut self,
567 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
568 ) -> Vec<bool> {
569 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
570 }
571
572 fn set_logical_region(&mut self, logical_region: bool) {
573 self.properties.set_logical_region(logical_region);
574 }
575
576 fn snapshot_sequence(&self) -> Option<u64> {
577 self.stream_ctx.input.snapshot_sequence
578 }
579}
580
581impl DisplayAs for SeqScan {
582 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
583 write!(
584 f,
585 "SeqScan: region={}, ",
586 self.stream_ctx.input.mapper.metadata().region_id
587 )?;
588 match t {
589 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
591 self.stream_ctx.format_for_explain(false, f)
592 }
593 DisplayFormatType::Verbose => {
594 self.stream_ctx.format_for_explain(true, f)?;
595 self.metrics_list.format_verbose_metrics(f)
596 }
597 }
598 }
599}
600
601impl fmt::Debug for SeqScan {
602 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603 f.debug_struct("SeqScan")
604 .field("num_ranges", &self.stream_ctx.ranges.len())
605 .finish()
606 }
607}
608
609pub(crate) async fn build_flat_sources(
612 stream_ctx: &Arc<StreamContext>,
613 part_range: &PartitionRange,
614 compaction: bool,
615 part_metrics: &PartitionMetrics,
616 partition_pruner: Arc<PartitionPruner>,
617 sources: &mut Vec<BoxedRecordBatchStream>,
618 semaphore: Option<Arc<Semaphore>>,
619) -> Result<Option<usize>> {
620 let range_meta = &stream_ctx.ranges[part_range.identifier];
622 #[cfg(debug_assertions)]
623 if compaction {
624 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
626 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
627 debug_assert_eq!(
629 -1, row_group_idx.row_group_index,
630 "Expect {} range scan all row groups, given: {}",
631 i, row_group_idx.row_group_index,
632 );
633 }
634 }
635
636 let read_type = if compaction {
637 "compaction"
638 } else {
639 "seq_scan_files"
640 };
641 let num_indices = range_meta.row_group_indices.len();
642 if num_indices == 0 {
643 return Ok(None);
644 }
645
646 let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
647 let should_split = split_batch_size.is_some();
648 sources.reserve(num_indices);
649 let mut ordered_sources = Vec::with_capacity(num_indices);
650 ordered_sources.resize_with(num_indices, || None);
651 let mut file_scan_tasks = Vec::new();
652 let pre_filter_mode = stream_ctx.range_pre_filter_mode(part_range);
653
654 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
655 if stream_ctx.is_mem_range_index(*index) {
656 let stream = scan_flat_mem_ranges(
657 stream_ctx.clone(),
658 part_metrics.clone(),
659 *index,
660 range_meta.time_range,
661 );
662 ordered_sources[position] = Some(Box::pin(stream) as _);
663 } else if stream_ctx.is_file_range_index(*index) {
664 if let Some(semaphore_ref) = semaphore.as_ref() {
665 let stream_ctx = stream_ctx.clone();
667 let part_metrics = part_metrics.clone();
668 let partition_pruner = partition_pruner.clone();
669 let semaphore = Arc::clone(semaphore_ref);
670 let row_group_index = *index;
671 file_scan_tasks.push(async move {
672 let _permit = semaphore.acquire().await.unwrap();
673 let stream = scan_flat_file_ranges(
674 stream_ctx,
675 part_metrics,
676 row_group_index,
677 read_type,
678 partition_pruner,
679 )
680 .await?;
681 Ok((position, Box::pin(stream) as _))
682 });
683 } else {
684 let stream = scan_flat_file_ranges(
686 stream_ctx.clone(),
687 part_metrics.clone(),
688 *index,
689 read_type,
690 partition_pruner.clone(),
691 )
692 .await?;
693 ordered_sources[position] = Some(Box::pin(stream) as _);
694 }
695 } else {
696 let stream = scan_util::maybe_scan_flat_other_ranges(
697 stream_ctx,
698 *index,
699 part_metrics,
700 pre_filter_mode,
701 )
702 .await?;
703 ordered_sources[position] = Some(stream);
704 }
705 }
706
707 if !file_scan_tasks.is_empty() {
708 let results = futures::future::try_join_all(file_scan_tasks).await?;
709 for (position, stream) in results {
710 ordered_sources[position] = Some(stream);
711 }
712 }
713
714 for stream in ordered_sources.into_iter().flatten() {
715 if should_split {
716 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
717 } else {
718 sources.push(stream);
719 }
720 }
721
722 if should_split {
723 common_telemetry::debug!(
724 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
725 stream_ctx.input.region_metadata().region_id,
726 sources.len(),
727 part_range,
728 );
729 }
730
731 Ok(split_batch_size)
732}
733
734#[cfg(test)]
735impl SeqScan {
736 pub(crate) fn input(&self) -> &ScanInput {
738 &self.stream_ctx.input
739 }
740}
741
742fn get_scanner_type(compaction: bool) -> &'static str {
744 if compaction {
745 "SeqScan(compaction)"
746 } else {
747 "SeqScan"
748 }
749}