1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
39use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
40use crate::read::flat_merge::FlatMergeReader;
41use crate::read::last_row::FlatLastRowReader;
42use crate::read::pruner::{PartitionPruner, Pruner};
43use crate::read::range::RangeMeta;
44use crate::read::scan_region::{ScanInput, StreamContext};
45use crate::read::scan_util::{
46 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
47 scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
48};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
51use crate::region::options::MergeMode;
52use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
53
54pub struct SeqScan {
59 properties: ScannerProperties,
61 stream_ctx: Arc<StreamContext>,
63 pruner: Arc<Pruner>,
65 metrics_list: PartitionMetricsList,
68}
69
70impl SeqScan {
71 pub(crate) fn new(input: ScanInput) -> Self {
74 let mut properties = ScannerProperties::default()
75 .with_append_mode(input.append_mode)
76 .with_total_rows(input.total_rows());
77 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
78 properties.partitions = vec![stream_ctx.partition_ranges()];
79
80 let num_workers = common_stat::get_total_cpu_cores().max(1);
82 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
83
84 Self {
85 properties,
86 stream_ctx,
87 pruner,
88 metrics_list: PartitionMetricsList::default(),
89 }
90 }
91
92 #[tracing::instrument(
97 skip_all,
98 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
99 )]
100 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
101 let metrics_set = ExecutionPlanMetricsSet::new();
102 let streams = (0..self.properties.partitions.len())
103 .map(|partition: usize| {
104 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
105 })
106 .collect::<Result<Vec<_>, _>>()?;
107
108 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
109 Ok(Box::pin(aggr_stream))
110 }
111
112 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
114 let metrics_set = ExecutionPlanMetricsSet::new();
115
116 let streams = (0..self.properties.partitions.len())
117 .map(|partition| {
118 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
119 self.scan_flat_batch_in_partition(partition, metrics)
120 })
121 .collect::<Result<Vec<_>>>()?;
122
123 Ok(Box::pin(futures::stream::iter(streams).flatten()))
124 }
125
126 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
131 assert!(self.stream_ctx.input.compaction);
132
133 let metrics_set = ExecutionPlanMetricsSet::new();
134 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
135 debug_assert_eq!(1, self.properties.partitions.len());
136 let partition_ranges = &self.properties.partitions[0];
137
138 let reader = Self::merge_all_flat_ranges_for_compaction(
139 &self.stream_ctx,
140 partition_ranges,
141 &part_metrics,
142 self.pruner.clone(),
143 )
144 .await?;
145 Ok(reader)
146 }
147
148 async fn merge_all_flat_ranges_for_compaction(
151 stream_ctx: &Arc<StreamContext>,
152 partition_ranges: &[PartitionRange],
153 part_metrics: &PartitionMetrics,
154 pruner: Arc<Pruner>,
155 ) -> Result<BoxedRecordBatchStream> {
156 pruner.add_partition_ranges(partition_ranges);
157 let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
158
159 let mut sources = Vec::new();
160 for part_range in partition_ranges {
161 build_flat_sources(
162 stream_ctx,
163 part_range,
164 true,
165 part_metrics,
166 partition_pruner.clone(),
167 &mut sources,
168 None,
169 )
170 .await?;
171 }
172
173 common_telemetry::debug!(
174 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
175 stream_ctx.input.mapper.metadata().region_id,
176 partition_ranges.len(),
177 sources.len()
178 );
179 Self::build_flat_reader_from_sources(
180 stream_ctx,
181 sources,
182 None,
183 None,
184 compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
185 )
186 .await
187 }
188
189 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
192 pub(crate) async fn build_flat_reader_from_sources(
193 stream_ctx: &StreamContext,
194 mut sources: Vec<BoxedRecordBatchStream>,
195 semaphore: Option<Arc<Semaphore>>,
196 part_metrics: Option<&PartitionMetrics>,
197 channel_size: usize,
198 ) -> Result<BoxedRecordBatchStream> {
199 if let Some(semaphore) = semaphore.as_ref() {
200 if sources.len() > 1 {
202 sources = stream_ctx.input.create_parallel_flat_sources(
203 sources,
204 semaphore.clone(),
205 channel_size,
206 )?;
207 }
208 }
209
210 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
211 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
212
213 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
214 let reader =
215 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
216 .await?;
217
218 let dedup = !stream_ctx.input.append_mode;
219 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
220 let reader = if dedup {
221 match stream_ctx.input.merge_mode {
222 MergeMode::LastRow => Box::pin(
223 FlatDedupReader::new(
224 reader.into_stream().boxed(),
225 FlatLastRow::new(stream_ctx.input.filter_deleted),
226 dedup_metrics_reporter,
227 )
228 .into_stream(),
229 ) as _,
230 MergeMode::LastNonNull => Box::pin(
231 FlatDedupReader::new(
232 reader.into_stream().boxed(),
233 FlatLastNonNull::new(
234 mapper.field_column_start(),
235 stream_ctx.input.filter_deleted,
236 ),
237 dedup_metrics_reporter,
238 )
239 .into_stream(),
240 ) as _,
241 }
242 } else {
243 Box::pin(reader.into_stream()) as _
244 };
245
246 let reader = match &stream_ctx.input.series_row_selector {
247 Some(TimeSeriesRowSelector::LastRow) => {
248 Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
249 }
250 None => reader,
251 };
252
253 Ok(reader)
254 }
255
256 fn scan_partition_impl(
259 &self,
260 ctx: &QueryScanContext,
261 metrics_set: &ExecutionPlanMetricsSet,
262 partition: usize,
263 ) -> Result<SendableRecordBatchStream> {
264 if ctx.explain_verbose {
265 common_telemetry::info!(
266 "SeqScan partition {}, region_id: {}",
267 partition,
268 self.stream_ctx.input.region_metadata().region_id
269 );
270 }
271
272 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
273 let input = &self.stream_ctx.input;
274
275 let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
276 let record_batch_stream = ConvertBatchStream::new(
277 batch_stream,
278 input.mapper.clone(),
279 input.cache_strategy.clone(),
280 metrics,
281 );
282
283 Ok(Box::pin(RecordBatchStreamWrapper::new(
284 input.mapper.output_schema(),
285 Box::pin(record_batch_stream),
286 )))
287 }
288
289 #[tracing::instrument(
290 skip_all,
291 fields(
292 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
293 partition = partition
294 )
295 )]
296 fn scan_flat_batch_in_partition(
297 &self,
298 partition: usize,
299 part_metrics: PartitionMetrics,
300 ) -> Result<ScanBatchStream> {
301 ensure!(
302 partition < self.properties.partitions.len(),
303 PartitionOutOfRangeSnafu {
304 given: partition,
305 all: self.properties.partitions.len(),
306 }
307 );
308
309 if self.properties.partitions[partition].is_empty() {
310 return Ok(Box::pin(futures::stream::empty()));
311 }
312
313 let stream_ctx = self.stream_ctx.clone();
314 let semaphore = self.new_semaphore();
315 let partition_ranges = self.properties.partitions[partition].clone();
316 let compaction = self.stream_ctx.input.compaction;
317 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
318 let pruner = self.pruner.clone();
319 pruner.add_partition_ranges(&partition_ranges);
324 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
325
326 let stream = try_stream! {
327 part_metrics.on_first_poll();
328 let mut fetch_start = Instant::now();
331
332 for part_range in partition_ranges {
334 let mut sources = Vec::new();
335 let split_batch_size = build_flat_sources(
336 &stream_ctx,
337 &part_range,
338 compaction,
339 &part_metrics,
340 partition_pruner.clone(),
341 &mut sources,
342 file_scan_semaphore.clone(),
343 ).await?;
344
345 let channel_size = compute_parallel_channel_size(
346 split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE),
347 );
348 let mut reader =
349 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size)
350 .await?;
351
352 let mut metrics = ScannerMetrics {
353 scan_cost: fetch_start.elapsed(),
354 ..Default::default()
355 };
356 fetch_start = Instant::now();
357
358 while let Some(record_batch) = reader.try_next().await? {
359 metrics.scan_cost += fetch_start.elapsed();
360 metrics.num_batches += 1;
361 metrics.num_rows += record_batch.num_rows();
362
363 debug_assert!(record_batch.num_rows() > 0);
364 if record_batch.num_rows() == 0 {
365 fetch_start = Instant::now();
366 continue;
367 }
368
369 let yield_start = Instant::now();
370 yield ScanBatch::RecordBatch(record_batch);
371 metrics.yield_cost += yield_start.elapsed();
372
373 fetch_start = Instant::now();
374 }
375
376 metrics.scan_cost += fetch_start.elapsed();
377 fetch_start = Instant::now();
378 part_metrics.merge_metrics(&metrics);
379 }
380
381 part_metrics.on_finish();
382 };
383 Ok(Box::pin(stream))
384 }
385
386 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
387 if self.properties.target_partitions() > self.properties.num_partitions() {
388 Some(Arc::new(Semaphore::new(
394 self.properties.target_partitions() - self.properties.num_partitions() + 1,
395 )))
396 } else {
397 None
398 }
399 }
400
401 fn new_partition_metrics(
404 &self,
405 explain_verbose: bool,
406 metrics_set: &ExecutionPlanMetricsSet,
407 partition: usize,
408 ) -> PartitionMetrics {
409 let metrics = PartitionMetrics::new(
410 self.stream_ctx.input.mapper.metadata().region_id,
411 partition,
412 get_scanner_type(self.stream_ctx.input.compaction),
413 self.stream_ctx.query_start,
414 explain_verbose,
415 metrics_set,
416 );
417
418 if !self.stream_ctx.input.compaction {
419 self.metrics_list.set(partition, metrics.clone());
420 }
421
422 metrics
423 }
424
425 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
427 partition_ranges
428 .iter()
429 .map(|part_range| {
430 let range_meta = &ranges[part_range.identifier];
431 range_meta.indices.len()
432 })
433 .max()
434 .unwrap_or(0)
435 }
436
437 pub(crate) fn check_scan_limit(&self) -> Result<()> {
439 let total_max_files: usize = self
441 .properties
442 .partitions
443 .iter()
444 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
445 .sum();
446
447 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
448 if total_max_files > max_concurrent_files {
449 return TooManyFilesToReadSnafu {
450 actual: total_max_files,
451 max: max_concurrent_files,
452 }
453 .fail();
454 }
455
456 Ok(())
457 }
458}
459
460impl RegionScanner for SeqScan {
461 fn name(&self) -> &str {
462 "SeqScan"
463 }
464
465 fn properties(&self) -> &ScannerProperties {
466 &self.properties
467 }
468
469 fn schema(&self) -> SchemaRef {
470 self.stream_ctx.input.mapper.output_schema()
471 }
472
473 fn metadata(&self) -> RegionMetadataRef {
474 self.stream_ctx.input.mapper.metadata().clone()
475 }
476
477 fn scan_partition(
478 &self,
479 ctx: &QueryScanContext,
480 metrics_set: &ExecutionPlanMetricsSet,
481 partition: usize,
482 ) -> Result<SendableRecordBatchStream, BoxedError> {
483 self.scan_partition_impl(ctx, metrics_set, partition)
484 .map_err(BoxedError::new)
485 }
486
487 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
488 self.properties.prepare(request);
489
490 self.check_scan_limit().map_err(BoxedError::new)?;
491
492 Ok(())
493 }
494
495 fn has_predicate_without_region(&self) -> bool {
496 let predicate = self
497 .stream_ctx
498 .input
499 .predicate_group()
500 .predicate_without_region();
501 predicate.is_some()
502 }
503
504 fn add_dyn_filter_to_predicate(
505 &mut self,
506 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
507 ) -> Vec<bool> {
508 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
509 }
510
511 fn set_logical_region(&mut self, logical_region: bool) {
512 self.properties.set_logical_region(logical_region);
513 }
514}
515
516impl DisplayAs for SeqScan {
517 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
518 write!(
519 f,
520 "SeqScan: region={}, ",
521 self.stream_ctx.input.mapper.metadata().region_id
522 )?;
523 match t {
524 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
526 self.stream_ctx.format_for_explain(false, f)
527 }
528 DisplayFormatType::Verbose => {
529 self.stream_ctx.format_for_explain(true, f)?;
530 self.metrics_list.format_verbose_metrics(f)
531 }
532 }
533 }
534}
535
536impl fmt::Debug for SeqScan {
537 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538 f.debug_struct("SeqScan")
539 .field("num_ranges", &self.stream_ctx.ranges.len())
540 .finish()
541 }
542}
543
544pub(crate) async fn build_flat_sources(
547 stream_ctx: &Arc<StreamContext>,
548 part_range: &PartitionRange,
549 compaction: bool,
550 part_metrics: &PartitionMetrics,
551 partition_pruner: Arc<PartitionPruner>,
552 sources: &mut Vec<BoxedRecordBatchStream>,
553 semaphore: Option<Arc<Semaphore>>,
554) -> Result<Option<usize>> {
555 let range_meta = &stream_ctx.ranges[part_range.identifier];
557 #[cfg(debug_assertions)]
558 if compaction {
559 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
561 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
562 debug_assert_eq!(
564 -1, row_group_idx.row_group_index,
565 "Expect {} range scan all row groups, given: {}",
566 i, row_group_idx.row_group_index,
567 );
568 }
569 }
570
571 let read_type = if compaction {
572 "compaction"
573 } else {
574 "seq_scan_files"
575 };
576 let num_indices = range_meta.row_group_indices.len();
577 if num_indices == 0 {
578 return Ok(None);
579 }
580
581 let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
582 let should_split = split_batch_size.is_some();
583 sources.reserve(num_indices);
584 let mut ordered_sources = Vec::with_capacity(num_indices);
585 ordered_sources.resize_with(num_indices, || None);
586 let mut file_scan_tasks = Vec::new();
587
588 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
589 if stream_ctx.is_mem_range_index(*index) {
590 let stream = scan_flat_mem_ranges(
591 stream_ctx.clone(),
592 part_metrics.clone(),
593 *index,
594 range_meta.time_range,
595 );
596 ordered_sources[position] = Some(Box::pin(stream) as _);
597 } else if stream_ctx.is_file_range_index(*index) {
598 if let Some(semaphore_ref) = semaphore.as_ref() {
599 let stream_ctx = stream_ctx.clone();
601 let part_metrics = part_metrics.clone();
602 let partition_pruner = partition_pruner.clone();
603 let semaphore = Arc::clone(semaphore_ref);
604 let row_group_index = *index;
605 file_scan_tasks.push(async move {
606 let _permit = semaphore.acquire().await.unwrap();
607 let stream = scan_flat_file_ranges(
608 stream_ctx,
609 part_metrics,
610 row_group_index,
611 read_type,
612 partition_pruner,
613 )
614 .await?;
615 Ok((position, Box::pin(stream) as _))
616 });
617 } else {
618 let stream = scan_flat_file_ranges(
620 stream_ctx.clone(),
621 part_metrics.clone(),
622 *index,
623 read_type,
624 partition_pruner.clone(),
625 )
626 .await?;
627 ordered_sources[position] = Some(Box::pin(stream) as _);
628 }
629 } else {
630 let stream =
631 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
632 ordered_sources[position] = Some(stream);
633 }
634 }
635
636 if !file_scan_tasks.is_empty() {
637 let results = futures::future::try_join_all(file_scan_tasks).await?;
638 for (position, stream) in results {
639 ordered_sources[position] = Some(stream);
640 }
641 }
642
643 for stream in ordered_sources.into_iter().flatten() {
644 if should_split {
645 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
646 } else {
647 sources.push(stream);
648 }
649 }
650
651 if should_split {
652 common_telemetry::debug!(
653 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
654 stream_ctx.input.region_metadata().region_id,
655 sources.len(),
656 part_range,
657 );
658 }
659
660 Ok(split_batch_size)
661}
662
663#[cfg(test)]
664impl SeqScan {
665 pub(crate) fn input(&self) -> &ScanInput {
667 &self.stream_ctx.input
668 }
669}
670
671fn get_scanner_type(compaction: bool) -> &'static str {
673 if compaction {
674 "SeqScan(compaction)"
675 } else {
676 "SeqScan"
677 }
678}