1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
26use datatypes::schema::SchemaRef;
27use futures::{Stream, StreamExt};
28use snafu::ensure;
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_engine::{
31 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
32};
33
34use crate::error::{PartitionOutOfRangeSnafu, Result};
35use crate::read::range::RangeBuilderList;
36use crate::read::scan_region::{ScanInput, StreamContext};
37use crate::read::scan_util::{
38 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
39};
40use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
41use crate::read::{scan_util, Batch, ScannerMetrics};
42
43pub struct UnorderedScan {
47 properties: ScannerProperties,
49 stream_ctx: Arc<StreamContext>,
51 metrics_list: PartitionMetricsList,
53}
54
55impl UnorderedScan {
56 pub(crate) fn new(input: ScanInput) -> Self {
58 let mut properties = ScannerProperties::default()
59 .with_append_mode(input.append_mode)
60 .with_total_rows(input.total_rows());
61 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
62 properties.partitions = vec![stream_ctx.partition_ranges()];
63
64 Self {
65 properties,
66 stream_ctx,
67 metrics_list: PartitionMetricsList::default(),
68 }
69 }
70
71 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
73 let metrics_set = ExecutionPlanMetricsSet::new();
74 let part_num = self.properties.num_partitions();
75 let streams = (0..part_num)
76 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
77 .collect::<Result<Vec<_>, BoxedError>>()?;
78 let stream = stream! {
79 for mut stream in streams {
80 while let Some(rb) = stream.next().await {
81 yield rb;
82 }
83 }
84 };
85 let stream = Box::pin(RecordBatchStreamWrapper::new(
86 self.schema(),
87 Box::pin(stream),
88 ));
89 Ok(stream)
90 }
91
92 fn scan_partition_range(
94 stream_ctx: Arc<StreamContext>,
95 part_range_id: usize,
96 part_metrics: PartitionMetrics,
97 range_builder_list: Arc<RangeBuilderList>,
98 ) -> impl Stream<Item = Result<Batch>> {
99 try_stream! {
100 let range_meta = &stream_ctx.ranges[part_range_id];
102 for index in &range_meta.row_group_indices {
103 if stream_ctx.is_mem_range_index(*index) {
104 let stream = scan_mem_ranges(
105 stream_ctx.clone(),
106 part_metrics.clone(),
107 *index,
108 range_meta.time_range,
109 );
110 for await batch in stream {
111 yield batch?;
112 }
113 } else if stream_ctx.is_file_range_index(*index) {
114 let stream = scan_file_ranges(
115 stream_ctx.clone(),
116 part_metrics.clone(),
117 *index,
118 "unordered_scan_files",
119 range_builder_list.clone(),
120 ).await?;
121 for await batch in stream {
122 yield batch?;
123 }
124 } else {
125 let stream = scan_util::maybe_scan_other_ranges(
126 &stream_ctx,
127 *index,
128 &part_metrics,
129 ).await?;
130 for await batch in stream {
131 yield batch?;
132 }
133 }
134 }
135 }
136 }
137
138 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
140 let metrics_set = ExecutionPlanMetricsSet::new();
141
142 let streams = (0..self.properties.partitions.len())
143 .map(|partition| {
144 let metrics = self.partition_metrics(false, partition, &metrics_set);
145 self.scan_batch_in_partition(partition, metrics)
146 })
147 .collect::<Result<Vec<_>>>()?;
148
149 Ok(Box::pin(futures::stream::iter(streams).flatten()))
150 }
151
152 fn partition_metrics(
153 &self,
154 explain_verbose: bool,
155 partition: usize,
156 metrics_set: &ExecutionPlanMetricsSet,
157 ) -> PartitionMetrics {
158 let part_metrics = PartitionMetrics::new(
159 self.stream_ctx.input.mapper.metadata().region_id,
160 partition,
161 "UnorderedScan",
162 self.stream_ctx.query_start,
163 explain_verbose,
164 metrics_set,
165 );
166 self.metrics_list.set(partition, part_metrics.clone());
167 part_metrics
168 }
169
170 fn scan_partition_impl(
171 &self,
172 ctx: &QueryScanContext,
173 metrics_set: &ExecutionPlanMetricsSet,
174 partition: usize,
175 ) -> Result<SendableRecordBatchStream> {
176 if ctx.explain_verbose {
177 common_telemetry::info!(
178 "UnorderedScan partition {}, region_id: {}",
179 partition,
180 self.stream_ctx.input.region_metadata().region_id
181 );
182 }
183
184 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
185
186 let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
187
188 let input = &self.stream_ctx.input;
189 let record_batch_stream = ConvertBatchStream::new(
190 batch_stream,
191 input.mapper.clone(),
192 input.cache_strategy.clone(),
193 metrics,
194 );
195
196 Ok(Box::pin(RecordBatchStreamWrapper::new(
197 input.mapper.output_schema(),
198 Box::pin(record_batch_stream),
199 )))
200 }
201
202 fn scan_batch_in_partition(
203 &self,
204 partition: usize,
205 part_metrics: PartitionMetrics,
206 ) -> Result<ScanBatchStream> {
207 ensure!(
208 partition < self.properties.partitions.len(),
209 PartitionOutOfRangeSnafu {
210 given: partition,
211 all: self.properties.partitions.len(),
212 }
213 );
214
215 let stream_ctx = self.stream_ctx.clone();
216 let part_ranges = self.properties.partitions[partition].clone();
217 let distinguish_range = self.properties.distinguish_partition_range;
218
219 let stream = try_stream! {
220 part_metrics.on_first_poll();
221
222 let range_builder_list = Arc::new(RangeBuilderList::new(
223 stream_ctx.input.num_memtables(),
224 stream_ctx.input.num_files(),
225 ));
226 for part_range in part_ranges {
228 let mut metrics = ScannerMetrics::default();
229 let mut fetch_start = Instant::now();
230 #[cfg(debug_assertions)]
231 let mut checker = crate::read::BatchChecker::default()
232 .with_start(Some(part_range.start))
233 .with_end(Some(part_range.end));
234
235 let stream = Self::scan_partition_range(
236 stream_ctx.clone(),
237 part_range.identifier,
238 part_metrics.clone(),
239 range_builder_list.clone(),
240 );
241 for await batch in stream {
242 let batch = batch?;
243 metrics.scan_cost += fetch_start.elapsed();
244 metrics.num_batches += 1;
245 metrics.num_rows += batch.num_rows();
246
247 debug_assert!(!batch.is_empty());
248 if batch.is_empty() {
249 continue;
250 }
251
252 #[cfg(debug_assertions)]
253 checker.ensure_part_range_batch(
254 "UnorderedScan",
255 stream_ctx.input.mapper.metadata().region_id,
256 partition,
257 part_range,
258 &batch,
259 );
260
261 let yield_start = Instant::now();
262 yield ScanBatch::Normal(batch);
263 metrics.yield_cost += yield_start.elapsed();
264
265 fetch_start = Instant::now();
266 }
267
268 if distinguish_range {
271 let yield_start = Instant::now();
272 yield ScanBatch::Normal(Batch::empty());
273 metrics.yield_cost += yield_start.elapsed();
274 }
275
276 metrics.scan_cost += fetch_start.elapsed();
277 part_metrics.merge_metrics(&metrics);
278 }
279
280 part_metrics.on_finish();
281 };
282 Ok(Box::pin(stream))
283 }
284}
285
286impl RegionScanner for UnorderedScan {
287 fn properties(&self) -> &ScannerProperties {
288 &self.properties
289 }
290
291 fn schema(&self) -> SchemaRef {
292 self.stream_ctx.input.mapper.output_schema()
293 }
294
295 fn metadata(&self) -> RegionMetadataRef {
296 self.stream_ctx.input.mapper.metadata().clone()
297 }
298
299 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
300 self.properties.prepare(request);
301 Ok(())
302 }
303
304 fn scan_partition(
305 &self,
306 ctx: &QueryScanContext,
307 metrics_set: &ExecutionPlanMetricsSet,
308 partition: usize,
309 ) -> Result<SendableRecordBatchStream, BoxedError> {
310 self.scan_partition_impl(ctx, metrics_set, partition)
311 .map_err(BoxedError::new)
312 }
313
314 fn has_predicate(&self) -> bool {
315 let predicate = self.stream_ctx.input.predicate();
316 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
317 }
318
319 fn set_logical_region(&mut self, logical_region: bool) {
320 self.properties.set_logical_region(logical_region);
321 }
322}
323
324impl DisplayAs for UnorderedScan {
325 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
326 write!(
327 f,
328 "UnorderedScan: region={}, ",
329 self.stream_ctx.input.mapper.metadata().region_id
330 )?;
331 match t {
332 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
333 DisplayFormatType::Verbose => {
334 self.stream_ctx.format_for_explain(true, f)?;
335 self.metrics_list.format_verbose_metrics(f)
336 }
337 }
338 }
339}
340
341impl fmt::Debug for UnorderedScan {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 f.debug_struct("UnorderedScan")
344 .field("num_ranges", &self.stream_ctx.ranges.len())
345 .finish()
346 }
347}
348
349#[cfg(test)]
350impl UnorderedScan {
351 pub(crate) fn input(&self) -> &ScanInput {
353 &self.stream_ctx.input
354 }
355}