1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::util::ChainedRecordBatchStream;
25use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::compute::concat_batches;
29use datatypes::schema::SchemaRef;
30use smallvec::{smallvec, SmallVec};
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
34use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
35use tokio::sync::mpsc::{self, Receiver, Sender};
36use tokio::sync::Semaphore;
37
38use crate::error::{
39 ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
40 ScanMultiTimesSnafu, ScanSeriesSnafu,
41};
42use crate::read::range::RangeBuilderList;
43use crate::read::scan_region::{ScanInput, StreamContext};
44use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
45use crate::read::seq_scan::{build_sources, SeqScan};
46use crate::read::{Batch, ScannerMetrics};
47
48const SEND_TIMEOUT: Duration = Duration::from_millis(10);
50
51type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
53
54pub struct SeriesScan {
60 properties: ScannerProperties,
62 stream_ctx: Arc<StreamContext>,
64 receivers: Mutex<ReceiverList>,
66 metrics_list: Arc<PartitionMetricsList>,
69}
70
71impl SeriesScan {
72 pub(crate) fn new(input: ScanInput) -> Self {
74 let mut properties = ScannerProperties::default()
75 .with_append_mode(input.append_mode)
76 .with_total_rows(input.total_rows());
77 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
78 properties.partitions = vec![stream_ctx.partition_ranges()];
79
80 Self {
81 properties,
82 stream_ctx,
83 receivers: Mutex::new(Vec::new()),
84 metrics_list: Arc::new(PartitionMetricsList::default()),
85 }
86 }
87
88 fn scan_partition_impl(
89 &self,
90 metrics_set: &ExecutionPlanMetricsSet,
91 partition: usize,
92 ) -> Result<SendableRecordBatchStream, BoxedError> {
93 if partition >= self.properties.num_partitions() {
94 return Err(BoxedError::new(
95 PartitionOutOfRangeSnafu {
96 given: partition,
97 all: self.properties.num_partitions(),
98 }
99 .build(),
100 ));
101 }
102
103 self.maybe_start_distributor(metrics_set, &self.metrics_list);
104
105 let part_metrics =
106 new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
107 let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
108 let stream_ctx = self.stream_ctx.clone();
109
110 let stream = try_stream! {
111 part_metrics.on_first_poll();
112
113 let cache = &stream_ctx.input.cache_strategy;
114 let mut df_record_batches = Vec::new();
115 let mut fetch_start = Instant::now();
116 while let Some(result) = receiver.recv().await {
117 let mut metrics = ScannerMetrics::default();
118 let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
119 metrics.scan_cost += fetch_start.elapsed();
120 fetch_start = Instant::now();
121
122 let convert_start = Instant::now();
123 df_record_batches.reserve(series.batches.len());
124 for batch in series.batches {
125 metrics.num_batches += 1;
126 metrics.num_rows += batch.num_rows();
127
128 let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
129 df_record_batches.push(record_batch.into_df_record_batch());
130 }
131
132 let output_schema = stream_ctx.input.mapper.output_schema();
133 let df_record_batch =
134 concat_batches(output_schema.arrow_schema(), &df_record_batches)
135 .context(ComputeArrowSnafu)
136 .map_err(BoxedError::new)
137 .context(ExternalSnafu)?;
138 df_record_batches.clear();
139 let record_batch =
140 RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
141 metrics.convert_cost += convert_start.elapsed();
142
143 let yield_start = Instant::now();
144 yield record_batch;
145 metrics.yield_cost += yield_start.elapsed();
146
147 part_metrics.merge_metrics(&metrics);
148 }
149 };
150
151 let stream = Box::pin(RecordBatchStreamWrapper::new(
152 self.stream_ctx.input.mapper.output_schema(),
153 Box::pin(stream),
154 ));
155
156 Ok(stream)
157 }
158
159 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
161 let mut rx_list = self.receivers.lock().unwrap();
162 rx_list[partition]
163 .take()
164 .context(ScanMultiTimesSnafu { partition })
165 }
166
167 fn maybe_start_distributor(
169 &self,
170 metrics_set: &ExecutionPlanMetricsSet,
171 metrics_list: &Arc<PartitionMetricsList>,
172 ) {
173 let mut rx_list = self.receivers.lock().unwrap();
174 if !rx_list.is_empty() {
175 return;
176 }
177
178 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
179 let mut distributor = SeriesDistributor {
180 stream_ctx: self.stream_ctx.clone(),
181 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
182 partitions: self.properties.partitions.clone(),
183 senders,
184 metrics_set: metrics_set.clone(),
185 metrics_list: metrics_list.clone(),
186 };
187 common_runtime::spawn_global(async move {
188 distributor.execute().await;
189 });
190
191 *rx_list = receivers;
192 }
193
194 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
196 let part_num = self.properties.num_partitions();
197 let metrics_set = ExecutionPlanMetricsSet::default();
198 let streams = (0..part_num)
199 .map(|i| self.scan_partition(&metrics_set, i))
200 .collect::<Result<Vec<_>, BoxedError>>()?;
201 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
202 Ok(Box::pin(chained_stream))
203 }
204}
205
206fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
207 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
208 .map(|_| {
209 let (sender, receiver) = mpsc::channel(1);
210 (Some(sender), Some(receiver))
211 })
212 .unzip();
213 (SenderList::new(senders), receivers)
214}
215
216impl RegionScanner for SeriesScan {
217 fn properties(&self) -> &ScannerProperties {
218 &self.properties
219 }
220
221 fn schema(&self) -> SchemaRef {
222 self.stream_ctx.input.mapper.output_schema()
223 }
224
225 fn metadata(&self) -> RegionMetadataRef {
226 self.stream_ctx.input.mapper.metadata().clone()
227 }
228
229 fn scan_partition(
230 &self,
231 metrics_set: &ExecutionPlanMetricsSet,
232 partition: usize,
233 ) -> Result<SendableRecordBatchStream, BoxedError> {
234 self.scan_partition_impl(metrics_set, partition)
235 }
236
237 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
238 self.properties.prepare(request);
239 Ok(())
240 }
241
242 fn has_predicate(&self) -> bool {
243 let predicate = self.stream_ctx.input.predicate();
244 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
245 }
246
247 fn set_logical_region(&mut self, logical_region: bool) {
248 self.properties.set_logical_region(logical_region);
249 }
250}
251
252impl DisplayAs for SeriesScan {
253 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
254 write!(
255 f,
256 "SeriesScan: region={}, ",
257 self.stream_ctx.input.mapper.metadata().region_id
258 )?;
259 match t {
260 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
261 DisplayFormatType::Verbose => {
262 self.stream_ctx.format_for_explain(true, f)?;
263 self.metrics_list.format_verbose_metrics(f)
264 }
265 }
266 }
267}
268
269impl fmt::Debug for SeriesScan {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 f.debug_struct("SeriesScan")
272 .field("num_ranges", &self.stream_ctx.ranges.len())
273 .finish()
274 }
275}
276
277#[cfg(test)]
278impl SeriesScan {
279 pub(crate) fn input(&self) -> &ScanInput {
281 &self.stream_ctx.input
282 }
283}
284
285struct SeriesDistributor {
287 stream_ctx: Arc<StreamContext>,
289 semaphore: Option<Arc<Semaphore>>,
291 partitions: Vec<Vec<PartitionRange>>,
293 senders: SenderList,
295 metrics_set: ExecutionPlanMetricsSet,
301 metrics_list: Arc<PartitionMetricsList>,
302}
303
304impl SeriesDistributor {
305 async fn execute(&mut self) {
307 if let Err(e) = self.scan_partitions().await {
308 self.senders.send_error(e).await;
309 }
310 }
311
312 async fn scan_partitions(&mut self) -> Result<()> {
314 let part_metrics = new_partition_metrics(
315 &self.stream_ctx,
316 &self.metrics_set,
317 self.partitions.len(),
318 &self.metrics_list,
319 );
320 part_metrics.on_first_poll();
321
322 let range_builder_list = Arc::new(RangeBuilderList::new(
323 self.stream_ctx.input.num_memtables(),
324 self.stream_ctx.input.num_files(),
325 ));
326 let mut sources = Vec::with_capacity(self.partitions.len());
328 for partition in &self.partitions {
329 sources.reserve(partition.len());
330 for part_range in partition {
331 build_sources(
332 &self.stream_ctx,
333 part_range,
334 false,
335 &part_metrics,
336 range_builder_list.clone(),
337 &mut sources,
338 );
339 }
340 }
341
342 let mut reader =
344 SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
345 .await?;
346 let mut metrics = SeriesDistributorMetrics::default();
347 let mut fetch_start = Instant::now();
348
349 let mut current_series = SeriesBatch::default();
350 while let Some(batch) = reader.next_batch().await? {
351 metrics.scan_cost += fetch_start.elapsed();
352 fetch_start = Instant::now();
353 metrics.num_batches += 1;
354 metrics.num_rows += batch.num_rows();
355
356 debug_assert!(!batch.is_empty());
357 if batch.is_empty() {
358 continue;
359 }
360
361 let Some(last_key) = current_series.current_key() else {
362 current_series.push(batch);
363 continue;
364 };
365
366 if last_key == batch.primary_key() {
367 current_series.push(batch);
368 continue;
369 }
370
371 let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
373 let yield_start = Instant::now();
374 self.senders.send_batch(to_send).await?;
375 metrics.yield_cost += yield_start.elapsed();
376 }
377
378 if !current_series.is_empty() {
379 let yield_start = Instant::now();
380 self.senders.send_batch(current_series).await?;
381 metrics.yield_cost += yield_start.elapsed();
382 }
383
384 metrics.scan_cost += fetch_start.elapsed();
385 metrics.num_series_send_timeout = self.senders.num_timeout;
386 part_metrics.set_distributor_metrics(&metrics);
387
388 part_metrics.on_finish();
389
390 Ok(())
391 }
392}
393
394#[derive(Default)]
396struct SeriesBatch {
397 batches: SmallVec<[Batch; 4]>,
398}
399
400impl SeriesBatch {
401 fn single(batch: Batch) -> Self {
403 Self {
404 batches: smallvec![batch],
405 }
406 }
407
408 fn current_key(&self) -> Option<&[u8]> {
409 self.batches.first().map(|batch| batch.primary_key())
410 }
411
412 fn push(&mut self, batch: Batch) {
413 self.batches.push(batch);
414 }
415
416 fn is_empty(&self) -> bool {
418 self.batches.is_empty()
419 }
420}
421
422struct SenderList {
424 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
425 num_nones: usize,
427 sender_idx: usize,
429 num_timeout: usize,
431}
432
433impl SenderList {
434 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
435 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
436 Self {
437 senders,
438 num_nones,
439 sender_idx: 0,
440 num_timeout: 0,
441 }
442 }
443
444 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
447 for _ in 0..self.senders.len() {
448 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
449
450 let sender_idx = self.fetch_add_sender_idx();
451 let Some(sender) = &self.senders[sender_idx] else {
452 continue;
453 };
454
455 match sender.try_send(Ok(batch)) {
456 Ok(()) => return Ok(None),
457 Err(TrySendError::Full(res)) => {
458 batch = res.unwrap();
460 }
461 Err(TrySendError::Closed(res)) => {
462 self.senders[sender_idx] = None;
463 self.num_nones += 1;
464 batch = res.unwrap();
466 }
467 }
468 }
469
470 Ok(Some(batch))
471 }
472
473 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
475 match self.try_send_batch(batch)? {
477 Some(b) => {
478 batch = b;
480 }
481 None => {
482 return Ok(());
483 }
484 }
485
486 loop {
487 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
488
489 let sender_idx = self.fetch_add_sender_idx();
490 let Some(sender) = &self.senders[sender_idx] else {
491 continue;
492 };
493 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
498 Ok(()) => break,
499 Err(SendTimeoutError::Timeout(res)) => {
500 self.num_timeout += 1;
501 batch = res.unwrap();
503 }
504 Err(SendTimeoutError::Closed(res)) => {
505 self.senders[sender_idx] = None;
506 self.num_nones += 1;
507 batch = res.unwrap();
509 }
510 }
511 }
512
513 Ok(())
514 }
515
516 async fn send_error(&self, error: Error) {
517 let error = Arc::new(error);
518 for sender in self.senders.iter().flatten() {
519 let result = Err(error.clone()).context(ScanSeriesSnafu);
520 let _ = sender.send(result).await;
521 }
522 }
523
524 fn fetch_add_sender_idx(&mut self) -> usize {
525 let sender_idx = self.sender_idx;
526 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
527 sender_idx
528 }
529}
530
531fn new_partition_metrics(
532 stream_ctx: &StreamContext,
533 metrics_set: &ExecutionPlanMetricsSet,
534 partition: usize,
535 metrics_list: &PartitionMetricsList,
536) -> PartitionMetrics {
537 let metrics = PartitionMetrics::new(
538 stream_ctx.input.mapper.metadata().region_id,
539 partition,
540 "SeriesScan",
541 stream_ctx.query_start,
542 metrics_set,
543 );
544
545 metrics_list.set(partition, metrics.clone());
546 metrics
547}