1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::tracing;
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::range::{RangeBuilderList, file_range_counts};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40 PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
41 scan_flat_mem_ranges, scan_mem_ranges,
42};
43use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
44use crate::read::{Batch, ScannerMetrics, scan_util};
45
46pub struct UnorderedScan {
50 properties: ScannerProperties,
52 stream_ctx: Arc<StreamContext>,
54 metrics_list: PartitionMetricsList,
56}
57
58impl UnorderedScan {
59 pub(crate) fn new(input: ScanInput) -> Self {
61 let mut properties = ScannerProperties::default()
62 .with_append_mode(input.append_mode)
63 .with_total_rows(input.total_rows());
64 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
65 properties.partitions = vec![stream_ctx.partition_ranges()];
66
67 Self {
68 properties,
69 stream_ctx,
70 metrics_list: PartitionMetricsList::default(),
71 }
72 }
73
74 #[tracing::instrument(
76 skip_all,
77 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
78 )]
79 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
80 let metrics_set = ExecutionPlanMetricsSet::new();
81 let part_num = self.properties.num_partitions();
82 let streams = (0..part_num)
83 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
84 .collect::<Result<Vec<_>, BoxedError>>()?;
85 let stream = stream! {
86 for mut stream in streams {
87 while let Some(rb) = stream.next().await {
88 yield rb;
89 }
90 }
91 };
92 let stream = Box::pin(RecordBatchStreamWrapper::new(
93 self.schema(),
94 Box::pin(stream),
95 ));
96 Ok(stream)
97 }
98
99 #[tracing::instrument(
101 skip_all,
102 fields(
103 region_id = %stream_ctx.input.region_metadata().region_id,
104 part_range_id = part_range_id
105 )
106 )]
107 fn scan_partition_range(
108 stream_ctx: Arc<StreamContext>,
109 part_range_id: usize,
110 part_metrics: PartitionMetrics,
111 range_builder_list: Arc<RangeBuilderList>,
112 ) -> impl Stream<Item = Result<Batch>> {
113 try_stream! {
114 let range_meta = &stream_ctx.ranges[part_range_id];
116 for index in &range_meta.row_group_indices {
117 if stream_ctx.is_mem_range_index(*index) {
118 let stream = scan_mem_ranges(
119 stream_ctx.clone(),
120 part_metrics.clone(),
121 *index,
122 range_meta.time_range,
123 );
124 for await batch in stream {
125 yield batch?;
126 }
127 } else if stream_ctx.is_file_range_index(*index) {
128 let stream = scan_file_ranges(
129 stream_ctx.clone(),
130 part_metrics.clone(),
131 *index,
132 "unordered_scan_files",
133 range_builder_list.clone(),
134 ).await?;
135 for await batch in stream {
136 yield batch?;
137 }
138 } else {
139 let stream = scan_util::maybe_scan_other_ranges(
140 &stream_ctx,
141 *index,
142 &part_metrics,
143 ).await?;
144 for await batch in stream {
145 yield batch?;
146 }
147 }
148 }
149 }
150 }
151
152 #[tracing::instrument(
154 skip_all,
155 fields(
156 region_id = %stream_ctx.input.region_metadata().region_id,
157 part_range_id = part_range_id
158 )
159 )]
160 fn scan_flat_partition_range(
161 stream_ctx: Arc<StreamContext>,
162 part_range_id: usize,
163 part_metrics: PartitionMetrics,
164 range_builder_list: Arc<RangeBuilderList>,
165 ) -> impl Stream<Item = Result<RecordBatch>> {
166 try_stream! {
167 let range_meta = &stream_ctx.ranges[part_range_id];
169 for index in &range_meta.row_group_indices {
170 if stream_ctx.is_mem_range_index(*index) {
171 let stream = scan_flat_mem_ranges(
172 stream_ctx.clone(),
173 part_metrics.clone(),
174 *index,
175 );
176 for await record_batch in stream {
177 yield record_batch?;
178 }
179 } else if stream_ctx.is_file_range_index(*index) {
180 let stream = scan_flat_file_ranges(
181 stream_ctx.clone(),
182 part_metrics.clone(),
183 *index,
184 "unordered_scan_files",
185 range_builder_list.clone(),
186 ).await?;
187 for await record_batch in stream {
188 yield record_batch?;
189 }
190 } else {
191 let stream = scan_util::maybe_scan_flat_other_ranges(
192 &stream_ctx,
193 *index,
194 &part_metrics,
195 ).await?;
196 for await record_batch in stream {
197 yield record_batch?;
198 }
199 }
200 }
201 }
202 }
203
204 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
206 let metrics_set = ExecutionPlanMetricsSet::new();
207
208 let streams = (0..self.properties.partitions.len())
209 .map(|partition| {
210 let metrics = self.partition_metrics(false, partition, &metrics_set);
211 self.scan_batch_in_partition(partition, metrics)
212 })
213 .collect::<Result<Vec<_>>>()?;
214
215 Ok(Box::pin(futures::stream::iter(streams).flatten()))
216 }
217
218 fn partition_metrics(
219 &self,
220 explain_verbose: bool,
221 partition: usize,
222 metrics_set: &ExecutionPlanMetricsSet,
223 ) -> PartitionMetrics {
224 let part_metrics = PartitionMetrics::new(
225 self.stream_ctx.input.mapper.metadata().region_id,
226 partition,
227 "UnorderedScan",
228 self.stream_ctx.query_start,
229 explain_verbose,
230 metrics_set,
231 );
232 self.metrics_list.set(partition, part_metrics.clone());
233 part_metrics
234 }
235
236 #[tracing::instrument(
237 skip_all,
238 fields(
239 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
240 partition = partition
241 )
242 )]
243 fn scan_partition_impl(
244 &self,
245 ctx: &QueryScanContext,
246 metrics_set: &ExecutionPlanMetricsSet,
247 partition: usize,
248 ) -> Result<SendableRecordBatchStream> {
249 if ctx.explain_verbose {
250 common_telemetry::info!(
251 "UnorderedScan partition {}, region_id: {}",
252 partition,
253 self.stream_ctx.input.region_metadata().region_id
254 );
255 }
256
257 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
258 let input = &self.stream_ctx.input;
259
260 let batch_stream = if input.flat_format {
261 self.scan_flat_batch_in_partition(partition, metrics.clone())?
263 } else {
264 self.scan_batch_in_partition(partition, metrics.clone())?
266 };
267
268 let record_batch_stream = ConvertBatchStream::new(
269 batch_stream,
270 input.mapper.clone(),
271 input.cache_strategy.clone(),
272 metrics,
273 );
274
275 Ok(Box::pin(RecordBatchStreamWrapper::new(
276 input.mapper.output_schema(),
277 Box::pin(record_batch_stream),
278 )))
279 }
280
281 #[tracing::instrument(
282 skip_all,
283 fields(
284 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
285 partition = partition
286 )
287 )]
288 fn scan_batch_in_partition(
289 &self,
290 partition: usize,
291 part_metrics: PartitionMetrics,
292 ) -> Result<ScanBatchStream> {
293 ensure!(
294 partition < self.properties.partitions.len(),
295 PartitionOutOfRangeSnafu {
296 given: partition,
297 all: self.properties.partitions.len(),
298 }
299 );
300
301 let stream_ctx = self.stream_ctx.clone();
302 let part_ranges = self.properties.partitions[partition].clone();
303 let distinguish_range = self.properties.distinguish_partition_range;
304
305 let stream = try_stream! {
306 part_metrics.on_first_poll();
307
308 let counts = file_range_counts(
309 stream_ctx.input.num_memtables(),
310 stream_ctx.input.num_files(),
311 &stream_ctx.ranges,
312 part_ranges.iter(),
313 );
314 let range_builder_list = Arc::new(RangeBuilderList::new(
315 stream_ctx.input.num_memtables(),
316 counts,
317 ));
318 for part_range in part_ranges {
320 let mut metrics = ScannerMetrics::default();
321 let mut fetch_start = Instant::now();
322 let _mapper = &stream_ctx.input.mapper;
323 #[cfg(debug_assertions)]
324 let mut checker = crate::read::BatchChecker::default()
325 .with_start(Some(part_range.start))
326 .with_end(Some(part_range.end));
327
328 let stream = Self::scan_partition_range(
329 stream_ctx.clone(),
330 part_range.identifier,
331 part_metrics.clone(),
332 range_builder_list.clone(),
333 );
334 for await batch in stream {
335 let batch = batch?;
336 metrics.scan_cost += fetch_start.elapsed();
337 metrics.num_batches += 1;
338 metrics.num_rows += batch.num_rows();
339
340 debug_assert!(!batch.is_empty());
341 if batch.is_empty() {
342 continue;
343 }
344
345 #[cfg(debug_assertions)]
346 checker.ensure_part_range_batch(
347 "UnorderedScan",
348 _mapper.metadata().region_id,
349 partition,
350 part_range,
351 &batch,
352 );
353
354 let yield_start = Instant::now();
355 yield ScanBatch::Normal(batch);
356 metrics.yield_cost += yield_start.elapsed();
357
358 fetch_start = Instant::now();
359 }
360
361 if distinguish_range {
364 let yield_start = Instant::now();
365 yield ScanBatch::Normal(Batch::empty());
366 metrics.yield_cost += yield_start.elapsed();
367 }
368
369 metrics.scan_cost += fetch_start.elapsed();
370 part_metrics.merge_metrics(&metrics);
371 }
372
373 part_metrics.on_finish();
374 };
375 Ok(Box::pin(stream))
376 }
377
378 #[tracing::instrument(
379 skip_all,
380 fields(
381 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
382 partition = partition
383 )
384 )]
385 fn scan_flat_batch_in_partition(
386 &self,
387 partition: usize,
388 part_metrics: PartitionMetrics,
389 ) -> Result<ScanBatchStream> {
390 ensure!(
391 partition < self.properties.partitions.len(),
392 PartitionOutOfRangeSnafu {
393 given: partition,
394 all: self.properties.partitions.len(),
395 }
396 );
397
398 let stream_ctx = self.stream_ctx.clone();
399 let part_ranges = self.properties.partitions[partition].clone();
400
401 let stream = try_stream! {
402 part_metrics.on_first_poll();
403
404 let counts = file_range_counts(
405 stream_ctx.input.num_memtables(),
406 stream_ctx.input.num_files(),
407 &stream_ctx.ranges,
408 part_ranges.iter(),
409 );
410 let range_builder_list = Arc::new(RangeBuilderList::new(
411 stream_ctx.input.num_memtables(),
412 counts,
413 ));
414 for part_range in part_ranges {
416 let mut metrics = ScannerMetrics::default();
417 let mut fetch_start = Instant::now();
418
419 let stream = Self::scan_flat_partition_range(
420 stream_ctx.clone(),
421 part_range.identifier,
422 part_metrics.clone(),
423 range_builder_list.clone(),
424 );
425 for await record_batch in stream {
426 let record_batch = record_batch?;
427 metrics.scan_cost += fetch_start.elapsed();
428 metrics.num_batches += 1;
429 metrics.num_rows += record_batch.num_rows();
430
431 debug_assert!(record_batch.num_rows() > 0);
432 if record_batch.num_rows() == 0 {
433 continue;
434 }
435
436 let yield_start = Instant::now();
437 yield ScanBatch::RecordBatch(record_batch);
438 metrics.yield_cost += yield_start.elapsed();
439
440 fetch_start = Instant::now();
441 }
442
443 metrics.scan_cost += fetch_start.elapsed();
444 part_metrics.merge_metrics(&metrics);
445 }
446
447 part_metrics.on_finish();
448 };
449 Ok(Box::pin(stream))
450 }
451}
452
453impl RegionScanner for UnorderedScan {
454 fn name(&self) -> &str {
455 "UnorderedScan"
456 }
457
458 fn properties(&self) -> &ScannerProperties {
459 &self.properties
460 }
461
462 fn schema(&self) -> SchemaRef {
463 self.stream_ctx.input.mapper.output_schema()
464 }
465
466 fn metadata(&self) -> RegionMetadataRef {
467 self.stream_ctx.input.mapper.metadata().clone()
468 }
469
470 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
471 self.properties.prepare(request);
472 Ok(())
474 }
475
476 fn scan_partition(
477 &self,
478 ctx: &QueryScanContext,
479 metrics_set: &ExecutionPlanMetricsSet,
480 partition: usize,
481 ) -> Result<SendableRecordBatchStream, BoxedError> {
482 self.scan_partition_impl(ctx, metrics_set, partition)
483 .map_err(BoxedError::new)
484 }
485
486 fn has_predicate_without_region(&self) -> bool {
488 let predicate = self
489 .stream_ctx
490 .input
491 .predicate_group()
492 .predicate_without_region();
493
494 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
495 }
496
497 fn set_logical_region(&mut self, logical_region: bool) {
498 self.properties.set_logical_region(logical_region);
499 }
500}
501
502impl DisplayAs for UnorderedScan {
503 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
504 write!(
505 f,
506 "UnorderedScan: region={}, ",
507 self.stream_ctx.input.mapper.metadata().region_id
508 )?;
509 match t {
510 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
511 self.stream_ctx.format_for_explain(false, f)
512 }
513 DisplayFormatType::Verbose => {
514 self.stream_ctx.format_for_explain(true, f)?;
515 self.metrics_list.format_verbose_metrics(f)
516 }
517 }
518 }
519}
520
521impl fmt::Debug for UnorderedScan {
522 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
523 f.debug_struct("UnorderedScan")
524 .field("num_ranges", &self.stream_ctx.ranges.len())
525 .finish()
526 }
527}
528
529#[cfg(test)]
530impl UnorderedScan {
531 pub(crate) fn input(&self) -> &ScanInput {
533 &self.stream_ctx.input
534 }
535}