1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::StreamExt;
29use smallvec::{smallvec, SmallVec};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
34use tokio::sync::mpsc::{self, Receiver, Sender};
35use tokio::sync::Semaphore;
36
37use crate::error::{
38 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
39 ScanSeriesSnafu,
40};
41use crate::read::range::RangeBuilderList;
42use crate::read::scan_region::{ScanInput, StreamContext};
43use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
44use crate::read::seq_scan::{build_sources, SeqScan};
45use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
46use crate::read::{Batch, ScannerMetrics};
47
48const SEND_TIMEOUT: Duration = Duration::from_millis(10);
50
51type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
53
54pub struct SeriesScan {
60 properties: ScannerProperties,
62 stream_ctx: Arc<StreamContext>,
64 receivers: Mutex<ReceiverList>,
66 metrics_list: Arc<PartitionMetricsList>,
69}
70
71impl SeriesScan {
72 pub(crate) fn new(input: ScanInput) -> Self {
74 let mut properties = ScannerProperties::default()
75 .with_append_mode(input.append_mode)
76 .with_total_rows(input.total_rows());
77 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
78 properties.partitions = vec![stream_ctx.partition_ranges()];
79
80 Self {
81 properties,
82 stream_ctx,
83 receivers: Mutex::new(Vec::new()),
84 metrics_list: Arc::new(PartitionMetricsList::default()),
85 }
86 }
87
88 fn scan_partition_impl(
89 &self,
90 metrics_set: &ExecutionPlanMetricsSet,
91 partition: usize,
92 ) -> Result<SendableRecordBatchStream> {
93 let metrics =
94 new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
95
96 let batch_stream = self.scan_batch_in_partition(partition, metrics.clone(), metrics_set)?;
97
98 let input = &self.stream_ctx.input;
99 let record_batch_stream = ConvertBatchStream::new(
100 batch_stream,
101 input.mapper.clone(),
102 input.cache_strategy.clone(),
103 metrics,
104 );
105
106 Ok(Box::pin(RecordBatchStreamWrapper::new(
107 input.mapper.output_schema(),
108 Box::pin(record_batch_stream),
109 )))
110 }
111
112 fn scan_batch_in_partition(
113 &self,
114 partition: usize,
115 part_metrics: PartitionMetrics,
116 metrics_set: &ExecutionPlanMetricsSet,
117 ) -> Result<ScanBatchStream> {
118 ensure!(
119 partition < self.properties.num_partitions(),
120 PartitionOutOfRangeSnafu {
121 given: partition,
122 all: self.properties.num_partitions(),
123 }
124 );
125
126 self.maybe_start_distributor(metrics_set, &self.metrics_list);
127
128 let mut receiver = self.take_receiver(partition)?;
129 let stream = try_stream! {
130 part_metrics.on_first_poll();
131
132 let mut fetch_start = Instant::now();
133 while let Some(series) = receiver.recv().await {
134 let series = series?;
135
136 let mut metrics = ScannerMetrics::default();
137 metrics.scan_cost += fetch_start.elapsed();
138 fetch_start = Instant::now();
139
140 metrics.num_batches += series.batches.len();
141 metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
142
143 let yield_start = Instant::now();
144 yield ScanBatch::Series(series);
145 metrics.yield_cost += yield_start.elapsed();
146
147 part_metrics.merge_metrics(&metrics);
148 }
149 };
150 Ok(Box::pin(stream))
151 }
152
153 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
155 let mut rx_list = self.receivers.lock().unwrap();
156 rx_list[partition]
157 .take()
158 .context(ScanMultiTimesSnafu { partition })
159 }
160
161 fn maybe_start_distributor(
163 &self,
164 metrics_set: &ExecutionPlanMetricsSet,
165 metrics_list: &Arc<PartitionMetricsList>,
166 ) {
167 let mut rx_list = self.receivers.lock().unwrap();
168 if !rx_list.is_empty() {
169 return;
170 }
171
172 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
173 let mut distributor = SeriesDistributor {
174 stream_ctx: self.stream_ctx.clone(),
175 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
176 partitions: self.properties.partitions.clone(),
177 senders,
178 metrics_set: metrics_set.clone(),
179 metrics_list: metrics_list.clone(),
180 };
181 common_runtime::spawn_global(async move {
182 distributor.execute().await;
183 });
184
185 *rx_list = receivers;
186 }
187
188 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
190 let part_num = self.properties.num_partitions();
191 let metrics_set = ExecutionPlanMetricsSet::default();
192 let streams = (0..part_num)
193 .map(|i| self.scan_partition(&metrics_set, i))
194 .collect::<Result<Vec<_>, BoxedError>>()?;
195 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
196 Ok(Box::pin(chained_stream))
197 }
198
199 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
201 let metrics_set = ExecutionPlanMetricsSet::new();
202
203 let streams = (0..self.properties.partitions.len())
204 .map(|partition| {
205 let metrics = new_partition_metrics(
206 &self.stream_ctx,
207 &metrics_set,
208 partition,
209 &self.metrics_list,
210 );
211
212 self.scan_batch_in_partition(partition, metrics, &metrics_set)
213 })
214 .collect::<Result<Vec<_>>>()?;
215
216 Ok(Box::pin(futures::stream::iter(streams).flatten()))
217 }
218}
219
220fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
221 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
222 .map(|_| {
223 let (sender, receiver) = mpsc::channel(1);
224 (Some(sender), Some(receiver))
225 })
226 .unzip();
227 (SenderList::new(senders), receivers)
228}
229
230impl RegionScanner for SeriesScan {
231 fn properties(&self) -> &ScannerProperties {
232 &self.properties
233 }
234
235 fn schema(&self) -> SchemaRef {
236 self.stream_ctx.input.mapper.output_schema()
237 }
238
239 fn metadata(&self) -> RegionMetadataRef {
240 self.stream_ctx.input.mapper.metadata().clone()
241 }
242
243 fn scan_partition(
244 &self,
245 metrics_set: &ExecutionPlanMetricsSet,
246 partition: usize,
247 ) -> Result<SendableRecordBatchStream, BoxedError> {
248 self.scan_partition_impl(metrics_set, partition)
249 .map_err(BoxedError::new)
250 }
251
252 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
253 self.properties.prepare(request);
254 Ok(())
255 }
256
257 fn has_predicate(&self) -> bool {
258 let predicate = self.stream_ctx.input.predicate();
259 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
260 }
261
262 fn set_logical_region(&mut self, logical_region: bool) {
263 self.properties.set_logical_region(logical_region);
264 }
265}
266
267impl DisplayAs for SeriesScan {
268 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
269 write!(
270 f,
271 "SeriesScan: region={}, ",
272 self.stream_ctx.input.mapper.metadata().region_id
273 )?;
274 match t {
275 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
276 DisplayFormatType::Verbose => {
277 self.stream_ctx.format_for_explain(true, f)?;
278 self.metrics_list.format_verbose_metrics(f)
279 }
280 }
281 }
282}
283
284impl fmt::Debug for SeriesScan {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 f.debug_struct("SeriesScan")
287 .field("num_ranges", &self.stream_ctx.ranges.len())
288 .finish()
289 }
290}
291
292#[cfg(test)]
293impl SeriesScan {
294 pub(crate) fn input(&self) -> &ScanInput {
296 &self.stream_ctx.input
297 }
298}
299
300struct SeriesDistributor {
302 stream_ctx: Arc<StreamContext>,
304 semaphore: Option<Arc<Semaphore>>,
306 partitions: Vec<Vec<PartitionRange>>,
308 senders: SenderList,
310 metrics_set: ExecutionPlanMetricsSet,
316 metrics_list: Arc<PartitionMetricsList>,
317}
318
319impl SeriesDistributor {
320 async fn execute(&mut self) {
322 if let Err(e) = self.scan_partitions().await {
323 self.senders.send_error(e).await;
324 }
325 }
326
327 async fn scan_partitions(&mut self) -> Result<()> {
329 let part_metrics = new_partition_metrics(
330 &self.stream_ctx,
331 &self.metrics_set,
332 self.partitions.len(),
333 &self.metrics_list,
334 );
335 part_metrics.on_first_poll();
336
337 let range_builder_list = Arc::new(RangeBuilderList::new(
338 self.stream_ctx.input.num_memtables(),
339 self.stream_ctx.input.num_files(),
340 ));
341 let mut sources = Vec::with_capacity(self.partitions.len());
343 for partition in &self.partitions {
344 sources.reserve(partition.len());
345 for part_range in partition {
346 build_sources(
347 &self.stream_ctx,
348 part_range,
349 false,
350 &part_metrics,
351 range_builder_list.clone(),
352 &mut sources,
353 )
354 .await?;
355 }
356 }
357
358 let mut reader =
360 SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
361 .await?;
362 let mut metrics = SeriesDistributorMetrics::default();
363 let mut fetch_start = Instant::now();
364
365 let mut current_series = SeriesBatch::default();
366 while let Some(batch) = reader.next_batch().await? {
367 metrics.scan_cost += fetch_start.elapsed();
368 fetch_start = Instant::now();
369 metrics.num_batches += 1;
370 metrics.num_rows += batch.num_rows();
371
372 debug_assert!(!batch.is_empty());
373 if batch.is_empty() {
374 continue;
375 }
376
377 let Some(last_key) = current_series.current_key() else {
378 current_series.push(batch);
379 continue;
380 };
381
382 if last_key == batch.primary_key() {
383 current_series.push(batch);
384 continue;
385 }
386
387 let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
389 let yield_start = Instant::now();
390 self.senders.send_batch(to_send).await?;
391 metrics.yield_cost += yield_start.elapsed();
392 }
393
394 if !current_series.is_empty() {
395 let yield_start = Instant::now();
396 self.senders.send_batch(current_series).await?;
397 metrics.yield_cost += yield_start.elapsed();
398 }
399
400 metrics.scan_cost += fetch_start.elapsed();
401 metrics.num_series_send_timeout = self.senders.num_timeout;
402 part_metrics.set_distributor_metrics(&metrics);
403
404 part_metrics.on_finish();
405
406 Ok(())
407 }
408}
409
410#[derive(Default)]
412pub struct SeriesBatch {
413 pub batches: SmallVec<[Batch; 4]>,
414}
415
416impl SeriesBatch {
417 fn single(batch: Batch) -> Self {
419 Self {
420 batches: smallvec![batch],
421 }
422 }
423
424 fn current_key(&self) -> Option<&[u8]> {
425 self.batches.first().map(|batch| batch.primary_key())
426 }
427
428 fn push(&mut self, batch: Batch) {
429 self.batches.push(batch);
430 }
431
432 fn is_empty(&self) -> bool {
434 self.batches.is_empty()
435 }
436}
437
438struct SenderList {
440 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
441 num_nones: usize,
443 sender_idx: usize,
445 num_timeout: usize,
447}
448
449impl SenderList {
450 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
451 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
452 Self {
453 senders,
454 num_nones,
455 sender_idx: 0,
456 num_timeout: 0,
457 }
458 }
459
460 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
463 for _ in 0..self.senders.len() {
464 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
465
466 let sender_idx = self.fetch_add_sender_idx();
467 let Some(sender) = &self.senders[sender_idx] else {
468 continue;
469 };
470
471 match sender.try_send(Ok(batch)) {
472 Ok(()) => return Ok(None),
473 Err(TrySendError::Full(res)) => {
474 batch = res.unwrap();
476 }
477 Err(TrySendError::Closed(res)) => {
478 self.senders[sender_idx] = None;
479 self.num_nones += 1;
480 batch = res.unwrap();
482 }
483 }
484 }
485
486 Ok(Some(batch))
487 }
488
489 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
491 match self.try_send_batch(batch)? {
493 Some(b) => {
494 batch = b;
496 }
497 None => {
498 return Ok(());
499 }
500 }
501
502 loop {
503 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
504
505 let sender_idx = self.fetch_add_sender_idx();
506 let Some(sender) = &self.senders[sender_idx] else {
507 continue;
508 };
509 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
514 Ok(()) => break,
515 Err(SendTimeoutError::Timeout(res)) => {
516 self.num_timeout += 1;
517 batch = res.unwrap();
519 }
520 Err(SendTimeoutError::Closed(res)) => {
521 self.senders[sender_idx] = None;
522 self.num_nones += 1;
523 batch = res.unwrap();
525 }
526 }
527 }
528
529 Ok(())
530 }
531
532 async fn send_error(&self, error: Error) {
533 let error = Arc::new(error);
534 for sender in self.senders.iter().flatten() {
535 let result = Err(error.clone()).context(ScanSeriesSnafu);
536 let _ = sender.send(result).await;
537 }
538 }
539
540 fn fetch_add_sender_idx(&mut self) -> usize {
541 let sender_idx = self.sender_idx;
542 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
543 sender_idx
544 }
545}
546
547fn new_partition_metrics(
548 stream_ctx: &StreamContext,
549 metrics_set: &ExecutionPlanMetricsSet,
550 partition: usize,
551 metrics_list: &PartitionMetricsList,
552) -> PartitionMetrics {
553 let metrics = PartitionMetrics::new(
554 stream_ctx.input.mapper.metadata().region_id,
555 partition,
556 "SeriesScan",
557 stream_ctx.query_start,
558 metrics_set,
559 );
560
561 metrics_list.set(partition, metrics.clone());
562 metrics
563}