1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::StreamExt;
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use store_api::storage::TimeSeriesRowSelector;
34use tokio::sync::Semaphore;
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
38use crate::read::last_row::LastRowReader;
39use crate::read::merge::MergeReaderBuilder;
40use crate::read::range::RangeBuilderList;
41use crate::read::scan_region::{ScanInput, StreamContext};
42use crate::read::scan_util::{
43 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
44};
45use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
46use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
47use crate::region::options::MergeMode;
48
49pub struct SeqScan {
54 properties: ScannerProperties,
56 stream_ctx: Arc<StreamContext>,
58 compaction: bool,
60 metrics_list: PartitionMetricsList,
63}
64
65impl SeqScan {
66 pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
69 let mut properties = ScannerProperties::default()
70 .with_append_mode(input.append_mode)
71 .with_total_rows(input.total_rows());
72 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
73 properties.partitions = vec![stream_ctx.partition_ranges()];
74
75 Self {
76 properties,
77 stream_ctx,
78 compaction,
79 metrics_list: PartitionMetricsList::default(),
80 }
81 }
82
83 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
88 let metrics_set = ExecutionPlanMetricsSet::new();
89 let streams = (0..self.properties.partitions.len())
90 .map(|partition: usize| self.scan_partition(&metrics_set, partition))
91 .collect::<Result<Vec<_>, _>>()?;
92
93 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
94 Ok(Box::pin(aggr_stream))
95 }
96
97 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
99 let metrics_set = ExecutionPlanMetricsSet::new();
100
101 let streams = (0..self.properties.partitions.len())
102 .map(|partition| {
103 let metrics = self.new_partition_metrics(&metrics_set, partition);
104 self.scan_batch_in_partition(partition, metrics)
105 })
106 .collect::<Result<Vec<_>>>()?;
107
108 Ok(Box::pin(futures::stream::iter(streams).flatten()))
109 }
110
111 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
116 assert!(self.compaction);
117
118 let metrics_set = ExecutionPlanMetricsSet::new();
119 let part_metrics = self.new_partition_metrics(&metrics_set, 0);
120 debug_assert_eq!(1, self.properties.partitions.len());
121 let partition_ranges = &self.properties.partitions[0];
122
123 let reader = Self::merge_all_ranges_for_compaction(
124 &self.stream_ctx,
125 partition_ranges,
126 &part_metrics,
127 )
128 .await?;
129 Ok(Box::new(reader))
130 }
131
132 async fn merge_all_ranges_for_compaction(
135 stream_ctx: &Arc<StreamContext>,
136 partition_ranges: &[PartitionRange],
137 part_metrics: &PartitionMetrics,
138 ) -> Result<BoxedBatchReader> {
139 let mut sources = Vec::new();
140 let range_builder_list = Arc::new(RangeBuilderList::new(
141 stream_ctx.input.num_memtables(),
142 stream_ctx.input.num_files(),
143 ));
144 for part_range in partition_ranges {
145 build_sources(
146 stream_ctx,
147 part_range,
148 true,
149 part_metrics,
150 range_builder_list.clone(),
151 &mut sources,
152 )
153 .await?;
154 }
155
156 common_telemetry::debug!(
157 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
158 stream_ctx.input.mapper.metadata().region_id,
159 partition_ranges.len(),
160 sources.len()
161 );
162 Self::build_reader_from_sources(stream_ctx, sources, None).await
163 }
164
165 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
168 pub(crate) async fn build_reader_from_sources(
169 stream_ctx: &StreamContext,
170 mut sources: Vec<Source>,
171 semaphore: Option<Arc<Semaphore>>,
172 ) -> Result<BoxedBatchReader> {
173 if let Some(semaphore) = semaphore.as_ref() {
174 if sources.len() > 1 {
176 sources = stream_ctx
177 .input
178 .create_parallel_sources(sources, semaphore.clone())?;
179 }
180 }
181
182 let mut builder = MergeReaderBuilder::from_sources(sources);
183 let reader = builder.build().await?;
184
185 let dedup = !stream_ctx.input.append_mode;
186 let reader = if dedup {
187 match stream_ctx.input.merge_mode {
188 MergeMode::LastRow => Box::new(DedupReader::new(
189 reader,
190 LastRow::new(stream_ctx.input.filter_deleted),
191 )) as _,
192 MergeMode::LastNonNull => Box::new(DedupReader::new(
193 reader,
194 LastNonNull::new(stream_ctx.input.filter_deleted),
195 )) as _,
196 }
197 } else {
198 Box::new(reader) as _
199 };
200
201 let reader = match &stream_ctx.input.series_row_selector {
202 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
203 None => reader,
204 };
205
206 Ok(reader)
207 }
208
209 fn scan_partition_impl(
212 &self,
213 metrics_set: &ExecutionPlanMetricsSet,
214 partition: usize,
215 ) -> Result<SendableRecordBatchStream> {
216 let metrics = self.new_partition_metrics(metrics_set, partition);
217
218 let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
219
220 let input = &self.stream_ctx.input;
221 let record_batch_stream = ConvertBatchStream::new(
222 batch_stream,
223 input.mapper.clone(),
224 input.cache_strategy.clone(),
225 metrics,
226 );
227
228 Ok(Box::pin(RecordBatchStreamWrapper::new(
229 input.mapper.output_schema(),
230 Box::pin(record_batch_stream),
231 )))
232 }
233
234 fn scan_batch_in_partition(
235 &self,
236 partition: usize,
237 part_metrics: PartitionMetrics,
238 ) -> Result<ScanBatchStream> {
239 ensure!(
240 partition < self.properties.partitions.len(),
241 PartitionOutOfRangeSnafu {
242 given: partition,
243 all: self.properties.partitions.len(),
244 }
245 );
246
247 if self.properties.partitions[partition].is_empty() {
248 return Ok(Box::pin(futures::stream::empty()));
249 }
250
251 let stream_ctx = self.stream_ctx.clone();
252 let semaphore = self.new_semaphore();
253 let partition_ranges = self.properties.partitions[partition].clone();
254 let compaction = self.compaction;
255 let distinguish_range = self.properties.distinguish_partition_range;
256
257 let stream = try_stream! {
258 part_metrics.on_first_poll();
259
260 let range_builder_list = Arc::new(RangeBuilderList::new(
261 stream_ctx.input.num_memtables(),
262 stream_ctx.input.num_files(),
263 ));
264 for part_range in partition_ranges {
266 let mut sources = Vec::new();
267 build_sources(
268 &stream_ctx,
269 &part_range,
270 compaction,
271 &part_metrics,
272 range_builder_list.clone(),
273 &mut sources,
274 ).await?;
275
276 let mut metrics = ScannerMetrics::default();
277 let mut fetch_start = Instant::now();
278 let mut reader =
279 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
280 .await?;
281 #[cfg(debug_assertions)]
282 let mut checker = crate::read::BatchChecker::default()
283 .with_start(Some(part_range.start))
284 .with_end(Some(part_range.end));
285
286 while let Some(batch) = reader.next_batch().await? {
287 metrics.scan_cost += fetch_start.elapsed();
288 metrics.num_batches += 1;
289 metrics.num_rows += batch.num_rows();
290
291 debug_assert!(!batch.is_empty());
292 if batch.is_empty() {
293 continue;
294 }
295
296 #[cfg(debug_assertions)]
297 checker.ensure_part_range_batch(
298 "SeqScan",
299 stream_ctx.input.mapper.metadata().region_id,
300 partition,
301 part_range,
302 &batch,
303 );
304
305 let yield_start = Instant::now();
306 yield ScanBatch::Normal(batch);
307 metrics.yield_cost += yield_start.elapsed();
308
309 fetch_start = Instant::now();
310 }
311
312 if distinguish_range {
315 let yield_start = Instant::now();
316 yield ScanBatch::Normal(Batch::empty());
317 metrics.yield_cost += yield_start.elapsed();
318 }
319
320 metrics.scan_cost += fetch_start.elapsed();
321 part_metrics.merge_metrics(&metrics);
322 }
323
324 part_metrics.on_finish();
325 };
326 Ok(Box::pin(stream))
327 }
328
329 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
330 if self.properties.target_partitions() > self.properties.num_partitions() {
331 Some(Arc::new(Semaphore::new(
337 self.properties.target_partitions() - self.properties.num_partitions() + 1,
338 )))
339 } else {
340 None
341 }
342 }
343
344 fn new_partition_metrics(
347 &self,
348 metrics_set: &ExecutionPlanMetricsSet,
349 partition: usize,
350 ) -> PartitionMetrics {
351 let metrics = PartitionMetrics::new(
352 self.stream_ctx.input.mapper.metadata().region_id,
353 partition,
354 get_scanner_type(self.compaction),
355 self.stream_ctx.query_start,
356 metrics_set,
357 );
358
359 if !self.compaction {
360 self.metrics_list.set(partition, metrics.clone());
361 }
362
363 metrics
364 }
365}
366
367impl RegionScanner for SeqScan {
368 fn properties(&self) -> &ScannerProperties {
369 &self.properties
370 }
371
372 fn schema(&self) -> SchemaRef {
373 self.stream_ctx.input.mapper.output_schema()
374 }
375
376 fn metadata(&self) -> RegionMetadataRef {
377 self.stream_ctx.input.mapper.metadata().clone()
378 }
379
380 fn scan_partition(
381 &self,
382 metrics_set: &ExecutionPlanMetricsSet,
383 partition: usize,
384 ) -> Result<SendableRecordBatchStream, BoxedError> {
385 self.scan_partition_impl(metrics_set, partition)
386 .map_err(BoxedError::new)
387 }
388
389 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
390 self.properties.prepare(request);
391 Ok(())
392 }
393
394 fn has_predicate(&self) -> bool {
395 let predicate = self.stream_ctx.input.predicate();
396 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
397 }
398
399 fn set_logical_region(&mut self, logical_region: bool) {
400 self.properties.set_logical_region(logical_region);
401 }
402}
403
404impl DisplayAs for SeqScan {
405 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
406 write!(
407 f,
408 "SeqScan: region={}, ",
409 self.stream_ctx.input.mapper.metadata().region_id
410 )?;
411 match t {
412 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
413 DisplayFormatType::Verbose => {
414 self.stream_ctx.format_for_explain(true, f)?;
415 self.metrics_list.format_verbose_metrics(f)
416 }
417 }
418 }
419}
420
421impl fmt::Debug for SeqScan {
422 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423 f.debug_struct("SeqScan")
424 .field("num_ranges", &self.stream_ctx.ranges.len())
425 .finish()
426 }
427}
428
429pub(crate) async fn build_sources(
431 stream_ctx: &Arc<StreamContext>,
432 part_range: &PartitionRange,
433 compaction: bool,
434 part_metrics: &PartitionMetrics,
435 range_builder_list: Arc<RangeBuilderList>,
436 sources: &mut Vec<Source>,
437) -> Result<()> {
438 let range_meta = &stream_ctx.ranges[part_range.identifier];
440 #[cfg(debug_assertions)]
441 if compaction {
442 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
444 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
445 debug_assert_eq!(
447 -1, row_group_idx.row_group_index,
448 "Expect {} range scan all row groups, given: {}",
449 i, row_group_idx.row_group_index,
450 );
451 }
452 }
453
454 sources.reserve(range_meta.row_group_indices.len());
455 for index in &range_meta.row_group_indices {
456 let stream = if stream_ctx.is_mem_range_index(*index) {
457 let stream = scan_mem_ranges(
458 stream_ctx.clone(),
459 part_metrics.clone(),
460 *index,
461 range_meta.time_range,
462 );
463 Box::pin(stream) as _
464 } else if stream_ctx.is_file_range_index(*index) {
465 let read_type = if compaction {
466 "compaction"
467 } else {
468 "seq_scan_files"
469 };
470 let stream = scan_file_ranges(
471 stream_ctx.clone(),
472 part_metrics.clone(),
473 *index,
474 read_type,
475 range_builder_list.clone(),
476 )
477 .await?;
478 Box::pin(stream) as _
479 } else {
480 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
481 };
482 sources.push(Source::Stream(stream));
483 }
484 Ok(())
485}
486
487#[cfg(test)]
488impl SeqScan {
489 pub(crate) fn input(&self) -> &ScanInput {
491 &self.stream_ctx.input
492 }
493}
494
495fn get_scanner_type(compaction: bool) -> &'static str {
497 if compaction {
498 "SeqScan(compaction)"
499 } else {
500 "SeqScan"
501 }
502}