mito2/read/
unordered_scan.rs1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::{Stream, StreamExt};
29use snafu::ResultExt;
30use store_api::metadata::RegionMetadataRef;
31use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
32
33use crate::error::{PartitionOutOfRangeSnafu, Result};
34use crate::read::range::RangeBuilderList;
35use crate::read::scan_region::{ScanInput, StreamContext};
36use crate::read::scan_util::{
37 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
38};
39use crate::read::{Batch, ScannerMetrics};
40
41pub struct UnorderedScan {
45 properties: ScannerProperties,
47 stream_ctx: Arc<StreamContext>,
49 metrics_list: PartitionMetricsList,
51}
52
53impl UnorderedScan {
54 pub(crate) fn new(input: ScanInput) -> Self {
56 let mut properties = ScannerProperties::default()
57 .with_append_mode(input.append_mode)
58 .with_total_rows(input.total_rows());
59 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
60 properties.partitions = vec![stream_ctx.partition_ranges()];
61
62 Self {
63 properties,
64 stream_ctx,
65 metrics_list: PartitionMetricsList::default(),
66 }
67 }
68
69 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
71 let metrics_set = ExecutionPlanMetricsSet::new();
72 let part_num = self.properties.num_partitions();
73 let streams = (0..part_num)
74 .map(|i| self.scan_partition(&metrics_set, i))
75 .collect::<Result<Vec<_>, BoxedError>>()?;
76 let stream = stream! {
77 for mut stream in streams {
78 while let Some(rb) = stream.next().await {
79 yield rb;
80 }
81 }
82 };
83 let stream = Box::pin(RecordBatchStreamWrapper::new(
84 self.schema(),
85 Box::pin(stream),
86 ));
87 Ok(stream)
88 }
89
90 fn scan_partition_range(
92 stream_ctx: Arc<StreamContext>,
93 part_range_id: usize,
94 part_metrics: PartitionMetrics,
95 range_builder_list: Arc<RangeBuilderList>,
96 ) -> impl Stream<Item = Result<Batch>> {
97 stream! {
98 let range_meta = &stream_ctx.ranges[part_range_id];
100 for index in &range_meta.row_group_indices {
101 if stream_ctx.is_mem_range_index(*index) {
102 let stream = scan_mem_ranges(
103 stream_ctx.clone(),
104 part_metrics.clone(),
105 *index,
106 range_meta.time_range,
107 );
108 for await batch in stream {
109 yield batch;
110 }
111 } else {
112 let stream = scan_file_ranges(
113 stream_ctx.clone(),
114 part_metrics.clone(),
115 *index,
116 "unordered_scan_files",
117 range_builder_list.clone(),
118 );
119 for await batch in stream {
120 yield batch;
121 }
122 }
123 }
124 }
125 }
126
127 fn scan_partition_impl(
128 &self,
129 metrics_set: &ExecutionPlanMetricsSet,
130 partition: usize,
131 ) -> Result<SendableRecordBatchStream, BoxedError> {
132 if partition >= self.properties.partitions.len() {
133 return Err(BoxedError::new(
134 PartitionOutOfRangeSnafu {
135 given: partition,
136 all: self.properties.partitions.len(),
137 }
138 .build(),
139 ));
140 }
141
142 let part_metrics = PartitionMetrics::new(
143 self.stream_ctx.input.mapper.metadata().region_id,
144 partition,
145 "UnorderedScan",
146 self.stream_ctx.query_start,
147 metrics_set,
148 );
149 self.metrics_list.set(partition, part_metrics.clone());
150 let stream_ctx = self.stream_ctx.clone();
151 let part_ranges = self.properties.partitions[partition].clone();
152 let distinguish_range = self.properties.distinguish_partition_range;
153
154 let stream = try_stream! {
155 part_metrics.on_first_poll();
156
157 let cache = &stream_ctx.input.cache_strategy;
158 let range_builder_list = Arc::new(RangeBuilderList::new(
159 stream_ctx.input.num_memtables(),
160 stream_ctx.input.num_files(),
161 ));
162 for part_range in part_ranges {
164 let mut metrics = ScannerMetrics::default();
165 let mut fetch_start = Instant::now();
166 #[cfg(debug_assertions)]
167 let mut checker = crate::read::BatchChecker::default()
168 .with_start(Some(part_range.start))
169 .with_end(Some(part_range.end));
170
171 let stream = Self::scan_partition_range(
172 stream_ctx.clone(),
173 part_range.identifier,
174 part_metrics.clone(),
175 range_builder_list.clone(),
176 );
177 for await batch in stream {
178 let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
179 metrics.scan_cost += fetch_start.elapsed();
180 metrics.num_batches += 1;
181 metrics.num_rows += batch.num_rows();
182
183 debug_assert!(!batch.is_empty());
184 if batch.is_empty() {
185 continue;
186 }
187
188 #[cfg(debug_assertions)]
189 checker.ensure_part_range_batch(
190 "UnorderedScan",
191 stream_ctx.input.mapper.metadata().region_id,
192 partition,
193 part_range,
194 &batch,
195 );
196
197 let convert_start = Instant::now();
198 let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
199 metrics.convert_cost += convert_start.elapsed();
200 let yield_start = Instant::now();
201 yield record_batch;
202 metrics.yield_cost += yield_start.elapsed();
203
204 fetch_start = Instant::now();
205 }
206
207 if distinguish_range {
210 let yield_start = Instant::now();
211 yield stream_ctx.input.mapper.empty_record_batch();
212 metrics.yield_cost += yield_start.elapsed();
213 }
214
215 metrics.scan_cost += fetch_start.elapsed();
216 part_metrics.merge_metrics(&metrics);
217 }
218
219 part_metrics.on_finish();
220 };
221 let stream = Box::pin(RecordBatchStreamWrapper::new(
222 self.stream_ctx.input.mapper.output_schema(),
223 Box::pin(stream),
224 ));
225
226 Ok(stream)
227 }
228}
229
230impl RegionScanner for UnorderedScan {
231 fn properties(&self) -> &ScannerProperties {
232 &self.properties
233 }
234
235 fn schema(&self) -> SchemaRef {
236 self.stream_ctx.input.mapper.output_schema()
237 }
238
239 fn metadata(&self) -> RegionMetadataRef {
240 self.stream_ctx.input.mapper.metadata().clone()
241 }
242
243 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
244 self.properties.prepare(request);
245 Ok(())
246 }
247
248 fn scan_partition(
249 &self,
250 metrics_set: &ExecutionPlanMetricsSet,
251 partition: usize,
252 ) -> Result<SendableRecordBatchStream, BoxedError> {
253 self.scan_partition_impl(metrics_set, partition)
254 }
255
256 fn has_predicate(&self) -> bool {
257 let predicate = self.stream_ctx.input.predicate();
258 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
259 }
260
261 fn set_logical_region(&mut self, logical_region: bool) {
262 self.properties.set_logical_region(logical_region);
263 }
264}
265
266impl DisplayAs for UnorderedScan {
267 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
268 write!(
269 f,
270 "UnorderedScan: region={}, ",
271 self.stream_ctx.input.mapper.metadata().region_id
272 )?;
273 match t {
274 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
275 DisplayFormatType::Verbose => {
276 self.stream_ctx.format_for_explain(true, f)?;
277 self.metrics_list.format_verbose_metrics(f)
278 }
279 }
280 }
281}
282
283impl fmt::Debug for UnorderedScan {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 f.debug_struct("UnorderedScan")
286 .field("num_ranges", &self.stream_ctx.ranges.len())
287 .finish()
288 }
289}
290
291#[cfg(test)]
292impl UnorderedScan {
293 pub(crate) fn input(&self) -> &ScanInput {
295 &self.stream_ctx.input
296 }
297}