1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::StreamExt;
29use smallvec::{smallvec, SmallVec};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
36use tokio::sync::mpsc::{self, Receiver, Sender};
37use tokio::sync::Semaphore;
38
39use crate::error::{
40 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
41 ScanSeriesSnafu, TooManyFilesToReadSnafu,
42};
43use crate::read::range::RangeBuilderList;
44use crate::read::scan_region::{ScanInput, StreamContext};
45use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
46use crate::read::seq_scan::{build_sources, SeqScan};
47use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
48use crate::read::{Batch, ScannerMetrics};
49
50const SEND_TIMEOUT: Duration = Duration::from_millis(10);
52
53type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
55
56pub struct SeriesScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 receivers: Mutex<ReceiverList>,
68 metrics_list: Arc<PartitionMetricsList>,
71}
72
73impl SeriesScan {
74 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 receivers: Mutex::new(Vec::new()),
86 metrics_list: Arc::new(PartitionMetricsList::default()),
87 }
88 }
89
90 fn scan_partition_impl(
91 &self,
92 ctx: &QueryScanContext,
93 metrics_set: &ExecutionPlanMetricsSet,
94 partition: usize,
95 ) -> Result<SendableRecordBatchStream> {
96 let metrics = new_partition_metrics(
97 &self.stream_ctx,
98 ctx.explain_verbose,
99 metrics_set,
100 partition,
101 &self.metrics_list,
102 );
103
104 let batch_stream =
105 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
106
107 let input = &self.stream_ctx.input;
108 let record_batch_stream = ConvertBatchStream::new(
109 batch_stream,
110 input.mapper.clone(),
111 input.cache_strategy.clone(),
112 metrics,
113 );
114
115 Ok(Box::pin(RecordBatchStreamWrapper::new(
116 input.mapper.output_schema(),
117 Box::pin(record_batch_stream),
118 )))
119 }
120
121 fn scan_batch_in_partition(
122 &self,
123 ctx: &QueryScanContext,
124 partition: usize,
125 part_metrics: PartitionMetrics,
126 metrics_set: &ExecutionPlanMetricsSet,
127 ) -> Result<ScanBatchStream> {
128 if ctx.explain_verbose {
129 common_telemetry::info!(
130 "SeriesScan partition {}, region_id: {}",
131 partition,
132 self.stream_ctx.input.region_metadata().region_id
133 );
134 }
135
136 ensure!(
137 partition < self.properties.num_partitions(),
138 PartitionOutOfRangeSnafu {
139 given: partition,
140 all: self.properties.num_partitions(),
141 }
142 );
143
144 self.maybe_start_distributor(metrics_set, &self.metrics_list);
145
146 let mut receiver = self.take_receiver(partition)?;
147 let stream = try_stream! {
148 part_metrics.on_first_poll();
149
150 let mut fetch_start = Instant::now();
151 while let Some(series) = receiver.recv().await {
152 let series = series?;
153
154 let mut metrics = ScannerMetrics::default();
155 metrics.scan_cost += fetch_start.elapsed();
156 fetch_start = Instant::now();
157
158 metrics.num_batches += series.batches.len();
159 metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
160
161 let yield_start = Instant::now();
162 yield ScanBatch::Series(series);
163 metrics.yield_cost += yield_start.elapsed();
164
165 part_metrics.merge_metrics(&metrics);
166 }
167
168 part_metrics.on_finish();
169 };
170 Ok(Box::pin(stream))
171 }
172
173 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
175 let mut rx_list = self.receivers.lock().unwrap();
176 rx_list[partition]
177 .take()
178 .context(ScanMultiTimesSnafu { partition })
179 }
180
181 fn maybe_start_distributor(
183 &self,
184 metrics_set: &ExecutionPlanMetricsSet,
185 metrics_list: &Arc<PartitionMetricsList>,
186 ) {
187 let mut rx_list = self.receivers.lock().unwrap();
188 if !rx_list.is_empty() {
189 return;
190 }
191
192 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
193 let mut distributor = SeriesDistributor {
194 stream_ctx: self.stream_ctx.clone(),
195 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
196 partitions: self.properties.partitions.clone(),
197 senders,
198 metrics_set: metrics_set.clone(),
199 metrics_list: metrics_list.clone(),
200 };
201 common_runtime::spawn_global(async move {
202 distributor.execute().await;
203 });
204
205 *rx_list = receivers;
206 }
207
208 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
210 let part_num = self.properties.num_partitions();
211 let metrics_set = ExecutionPlanMetricsSet::default();
212 let streams = (0..part_num)
213 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
214 .collect::<Result<Vec<_>, BoxedError>>()?;
215 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
216 Ok(Box::pin(chained_stream))
217 }
218
219 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
221 let metrics_set = ExecutionPlanMetricsSet::new();
222
223 let streams = (0..self.properties.partitions.len())
224 .map(|partition| {
225 let metrics = new_partition_metrics(
226 &self.stream_ctx,
227 false,
228 &metrics_set,
229 partition,
230 &self.metrics_list,
231 );
232
233 self.scan_batch_in_partition(
234 &QueryScanContext::default(),
235 partition,
236 metrics,
237 &metrics_set,
238 )
239 })
240 .collect::<Result<Vec<_>>>()?;
241
242 Ok(Box::pin(futures::stream::iter(streams).flatten()))
243 }
244
245 pub(crate) fn check_scan_limit(&self) -> Result<()> {
247 let total_files: usize = self
249 .properties
250 .partitions
251 .iter()
252 .flat_map(|partition| partition.iter())
253 .map(|part_range| {
254 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
255 range_meta.indices.len()
256 })
257 .sum();
258
259 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
260 if total_files > max_concurrent_files {
261 return TooManyFilesToReadSnafu {
262 actual: total_files,
263 max: max_concurrent_files,
264 }
265 .fail();
266 }
267
268 Ok(())
269 }
270}
271
272fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
273 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
274 .map(|_| {
275 let (sender, receiver) = mpsc::channel(1);
276 (Some(sender), Some(receiver))
277 })
278 .unzip();
279 (SenderList::new(senders), receivers)
280}
281
282impl RegionScanner for SeriesScan {
283 fn properties(&self) -> &ScannerProperties {
284 &self.properties
285 }
286
287 fn schema(&self) -> SchemaRef {
288 self.stream_ctx.input.mapper.output_schema()
289 }
290
291 fn metadata(&self) -> RegionMetadataRef {
292 self.stream_ctx.input.mapper.metadata().clone()
293 }
294
295 fn scan_partition(
296 &self,
297 ctx: &QueryScanContext,
298 metrics_set: &ExecutionPlanMetricsSet,
299 partition: usize,
300 ) -> Result<SendableRecordBatchStream, BoxedError> {
301 self.scan_partition_impl(ctx, metrics_set, partition)
302 .map_err(BoxedError::new)
303 }
304
305 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
306 self.properties.prepare(request);
307
308 self.check_scan_limit().map_err(BoxedError::new)?;
309
310 Ok(())
311 }
312
313 fn has_predicate(&self) -> bool {
314 let predicate = self.stream_ctx.input.predicate();
315 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
316 }
317
318 fn set_logical_region(&mut self, logical_region: bool) {
319 self.properties.set_logical_region(logical_region);
320 }
321}
322
323impl DisplayAs for SeriesScan {
324 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
325 write!(
326 f,
327 "SeriesScan: region={}, ",
328 self.stream_ctx.input.mapper.metadata().region_id
329 )?;
330 match t {
331 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
332 self.stream_ctx.format_for_explain(false, f)
333 }
334 DisplayFormatType::Verbose => {
335 self.stream_ctx.format_for_explain(true, f)?;
336 self.metrics_list.format_verbose_metrics(f)
337 }
338 }
339 }
340}
341
342impl fmt::Debug for SeriesScan {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 f.debug_struct("SeriesScan")
345 .field("num_ranges", &self.stream_ctx.ranges.len())
346 .finish()
347 }
348}
349
350#[cfg(test)]
351impl SeriesScan {
352 pub(crate) fn input(&self) -> &ScanInput {
354 &self.stream_ctx.input
355 }
356}
357
358struct SeriesDistributor {
360 stream_ctx: Arc<StreamContext>,
362 semaphore: Option<Arc<Semaphore>>,
364 partitions: Vec<Vec<PartitionRange>>,
366 senders: SenderList,
368 metrics_set: ExecutionPlanMetricsSet,
374 metrics_list: Arc<PartitionMetricsList>,
375}
376
377impl SeriesDistributor {
378 async fn execute(&mut self) {
380 if let Err(e) = self.scan_partitions().await {
381 self.senders.send_error(e).await;
382 }
383 }
384
385 async fn scan_partitions(&mut self) -> Result<()> {
387 let part_metrics = new_partition_metrics(
388 &self.stream_ctx,
389 false,
390 &self.metrics_set,
391 self.partitions.len(),
392 &self.metrics_list,
393 );
394 part_metrics.on_first_poll();
395
396 let range_builder_list = Arc::new(RangeBuilderList::new(
397 self.stream_ctx.input.num_memtables(),
398 self.stream_ctx.input.num_files(),
399 ));
400 let mut sources = Vec::with_capacity(self.partitions.len());
402 for partition in &self.partitions {
403 sources.reserve(partition.len());
404 for part_range in partition {
405 build_sources(
406 &self.stream_ctx,
407 part_range,
408 false,
409 &part_metrics,
410 range_builder_list.clone(),
411 &mut sources,
412 )
413 .await?;
414 }
415 }
416
417 let mut reader =
419 SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
420 .await?;
421 let mut metrics = SeriesDistributorMetrics::default();
422 let mut fetch_start = Instant::now();
423
424 let mut current_series = SeriesBatch::default();
425 while let Some(batch) = reader.next_batch().await? {
426 metrics.scan_cost += fetch_start.elapsed();
427 fetch_start = Instant::now();
428 metrics.num_batches += 1;
429 metrics.num_rows += batch.num_rows();
430
431 debug_assert!(!batch.is_empty());
432 if batch.is_empty() {
433 continue;
434 }
435
436 let Some(last_key) = current_series.current_key() else {
437 current_series.push(batch);
438 continue;
439 };
440
441 if last_key == batch.primary_key() {
442 current_series.push(batch);
443 continue;
444 }
445
446 let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
448 let yield_start = Instant::now();
449 self.senders.send_batch(to_send).await?;
450 metrics.yield_cost += yield_start.elapsed();
451 }
452
453 if !current_series.is_empty() {
454 let yield_start = Instant::now();
455 self.senders.send_batch(current_series).await?;
456 metrics.yield_cost += yield_start.elapsed();
457 }
458
459 metrics.scan_cost += fetch_start.elapsed();
460 metrics.num_series_send_timeout = self.senders.num_timeout;
461 metrics.num_series_send_full = self.senders.num_full;
462 part_metrics.set_distributor_metrics(&metrics);
463
464 part_metrics.on_finish();
465
466 Ok(())
467 }
468}
469
470#[derive(Default)]
472pub struct SeriesBatch {
473 pub batches: SmallVec<[Batch; 4]>,
474}
475
476impl SeriesBatch {
477 fn single(batch: Batch) -> Self {
479 Self {
480 batches: smallvec![batch],
481 }
482 }
483
484 fn current_key(&self) -> Option<&[u8]> {
485 self.batches.first().map(|batch| batch.primary_key())
486 }
487
488 fn push(&mut self, batch: Batch) {
489 self.batches.push(batch);
490 }
491
492 fn is_empty(&self) -> bool {
494 self.batches.is_empty()
495 }
496}
497
498struct SenderList {
500 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
501 num_nones: usize,
503 sender_idx: usize,
505 num_timeout: usize,
507 num_full: usize,
509}
510
511impl SenderList {
512 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
513 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
514 Self {
515 senders,
516 num_nones,
517 sender_idx: 0,
518 num_timeout: 0,
519 num_full: 0,
520 }
521 }
522
523 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
526 for _ in 0..self.senders.len() {
527 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
528
529 let sender_idx = self.fetch_add_sender_idx();
530 let Some(sender) = &self.senders[sender_idx] else {
531 continue;
532 };
533
534 match sender.try_send(Ok(batch)) {
535 Ok(()) => return Ok(None),
536 Err(TrySendError::Full(res)) => {
537 self.num_full += 1;
538 batch = res.unwrap();
540 }
541 Err(TrySendError::Closed(res)) => {
542 self.senders[sender_idx] = None;
543 self.num_nones += 1;
544 batch = res.unwrap();
546 }
547 }
548 }
549
550 Ok(Some(batch))
551 }
552
553 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
555 match self.try_send_batch(batch)? {
557 Some(b) => {
558 batch = b;
560 }
561 None => {
562 return Ok(());
563 }
564 }
565
566 loop {
567 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
568
569 let sender_idx = self.fetch_add_sender_idx();
570 let Some(sender) = &self.senders[sender_idx] else {
571 continue;
572 };
573 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
578 Ok(()) => break,
579 Err(SendTimeoutError::Timeout(res)) => {
580 self.num_timeout += 1;
581 batch = res.unwrap();
583 }
584 Err(SendTimeoutError::Closed(res)) => {
585 self.senders[sender_idx] = None;
586 self.num_nones += 1;
587 batch = res.unwrap();
589 }
590 }
591 }
592
593 Ok(())
594 }
595
596 async fn send_error(&self, error: Error) {
597 let error = Arc::new(error);
598 for sender in self.senders.iter().flatten() {
599 let result = Err(error.clone()).context(ScanSeriesSnafu);
600 let _ = sender.send(result).await;
601 }
602 }
603
604 fn fetch_add_sender_idx(&mut self) -> usize {
605 let sender_idx = self.sender_idx;
606 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
607 sender_idx
608 }
609}
610
611fn new_partition_metrics(
612 stream_ctx: &StreamContext,
613 explain_verbose: bool,
614 metrics_set: &ExecutionPlanMetricsSet,
615 partition: usize,
616 metrics_list: &PartitionMetricsList,
617) -> PartitionMetrics {
618 let metrics = PartitionMetrics::new(
619 stream_ctx.input.mapper.metadata().region_id,
620 partition,
621 "SeriesScan",
622 stream_ctx.query_start,
623 explain_verbose,
624 metrics_set,
625 );
626
627 metrics_list.set(partition, metrics.clone());
628 metrics
629}