1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::util::ChainedRecordBatchStream;
25use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
26use common_telemetry::tracing;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::schema::SchemaRef;
30use snafu::ResultExt;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use store_api::storage::TimeSeriesRowSelector;
34use tokio::sync::Semaphore;
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
38use crate::read::last_row::LastRowReader;
39use crate::read::merge::MergeReaderBuilder;
40use crate::read::range::RangeBuilderList;
41use crate::read::scan_region::{ScanInput, StreamContext};
42use crate::read::scan_util::{
43 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
44};
45use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
46use crate::region::options::MergeMode;
47
48pub struct SeqScan {
53 properties: ScannerProperties,
55 stream_ctx: Arc<StreamContext>,
57 compaction: bool,
59 metrics_list: PartitionMetricsList,
62}
63
64impl SeqScan {
65 pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
68 let mut properties = ScannerProperties::default()
69 .with_append_mode(input.append_mode)
70 .with_total_rows(input.total_rows());
71 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
72 properties.partitions = vec![stream_ctx.partition_ranges()];
73
74 Self {
75 properties,
76 stream_ctx,
77 compaction,
78 metrics_list: PartitionMetricsList::default(),
79 }
80 }
81
82 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87 let metrics_set = ExecutionPlanMetricsSet::new();
88 let streams = (0..self.properties.partitions.len())
89 .map(|partition: usize| self.scan_partition(&metrics_set, partition))
90 .collect::<Result<Vec<_>, _>>()?;
91
92 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
93 Ok(Box::pin(aggr_stream))
94 }
95
96 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
101 assert!(self.compaction);
102
103 let metrics_set = ExecutionPlanMetricsSet::new();
104 let part_metrics = self.new_partition_metrics(&metrics_set, 0);
105 debug_assert_eq!(1, self.properties.partitions.len());
106 let partition_ranges = &self.properties.partitions[0];
107
108 let reader = Self::merge_all_ranges_for_compaction(
109 &self.stream_ctx,
110 partition_ranges,
111 &part_metrics,
112 )
113 .await?;
114 Ok(Box::new(reader))
115 }
116
117 async fn merge_all_ranges_for_compaction(
120 stream_ctx: &Arc<StreamContext>,
121 partition_ranges: &[PartitionRange],
122 part_metrics: &PartitionMetrics,
123 ) -> Result<BoxedBatchReader> {
124 let mut sources = Vec::new();
125 let range_builder_list = Arc::new(RangeBuilderList::new(
126 stream_ctx.input.num_memtables(),
127 stream_ctx.input.num_files(),
128 ));
129 for part_range in partition_ranges {
130 build_sources(
131 stream_ctx,
132 part_range,
133 true,
134 part_metrics,
135 range_builder_list.clone(),
136 &mut sources,
137 );
138 }
139
140 common_telemetry::debug!(
141 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
142 stream_ctx.input.mapper.metadata().region_id,
143 partition_ranges.len(),
144 sources.len()
145 );
146 Self::build_reader_from_sources(stream_ctx, sources, None).await
147 }
148
149 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
152 pub(crate) async fn build_reader_from_sources(
153 stream_ctx: &StreamContext,
154 mut sources: Vec<Source>,
155 semaphore: Option<Arc<Semaphore>>,
156 ) -> Result<BoxedBatchReader> {
157 if let Some(semaphore) = semaphore.as_ref() {
158 if sources.len() > 1 {
160 sources = stream_ctx
161 .input
162 .create_parallel_sources(sources, semaphore.clone())?;
163 }
164 }
165
166 let mut builder = MergeReaderBuilder::from_sources(sources);
167 let reader = builder.build().await?;
168
169 let dedup = !stream_ctx.input.append_mode;
170 let reader = if dedup {
171 match stream_ctx.input.merge_mode {
172 MergeMode::LastRow => Box::new(DedupReader::new(
173 reader,
174 LastRow::new(stream_ctx.input.filter_deleted),
175 )) as _,
176 MergeMode::LastNonNull => Box::new(DedupReader::new(
177 reader,
178 LastNonNull::new(stream_ctx.input.filter_deleted),
179 )) as _,
180 }
181 } else {
182 Box::new(reader) as _
183 };
184
185 let reader = match &stream_ctx.input.series_row_selector {
186 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
187 None => reader,
188 };
189
190 Ok(reader)
191 }
192
193 fn scan_partition_impl(
196 &self,
197 metrics_set: &ExecutionPlanMetricsSet,
198 partition: usize,
199 ) -> Result<SendableRecordBatchStream, BoxedError> {
200 if partition >= self.properties.partitions.len() {
201 return Err(BoxedError::new(
202 PartitionOutOfRangeSnafu {
203 given: partition,
204 all: self.properties.partitions.len(),
205 }
206 .build(),
207 ));
208 }
209 if self.properties.partitions[partition].is_empty() {
210 return Ok(Box::pin(RecordBatchStreamWrapper::new(
211 self.stream_ctx.input.mapper.output_schema(),
212 common_recordbatch::EmptyRecordBatchStream::new(
213 self.stream_ctx.input.mapper.output_schema(),
214 ),
215 )));
216 }
217
218 let stream_ctx = self.stream_ctx.clone();
219 let semaphore = self.new_semaphore();
220 let partition_ranges = self.properties.partitions[partition].clone();
221 let compaction = self.compaction;
222 let distinguish_range = self.properties.distinguish_partition_range;
223 let part_metrics = self.new_partition_metrics(metrics_set, partition);
224
225 let stream = try_stream! {
226 part_metrics.on_first_poll();
227
228 let range_builder_list = Arc::new(RangeBuilderList::new(
229 stream_ctx.input.num_memtables(),
230 stream_ctx.input.num_files(),
231 ));
232 for part_range in partition_ranges {
234 let mut sources = Vec::new();
235 build_sources(
236 &stream_ctx,
237 &part_range,
238 compaction,
239 &part_metrics,
240 range_builder_list.clone(),
241 &mut sources,
242 );
243
244 let mut metrics = ScannerMetrics::default();
245 let mut fetch_start = Instant::now();
246 let mut reader =
247 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
248 .await
249 .map_err(BoxedError::new)
250 .context(ExternalSnafu)?;
251 let cache = &stream_ctx.input.cache_strategy;
252 #[cfg(debug_assertions)]
253 let mut checker = crate::read::BatchChecker::default()
254 .with_start(Some(part_range.start))
255 .with_end(Some(part_range.end));
256
257 while let Some(batch) = reader
258 .next_batch()
259 .await
260 .map_err(BoxedError::new)
261 .context(ExternalSnafu)?
262 {
263 metrics.scan_cost += fetch_start.elapsed();
264 metrics.num_batches += 1;
265 metrics.num_rows += batch.num_rows();
266
267 debug_assert!(!batch.is_empty());
268 if batch.is_empty() {
269 continue;
270 }
271
272 #[cfg(debug_assertions)]
273 checker.ensure_part_range_batch(
274 "SeqScan",
275 stream_ctx.input.mapper.metadata().region_id,
276 partition,
277 part_range,
278 &batch,
279 );
280
281 let convert_start = Instant::now();
282 let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
283 metrics.convert_cost += convert_start.elapsed();
284 let yield_start = Instant::now();
285 yield record_batch;
286 metrics.yield_cost += yield_start.elapsed();
287
288 fetch_start = Instant::now();
289 }
290
291 if distinguish_range {
294 let yield_start = Instant::now();
295 yield stream_ctx.input.mapper.empty_record_batch();
296 metrics.yield_cost += yield_start.elapsed();
297 }
298
299 metrics.scan_cost += fetch_start.elapsed();
300 part_metrics.merge_metrics(&metrics);
301 }
302
303 part_metrics.on_finish();
304 };
305
306 let stream = Box::pin(RecordBatchStreamWrapper::new(
307 self.stream_ctx.input.mapper.output_schema(),
308 Box::pin(stream),
309 ));
310
311 Ok(stream)
312 }
313
314 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
315 if self.properties.target_partitions() > self.properties.num_partitions() {
316 Some(Arc::new(Semaphore::new(
322 self.properties.target_partitions() - self.properties.num_partitions() + 1,
323 )))
324 } else {
325 None
326 }
327 }
328
329 fn new_partition_metrics(
332 &self,
333 metrics_set: &ExecutionPlanMetricsSet,
334 partition: usize,
335 ) -> PartitionMetrics {
336 let metrics = PartitionMetrics::new(
337 self.stream_ctx.input.mapper.metadata().region_id,
338 partition,
339 get_scanner_type(self.compaction),
340 self.stream_ctx.query_start,
341 metrics_set,
342 );
343
344 if !self.compaction {
345 self.metrics_list.set(partition, metrics.clone());
346 }
347
348 metrics
349 }
350}
351
352impl RegionScanner for SeqScan {
353 fn properties(&self) -> &ScannerProperties {
354 &self.properties
355 }
356
357 fn schema(&self) -> SchemaRef {
358 self.stream_ctx.input.mapper.output_schema()
359 }
360
361 fn metadata(&self) -> RegionMetadataRef {
362 self.stream_ctx.input.mapper.metadata().clone()
363 }
364
365 fn scan_partition(
366 &self,
367 metrics_set: &ExecutionPlanMetricsSet,
368 partition: usize,
369 ) -> Result<SendableRecordBatchStream, BoxedError> {
370 self.scan_partition_impl(metrics_set, partition)
371 }
372
373 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
374 self.properties.prepare(request);
375 Ok(())
376 }
377
378 fn has_predicate(&self) -> bool {
379 let predicate = self.stream_ctx.input.predicate();
380 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
381 }
382
383 fn set_logical_region(&mut self, logical_region: bool) {
384 self.properties.set_logical_region(logical_region);
385 }
386}
387
388impl DisplayAs for SeqScan {
389 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
390 write!(
391 f,
392 "SeqScan: region={}, ",
393 self.stream_ctx.input.mapper.metadata().region_id
394 )?;
395 match t {
396 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
397 DisplayFormatType::Verbose => {
398 self.stream_ctx.format_for_explain(true, f)?;
399 self.metrics_list.format_verbose_metrics(f)
400 }
401 }
402 }
403}
404
405impl fmt::Debug for SeqScan {
406 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407 f.debug_struct("SeqScan")
408 .field("num_ranges", &self.stream_ctx.ranges.len())
409 .finish()
410 }
411}
412
413pub(crate) fn build_sources(
415 stream_ctx: &Arc<StreamContext>,
416 part_range: &PartitionRange,
417 compaction: bool,
418 part_metrics: &PartitionMetrics,
419 range_builder_list: Arc<RangeBuilderList>,
420 sources: &mut Vec<Source>,
421) {
422 let range_meta = &stream_ctx.ranges[part_range.identifier];
424 #[cfg(debug_assertions)]
425 if compaction {
426 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
428 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
429 debug_assert_eq!(
431 -1, row_group_idx.row_group_index,
432 "Expect {} range scan all row groups, given: {}",
433 i, row_group_idx.row_group_index,
434 );
435 }
436 }
437
438 sources.reserve(range_meta.row_group_indices.len());
439 for index in &range_meta.row_group_indices {
440 let stream = if stream_ctx.is_mem_range_index(*index) {
441 let stream = scan_mem_ranges(
442 stream_ctx.clone(),
443 part_metrics.clone(),
444 *index,
445 range_meta.time_range,
446 );
447 Box::pin(stream) as _
448 } else {
449 let read_type = if compaction {
450 "compaction"
451 } else {
452 "seq_scan_files"
453 };
454 let stream = scan_file_ranges(
455 stream_ctx.clone(),
456 part_metrics.clone(),
457 *index,
458 read_type,
459 range_builder_list.clone(),
460 );
461 Box::pin(stream) as _
462 };
463 sources.push(Source::Stream(stream));
464 }
465}
466
467#[cfg(test)]
468impl SeqScan {
469 pub(crate) fn input(&self) -> &ScanInput {
471 &self.stream_ctx.input
472 }
473}
474
475fn get_scanner_type(compaction: bool) -> &'static str {
477 if compaction {
478 "SeqScan(compaction)"
479 } else {
480 "SeqScan"
481 }
482}