1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::StreamExt;
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::last_row::LastRowReader;
41use crate::read::merge::MergeReaderBuilder;
42use crate::read::range::RangeBuilderList;
43use crate::read::scan_region::{ScanInput, StreamContext};
44use crate::read::scan_util::{
45 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
46};
47use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
48use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
49use crate::region::options::MergeMode;
50
51pub struct SeqScan {
56 properties: ScannerProperties,
58 stream_ctx: Arc<StreamContext>,
60 compaction: bool,
62 metrics_list: PartitionMetricsList,
65}
66
67impl SeqScan {
68 pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
71 let mut properties = ScannerProperties::default()
72 .with_append_mode(input.append_mode)
73 .with_total_rows(input.total_rows());
74 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
75 properties.partitions = vec![stream_ctx.partition_ranges()];
76
77 Self {
78 properties,
79 stream_ctx,
80 compaction,
81 metrics_list: PartitionMetricsList::default(),
82 }
83 }
84
85 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
90 let metrics_set = ExecutionPlanMetricsSet::new();
91 let streams = (0..self.properties.partitions.len())
92 .map(|partition: usize| {
93 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
94 })
95 .collect::<Result<Vec<_>, _>>()?;
96
97 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
98 Ok(Box::pin(aggr_stream))
99 }
100
101 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
103 let metrics_set = ExecutionPlanMetricsSet::new();
104
105 let streams = (0..self.properties.partitions.len())
106 .map(|partition| {
107 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
108 self.scan_batch_in_partition(partition, metrics)
109 })
110 .collect::<Result<Vec<_>>>()?;
111
112 Ok(Box::pin(futures::stream::iter(streams).flatten()))
113 }
114
115 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
120 assert!(self.compaction);
121
122 let metrics_set = ExecutionPlanMetricsSet::new();
123 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
124 debug_assert_eq!(1, self.properties.partitions.len());
125 let partition_ranges = &self.properties.partitions[0];
126
127 let reader = Self::merge_all_ranges_for_compaction(
128 &self.stream_ctx,
129 partition_ranges,
130 &part_metrics,
131 )
132 .await?;
133 Ok(Box::new(reader))
134 }
135
136 async fn merge_all_ranges_for_compaction(
139 stream_ctx: &Arc<StreamContext>,
140 partition_ranges: &[PartitionRange],
141 part_metrics: &PartitionMetrics,
142 ) -> Result<BoxedBatchReader> {
143 let mut sources = Vec::new();
144 let range_builder_list = Arc::new(RangeBuilderList::new(
145 stream_ctx.input.num_memtables(),
146 stream_ctx.input.num_files(),
147 ));
148 for part_range in partition_ranges {
149 build_sources(
150 stream_ctx,
151 part_range,
152 true,
153 part_metrics,
154 range_builder_list.clone(),
155 &mut sources,
156 )
157 .await?;
158 }
159
160 common_telemetry::debug!(
161 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
162 stream_ctx.input.mapper.metadata().region_id,
163 partition_ranges.len(),
164 sources.len()
165 );
166 Self::build_reader_from_sources(stream_ctx, sources, None).await
167 }
168
169 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
172 pub(crate) async fn build_reader_from_sources(
173 stream_ctx: &StreamContext,
174 mut sources: Vec<Source>,
175 semaphore: Option<Arc<Semaphore>>,
176 ) -> Result<BoxedBatchReader> {
177 if let Some(semaphore) = semaphore.as_ref() {
178 if sources.len() > 1 {
180 sources = stream_ctx
181 .input
182 .create_parallel_sources(sources, semaphore.clone())?;
183 }
184 }
185
186 let mut builder = MergeReaderBuilder::from_sources(sources);
187 let reader = builder.build().await?;
188
189 let dedup = !stream_ctx.input.append_mode;
190 let reader = if dedup {
191 match stream_ctx.input.merge_mode {
192 MergeMode::LastRow => Box::new(DedupReader::new(
193 reader,
194 LastRow::new(stream_ctx.input.filter_deleted),
195 )) as _,
196 MergeMode::LastNonNull => Box::new(DedupReader::new(
197 reader,
198 LastNonNull::new(stream_ctx.input.filter_deleted),
199 )) as _,
200 }
201 } else {
202 Box::new(reader) as _
203 };
204
205 let reader = match &stream_ctx.input.series_row_selector {
206 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
207 None => reader,
208 };
209
210 Ok(reader)
211 }
212
213 fn scan_partition_impl(
216 &self,
217 ctx: &QueryScanContext,
218 metrics_set: &ExecutionPlanMetricsSet,
219 partition: usize,
220 ) -> Result<SendableRecordBatchStream> {
221 if ctx.explain_verbose {
222 common_telemetry::info!(
223 "SeqScan partition {}, region_id: {}",
224 partition,
225 self.stream_ctx.input.region_metadata().region_id
226 );
227 }
228
229 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
230
231 let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
232
233 let input = &self.stream_ctx.input;
234 let record_batch_stream = ConvertBatchStream::new(
235 batch_stream,
236 input.mapper.clone(),
237 input.cache_strategy.clone(),
238 metrics,
239 );
240
241 Ok(Box::pin(RecordBatchStreamWrapper::new(
242 input.mapper.output_schema(),
243 Box::pin(record_batch_stream),
244 )))
245 }
246
247 fn scan_batch_in_partition(
248 &self,
249 partition: usize,
250 part_metrics: PartitionMetrics,
251 ) -> Result<ScanBatchStream> {
252 ensure!(
253 partition < self.properties.partitions.len(),
254 PartitionOutOfRangeSnafu {
255 given: partition,
256 all: self.properties.partitions.len(),
257 }
258 );
259
260 if self.properties.partitions[partition].is_empty() {
261 return Ok(Box::pin(futures::stream::empty()));
262 }
263
264 let stream_ctx = self.stream_ctx.clone();
265 let semaphore = self.new_semaphore();
266 let partition_ranges = self.properties.partitions[partition].clone();
267 let compaction = self.compaction;
268 let distinguish_range = self.properties.distinguish_partition_range;
269
270 let stream = try_stream! {
271 part_metrics.on_first_poll();
272
273 let range_builder_list = Arc::new(RangeBuilderList::new(
274 stream_ctx.input.num_memtables(),
275 stream_ctx.input.num_files(),
276 ));
277 for part_range in partition_ranges {
279 let mut sources = Vec::new();
280 build_sources(
281 &stream_ctx,
282 &part_range,
283 compaction,
284 &part_metrics,
285 range_builder_list.clone(),
286 &mut sources,
287 ).await?;
288
289 let mut metrics = ScannerMetrics::default();
290 let mut fetch_start = Instant::now();
291 let mut reader =
292 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
293 .await?;
294 #[cfg(debug_assertions)]
295 let mut checker = crate::read::BatchChecker::default()
296 .with_start(Some(part_range.start))
297 .with_end(Some(part_range.end));
298
299 while let Some(batch) = reader.next_batch().await? {
300 metrics.scan_cost += fetch_start.elapsed();
301 metrics.num_batches += 1;
302 metrics.num_rows += batch.num_rows();
303
304 debug_assert!(!batch.is_empty());
305 if batch.is_empty() {
306 continue;
307 }
308
309 #[cfg(debug_assertions)]
310 checker.ensure_part_range_batch(
311 "SeqScan",
312 stream_ctx.input.mapper.metadata().region_id,
313 partition,
314 part_range,
315 &batch,
316 );
317
318 let yield_start = Instant::now();
319 yield ScanBatch::Normal(batch);
320 metrics.yield_cost += yield_start.elapsed();
321
322 fetch_start = Instant::now();
323 }
324
325 if distinguish_range {
328 let yield_start = Instant::now();
329 yield ScanBatch::Normal(Batch::empty());
330 metrics.yield_cost += yield_start.elapsed();
331 }
332
333 metrics.scan_cost += fetch_start.elapsed();
334 part_metrics.merge_metrics(&metrics);
335 }
336
337 part_metrics.on_finish();
338 };
339 Ok(Box::pin(stream))
340 }
341
342 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
343 if self.properties.target_partitions() > self.properties.num_partitions() {
344 Some(Arc::new(Semaphore::new(
350 self.properties.target_partitions() - self.properties.num_partitions() + 1,
351 )))
352 } else {
353 None
354 }
355 }
356
357 fn new_partition_metrics(
360 &self,
361 explain_verbose: bool,
362 metrics_set: &ExecutionPlanMetricsSet,
363 partition: usize,
364 ) -> PartitionMetrics {
365 let metrics = PartitionMetrics::new(
366 self.stream_ctx.input.mapper.metadata().region_id,
367 partition,
368 get_scanner_type(self.compaction),
369 self.stream_ctx.query_start,
370 explain_verbose,
371 metrics_set,
372 );
373
374 if !self.compaction {
375 self.metrics_list.set(partition, metrics.clone());
376 }
377
378 metrics
379 }
380}
381
382impl RegionScanner for SeqScan {
383 fn properties(&self) -> &ScannerProperties {
384 &self.properties
385 }
386
387 fn schema(&self) -> SchemaRef {
388 self.stream_ctx.input.mapper.output_schema()
389 }
390
391 fn metadata(&self) -> RegionMetadataRef {
392 self.stream_ctx.input.mapper.metadata().clone()
393 }
394
395 fn scan_partition(
396 &self,
397 ctx: &QueryScanContext,
398 metrics_set: &ExecutionPlanMetricsSet,
399 partition: usize,
400 ) -> Result<SendableRecordBatchStream, BoxedError> {
401 self.scan_partition_impl(ctx, metrics_set, partition)
402 .map_err(BoxedError::new)
403 }
404
405 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
406 self.properties.prepare(request);
407 Ok(())
408 }
409
410 fn has_predicate(&self) -> bool {
411 let predicate = self.stream_ctx.input.predicate();
412 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
413 }
414
415 fn set_logical_region(&mut self, logical_region: bool) {
416 self.properties.set_logical_region(logical_region);
417 }
418}
419
420impl DisplayAs for SeqScan {
421 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
422 write!(
423 f,
424 "SeqScan: region={}, ",
425 self.stream_ctx.input.mapper.metadata().region_id
426 )?;
427 match t {
428 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
429 DisplayFormatType::Verbose => {
430 self.stream_ctx.format_for_explain(true, f)?;
431 self.metrics_list.format_verbose_metrics(f)
432 }
433 }
434 }
435}
436
437impl fmt::Debug for SeqScan {
438 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439 f.debug_struct("SeqScan")
440 .field("num_ranges", &self.stream_ctx.ranges.len())
441 .finish()
442 }
443}
444
445pub(crate) async fn build_sources(
447 stream_ctx: &Arc<StreamContext>,
448 part_range: &PartitionRange,
449 compaction: bool,
450 part_metrics: &PartitionMetrics,
451 range_builder_list: Arc<RangeBuilderList>,
452 sources: &mut Vec<Source>,
453) -> Result<()> {
454 let range_meta = &stream_ctx.ranges[part_range.identifier];
456 #[cfg(debug_assertions)]
457 if compaction {
458 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
460 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
461 debug_assert_eq!(
463 -1, row_group_idx.row_group_index,
464 "Expect {} range scan all row groups, given: {}",
465 i, row_group_idx.row_group_index,
466 );
467 }
468 }
469
470 sources.reserve(range_meta.row_group_indices.len());
471 for index in &range_meta.row_group_indices {
472 let stream = if stream_ctx.is_mem_range_index(*index) {
473 let stream = scan_mem_ranges(
474 stream_ctx.clone(),
475 part_metrics.clone(),
476 *index,
477 range_meta.time_range,
478 );
479 Box::pin(stream) as _
480 } else if stream_ctx.is_file_range_index(*index) {
481 let read_type = if compaction {
482 "compaction"
483 } else {
484 "seq_scan_files"
485 };
486 let stream = scan_file_ranges(
487 stream_ctx.clone(),
488 part_metrics.clone(),
489 *index,
490 read_type,
491 range_builder_list.clone(),
492 )
493 .await?;
494 Box::pin(stream) as _
495 } else {
496 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
497 };
498 sources.push(Source::Stream(stream));
499 }
500 Ok(())
501}
502
503#[cfg(test)]
504impl SeqScan {
505 pub(crate) fn input(&self) -> &ScanInput {
507 &self.stream_ctx.input
508 }
509}
510
511fn get_scanner_type(compaction: bool) -> &'static str {
513 if compaction {
514 "SeqScan(compaction)"
515 } else {
516 "SeqScan"
517 }
518}