1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::{tracing, warn};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40 PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
41 scan_flat_mem_ranges, scan_mem_ranges,
42};
43use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
44use crate::read::{Batch, ScannerMetrics, scan_util};
45
46pub struct UnorderedScan {
50 properties: ScannerProperties,
52 stream_ctx: Arc<StreamContext>,
54 pruner: Arc<Pruner>,
56 metrics_list: PartitionMetricsList,
58}
59
60impl UnorderedScan {
61 pub(crate) fn new(input: ScanInput) -> Self {
63 let mut properties = ScannerProperties::default()
64 .with_append_mode(input.append_mode)
65 .with_total_rows(input.total_rows());
66 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
67 properties.partitions = vec![stream_ctx.partition_ranges()];
68
69 let num_workers = common_stat::get_total_cpu_cores().max(1);
71 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
72
73 Self {
74 properties,
75 stream_ctx,
76 pruner,
77 metrics_list: PartitionMetricsList::default(),
78 }
79 }
80
81 #[tracing::instrument(
83 skip_all,
84 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
85 )]
86 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87 let metrics_set = ExecutionPlanMetricsSet::new();
88 let part_num = self.properties.num_partitions();
89 let streams = (0..part_num)
90 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
91 .collect::<Result<Vec<_>, BoxedError>>()?;
92 let stream = stream! {
93 for mut stream in streams {
94 while let Some(rb) = stream.next().await {
95 yield rb;
96 }
97 }
98 };
99 let stream = Box::pin(RecordBatchStreamWrapper::new(
100 self.schema(),
101 Box::pin(stream),
102 ));
103 Ok(stream)
104 }
105
106 #[tracing::instrument(
108 skip_all,
109 fields(
110 region_id = %stream_ctx.input.region_metadata().region_id,
111 part_range_id = part_range_id
112 )
113 )]
114 fn scan_partition_range(
115 stream_ctx: Arc<StreamContext>,
116 part_range_id: usize,
117 part_metrics: PartitionMetrics,
118 partition_pruner: Arc<PartitionPruner>,
119 ) -> impl Stream<Item = Result<Batch>> {
120 try_stream! {
121 let range_meta = &stream_ctx.ranges[part_range_id];
123 for index in &range_meta.row_group_indices {
124 if stream_ctx.is_mem_range_index(*index) {
125 let stream = scan_mem_ranges(
126 stream_ctx.clone(),
127 part_metrics.clone(),
128 *index,
129 range_meta.time_range,
130 );
131 for await batch in stream {
132 yield batch?;
133 }
134 } else if stream_ctx.is_file_range_index(*index) {
135 let stream = scan_file_ranges(
136 stream_ctx.clone(),
137 part_metrics.clone(),
138 *index,
139 "unordered_scan_files",
140 partition_pruner.clone(),
141 ).await?;
142 for await batch in stream {
143 yield batch?;
144 }
145 } else {
146 let stream = scan_util::maybe_scan_other_ranges(
147 &stream_ctx,
148 *index,
149 &part_metrics,
150 ).await?;
151 for await batch in stream {
152 yield batch?;
153 }
154 }
155 }
156 }
157 }
158
159 #[tracing::instrument(
161 skip_all,
162 fields(
163 region_id = %stream_ctx.input.region_metadata().region_id,
164 part_range_id = part_range_id
165 )
166 )]
167 fn scan_flat_partition_range(
168 stream_ctx: Arc<StreamContext>,
169 part_range_id: usize,
170 part_metrics: PartitionMetrics,
171 partition_pruner: Arc<PartitionPruner>,
172 ) -> impl Stream<Item = Result<RecordBatch>> {
173 try_stream! {
174 let range_meta = &stream_ctx.ranges[part_range_id];
176 for index in &range_meta.row_group_indices {
177 if stream_ctx.is_mem_range_index(*index) {
178 let stream = scan_flat_mem_ranges(
179 stream_ctx.clone(),
180 part_metrics.clone(),
181 *index,
182 range_meta.time_range,
183 );
184 for await record_batch in stream {
185 yield record_batch?;
186 }
187 } else if stream_ctx.is_file_range_index(*index) {
188 let stream = scan_flat_file_ranges(
189 stream_ctx.clone(),
190 part_metrics.clone(),
191 *index,
192 "unordered_scan_files",
193 partition_pruner.clone(),
194 ).await?;
195 for await record_batch in stream {
196 yield record_batch?;
197 }
198 } else {
199 let stream = scan_util::maybe_scan_flat_other_ranges(
200 &stream_ctx,
201 *index,
202 &part_metrics,
203 ).await?;
204 for await record_batch in stream {
205 yield record_batch?;
206 }
207 }
208 }
209 }
210 }
211
212 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
214 let metrics_set = ExecutionPlanMetricsSet::new();
215
216 let streams = (0..self.properties.partitions.len())
217 .map(|partition| {
218 let metrics = self.partition_metrics(false, partition, &metrics_set);
219 self.scan_batch_in_partition(partition, metrics)
220 })
221 .collect::<Result<Vec<_>>>()?;
222
223 Ok(Box::pin(futures::stream::iter(streams).flatten()))
224 }
225
226 fn partition_metrics(
227 &self,
228 explain_verbose: bool,
229 partition: usize,
230 metrics_set: &ExecutionPlanMetricsSet,
231 ) -> PartitionMetrics {
232 let part_metrics = PartitionMetrics::new(
233 self.stream_ctx.input.mapper.metadata().region_id,
234 partition,
235 "UnorderedScan",
236 self.stream_ctx.query_start,
237 explain_verbose,
238 metrics_set,
239 );
240 self.metrics_list.set(partition, part_metrics.clone());
241 part_metrics
242 }
243
244 #[tracing::instrument(
245 skip_all,
246 fields(
247 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
248 partition = partition
249 )
250 )]
251 fn scan_partition_impl(
252 &self,
253 ctx: &QueryScanContext,
254 metrics_set: &ExecutionPlanMetricsSet,
255 partition: usize,
256 ) -> Result<SendableRecordBatchStream> {
257 if ctx.explain_verbose {
258 common_telemetry::info!(
259 "UnorderedScan partition {}, region_id: {}",
260 partition,
261 self.stream_ctx.input.region_metadata().region_id
262 );
263 }
264
265 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
266 let input = &self.stream_ctx.input;
267
268 let batch_stream = if input.flat_format {
269 self.scan_flat_batch_in_partition(partition, metrics.clone())?
271 } else {
272 self.scan_batch_in_partition(partition, metrics.clone())?
274 };
275
276 let record_batch_stream = ConvertBatchStream::new(
277 batch_stream,
278 input.mapper.clone(),
279 input.cache_strategy.clone(),
280 metrics,
281 );
282
283 Ok(Box::pin(RecordBatchStreamWrapper::new(
284 input.mapper.output_schema(),
285 Box::pin(record_batch_stream),
286 )))
287 }
288
289 #[tracing::instrument(
290 skip_all,
291 fields(
292 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
293 partition = partition
294 )
295 )]
296 fn scan_batch_in_partition(
297 &self,
298 partition: usize,
299 part_metrics: PartitionMetrics,
300 ) -> Result<ScanBatchStream> {
301 ensure!(
302 partition < self.properties.partitions.len(),
303 PartitionOutOfRangeSnafu {
304 given: partition,
305 all: self.properties.partitions.len(),
306 }
307 );
308
309 let stream_ctx = self.stream_ctx.clone();
310 let part_ranges = self.properties.partitions[partition].clone();
311 let distinguish_range = self.properties.distinguish_partition_range;
312 let pruner = self.pruner.clone();
313 pruner.add_partition_ranges(&part_ranges);
318 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
319
320 let stream = try_stream! {
321 part_metrics.on_first_poll();
322
323 for part_range in part_ranges {
325 let mut metrics = ScannerMetrics::default();
326 let mut fetch_start = Instant::now();
327 let _mapper = &stream_ctx.input.mapper;
328 #[cfg(debug_assertions)]
329 let mut checker = crate::read::BatchChecker::default()
330 .with_start(Some(part_range.start))
331 .with_end(Some(part_range.end));
332
333 let stream = Self::scan_partition_range(
334 stream_ctx.clone(),
335 part_range.identifier,
336 part_metrics.clone(),
337 partition_pruner.clone(),
338 );
339 for await batch in stream {
340 let batch = batch?;
341 metrics.scan_cost += fetch_start.elapsed();
342 metrics.num_batches += 1;
343 metrics.num_rows += batch.num_rows();
344
345 debug_assert!(!batch.is_empty());
346 if batch.is_empty() {
347 continue;
348 }
349
350 #[cfg(debug_assertions)]
351 checker.ensure_part_range_batch(
352 "UnorderedScan",
353 _mapper.metadata().region_id,
354 partition,
355 part_range,
356 &batch,
357 );
358
359 let yield_start = Instant::now();
360 yield ScanBatch::Normal(batch);
361 metrics.yield_cost += yield_start.elapsed();
362
363 fetch_start = Instant::now();
364 }
365
366 if distinguish_range {
369 let yield_start = Instant::now();
370 yield ScanBatch::Normal(Batch::empty());
371 metrics.yield_cost += yield_start.elapsed();
372 }
373
374 metrics.scan_cost += fetch_start.elapsed();
375 part_metrics.merge_metrics(&metrics);
376 }
377
378 part_metrics.on_finish();
379 };
380 Ok(Box::pin(stream))
381 }
382
383 #[tracing::instrument(
384 skip_all,
385 fields(
386 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
387 partition = partition
388 )
389 )]
390 fn scan_flat_batch_in_partition(
391 &self,
392 partition: usize,
393 part_metrics: PartitionMetrics,
394 ) -> Result<ScanBatchStream> {
395 ensure!(
396 partition < self.properties.partitions.len(),
397 PartitionOutOfRangeSnafu {
398 given: partition,
399 all: self.properties.partitions.len(),
400 }
401 );
402
403 let stream_ctx = self.stream_ctx.clone();
404 let part_ranges = self.properties.partitions[partition].clone();
405 let pruner = self.pruner.clone();
406 pruner.add_partition_ranges(&part_ranges);
411 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
412
413 let stream = try_stream! {
414 part_metrics.on_first_poll();
415
416 for part_range in part_ranges {
418 let mut metrics = ScannerMetrics::default();
419 let mut fetch_start = Instant::now();
420
421 let stream = Self::scan_flat_partition_range(
422 stream_ctx.clone(),
423 part_range.identifier,
424 part_metrics.clone(),
425 partition_pruner.clone(),
426 );
427 for await record_batch in stream {
428 let record_batch = record_batch?;
429 metrics.scan_cost += fetch_start.elapsed();
430 metrics.num_batches += 1;
431 metrics.num_rows += record_batch.num_rows();
432
433 debug_assert!(record_batch.num_rows() > 0);
434 if record_batch.num_rows() == 0 {
435 continue;
436 }
437
438 let yield_start = Instant::now();
439 yield ScanBatch::RecordBatch(record_batch);
440 metrics.yield_cost += yield_start.elapsed();
441
442 fetch_start = Instant::now();
443 }
444
445 metrics.scan_cost += fetch_start.elapsed();
446 part_metrics.merge_metrics(&metrics);
447 }
448
449 part_metrics.on_finish();
450 };
451 Ok(Box::pin(stream))
452 }
453}
454
455impl RegionScanner for UnorderedScan {
456 fn name(&self) -> &str {
457 "UnorderedScan"
458 }
459
460 fn properties(&self) -> &ScannerProperties {
461 &self.properties
462 }
463
464 fn schema(&self) -> SchemaRef {
465 self.stream_ctx.input.mapper.output_schema()
466 }
467
468 fn metadata(&self) -> RegionMetadataRef {
469 self.stream_ctx.input.mapper.metadata().clone()
470 }
471
472 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
473 self.properties.prepare(request);
474
475 Ok(())
476 }
477
478 fn scan_partition(
479 &self,
480 ctx: &QueryScanContext,
481 metrics_set: &ExecutionPlanMetricsSet,
482 partition: usize,
483 ) -> Result<SendableRecordBatchStream, BoxedError> {
484 self.scan_partition_impl(ctx, metrics_set, partition)
485 .map_err(BoxedError::new)
486 }
487
488 fn has_predicate_without_region(&self) -> bool {
490 let predicate = self
491 .stream_ctx
492 .input
493 .predicate_group()
494 .predicate_without_region();
495 predicate.is_some()
496 }
497
498 fn add_dyn_filter_to_predicate(
499 &mut self,
500 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
501 ) -> Vec<bool> {
502 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
503 }
504
505 fn set_logical_region(&mut self, logical_region: bool) {
506 self.properties.set_logical_region(logical_region);
507 }
508}
509
510impl DisplayAs for UnorderedScan {
511 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
512 write!(
513 f,
514 "UnorderedScan: region={}, ",
515 self.stream_ctx.input.mapper.metadata().region_id
516 )?;
517 match t {
518 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
519 self.stream_ctx.format_for_explain(false, f)
520 }
521 DisplayFormatType::Verbose => {
522 self.stream_ctx.format_for_explain(true, f)?;
523 self.metrics_list.format_verbose_metrics(f)
524 }
525 }
526 }
527}
528
529impl fmt::Debug for UnorderedScan {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 f.debug_struct("UnorderedScan")
532 .field("num_ranges", &self.stream_ctx.ranges.len())
533 .finish()
534 }
535}
536
537#[cfg(test)]
538impl UnorderedScan {
539 pub(crate) fn input(&self) -> &ScanInput {
541 &self.stream_ctx.input
542 }
543}