1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::StreamExt;
29use smallvec::{smallvec, SmallVec};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
36use tokio::sync::mpsc::{self, Receiver, Sender};
37use tokio::sync::Semaphore;
38
39use crate::error::{
40 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
41 ScanSeriesSnafu, TooManyFilesToReadSnafu,
42};
43use crate::read::range::RangeBuilderList;
44use crate::read::scan_region::{ScanInput, StreamContext};
45use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
46use crate::read::seq_scan::{build_sources, SeqScan};
47use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
48use crate::read::{Batch, ScannerMetrics};
49
50const SEND_TIMEOUT: Duration = Duration::from_millis(10);
52
53type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
55
56pub struct SeriesScan {
62 properties: ScannerProperties,
64 stream_ctx: Arc<StreamContext>,
66 receivers: Mutex<ReceiverList>,
68 metrics_list: Arc<PartitionMetricsList>,
71}
72
73impl SeriesScan {
74 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 receivers: Mutex::new(Vec::new()),
86 metrics_list: Arc::new(PartitionMetricsList::default()),
87 }
88 }
89
90 fn scan_partition_impl(
91 &self,
92 ctx: &QueryScanContext,
93 metrics_set: &ExecutionPlanMetricsSet,
94 partition: usize,
95 ) -> Result<SendableRecordBatchStream> {
96 let metrics = new_partition_metrics(
97 &self.stream_ctx,
98 ctx.explain_verbose,
99 metrics_set,
100 partition,
101 &self.metrics_list,
102 );
103
104 let batch_stream =
105 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
106
107 let input = &self.stream_ctx.input;
108 let record_batch_stream = ConvertBatchStream::new(
109 batch_stream,
110 input.mapper.clone(),
111 input.cache_strategy.clone(),
112 metrics,
113 );
114
115 Ok(Box::pin(RecordBatchStreamWrapper::new(
116 input.mapper.output_schema(),
117 Box::pin(record_batch_stream),
118 )))
119 }
120
121 fn scan_batch_in_partition(
122 &self,
123 ctx: &QueryScanContext,
124 partition: usize,
125 part_metrics: PartitionMetrics,
126 metrics_set: &ExecutionPlanMetricsSet,
127 ) -> Result<ScanBatchStream> {
128 if ctx.explain_verbose {
129 common_telemetry::info!(
130 "SeriesScan partition {}, region_id: {}",
131 partition,
132 self.stream_ctx.input.region_metadata().region_id
133 );
134 }
135
136 ensure!(
137 partition < self.properties.num_partitions(),
138 PartitionOutOfRangeSnafu {
139 given: partition,
140 all: self.properties.num_partitions(),
141 }
142 );
143
144 self.maybe_start_distributor(metrics_set, &self.metrics_list);
145
146 let mut receiver = self.take_receiver(partition)?;
147 let stream = try_stream! {
148 part_metrics.on_first_poll();
149
150 let mut fetch_start = Instant::now();
151 while let Some(series) = receiver.recv().await {
152 let series = series?;
153
154 let mut metrics = ScannerMetrics::default();
155 metrics.scan_cost += fetch_start.elapsed();
156 fetch_start = Instant::now();
157
158 metrics.num_batches += series.batches.len();
159 metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
160
161 let yield_start = Instant::now();
162 yield ScanBatch::Series(series);
163 metrics.yield_cost += yield_start.elapsed();
164
165 part_metrics.merge_metrics(&metrics);
166 }
167
168 part_metrics.on_finish();
169 };
170 Ok(Box::pin(stream))
171 }
172
173 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
175 let mut rx_list = self.receivers.lock().unwrap();
176 rx_list[partition]
177 .take()
178 .context(ScanMultiTimesSnafu { partition })
179 }
180
181 fn maybe_start_distributor(
183 &self,
184 metrics_set: &ExecutionPlanMetricsSet,
185 metrics_list: &Arc<PartitionMetricsList>,
186 ) {
187 let mut rx_list = self.receivers.lock().unwrap();
188 if !rx_list.is_empty() {
189 return;
190 }
191
192 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
193 let mut distributor = SeriesDistributor {
194 stream_ctx: self.stream_ctx.clone(),
195 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
196 partitions: self.properties.partitions.clone(),
197 senders,
198 metrics_set: metrics_set.clone(),
199 metrics_list: metrics_list.clone(),
200 };
201 common_runtime::spawn_global(async move {
202 distributor.execute().await;
203 });
204
205 *rx_list = receivers;
206 }
207
208 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
210 let part_num = self.properties.num_partitions();
211 let metrics_set = ExecutionPlanMetricsSet::default();
212 let streams = (0..part_num)
213 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
214 .collect::<Result<Vec<_>, BoxedError>>()?;
215 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
216 Ok(Box::pin(chained_stream))
217 }
218
219 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
221 let metrics_set = ExecutionPlanMetricsSet::new();
222
223 let streams = (0..self.properties.partitions.len())
224 .map(|partition| {
225 let metrics = new_partition_metrics(
226 &self.stream_ctx,
227 false,
228 &metrics_set,
229 partition,
230 &self.metrics_list,
231 );
232
233 self.scan_batch_in_partition(
234 &QueryScanContext::default(),
235 partition,
236 metrics,
237 &metrics_set,
238 )
239 })
240 .collect::<Result<Vec<_>>>()?;
241
242 Ok(Box::pin(futures::stream::iter(streams).flatten()))
243 }
244
245 pub(crate) fn check_scan_limit(&self) -> Result<()> {
247 let total_files: usize = self
249 .properties
250 .partitions
251 .iter()
252 .flat_map(|partition| partition.iter())
253 .map(|part_range| {
254 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
255 range_meta.indices.len()
256 })
257 .sum();
258
259 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
260 if total_files > max_concurrent_files {
261 return TooManyFilesToReadSnafu {
262 actual: total_files,
263 max: max_concurrent_files,
264 }
265 .fail();
266 }
267
268 Ok(())
269 }
270}
271
272fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
273 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
274 .map(|_| {
275 let (sender, receiver) = mpsc::channel(1);
276 (Some(sender), Some(receiver))
277 })
278 .unzip();
279 (SenderList::new(senders), receivers)
280}
281
282impl RegionScanner for SeriesScan {
283 fn properties(&self) -> &ScannerProperties {
284 &self.properties
285 }
286
287 fn schema(&self) -> SchemaRef {
288 self.stream_ctx.input.mapper.output_schema()
289 }
290
291 fn metadata(&self) -> RegionMetadataRef {
292 self.stream_ctx.input.mapper.metadata().clone()
293 }
294
295 fn scan_partition(
296 &self,
297 ctx: &QueryScanContext,
298 metrics_set: &ExecutionPlanMetricsSet,
299 partition: usize,
300 ) -> Result<SendableRecordBatchStream, BoxedError> {
301 self.scan_partition_impl(ctx, metrics_set, partition)
302 .map_err(BoxedError::new)
303 }
304
305 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
306 self.properties.prepare(request);
307
308 self.check_scan_limit().map_err(BoxedError::new)?;
309
310 Ok(())
311 }
312
313 fn has_predicate(&self) -> bool {
314 let predicate = self.stream_ctx.input.predicate();
315 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
316 }
317
318 fn set_logical_region(&mut self, logical_region: bool) {
319 self.properties.set_logical_region(logical_region);
320 }
321}
322
323impl DisplayAs for SeriesScan {
324 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
325 write!(
326 f,
327 "SeriesScan: region={}, ",
328 self.stream_ctx.input.mapper.metadata().region_id
329 )?;
330 match t {
331 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
332 DisplayFormatType::Verbose => {
333 self.stream_ctx.format_for_explain(true, f)?;
334 self.metrics_list.format_verbose_metrics(f)
335 }
336 }
337 }
338}
339
340impl fmt::Debug for SeriesScan {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 f.debug_struct("SeriesScan")
343 .field("num_ranges", &self.stream_ctx.ranges.len())
344 .finish()
345 }
346}
347
348#[cfg(test)]
349impl SeriesScan {
350 pub(crate) fn input(&self) -> &ScanInput {
352 &self.stream_ctx.input
353 }
354}
355
356struct SeriesDistributor {
358 stream_ctx: Arc<StreamContext>,
360 semaphore: Option<Arc<Semaphore>>,
362 partitions: Vec<Vec<PartitionRange>>,
364 senders: SenderList,
366 metrics_set: ExecutionPlanMetricsSet,
372 metrics_list: Arc<PartitionMetricsList>,
373}
374
375impl SeriesDistributor {
376 async fn execute(&mut self) {
378 if let Err(e) = self.scan_partitions().await {
379 self.senders.send_error(e).await;
380 }
381 }
382
383 async fn scan_partitions(&mut self) -> Result<()> {
385 let part_metrics = new_partition_metrics(
386 &self.stream_ctx,
387 false,
388 &self.metrics_set,
389 self.partitions.len(),
390 &self.metrics_list,
391 );
392 part_metrics.on_first_poll();
393
394 let range_builder_list = Arc::new(RangeBuilderList::new(
395 self.stream_ctx.input.num_memtables(),
396 self.stream_ctx.input.num_files(),
397 ));
398 let mut sources = Vec::with_capacity(self.partitions.len());
400 for partition in &self.partitions {
401 sources.reserve(partition.len());
402 for part_range in partition {
403 build_sources(
404 &self.stream_ctx,
405 part_range,
406 false,
407 &part_metrics,
408 range_builder_list.clone(),
409 &mut sources,
410 )
411 .await?;
412 }
413 }
414
415 let mut reader =
417 SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
418 .await?;
419 let mut metrics = SeriesDistributorMetrics::default();
420 let mut fetch_start = Instant::now();
421
422 let mut current_series = SeriesBatch::default();
423 while let Some(batch) = reader.next_batch().await? {
424 metrics.scan_cost += fetch_start.elapsed();
425 fetch_start = Instant::now();
426 metrics.num_batches += 1;
427 metrics.num_rows += batch.num_rows();
428
429 debug_assert!(!batch.is_empty());
430 if batch.is_empty() {
431 continue;
432 }
433
434 let Some(last_key) = current_series.current_key() else {
435 current_series.push(batch);
436 continue;
437 };
438
439 if last_key == batch.primary_key() {
440 current_series.push(batch);
441 continue;
442 }
443
444 let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
446 let yield_start = Instant::now();
447 self.senders.send_batch(to_send).await?;
448 metrics.yield_cost += yield_start.elapsed();
449 }
450
451 if !current_series.is_empty() {
452 let yield_start = Instant::now();
453 self.senders.send_batch(current_series).await?;
454 metrics.yield_cost += yield_start.elapsed();
455 }
456
457 metrics.scan_cost += fetch_start.elapsed();
458 metrics.num_series_send_timeout = self.senders.num_timeout;
459 metrics.num_series_send_full = self.senders.num_full;
460 part_metrics.set_distributor_metrics(&metrics);
461
462 part_metrics.on_finish();
463
464 Ok(())
465 }
466}
467
468#[derive(Default)]
470pub struct SeriesBatch {
471 pub batches: SmallVec<[Batch; 4]>,
472}
473
474impl SeriesBatch {
475 fn single(batch: Batch) -> Self {
477 Self {
478 batches: smallvec![batch],
479 }
480 }
481
482 fn current_key(&self) -> Option<&[u8]> {
483 self.batches.first().map(|batch| batch.primary_key())
484 }
485
486 fn push(&mut self, batch: Batch) {
487 self.batches.push(batch);
488 }
489
490 fn is_empty(&self) -> bool {
492 self.batches.is_empty()
493 }
494}
495
496struct SenderList {
498 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
499 num_nones: usize,
501 sender_idx: usize,
503 num_timeout: usize,
505 num_full: usize,
507}
508
509impl SenderList {
510 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
511 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
512 Self {
513 senders,
514 num_nones,
515 sender_idx: 0,
516 num_timeout: 0,
517 num_full: 0,
518 }
519 }
520
521 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
524 for _ in 0..self.senders.len() {
525 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
526
527 let sender_idx = self.fetch_add_sender_idx();
528 let Some(sender) = &self.senders[sender_idx] else {
529 continue;
530 };
531
532 match sender.try_send(Ok(batch)) {
533 Ok(()) => return Ok(None),
534 Err(TrySendError::Full(res)) => {
535 self.num_full += 1;
536 batch = res.unwrap();
538 }
539 Err(TrySendError::Closed(res)) => {
540 self.senders[sender_idx] = None;
541 self.num_nones += 1;
542 batch = res.unwrap();
544 }
545 }
546 }
547
548 Ok(Some(batch))
549 }
550
551 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
553 match self.try_send_batch(batch)? {
555 Some(b) => {
556 batch = b;
558 }
559 None => {
560 return Ok(());
561 }
562 }
563
564 loop {
565 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
566
567 let sender_idx = self.fetch_add_sender_idx();
568 let Some(sender) = &self.senders[sender_idx] else {
569 continue;
570 };
571 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
576 Ok(()) => break,
577 Err(SendTimeoutError::Timeout(res)) => {
578 self.num_timeout += 1;
579 batch = res.unwrap();
581 }
582 Err(SendTimeoutError::Closed(res)) => {
583 self.senders[sender_idx] = None;
584 self.num_nones += 1;
585 batch = res.unwrap();
587 }
588 }
589 }
590
591 Ok(())
592 }
593
594 async fn send_error(&self, error: Error) {
595 let error = Arc::new(error);
596 for sender in self.senders.iter().flatten() {
597 let result = Err(error.clone()).context(ScanSeriesSnafu);
598 let _ = sender.send(result).await;
599 }
600 }
601
602 fn fetch_add_sender_idx(&mut self) -> usize {
603 let sender_idx = self.sender_idx;
604 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
605 sender_idx
606 }
607}
608
609fn new_partition_metrics(
610 stream_ctx: &StreamContext,
611 explain_verbose: bool,
612 metrics_set: &ExecutionPlanMetricsSet,
613 partition: usize,
614 metrics_list: &PartitionMetricsList,
615) -> PartitionMetrics {
616 let metrics = PartitionMetrics::new(
617 stream_ctx.input.mapper.metadata().region_id,
618 partition,
619 "SeriesScan",
620 stream_ctx.query_start,
621 explain_verbose,
622 metrics_set,
623 );
624
625 metrics_list.set(partition, metrics.clone());
626 metrics
627}