1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::tracing;
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40 PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
41 scan_flat_mem_ranges, scan_mem_ranges,
42};
43use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
44use crate::read::{Batch, ScannerMetrics, scan_util};
45
46pub struct UnorderedScan {
50 properties: ScannerProperties,
52 stream_ctx: Arc<StreamContext>,
54 pruner: Arc<Pruner>,
56 metrics_list: PartitionMetricsList,
58}
59
60impl UnorderedScan {
61 pub(crate) fn new(input: ScanInput) -> Self {
63 let mut properties = ScannerProperties::default()
64 .with_append_mode(input.append_mode)
65 .with_total_rows(input.total_rows());
66 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
67 properties.partitions = vec![stream_ctx.partition_ranges()];
68
69 let num_workers = common_stat::get_total_cpu_cores().max(1);
71 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
72
73 Self {
74 properties,
75 stream_ctx,
76 pruner,
77 metrics_list: PartitionMetricsList::default(),
78 }
79 }
80
81 #[tracing::instrument(
83 skip_all,
84 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
85 )]
86 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87 let metrics_set = ExecutionPlanMetricsSet::new();
88 let part_num = self.properties.num_partitions();
89 let streams = (0..part_num)
90 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
91 .collect::<Result<Vec<_>, BoxedError>>()?;
92 let stream = stream! {
93 for mut stream in streams {
94 while let Some(rb) = stream.next().await {
95 yield rb;
96 }
97 }
98 };
99 let stream = Box::pin(RecordBatchStreamWrapper::new(
100 self.schema(),
101 Box::pin(stream),
102 ));
103 Ok(stream)
104 }
105
106 #[tracing::instrument(
108 skip_all,
109 fields(
110 region_id = %stream_ctx.input.region_metadata().region_id,
111 part_range_id = part_range_id
112 )
113 )]
114 fn scan_partition_range(
115 stream_ctx: Arc<StreamContext>,
116 part_range_id: usize,
117 part_metrics: PartitionMetrics,
118 partition_pruner: Arc<PartitionPruner>,
119 ) -> impl Stream<Item = Result<Batch>> {
120 try_stream! {
121 let range_meta = &stream_ctx.ranges[part_range_id];
123 for index in &range_meta.row_group_indices {
124 if stream_ctx.is_mem_range_index(*index) {
125 let stream = scan_mem_ranges(
126 stream_ctx.clone(),
127 part_metrics.clone(),
128 *index,
129 range_meta.time_range,
130 );
131 for await batch in stream {
132 yield batch?;
133 }
134 } else if stream_ctx.is_file_range_index(*index) {
135 let stream = scan_file_ranges(
136 stream_ctx.clone(),
137 part_metrics.clone(),
138 *index,
139 "unordered_scan_files",
140 partition_pruner.clone(),
141 ).await?;
142 for await batch in stream {
143 yield batch?;
144 }
145 } else {
146 let stream = scan_util::maybe_scan_other_ranges(
147 &stream_ctx,
148 *index,
149 &part_metrics,
150 ).await?;
151 for await batch in stream {
152 yield batch?;
153 }
154 }
155 }
156 }
157 }
158
159 #[tracing::instrument(
161 skip_all,
162 fields(
163 region_id = %stream_ctx.input.region_metadata().region_id,
164 part_range_id = part_range_id
165 )
166 )]
167 fn scan_flat_partition_range(
168 stream_ctx: Arc<StreamContext>,
169 part_range_id: usize,
170 part_metrics: PartitionMetrics,
171 partition_pruner: Arc<PartitionPruner>,
172 ) -> impl Stream<Item = Result<RecordBatch>> {
173 try_stream! {
174 let range_meta = &stream_ctx.ranges[part_range_id];
176 for index in &range_meta.row_group_indices {
177 if stream_ctx.is_mem_range_index(*index) {
178 let stream = scan_flat_mem_ranges(
179 stream_ctx.clone(),
180 part_metrics.clone(),
181 *index,
182 );
183 for await record_batch in stream {
184 yield record_batch?;
185 }
186 } else if stream_ctx.is_file_range_index(*index) {
187 let stream = scan_flat_file_ranges(
188 stream_ctx.clone(),
189 part_metrics.clone(),
190 *index,
191 "unordered_scan_files",
192 partition_pruner.clone(),
193 ).await?;
194 for await record_batch in stream {
195 yield record_batch?;
196 }
197 } else {
198 let stream = scan_util::maybe_scan_flat_other_ranges(
199 &stream_ctx,
200 *index,
201 &part_metrics,
202 ).await?;
203 for await record_batch in stream {
204 yield record_batch?;
205 }
206 }
207 }
208 }
209 }
210
211 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
213 let metrics_set = ExecutionPlanMetricsSet::new();
214
215 let streams = (0..self.properties.partitions.len())
216 .map(|partition| {
217 let metrics = self.partition_metrics(false, partition, &metrics_set);
218 self.scan_batch_in_partition(partition, metrics)
219 })
220 .collect::<Result<Vec<_>>>()?;
221
222 Ok(Box::pin(futures::stream::iter(streams).flatten()))
223 }
224
225 fn partition_metrics(
226 &self,
227 explain_verbose: bool,
228 partition: usize,
229 metrics_set: &ExecutionPlanMetricsSet,
230 ) -> PartitionMetrics {
231 let part_metrics = PartitionMetrics::new(
232 self.stream_ctx.input.mapper.metadata().region_id,
233 partition,
234 "UnorderedScan",
235 self.stream_ctx.query_start,
236 explain_verbose,
237 metrics_set,
238 );
239 self.metrics_list.set(partition, part_metrics.clone());
240 part_metrics
241 }
242
243 #[tracing::instrument(
244 skip_all,
245 fields(
246 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
247 partition = partition
248 )
249 )]
250 fn scan_partition_impl(
251 &self,
252 ctx: &QueryScanContext,
253 metrics_set: &ExecutionPlanMetricsSet,
254 partition: usize,
255 ) -> Result<SendableRecordBatchStream> {
256 if ctx.explain_verbose {
257 common_telemetry::info!(
258 "UnorderedScan partition {}, region_id: {}",
259 partition,
260 self.stream_ctx.input.region_metadata().region_id
261 );
262 }
263
264 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
265 let input = &self.stream_ctx.input;
266
267 let batch_stream = if input.flat_format {
268 self.scan_flat_batch_in_partition(partition, metrics.clone())?
270 } else {
271 self.scan_batch_in_partition(partition, metrics.clone())?
273 };
274
275 let record_batch_stream = ConvertBatchStream::new(
276 batch_stream,
277 input.mapper.clone(),
278 input.cache_strategy.clone(),
279 metrics,
280 );
281
282 Ok(Box::pin(RecordBatchStreamWrapper::new(
283 input.mapper.output_schema(),
284 Box::pin(record_batch_stream),
285 )))
286 }
287
288 #[tracing::instrument(
289 skip_all,
290 fields(
291 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
292 partition = partition
293 )
294 )]
295 fn scan_batch_in_partition(
296 &self,
297 partition: usize,
298 part_metrics: PartitionMetrics,
299 ) -> Result<ScanBatchStream> {
300 ensure!(
301 partition < self.properties.partitions.len(),
302 PartitionOutOfRangeSnafu {
303 given: partition,
304 all: self.properties.partitions.len(),
305 }
306 );
307
308 let stream_ctx = self.stream_ctx.clone();
309 let part_ranges = self.properties.partitions[partition].clone();
310 let distinguish_range = self.properties.distinguish_partition_range;
311 let pruner = self.pruner.clone();
312 pruner.add_partition_ranges(&part_ranges);
317 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
318
319 let stream = try_stream! {
320 part_metrics.on_first_poll();
321
322 for part_range in part_ranges {
324 let mut metrics = ScannerMetrics::default();
325 let mut fetch_start = Instant::now();
326 let _mapper = &stream_ctx.input.mapper;
327 #[cfg(debug_assertions)]
328 let mut checker = crate::read::BatchChecker::default()
329 .with_start(Some(part_range.start))
330 .with_end(Some(part_range.end));
331
332 let stream = Self::scan_partition_range(
333 stream_ctx.clone(),
334 part_range.identifier,
335 part_metrics.clone(),
336 partition_pruner.clone(),
337 );
338 for await batch in stream {
339 let batch = batch?;
340 metrics.scan_cost += fetch_start.elapsed();
341 metrics.num_batches += 1;
342 metrics.num_rows += batch.num_rows();
343
344 debug_assert!(!batch.is_empty());
345 if batch.is_empty() {
346 continue;
347 }
348
349 #[cfg(debug_assertions)]
350 checker.ensure_part_range_batch(
351 "UnorderedScan",
352 _mapper.metadata().region_id,
353 partition,
354 part_range,
355 &batch,
356 );
357
358 let yield_start = Instant::now();
359 yield ScanBatch::Normal(batch);
360 metrics.yield_cost += yield_start.elapsed();
361
362 fetch_start = Instant::now();
363 }
364
365 if distinguish_range {
368 let yield_start = Instant::now();
369 yield ScanBatch::Normal(Batch::empty());
370 metrics.yield_cost += yield_start.elapsed();
371 }
372
373 metrics.scan_cost += fetch_start.elapsed();
374 part_metrics.merge_metrics(&metrics);
375 }
376
377 part_metrics.on_finish();
378 };
379 Ok(Box::pin(stream))
380 }
381
382 #[tracing::instrument(
383 skip_all,
384 fields(
385 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
386 partition = partition
387 )
388 )]
389 fn scan_flat_batch_in_partition(
390 &self,
391 partition: usize,
392 part_metrics: PartitionMetrics,
393 ) -> Result<ScanBatchStream> {
394 ensure!(
395 partition < self.properties.partitions.len(),
396 PartitionOutOfRangeSnafu {
397 given: partition,
398 all: self.properties.partitions.len(),
399 }
400 );
401
402 let stream_ctx = self.stream_ctx.clone();
403 let part_ranges = self.properties.partitions[partition].clone();
404 let pruner = self.pruner.clone();
405 pruner.add_partition_ranges(&part_ranges);
410 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
411
412 let stream = try_stream! {
413 part_metrics.on_first_poll();
414
415 for part_range in part_ranges {
417 let mut metrics = ScannerMetrics::default();
418 let mut fetch_start = Instant::now();
419
420 let stream = Self::scan_flat_partition_range(
421 stream_ctx.clone(),
422 part_range.identifier,
423 part_metrics.clone(),
424 partition_pruner.clone(),
425 );
426 for await record_batch in stream {
427 let record_batch = record_batch?;
428 metrics.scan_cost += fetch_start.elapsed();
429 metrics.num_batches += 1;
430 metrics.num_rows += record_batch.num_rows();
431
432 debug_assert!(record_batch.num_rows() > 0);
433 if record_batch.num_rows() == 0 {
434 continue;
435 }
436
437 let yield_start = Instant::now();
438 yield ScanBatch::RecordBatch(record_batch);
439 metrics.yield_cost += yield_start.elapsed();
440
441 fetch_start = Instant::now();
442 }
443
444 metrics.scan_cost += fetch_start.elapsed();
445 part_metrics.merge_metrics(&metrics);
446 }
447
448 part_metrics.on_finish();
449 };
450 Ok(Box::pin(stream))
451 }
452}
453
454impl RegionScanner for UnorderedScan {
455 fn name(&self) -> &str {
456 "UnorderedScan"
457 }
458
459 fn properties(&self) -> &ScannerProperties {
460 &self.properties
461 }
462
463 fn schema(&self) -> SchemaRef {
464 self.stream_ctx.input.mapper.output_schema()
465 }
466
467 fn metadata(&self) -> RegionMetadataRef {
468 self.stream_ctx.input.mapper.metadata().clone()
469 }
470
471 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
472 self.properties.prepare(request);
473
474 Ok(())
475 }
476
477 fn scan_partition(
478 &self,
479 ctx: &QueryScanContext,
480 metrics_set: &ExecutionPlanMetricsSet,
481 partition: usize,
482 ) -> Result<SendableRecordBatchStream, BoxedError> {
483 self.scan_partition_impl(ctx, metrics_set, partition)
484 .map_err(BoxedError::new)
485 }
486
487 fn has_predicate_without_region(&self) -> bool {
489 let predicate = self
490 .stream_ctx
491 .input
492 .predicate_group()
493 .predicate_without_region();
494
495 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
496 }
497
498 fn set_logical_region(&mut self, logical_region: bool) {
499 self.properties.set_logical_region(logical_region);
500 }
501}
502
503impl DisplayAs for UnorderedScan {
504 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
505 write!(
506 f,
507 "UnorderedScan: region={}, ",
508 self.stream_ctx.input.mapper.metadata().region_id
509 )?;
510 match t {
511 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
512 self.stream_ctx.format_for_explain(false, f)
513 }
514 DisplayFormatType::Verbose => {
515 self.stream_ctx.format_for_explain(true, f)?;
516 self.metrics_list.format_verbose_metrics(f)
517 }
518 }
519 }
520}
521
522impl fmt::Debug for UnorderedScan {
523 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524 f.debug_struct("UnorderedScan")
525 .field("num_ranges", &self.stream_ctx.ranges.len())
526 .finish()
527 }
528}
529
530#[cfg(test)]
531impl UnorderedScan {
532 pub(crate) fn input(&self) -> &ScanInput {
534 &self.stream_ctx.input
535 }
536}