1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::util::ChainedRecordBatchStream;
25use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
26use common_telemetry::tracing;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::schema::SchemaRef;
30use snafu::ResultExt;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
34use tokio::sync::Semaphore;
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
38use crate::read::last_row::LastRowReader;
39use crate::read::merge::MergeReaderBuilder;
40use crate::read::range::RangeBuilderList;
41use crate::read::scan_region::{ScanInput, StreamContext};
42use crate::read::scan_util::{
43 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
44};
45use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
46use crate::region::options::MergeMode;
47
48pub struct SeqScan {
53 properties: ScannerProperties,
55 stream_ctx: Arc<StreamContext>,
57 compaction: bool,
59 metrics_list: PartitionMetricsList,
62}
63
64impl SeqScan {
65 pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
68 let mut properties = ScannerProperties::default()
69 .with_append_mode(input.append_mode)
70 .with_total_rows(input.total_rows());
71 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
72 properties.partitions = vec![stream_ctx.partition_ranges()];
73
74 Self {
75 properties,
76 stream_ctx,
77 compaction,
78 metrics_list: PartitionMetricsList::default(),
79 }
80 }
81
82 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87 let metrics_set = ExecutionPlanMetricsSet::new();
88 let streams = (0..self.properties.partitions.len())
89 .map(|partition: usize| self.scan_partition(&metrics_set, partition))
90 .collect::<Result<Vec<_>, _>>()?;
91
92 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
93 Ok(Box::pin(aggr_stream))
94 }
95
96 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
101 assert!(self.compaction);
102
103 let metrics_set = ExecutionPlanMetricsSet::new();
104 let part_metrics = self.new_partition_metrics(&metrics_set, 0);
105 debug_assert_eq!(1, self.properties.partitions.len());
106 let partition_ranges = &self.properties.partitions[0];
107
108 let reader = Self::merge_all_ranges_for_compaction(
109 &self.stream_ctx,
110 partition_ranges,
111 &part_metrics,
112 )
113 .await?;
114 Ok(Box::new(reader))
115 }
116
117 async fn merge_all_ranges_for_compaction(
120 stream_ctx: &Arc<StreamContext>,
121 partition_ranges: &[PartitionRange],
122 part_metrics: &PartitionMetrics,
123 ) -> Result<BoxedBatchReader> {
124 let mut sources = Vec::new();
125 let range_builder_list = Arc::new(RangeBuilderList::new(
126 stream_ctx.input.num_memtables(),
127 stream_ctx.input.num_files(),
128 ));
129 for part_range in partition_ranges {
130 build_sources(
131 stream_ctx,
132 part_range,
133 true,
134 part_metrics,
135 range_builder_list.clone(),
136 &mut sources,
137 );
138 }
139
140 common_telemetry::debug!(
141 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
142 stream_ctx.input.mapper.metadata().region_id,
143 partition_ranges.len(),
144 sources.len()
145 );
146 Self::build_reader_from_sources(stream_ctx, sources, None).await
147 }
148
149 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
152 async fn build_reader_from_sources(
153 stream_ctx: &StreamContext,
154 mut sources: Vec<Source>,
155 semaphore: Option<Arc<Semaphore>>,
156 ) -> Result<BoxedBatchReader> {
157 if let Some(semaphore) = semaphore.as_ref() {
158 if sources.len() > 1 {
160 sources = stream_ctx
161 .input
162 .create_parallel_sources(sources, semaphore.clone())?;
163 }
164 }
165
166 let mut builder = MergeReaderBuilder::from_sources(sources);
167 let reader = builder.build().await?;
168
169 let dedup = !stream_ctx.input.append_mode;
170 let reader = if dedup {
171 match stream_ctx.input.merge_mode {
172 MergeMode::LastRow => Box::new(DedupReader::new(
173 reader,
174 LastRow::new(stream_ctx.input.filter_deleted),
175 )) as _,
176 MergeMode::LastNonNull => Box::new(DedupReader::new(
177 reader,
178 LastNonNull::new(stream_ctx.input.filter_deleted),
179 )) as _,
180 }
181 } else {
182 Box::new(reader) as _
183 };
184
185 let reader = match &stream_ctx.input.series_row_selector {
186 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
187 None => reader,
188 };
189
190 Ok(reader)
191 }
192
193 fn scan_partition_impl(
196 &self,
197 metrics_set: &ExecutionPlanMetricsSet,
198 partition: usize,
199 ) -> Result<SendableRecordBatchStream, BoxedError> {
200 if partition >= self.properties.partitions.len() {
201 return Err(BoxedError::new(
202 PartitionOutOfRangeSnafu {
203 given: partition,
204 all: self.properties.partitions.len(),
205 }
206 .build(),
207 ));
208 }
209
210 if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
211 return self.scan_partition_by_series(metrics_set, partition);
212 }
213
214 let stream_ctx = self.stream_ctx.clone();
215 let semaphore = self.new_semaphore();
216 let partition_ranges = self.properties.partitions[partition].clone();
217 let compaction = self.compaction;
218 let distinguish_range = self.properties.distinguish_partition_range;
219 let part_metrics = self.new_partition_metrics(metrics_set, partition);
220
221 let stream = try_stream! {
222 part_metrics.on_first_poll();
223
224 let range_builder_list = Arc::new(RangeBuilderList::new(
225 stream_ctx.input.num_memtables(),
226 stream_ctx.input.num_files(),
227 ));
228 for part_range in partition_ranges {
230 let mut sources = Vec::new();
231 build_sources(
232 &stream_ctx,
233 &part_range,
234 compaction,
235 &part_metrics,
236 range_builder_list.clone(),
237 &mut sources,
238 );
239
240 let mut reader =
241 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
242 .await
243 .map_err(BoxedError::new)
244 .context(ExternalSnafu)?;
245 let cache = &stream_ctx.input.cache_strategy;
246 let mut metrics = ScannerMetrics::default();
247 let mut fetch_start = Instant::now();
248 #[cfg(debug_assertions)]
249 let mut checker = crate::read::BatchChecker::default()
250 .with_start(Some(part_range.start))
251 .with_end(Some(part_range.end));
252
253 while let Some(batch) = reader
254 .next_batch()
255 .await
256 .map_err(BoxedError::new)
257 .context(ExternalSnafu)?
258 {
259 metrics.scan_cost += fetch_start.elapsed();
260 metrics.num_batches += 1;
261 metrics.num_rows += batch.num_rows();
262
263 debug_assert!(!batch.is_empty());
264 if batch.is_empty() {
265 continue;
266 }
267
268 #[cfg(debug_assertions)]
269 checker.ensure_part_range_batch(
270 "SeqScan",
271 stream_ctx.input.mapper.metadata().region_id,
272 partition,
273 part_range,
274 &batch,
275 );
276
277 let convert_start = Instant::now();
278 let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
279 metrics.convert_cost += convert_start.elapsed();
280 let yield_start = Instant::now();
281 yield record_batch;
282 metrics.yield_cost += yield_start.elapsed();
283
284 fetch_start = Instant::now();
285 }
286
287 if distinguish_range {
290 let yield_start = Instant::now();
291 yield stream_ctx.input.mapper.empty_record_batch();
292 metrics.yield_cost += yield_start.elapsed();
293 }
294
295 metrics.scan_cost += fetch_start.elapsed();
296 part_metrics.merge_metrics(&metrics);
297 }
298
299 part_metrics.on_finish();
300 };
301
302 let stream = Box::pin(RecordBatchStreamWrapper::new(
303 self.stream_ctx.input.mapper.output_schema(),
304 Box::pin(stream),
305 ));
306
307 Ok(stream)
308 }
309
310 fn scan_partition_by_series(
313 &self,
314 metrics_set: &ExecutionPlanMetricsSet,
315 partition: usize,
316 ) -> Result<SendableRecordBatchStream, BoxedError> {
317 let stream_ctx = self.stream_ctx.clone();
318 let semaphore = self.new_semaphore();
319 let partition_ranges = self.properties.partitions[partition].clone();
320 let distinguish_range = self.properties.distinguish_partition_range;
321 let part_metrics = self.new_partition_metrics(metrics_set, partition);
322 debug_assert!(!self.compaction);
323
324 let stream = try_stream! {
325 part_metrics.on_first_poll();
326
327 let range_builder_list = Arc::new(RangeBuilderList::new(
328 stream_ctx.input.num_memtables(),
329 stream_ctx.input.num_files(),
330 ));
331 let mut sources = Vec::with_capacity(partition_ranges.len());
333 for part_range in partition_ranges {
334 build_sources(
335 &stream_ctx,
336 &part_range,
337 false,
338 &part_metrics,
339 range_builder_list.clone(),
340 &mut sources,
341 );
342 }
343
344 let mut reader =
346 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
347 .await
348 .map_err(BoxedError::new)
349 .context(ExternalSnafu)?;
350 let cache = &stream_ctx.input.cache_strategy;
351 let mut metrics = ScannerMetrics::default();
352 let mut fetch_start = Instant::now();
353
354 while let Some(batch) = reader
355 .next_batch()
356 .await
357 .map_err(BoxedError::new)
358 .context(ExternalSnafu)?
359 {
360 metrics.scan_cost += fetch_start.elapsed();
361 metrics.num_batches += 1;
362 metrics.num_rows += batch.num_rows();
363
364 debug_assert!(!batch.is_empty());
365 if batch.is_empty() {
366 continue;
367 }
368
369 let convert_start = Instant::now();
370 let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
371 metrics.convert_cost += convert_start.elapsed();
372 let yield_start = Instant::now();
373 yield record_batch;
374 metrics.yield_cost += yield_start.elapsed();
375
376 fetch_start = Instant::now();
377 }
378
379 if distinguish_range {
382 let yield_start = Instant::now();
383 yield stream_ctx.input.mapper.empty_record_batch();
384 metrics.yield_cost += yield_start.elapsed();
385 }
386
387 metrics.scan_cost += fetch_start.elapsed();
388 part_metrics.merge_metrics(&metrics);
389
390 part_metrics.on_finish();
391 };
392
393 let stream = Box::pin(RecordBatchStreamWrapper::new(
394 self.stream_ctx.input.mapper.output_schema(),
395 Box::pin(stream),
396 ));
397
398 Ok(stream)
399 }
400
401 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
402 if self.properties.target_partitions() > self.properties.num_partitions() {
403 Some(Arc::new(Semaphore::new(
409 self.properties.target_partitions() - self.properties.num_partitions() + 1,
410 )))
411 } else {
412 None
413 }
414 }
415
416 fn new_partition_metrics(
419 &self,
420 metrics_set: &ExecutionPlanMetricsSet,
421 partition: usize,
422 ) -> PartitionMetrics {
423 let metrics = PartitionMetrics::new(
424 self.stream_ctx.input.mapper.metadata().region_id,
425 partition,
426 get_scanner_type(self.compaction),
427 self.stream_ctx.query_start,
428 metrics_set,
429 );
430
431 if !self.compaction {
432 self.metrics_list.set(partition, metrics.clone());
433 }
434
435 metrics
436 }
437}
438
439impl RegionScanner for SeqScan {
440 fn properties(&self) -> &ScannerProperties {
441 &self.properties
442 }
443
444 fn schema(&self) -> SchemaRef {
445 self.stream_ctx.input.mapper.output_schema()
446 }
447
448 fn metadata(&self) -> RegionMetadataRef {
449 self.stream_ctx.input.mapper.metadata().clone()
450 }
451
452 fn scan_partition(
453 &self,
454 metrics_set: &ExecutionPlanMetricsSet,
455 partition: usize,
456 ) -> Result<SendableRecordBatchStream, BoxedError> {
457 self.scan_partition_impl(metrics_set, partition)
458 }
459
460 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
461 self.properties.prepare(request);
462 Ok(())
463 }
464
465 fn has_predicate(&self) -> bool {
466 let predicate = self.stream_ctx.input.predicate();
467 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
468 }
469
470 fn set_logical_region(&mut self, logical_region: bool) {
471 self.properties.set_logical_region(logical_region);
472 }
473}
474
475impl DisplayAs for SeqScan {
476 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
477 write!(
478 f,
479 "SeqScan: region={}, ",
480 self.stream_ctx.input.mapper.metadata().region_id
481 )?;
482 match t {
483 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
484 DisplayFormatType::Verbose => {
485 self.stream_ctx.format_for_explain(true, f)?;
486 self.metrics_list.format_verbose_metrics(f)
487 }
488 }
489 }
490}
491
492impl fmt::Debug for SeqScan {
493 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494 f.debug_struct("SeqScan")
495 .field("num_ranges", &self.stream_ctx.ranges.len())
496 .finish()
497 }
498}
499
500fn build_sources(
502 stream_ctx: &Arc<StreamContext>,
503 part_range: &PartitionRange,
504 compaction: bool,
505 part_metrics: &PartitionMetrics,
506 range_builder_list: Arc<RangeBuilderList>,
507 sources: &mut Vec<Source>,
508) {
509 let range_meta = &stream_ctx.ranges[part_range.identifier];
511 #[cfg(debug_assertions)]
512 if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
513 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
515 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
516 debug_assert_eq!(
518 -1, row_group_idx.row_group_index,
519 "Expect {} range scan all row groups, given: {}",
520 i, row_group_idx.row_group_index,
521 );
522 }
523 }
524
525 sources.reserve(range_meta.row_group_indices.len());
526 for index in &range_meta.row_group_indices {
527 let stream = if stream_ctx.is_mem_range_index(*index) {
528 let stream = scan_mem_ranges(
529 stream_ctx.clone(),
530 part_metrics.clone(),
531 *index,
532 range_meta.time_range,
533 );
534 Box::pin(stream) as _
535 } else {
536 let read_type = if compaction {
537 "compaction"
538 } else {
539 "seq_scan_files"
540 };
541 let stream = scan_file_ranges(
542 stream_ctx.clone(),
543 part_metrics.clone(),
544 *index,
545 read_type,
546 range_builder_list.clone(),
547 );
548 Box::pin(stream) as _
549 };
550 sources.push(Source::Stream(stream));
551 }
552}
553
554#[cfg(test)]
555impl SeqScan {
556 pub(crate) fn input(&self) -> &ScanInput {
558 &self.stream_ctx.input
559 }
560}
561
562fn get_scanner_type(compaction: bool) -> &'static str {
564 if compaction {
565 "SeqScan(compaction)"
566 } else {
567 "SeqScan"
568 }
569}