1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
39use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
40use crate::read::flat_merge::FlatMergeReader;
41use crate::read::last_row::FlatLastRowReader;
42use crate::read::pruner::{PartitionPruner, Pruner};
43use crate::read::range::RangeMeta;
44use crate::read::range_cache::{
45 build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream,
46};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{
49 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
50 scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
51};
52use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
53use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57pub struct SeqScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 pruner: Arc<Pruner>,
68 metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74 pub(crate) fn new(input: ScanInput) -> Self {
77 let mut properties = ScannerProperties::default()
78 .with_append_mode(input.append_mode)
79 .with_total_rows(input.total_rows());
80 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
81 properties.partitions = vec![stream_ctx.partition_ranges()];
82
83 let num_workers = common_stat::get_total_cpu_cores().max(1);
85 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
86
87 Self {
88 properties,
89 stream_ctx,
90 pruner,
91 metrics_list: PartitionMetricsList::default(),
92 }
93 }
94
95 #[tracing::instrument(
100 skip_all,
101 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
102 )]
103 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
104 let metrics_set = ExecutionPlanMetricsSet::new();
105 let streams = (0..self.properties.partitions.len())
106 .map(|partition: usize| {
107 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
108 })
109 .collect::<Result<Vec<_>, _>>()?;
110
111 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
112 Ok(Box::pin(aggr_stream))
113 }
114
115 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
117 let metrics_set = ExecutionPlanMetricsSet::new();
118
119 let streams = (0..self.properties.partitions.len())
120 .map(|partition| {
121 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
122 self.scan_flat_batch_in_partition(partition, metrics)
123 })
124 .collect::<Result<Vec<_>>>()?;
125
126 Ok(Box::pin(futures::stream::iter(streams).flatten()))
127 }
128
129 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
134 assert!(self.stream_ctx.input.compaction);
135
136 let metrics_set = ExecutionPlanMetricsSet::new();
137 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
138 debug_assert_eq!(1, self.properties.partitions.len());
139 let partition_ranges = &self.properties.partitions[0];
140
141 let reader = Self::merge_all_flat_ranges_for_compaction(
142 &self.stream_ctx,
143 partition_ranges,
144 &part_metrics,
145 self.pruner.clone(),
146 )
147 .await?;
148 Ok(reader)
149 }
150
151 async fn merge_all_flat_ranges_for_compaction(
154 stream_ctx: &Arc<StreamContext>,
155 partition_ranges: &[PartitionRange],
156 part_metrics: &PartitionMetrics,
157 pruner: Arc<Pruner>,
158 ) -> Result<BoxedRecordBatchStream> {
159 pruner.add_partition_ranges(partition_ranges);
160 let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
161
162 let mut sources = Vec::new();
163 for part_range in partition_ranges {
164 build_flat_sources(
165 stream_ctx,
166 part_range,
167 true,
168 part_metrics,
169 partition_pruner.clone(),
170 &mut sources,
171 None,
172 )
173 .await?;
174 }
175
176 common_telemetry::debug!(
177 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
178 stream_ctx.input.mapper.metadata().region_id,
179 partition_ranges.len(),
180 sources.len()
181 );
182 Self::build_flat_reader_from_sources(
183 stream_ctx,
184 sources,
185 None,
186 None,
187 false,
188 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
189 )
190 .await
191 }
192
193 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
197 pub(crate) async fn build_flat_reader_from_sources(
198 stream_ctx: &StreamContext,
199 mut sources: Vec<BoxedRecordBatchStream>,
200 semaphore: Option<Arc<Semaphore>>,
201 part_metrics: Option<&PartitionMetrics>,
202 skip_dedup: bool,
203 channel_size: usize,
204 ) -> Result<BoxedRecordBatchStream> {
205 if let Some(semaphore) = semaphore.as_ref() {
206 if sources.len() > 1 {
208 sources = stream_ctx.input.create_parallel_flat_sources(
209 sources,
210 semaphore.clone(),
211 channel_size,
212 )?;
213 }
214 }
215
216 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
217 let reader: BoxedRecordBatchStream = if sources.len() == 1 {
218 sources.pop().unwrap()
221 } else {
222 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
223 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
224 let reader =
225 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
226 .await?;
227 Box::pin(reader.into_stream())
228 };
229
230 let dedup = !skip_dedup && !stream_ctx.input.append_mode;
231 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
232 let reader = if dedup {
233 match stream_ctx.input.merge_mode {
234 MergeMode::LastRow => Box::pin(
235 FlatDedupReader::new(
236 reader,
237 FlatLastRow::new(stream_ctx.input.filter_deleted),
238 dedup_metrics_reporter,
239 )
240 .into_stream(),
241 ) as _,
242 MergeMode::LastNonNull => Box::pin(
243 FlatDedupReader::new(
244 reader,
245 FlatLastNonNull::new(
246 mapper.field_column_start(),
247 stream_ctx.input.filter_deleted,
248 ),
249 dedup_metrics_reporter,
250 )
251 .into_stream(),
252 ) as _,
253 }
254 } else {
255 reader
256 };
257
258 let reader = match &stream_ctx.input.series_row_selector {
259 Some(TimeSeriesRowSelector::LastRow) => {
260 Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
261 }
262 None => reader,
263 };
264
265 Ok(reader)
266 }
267
268 pub(crate) async fn build_flat_partition_range_read(
270 stream_ctx: &Arc<StreamContext>,
271 part_range: &PartitionRange,
272 compaction: bool,
273 part_metrics: &PartitionMetrics,
274 partition_pruner: Arc<PartitionPruner>,
275 file_scan_semaphore: Option<Arc<Semaphore>>,
276 merge_semaphore: Option<Arc<Semaphore>>,
277 ) -> Result<(BoxedRecordBatchStream, usize)> {
278 let cache_key = build_range_cache_key(stream_ctx, part_range);
279
280 if let Some(key) = cache_key.as_ref() {
281 if let Some(value) = stream_ctx.input.cache_strategy.get_range_result(key) {
282 part_metrics.inc_range_cache_hit();
283 return Ok((cached_flat_range_stream(value), DEFAULT_READ_BATCH_SIZE));
284 }
285 part_metrics.inc_range_cache_miss();
286 }
287
288 let mut sources = Vec::new();
289 let split_batch_size = build_flat_sources(
290 stream_ctx,
291 part_range,
292 compaction,
293 part_metrics,
294 partition_pruner,
295 &mut sources,
296 file_scan_semaphore,
297 )
298 .await?;
299 let estimated_rows_per_batch = split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE);
300 let channel_size = compute_parallel_channel_size(estimated_rows_per_batch);
301 let stream = Self::build_flat_reader_from_sources(
302 stream_ctx,
303 sources,
304 merge_semaphore,
305 Some(part_metrics),
306 false,
307 channel_size,
308 )
309 .await?;
310
311 let stream = match cache_key {
312 Some(key) => cache_flat_range_stream(
313 stream,
314 stream_ctx.input.cache_strategy.clone(),
315 key,
316 part_metrics.clone(),
317 ),
318 None => stream,
319 };
320
321 Ok((stream, estimated_rows_per_batch))
322 }
323
324 fn scan_partition_impl(
327 &self,
328 ctx: &QueryScanContext,
329 metrics_set: &ExecutionPlanMetricsSet,
330 partition: usize,
331 ) -> Result<SendableRecordBatchStream> {
332 if ctx.explain_verbose {
333 common_telemetry::info!(
334 "SeqScan partition {}, region_id: {}",
335 partition,
336 self.stream_ctx.input.region_metadata().region_id
337 );
338 }
339
340 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
341 let input = &self.stream_ctx.input;
342
343 let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
344 let record_batch_stream = ConvertBatchStream::new(
345 batch_stream,
346 input.mapper.clone(),
347 input.cache_strategy.clone(),
348 metrics,
349 );
350
351 Ok(Box::pin(RecordBatchStreamWrapper::new(
352 input.mapper.output_schema(),
353 Box::pin(record_batch_stream),
354 )))
355 }
356
357 #[tracing::instrument(
358 skip_all,
359 fields(
360 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
361 partition = partition
362 )
363 )]
364 fn scan_flat_batch_in_partition(
365 &self,
366 partition: usize,
367 part_metrics: PartitionMetrics,
368 ) -> Result<ScanBatchStream> {
369 ensure!(
370 partition < self.properties.partitions.len(),
371 PartitionOutOfRangeSnafu {
372 given: partition,
373 all: self.properties.partitions.len(),
374 }
375 );
376
377 if self.properties.partitions[partition].is_empty() {
378 return Ok(Box::pin(futures::stream::empty()));
379 }
380
381 let stream_ctx = self.stream_ctx.clone();
382 let semaphore = self.new_semaphore();
383 let partition_ranges = self.properties.partitions[partition].clone();
384 let compaction = self.stream_ctx.input.compaction;
385 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
386 let pruner = self.pruner.clone();
387 pruner.add_partition_ranges(&partition_ranges);
392 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
393
394 let stream = try_stream! {
395 part_metrics.on_first_poll();
396 let mut fetch_start = Instant::now();
399
400 for part_range in partition_ranges {
402 let (mut reader, _) = Self::build_flat_partition_range_read(
403 &stream_ctx,
404 &part_range,
405 compaction,
406 &part_metrics,
407 partition_pruner.clone(),
408 file_scan_semaphore.clone(),
409 semaphore.clone(),
410 )
411 .await?;
412
413 let mut metrics = ScannerMetrics {
414 scan_cost: fetch_start.elapsed(),
415 ..Default::default()
416 };
417 fetch_start = Instant::now();
418
419 while let Some(record_batch) = reader.try_next().await? {
420 metrics.scan_cost += fetch_start.elapsed();
421 metrics.num_batches += 1;
422 metrics.num_rows += record_batch.num_rows();
423
424 debug_assert!(record_batch.num_rows() > 0);
425 if record_batch.num_rows() == 0 {
426 fetch_start = Instant::now();
427 continue;
428 }
429
430 let yield_start = Instant::now();
431 yield ScanBatch::RecordBatch(record_batch);
432 metrics.yield_cost += yield_start.elapsed();
433
434 fetch_start = Instant::now();
435 }
436
437 metrics.scan_cost += fetch_start.elapsed();
438 fetch_start = Instant::now();
439 part_metrics.merge_metrics(&metrics);
440 }
441
442 part_metrics.on_finish();
443 };
444 Ok(Box::pin(stream))
445 }
446
447 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
448 if self.properties.target_partitions() > self.properties.num_partitions() {
449 Some(Arc::new(Semaphore::new(
455 self.properties.target_partitions() - self.properties.num_partitions() + 1,
456 )))
457 } else {
458 None
459 }
460 }
461
462 fn new_partition_metrics(
465 &self,
466 explain_verbose: bool,
467 metrics_set: &ExecutionPlanMetricsSet,
468 partition: usize,
469 ) -> PartitionMetrics {
470 let metrics = PartitionMetrics::new(
471 self.stream_ctx.input.mapper.metadata().region_id,
472 partition,
473 get_scanner_type(self.stream_ctx.input.compaction),
474 self.stream_ctx.query_start,
475 explain_verbose,
476 metrics_set,
477 );
478
479 if !self.stream_ctx.input.compaction {
480 self.metrics_list.set(partition, metrics.clone());
481 }
482
483 metrics
484 }
485
486 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
488 partition_ranges
489 .iter()
490 .map(|part_range| {
491 let range_meta = &ranges[part_range.identifier];
492 range_meta.indices.len()
493 })
494 .max()
495 .unwrap_or(0)
496 }
497
498 pub(crate) fn check_scan_limit(&self) -> Result<()> {
500 let total_max_files: usize = self
502 .properties
503 .partitions
504 .iter()
505 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
506 .sum();
507
508 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
509 if total_max_files > max_concurrent_files {
510 return TooManyFilesToReadSnafu {
511 actual: total_max_files,
512 max: max_concurrent_files,
513 }
514 .fail();
515 }
516
517 Ok(())
518 }
519}
520
521impl RegionScanner for SeqScan {
522 fn name(&self) -> &str {
523 "SeqScan"
524 }
525
526 fn properties(&self) -> &ScannerProperties {
527 &self.properties
528 }
529
530 fn schema(&self) -> SchemaRef {
531 self.stream_ctx.input.mapper.output_schema()
532 }
533
534 fn metadata(&self) -> RegionMetadataRef {
535 self.stream_ctx.input.mapper.metadata().clone()
536 }
537
538 fn scan_partition(
539 &self,
540 ctx: &QueryScanContext,
541 metrics_set: &ExecutionPlanMetricsSet,
542 partition: usize,
543 ) -> Result<SendableRecordBatchStream, BoxedError> {
544 self.scan_partition_impl(ctx, metrics_set, partition)
545 .map_err(BoxedError::new)
546 }
547
548 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
549 self.properties.prepare(request);
550
551 self.check_scan_limit().map_err(BoxedError::new)?;
552
553 Ok(())
554 }
555
556 fn has_predicate_without_region(&self) -> bool {
557 let predicate = self
558 .stream_ctx
559 .input
560 .predicate_group()
561 .predicate_without_region();
562 predicate.is_some()
563 }
564
565 fn add_dyn_filter_to_predicate(
566 &mut self,
567 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
568 ) -> Vec<bool> {
569 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
570 }
571
572 fn set_logical_region(&mut self, logical_region: bool) {
573 self.properties.set_logical_region(logical_region);
574 }
575}
576
577impl DisplayAs for SeqScan {
578 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
579 write!(
580 f,
581 "SeqScan: region={}, ",
582 self.stream_ctx.input.mapper.metadata().region_id
583 )?;
584 match t {
585 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
587 self.stream_ctx.format_for_explain(false, f)
588 }
589 DisplayFormatType::Verbose => {
590 self.stream_ctx.format_for_explain(true, f)?;
591 self.metrics_list.format_verbose_metrics(f)
592 }
593 }
594 }
595}
596
597impl fmt::Debug for SeqScan {
598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599 f.debug_struct("SeqScan")
600 .field("num_ranges", &self.stream_ctx.ranges.len())
601 .finish()
602 }
603}
604
605pub(crate) async fn build_flat_sources(
608 stream_ctx: &Arc<StreamContext>,
609 part_range: &PartitionRange,
610 compaction: bool,
611 part_metrics: &PartitionMetrics,
612 partition_pruner: Arc<PartitionPruner>,
613 sources: &mut Vec<BoxedRecordBatchStream>,
614 semaphore: Option<Arc<Semaphore>>,
615) -> Result<Option<usize>> {
616 let range_meta = &stream_ctx.ranges[part_range.identifier];
618 #[cfg(debug_assertions)]
619 if compaction {
620 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
622 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
623 debug_assert_eq!(
625 -1, row_group_idx.row_group_index,
626 "Expect {} range scan all row groups, given: {}",
627 i, row_group_idx.row_group_index,
628 );
629 }
630 }
631
632 let read_type = if compaction {
633 "compaction"
634 } else {
635 "seq_scan_files"
636 };
637 let num_indices = range_meta.row_group_indices.len();
638 if num_indices == 0 {
639 return Ok(None);
640 }
641
642 let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
643 let should_split = split_batch_size.is_some();
644 sources.reserve(num_indices);
645 let mut ordered_sources = Vec::with_capacity(num_indices);
646 ordered_sources.resize_with(num_indices, || None);
647 let mut file_scan_tasks = Vec::new();
648
649 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
650 if stream_ctx.is_mem_range_index(*index) {
651 let stream = scan_flat_mem_ranges(
652 stream_ctx.clone(),
653 part_metrics.clone(),
654 *index,
655 range_meta.time_range,
656 );
657 ordered_sources[position] = Some(Box::pin(stream) as _);
658 } else if stream_ctx.is_file_range_index(*index) {
659 if let Some(semaphore_ref) = semaphore.as_ref() {
660 let stream_ctx = stream_ctx.clone();
662 let part_metrics = part_metrics.clone();
663 let partition_pruner = partition_pruner.clone();
664 let semaphore = Arc::clone(semaphore_ref);
665 let row_group_index = *index;
666 file_scan_tasks.push(async move {
667 let _permit = semaphore.acquire().await.unwrap();
668 let stream = scan_flat_file_ranges(
669 stream_ctx,
670 part_metrics,
671 row_group_index,
672 read_type,
673 partition_pruner,
674 )
675 .await?;
676 Ok((position, Box::pin(stream) as _))
677 });
678 } else {
679 let stream = scan_flat_file_ranges(
681 stream_ctx.clone(),
682 part_metrics.clone(),
683 *index,
684 read_type,
685 partition_pruner.clone(),
686 )
687 .await?;
688 ordered_sources[position] = Some(Box::pin(stream) as _);
689 }
690 } else {
691 let stream =
692 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
693 ordered_sources[position] = Some(stream);
694 }
695 }
696
697 if !file_scan_tasks.is_empty() {
698 let results = futures::future::try_join_all(file_scan_tasks).await?;
699 for (position, stream) in results {
700 ordered_sources[position] = Some(stream);
701 }
702 }
703
704 for stream in ordered_sources.into_iter().flatten() {
705 if should_split {
706 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
707 } else {
708 sources.push(stream);
709 }
710 }
711
712 if should_split {
713 common_telemetry::debug!(
714 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
715 stream_ctx.input.region_metadata().region_id,
716 sources.len(),
717 part_range,
718 );
719 }
720
721 Ok(split_batch_size)
722}
723
724#[cfg(test)]
725impl SeqScan {
726 pub(crate) fn input(&self) -> &ScanInput {
728 &self.stream_ctx.input
729 }
730}
731
732fn get_scanner_type(compaction: bool) -> &'static str {
734 if compaction {
735 "SeqScan(compaction)"
736 } else {
737 "SeqScan"
738 }
739}