1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::{tracing, warn};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40 PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges,
41};
42use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
43use crate::read::{ScannerMetrics, scan_util};
44
45pub struct UnorderedScan {
49 properties: ScannerProperties,
51 stream_ctx: Arc<StreamContext>,
53 pruner: Arc<Pruner>,
55 metrics_list: PartitionMetricsList,
57}
58
59impl UnorderedScan {
60 pub(crate) fn new(input: ScanInput) -> Self {
62 let mut properties = ScannerProperties::default()
63 .with_append_mode(input.append_mode)
64 .with_total_rows(input.total_rows());
65 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
66 properties.partitions = vec![stream_ctx.partition_ranges()];
67
68 let num_workers = common_stat::get_total_cpu_cores().max(1);
70 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
71
72 Self {
73 properties,
74 stream_ctx,
75 pruner,
76 metrics_list: PartitionMetricsList::default(),
77 }
78 }
79
80 #[tracing::instrument(
82 skip_all,
83 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
84 )]
85 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
86 let metrics_set = ExecutionPlanMetricsSet::new();
87 let part_num = self.properties.num_partitions();
88 let streams = (0..part_num)
89 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
90 .collect::<Result<Vec<_>, BoxedError>>()?;
91 let stream = stream! {
92 for mut stream in streams {
93 while let Some(rb) = stream.next().await {
94 yield rb;
95 }
96 }
97 };
98 let stream = Box::pin(RecordBatchStreamWrapper::new(
99 self.schema(),
100 Box::pin(stream),
101 ));
102 Ok(stream)
103 }
104
105 #[tracing::instrument(
107 skip_all,
108 fields(
109 region_id = %stream_ctx.input.region_metadata().region_id,
110 part_range_id = part_range_id
111 )
112 )]
113 fn scan_flat_partition_range(
114 stream_ctx: Arc<StreamContext>,
115 part_range_id: usize,
116 part_metrics: PartitionMetrics,
117 partition_pruner: Arc<PartitionPruner>,
118 ) -> impl Stream<Item = Result<RecordBatch>> {
119 try_stream! {
120 let range_meta = &stream_ctx.ranges[part_range_id];
122 for index in &range_meta.row_group_indices {
123 if stream_ctx.is_mem_range_index(*index) {
124 let stream = scan_flat_mem_ranges(
125 stream_ctx.clone(),
126 part_metrics.clone(),
127 *index,
128 range_meta.time_range,
129 );
130 for await record_batch in stream {
131 yield record_batch?;
132 }
133 } else if stream_ctx.is_file_range_index(*index) {
134 let stream = scan_flat_file_ranges(
135 stream_ctx.clone(),
136 part_metrics.clone(),
137 *index,
138 "unordered_scan_files",
139 partition_pruner.clone(),
140 ).await?;
141 for await record_batch in stream {
142 yield record_batch?;
143 }
144 } else {
145 let stream = scan_util::maybe_scan_flat_other_ranges(
146 &stream_ctx,
147 *index,
148 &part_metrics,
149 ).await?;
150 for await record_batch in stream {
151 yield record_batch?;
152 }
153 }
154 }
155 }
156 }
157
158 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
160 let metrics_set = ExecutionPlanMetricsSet::new();
161
162 let streams = (0..self.properties.partitions.len())
163 .map(|partition| {
164 let metrics = self.partition_metrics(false, partition, &metrics_set);
165 self.scan_flat_batch_in_partition(partition, metrics)
166 })
167 .collect::<Result<Vec<_>>>()?;
168
169 Ok(Box::pin(futures::stream::iter(streams).flatten()))
170 }
171
172 fn partition_metrics(
173 &self,
174 explain_verbose: bool,
175 partition: usize,
176 metrics_set: &ExecutionPlanMetricsSet,
177 ) -> PartitionMetrics {
178 let part_metrics = PartitionMetrics::new(
179 self.stream_ctx.input.mapper.metadata().region_id,
180 partition,
181 "UnorderedScan",
182 self.stream_ctx.query_start,
183 explain_verbose,
184 metrics_set,
185 );
186 self.metrics_list.set(partition, part_metrics.clone());
187 part_metrics
188 }
189
190 #[tracing::instrument(
191 skip_all,
192 fields(
193 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
194 partition = partition
195 )
196 )]
197 fn scan_partition_impl(
198 &self,
199 ctx: &QueryScanContext,
200 metrics_set: &ExecutionPlanMetricsSet,
201 partition: usize,
202 ) -> Result<SendableRecordBatchStream> {
203 if ctx.explain_verbose {
204 common_telemetry::info!(
205 "UnorderedScan partition {}, region_id: {}",
206 partition,
207 self.stream_ctx.input.region_metadata().region_id
208 );
209 }
210
211 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
212 let input = &self.stream_ctx.input;
213
214 let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
215
216 let record_batch_stream = ConvertBatchStream::new(
217 batch_stream,
218 input.mapper.clone(),
219 input.cache_strategy.clone(),
220 metrics,
221 );
222
223 Ok(Box::pin(RecordBatchStreamWrapper::new(
224 input.mapper.output_schema(),
225 Box::pin(record_batch_stream),
226 )))
227 }
228
229 #[tracing::instrument(
230 skip_all,
231 fields(
232 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
233 partition = partition
234 )
235 )]
236 fn scan_flat_batch_in_partition(
237 &self,
238 partition: usize,
239 part_metrics: PartitionMetrics,
240 ) -> Result<ScanBatchStream> {
241 ensure!(
242 partition < self.properties.partitions.len(),
243 PartitionOutOfRangeSnafu {
244 given: partition,
245 all: self.properties.partitions.len(),
246 }
247 );
248
249 let stream_ctx = self.stream_ctx.clone();
250 let part_ranges = self.properties.partitions[partition].clone();
251 let pruner = self.pruner.clone();
252 pruner.add_partition_ranges(&part_ranges);
257 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
258
259 let stream = try_stream! {
260 part_metrics.on_first_poll();
261
262 for part_range in part_ranges {
264 let mut metrics = ScannerMetrics::default();
265 let mut fetch_start = Instant::now();
266
267 let stream = Self::scan_flat_partition_range(
268 stream_ctx.clone(),
269 part_range.identifier,
270 part_metrics.clone(),
271 partition_pruner.clone(),
272 );
273 for await record_batch in stream {
274 let record_batch = record_batch?;
275 metrics.scan_cost += fetch_start.elapsed();
276 metrics.num_batches += 1;
277 metrics.num_rows += record_batch.num_rows();
278
279 debug_assert!(record_batch.num_rows() > 0);
280 if record_batch.num_rows() == 0 {
281 continue;
282 }
283
284 let yield_start = Instant::now();
285 yield ScanBatch::RecordBatch(record_batch);
286 metrics.yield_cost += yield_start.elapsed();
287
288 fetch_start = Instant::now();
289 }
290
291 metrics.scan_cost += fetch_start.elapsed();
292 part_metrics.merge_metrics(&metrics);
293 }
294
295 part_metrics.on_finish();
296 };
297 Ok(Box::pin(stream))
298 }
299}
300
301impl RegionScanner for UnorderedScan {
302 fn name(&self) -> &str {
303 "UnorderedScan"
304 }
305
306 fn properties(&self) -> &ScannerProperties {
307 &self.properties
308 }
309
310 fn schema(&self) -> SchemaRef {
311 self.stream_ctx.input.mapper.output_schema()
312 }
313
314 fn metadata(&self) -> RegionMetadataRef {
315 self.stream_ctx.input.mapper.metadata().clone()
316 }
317
318 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
319 self.properties.prepare(request);
320
321 Ok(())
322 }
323
324 fn scan_partition(
325 &self,
326 ctx: &QueryScanContext,
327 metrics_set: &ExecutionPlanMetricsSet,
328 partition: usize,
329 ) -> Result<SendableRecordBatchStream, BoxedError> {
330 self.scan_partition_impl(ctx, metrics_set, partition)
331 .map_err(BoxedError::new)
332 }
333
334 fn has_predicate_without_region(&self) -> bool {
336 let predicate = self
337 .stream_ctx
338 .input
339 .predicate_group()
340 .predicate_without_region();
341 predicate.is_some()
342 }
343
344 fn add_dyn_filter_to_predicate(
345 &mut self,
346 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
347 ) -> Vec<bool> {
348 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
349 }
350
351 fn set_logical_region(&mut self, logical_region: bool) {
352 self.properties.set_logical_region(logical_region);
353 }
354}
355
356impl DisplayAs for UnorderedScan {
357 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
358 write!(
359 f,
360 "UnorderedScan: region={}, ",
361 self.stream_ctx.input.mapper.metadata().region_id
362 )?;
363 match t {
364 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
365 self.stream_ctx.format_for_explain(false, f)
366 }
367 DisplayFormatType::Verbose => {
368 self.stream_ctx.format_for_explain(true, f)?;
369 self.metrics_list.format_verbose_metrics(f)
370 }
371 }
372 }
373}
374
375impl fmt::Debug for UnorderedScan {
376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 f.debug_struct("UnorderedScan")
378 .field("num_ranges", &self.stream_ctx.ranges.len())
379 .finish()
380 }
381}
382
383#[cfg(test)]
384impl UnorderedScan {
385 pub(crate) fn input(&self) -> &ScanInput {
387 &self.stream_ctx.input
388 }
389}