1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::{tracing, warn};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40 PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges,
41};
42use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
43use crate::read::{ScannerMetrics, scan_util};
44
45pub struct UnorderedScan {
49 properties: ScannerProperties,
51 stream_ctx: Arc<StreamContext>,
53 pruner: Arc<Pruner>,
55 metrics_list: PartitionMetricsList,
57}
58
59impl UnorderedScan {
60 pub(crate) fn new(input: ScanInput) -> Self {
62 let mut properties = ScannerProperties::default()
63 .with_append_mode(input.append_mode)
64 .with_total_rows(input.total_rows());
65 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
66 properties.partitions = vec![stream_ctx.partition_ranges()];
67
68 let num_workers = common_stat::get_total_cpu_cores().max(1);
70 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
71
72 Self {
73 properties,
74 stream_ctx,
75 pruner,
76 metrics_list: PartitionMetricsList::default(),
77 }
78 }
79
80 #[tracing::instrument(
82 skip_all,
83 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
84 )]
85 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
86 let metrics_set = ExecutionPlanMetricsSet::new();
87 let part_num = self.properties.num_partitions();
88 let streams = (0..part_num)
89 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
90 .collect::<Result<Vec<_>, BoxedError>>()?;
91 let stream = stream! {
92 for mut stream in streams {
93 while let Some(rb) = stream.next().await {
94 yield rb;
95 }
96 }
97 };
98 let stream = Box::pin(RecordBatchStreamWrapper::new(
99 self.schema(),
100 Box::pin(stream),
101 ));
102 Ok(stream)
103 }
104
105 #[tracing::instrument(
107 skip_all,
108 fields(
109 region_id = %stream_ctx.input.region_metadata().region_id,
110 part_range_id = part_range_id
111 )
112 )]
113 fn scan_flat_partition_range(
114 stream_ctx: Arc<StreamContext>,
115 part_range_id: usize,
116 part_metrics: PartitionMetrics,
117 partition_pruner: Arc<PartitionPruner>,
118 ) -> impl Stream<Item = Result<RecordBatch>> {
119 try_stream! {
120 let range_meta = &stream_ctx.ranges[part_range_id];
122 let part_range = range_meta.new_partition_range(part_range_id);
123 let pre_filter_mode = stream_ctx.range_pre_filter_mode(&part_range);
124 for index in &range_meta.row_group_indices {
125 if stream_ctx.is_mem_range_index(*index) {
126 let stream = scan_flat_mem_ranges(
127 stream_ctx.clone(),
128 part_metrics.clone(),
129 *index,
130 range_meta.time_range,
131 );
132 for await record_batch in stream {
133 yield record_batch?;
134 }
135 } else if stream_ctx.is_file_range_index(*index) {
136 let stream = scan_flat_file_ranges(
137 stream_ctx.clone(),
138 part_metrics.clone(),
139 *index,
140 "unordered_scan_files",
141 partition_pruner.clone(),
142 ).await?;
143 for await record_batch in stream {
144 yield record_batch?;
145 }
146 } else {
147 let stream = scan_util::maybe_scan_flat_other_ranges(
148 &stream_ctx,
149 *index,
150 &part_metrics,
151 pre_filter_mode,
152 ).await?;
153 for await record_batch in stream {
154 yield record_batch?;
155 }
156 }
157 }
158 }
159 }
160
161 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
163 let metrics_set = ExecutionPlanMetricsSet::new();
164
165 let streams = (0..self.properties.partitions.len())
166 .map(|partition| {
167 let metrics = self.partition_metrics(false, partition, &metrics_set);
168 self.scan_flat_batch_in_partition(partition, metrics)
169 })
170 .collect::<Result<Vec<_>>>()?;
171
172 Ok(Box::pin(futures::stream::iter(streams).flatten()))
173 }
174
175 fn partition_metrics(
176 &self,
177 explain_verbose: bool,
178 partition: usize,
179 metrics_set: &ExecutionPlanMetricsSet,
180 ) -> PartitionMetrics {
181 let part_metrics = PartitionMetrics::new(
182 self.stream_ctx.input.mapper.metadata().region_id,
183 partition,
184 "UnorderedScan",
185 self.stream_ctx.query_start,
186 explain_verbose,
187 metrics_set,
188 );
189 self.metrics_list.set(partition, part_metrics.clone());
190 part_metrics
191 }
192
193 #[tracing::instrument(
194 skip_all,
195 fields(
196 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
197 partition = partition
198 )
199 )]
200 fn scan_partition_impl(
201 &self,
202 ctx: &QueryScanContext,
203 metrics_set: &ExecutionPlanMetricsSet,
204 partition: usize,
205 ) -> Result<SendableRecordBatchStream> {
206 if ctx.explain_verbose {
207 common_telemetry::info!(
208 "UnorderedScan partition {}, region_id: {}",
209 partition,
210 self.stream_ctx.input.region_metadata().region_id
211 );
212 }
213
214 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
215 let input = &self.stream_ctx.input;
216
217 let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
218
219 let record_batch_stream = ConvertBatchStream::new(
220 batch_stream,
221 input.mapper.clone(),
222 input.cache_strategy.clone(),
223 metrics,
224 );
225
226 Ok(Box::pin(RecordBatchStreamWrapper::new(
227 input.mapper.output_schema(),
228 Box::pin(record_batch_stream),
229 )))
230 }
231
232 #[tracing::instrument(
233 skip_all,
234 fields(
235 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
236 partition = partition
237 )
238 )]
239 fn scan_flat_batch_in_partition(
240 &self,
241 partition: usize,
242 part_metrics: PartitionMetrics,
243 ) -> Result<ScanBatchStream> {
244 ensure!(
245 partition < self.properties.partitions.len(),
246 PartitionOutOfRangeSnafu {
247 given: partition,
248 all: self.properties.partitions.len(),
249 }
250 );
251
252 let stream_ctx = self.stream_ctx.clone();
253 let part_ranges = self.properties.partitions[partition].clone();
254 let pruner = self.pruner.clone();
255 pruner.add_partition_ranges(&part_ranges);
260 let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
261
262 let stream = try_stream! {
263 part_metrics.on_first_poll();
264
265 for part_range in part_ranges {
267 let mut metrics = ScannerMetrics::default();
268 let mut fetch_start = Instant::now();
269
270 let stream = Self::scan_flat_partition_range(
271 stream_ctx.clone(),
272 part_range.identifier,
273 part_metrics.clone(),
274 partition_pruner.clone(),
275 );
276 for await record_batch in stream {
277 let record_batch = record_batch?;
278 metrics.scan_cost += fetch_start.elapsed();
279 metrics.num_batches += 1;
280 metrics.num_rows += record_batch.num_rows();
281
282 debug_assert!(record_batch.num_rows() > 0);
283 if record_batch.num_rows() == 0 {
284 continue;
285 }
286
287 let yield_start = Instant::now();
288 yield ScanBatch::RecordBatch(record_batch);
289 metrics.yield_cost += yield_start.elapsed();
290
291 fetch_start = Instant::now();
292 }
293
294 metrics.scan_cost += fetch_start.elapsed();
295 part_metrics.merge_metrics(&metrics);
296 }
297
298 part_metrics.on_finish();
299 };
300 Ok(Box::pin(stream))
301 }
302}
303
304impl RegionScanner for UnorderedScan {
305 fn name(&self) -> &str {
306 "UnorderedScan"
307 }
308
309 fn properties(&self) -> &ScannerProperties {
310 &self.properties
311 }
312
313 fn schema(&self) -> SchemaRef {
314 self.stream_ctx.input.mapper.output_schema()
315 }
316
317 fn metadata(&self) -> RegionMetadataRef {
318 self.stream_ctx.input.mapper.metadata().clone()
319 }
320
321 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
322 self.properties.prepare(request);
323
324 Ok(())
325 }
326
327 fn scan_partition(
328 &self,
329 ctx: &QueryScanContext,
330 metrics_set: &ExecutionPlanMetricsSet,
331 partition: usize,
332 ) -> Result<SendableRecordBatchStream, BoxedError> {
333 self.scan_partition_impl(ctx, metrics_set, partition)
334 .map_err(BoxedError::new)
335 }
336
337 fn has_predicate_without_region(&self) -> bool {
339 let predicate = self
340 .stream_ctx
341 .input
342 .predicate_group()
343 .predicate_without_region();
344 predicate.is_some()
345 }
346
347 fn add_dyn_filter_to_predicate(
348 &mut self,
349 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
350 ) -> Vec<bool> {
351 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
352 }
353
354 fn set_logical_region(&mut self, logical_region: bool) {
355 self.properties.set_logical_region(logical_region);
356 }
357
358 fn snapshot_sequence(&self) -> Option<u64> {
359 self.stream_ctx.input.snapshot_sequence
360 }
361}
362
363impl DisplayAs for UnorderedScan {
364 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365 write!(
366 f,
367 "UnorderedScan: region={}, ",
368 self.stream_ctx.input.mapper.metadata().region_id
369 )?;
370 match t {
371 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
372 self.stream_ctx.format_for_explain(false, f)
373 }
374 DisplayFormatType::Verbose => {
375 self.stream_ctx.format_for_explain(true, f)?;
376 self.metrics_list.format_verbose_metrics(f)
377 }
378 }
379 }
380}
381
382impl fmt::Debug for UnorderedScan {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 f.debug_struct("UnorderedScan")
385 .field("num_ranges", &self.stream_ctx.ranges.len())
386 .finish()
387 }
388}
389
390#[cfg(test)]
391impl UnorderedScan {
392 pub(crate) fn input(&self) -> &ScanInput {
394 &self.stream_ctx.input
395 }
396}