mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::StreamExt;
29use smallvec::{smallvec, SmallVec};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
34use tokio::sync::mpsc::{self, Receiver, Sender};
35use tokio::sync::Semaphore;
36
37use crate::error::{
38    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
39    ScanSeriesSnafu,
40};
41use crate::read::range::RangeBuilderList;
42use crate::read::scan_region::{ScanInput, StreamContext};
43use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
44use crate::read::seq_scan::{build_sources, SeqScan};
45use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
46use crate::read::{Batch, ScannerMetrics};
47
48/// Timeout to send a batch to a sender.
49const SEND_TIMEOUT: Duration = Duration::from_millis(10);
50
51/// List of receivers.
52type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
53
54/// Scans a region and returns sorted rows of a series in the same partition.
55///
56/// The output order is always order by `(primary key, time index)` inside every
57/// partition.
58/// Always returns the same series (primary key) to the same partition.
59pub struct SeriesScan {
60    /// Properties of the scanner.
61    properties: ScannerProperties,
62    /// Context of streams.
63    stream_ctx: Arc<StreamContext>,
64    /// Receivers of each partition.
65    receivers: Mutex<ReceiverList>,
66    /// Metrics for each partition.
67    /// The scanner only sets in query and keeps it empty during compaction.
68    metrics_list: Arc<PartitionMetricsList>,
69}
70
71impl SeriesScan {
72    /// Creates a new [SeriesScan].
73    pub(crate) fn new(input: ScanInput) -> Self {
74        let mut properties = ScannerProperties::default()
75            .with_append_mode(input.append_mode)
76            .with_total_rows(input.total_rows());
77        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
78        properties.partitions = vec![stream_ctx.partition_ranges()];
79
80        Self {
81            properties,
82            stream_ctx,
83            receivers: Mutex::new(Vec::new()),
84            metrics_list: Arc::new(PartitionMetricsList::default()),
85        }
86    }
87
88    fn scan_partition_impl(
89        &self,
90        metrics_set: &ExecutionPlanMetricsSet,
91        partition: usize,
92    ) -> Result<SendableRecordBatchStream> {
93        let metrics =
94            new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
95
96        let batch_stream = self.scan_batch_in_partition(partition, metrics.clone(), metrics_set)?;
97
98        let input = &self.stream_ctx.input;
99        let record_batch_stream = ConvertBatchStream::new(
100            batch_stream,
101            input.mapper.clone(),
102            input.cache_strategy.clone(),
103            metrics,
104        );
105
106        Ok(Box::pin(RecordBatchStreamWrapper::new(
107            input.mapper.output_schema(),
108            Box::pin(record_batch_stream),
109        )))
110    }
111
112    fn scan_batch_in_partition(
113        &self,
114        partition: usize,
115        part_metrics: PartitionMetrics,
116        metrics_set: &ExecutionPlanMetricsSet,
117    ) -> Result<ScanBatchStream> {
118        ensure!(
119            partition < self.properties.num_partitions(),
120            PartitionOutOfRangeSnafu {
121                given: partition,
122                all: self.properties.num_partitions(),
123            }
124        );
125
126        self.maybe_start_distributor(metrics_set, &self.metrics_list);
127
128        let mut receiver = self.take_receiver(partition)?;
129        let stream = try_stream! {
130            part_metrics.on_first_poll();
131
132            let mut fetch_start = Instant::now();
133            while let Some(series) = receiver.recv().await {
134                let series = series?;
135
136                let mut metrics = ScannerMetrics::default();
137                metrics.scan_cost += fetch_start.elapsed();
138                fetch_start = Instant::now();
139
140                metrics.num_batches += series.batches.len();
141                metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
142
143                let yield_start = Instant::now();
144                yield ScanBatch::Series(series);
145                metrics.yield_cost += yield_start.elapsed();
146
147                part_metrics.merge_metrics(&metrics);
148            }
149        };
150        Ok(Box::pin(stream))
151    }
152
153    /// Takes the receiver for the partition.
154    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
155        let mut rx_list = self.receivers.lock().unwrap();
156        rx_list[partition]
157            .take()
158            .context(ScanMultiTimesSnafu { partition })
159    }
160
161    /// Starts the distributor if the receiver list is empty.
162    fn maybe_start_distributor(
163        &self,
164        metrics_set: &ExecutionPlanMetricsSet,
165        metrics_list: &Arc<PartitionMetricsList>,
166    ) {
167        let mut rx_list = self.receivers.lock().unwrap();
168        if !rx_list.is_empty() {
169            return;
170        }
171
172        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
173        let mut distributor = SeriesDistributor {
174            stream_ctx: self.stream_ctx.clone(),
175            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
176            partitions: self.properties.partitions.clone(),
177            senders,
178            metrics_set: metrics_set.clone(),
179            metrics_list: metrics_list.clone(),
180        };
181        common_runtime::spawn_global(async move {
182            distributor.execute().await;
183        });
184
185        *rx_list = receivers;
186    }
187
188    /// Scans the region and returns a stream.
189    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
190        let part_num = self.properties.num_partitions();
191        let metrics_set = ExecutionPlanMetricsSet::default();
192        let streams = (0..part_num)
193            .map(|i| self.scan_partition(&metrics_set, i))
194            .collect::<Result<Vec<_>, BoxedError>>()?;
195        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
196        Ok(Box::pin(chained_stream))
197    }
198
199    /// Scan [`Batch`] in all partitions one by one.
200    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
201        let metrics_set = ExecutionPlanMetricsSet::new();
202
203        let streams = (0..self.properties.partitions.len())
204            .map(|partition| {
205                let metrics = new_partition_metrics(
206                    &self.stream_ctx,
207                    &metrics_set,
208                    partition,
209                    &self.metrics_list,
210                );
211
212                self.scan_batch_in_partition(partition, metrics, &metrics_set)
213            })
214            .collect::<Result<Vec<_>>>()?;
215
216        Ok(Box::pin(futures::stream::iter(streams).flatten()))
217    }
218}
219
220fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
221    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
222        .map(|_| {
223            let (sender, receiver) = mpsc::channel(1);
224            (Some(sender), Some(receiver))
225        })
226        .unzip();
227    (SenderList::new(senders), receivers)
228}
229
230impl RegionScanner for SeriesScan {
231    fn properties(&self) -> &ScannerProperties {
232        &self.properties
233    }
234
235    fn schema(&self) -> SchemaRef {
236        self.stream_ctx.input.mapper.output_schema()
237    }
238
239    fn metadata(&self) -> RegionMetadataRef {
240        self.stream_ctx.input.mapper.metadata().clone()
241    }
242
243    fn scan_partition(
244        &self,
245        metrics_set: &ExecutionPlanMetricsSet,
246        partition: usize,
247    ) -> Result<SendableRecordBatchStream, BoxedError> {
248        self.scan_partition_impl(metrics_set, partition)
249            .map_err(BoxedError::new)
250    }
251
252    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
253        self.properties.prepare(request);
254        Ok(())
255    }
256
257    fn has_predicate(&self) -> bool {
258        let predicate = self.stream_ctx.input.predicate();
259        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
260    }
261
262    fn set_logical_region(&mut self, logical_region: bool) {
263        self.properties.set_logical_region(logical_region);
264    }
265}
266
267impl DisplayAs for SeriesScan {
268    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
269        write!(
270            f,
271            "SeriesScan: region={}, ",
272            self.stream_ctx.input.mapper.metadata().region_id
273        )?;
274        match t {
275            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
276            DisplayFormatType::Verbose => {
277                self.stream_ctx.format_for_explain(true, f)?;
278                self.metrics_list.format_verbose_metrics(f)
279            }
280        }
281    }
282}
283
284impl fmt::Debug for SeriesScan {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        f.debug_struct("SeriesScan")
287            .field("num_ranges", &self.stream_ctx.ranges.len())
288            .finish()
289    }
290}
291
292#[cfg(test)]
293impl SeriesScan {
294    /// Returns the input.
295    pub(crate) fn input(&self) -> &ScanInput {
296        &self.stream_ctx.input
297    }
298}
299
300/// The distributor scans series and distributes them to different partitions.
301struct SeriesDistributor {
302    /// Context for the scan stream.
303    stream_ctx: Arc<StreamContext>,
304    /// Optional semaphore for limiting the number of concurrent scans.
305    semaphore: Option<Arc<Semaphore>>,
306    /// Partition ranges to scan.
307    partitions: Vec<Vec<PartitionRange>>,
308    /// Senders of all partitions.
309    senders: SenderList,
310    /// Metrics set to report.
311    /// The distributor report the metrics as an additional partition.
312    /// This may double the scan cost of the [SeriesScan] metrics. We can
313    /// get per-partition metrics in verbose mode to see the metrics of the
314    /// distributor.
315    metrics_set: ExecutionPlanMetricsSet,
316    metrics_list: Arc<PartitionMetricsList>,
317}
318
319impl SeriesDistributor {
320    /// Executes the distributor.
321    async fn execute(&mut self) {
322        if let Err(e) = self.scan_partitions().await {
323            self.senders.send_error(e).await;
324        }
325    }
326
327    /// Scans all parts.
328    async fn scan_partitions(&mut self) -> Result<()> {
329        let part_metrics = new_partition_metrics(
330            &self.stream_ctx,
331            &self.metrics_set,
332            self.partitions.len(),
333            &self.metrics_list,
334        );
335        part_metrics.on_first_poll();
336
337        let range_builder_list = Arc::new(RangeBuilderList::new(
338            self.stream_ctx.input.num_memtables(),
339            self.stream_ctx.input.num_files(),
340        ));
341        // Scans all parts.
342        let mut sources = Vec::with_capacity(self.partitions.len());
343        for partition in &self.partitions {
344            sources.reserve(partition.len());
345            for part_range in partition {
346                build_sources(
347                    &self.stream_ctx,
348                    part_range,
349                    false,
350                    &part_metrics,
351                    range_builder_list.clone(),
352                    &mut sources,
353                )
354                .await?;
355            }
356        }
357
358        // Builds a reader that merge sources from all parts.
359        let mut reader =
360            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
361                .await?;
362        let mut metrics = SeriesDistributorMetrics::default();
363        let mut fetch_start = Instant::now();
364
365        let mut current_series = SeriesBatch::default();
366        while let Some(batch) = reader.next_batch().await? {
367            metrics.scan_cost += fetch_start.elapsed();
368            fetch_start = Instant::now();
369            metrics.num_batches += 1;
370            metrics.num_rows += batch.num_rows();
371
372            debug_assert!(!batch.is_empty());
373            if batch.is_empty() {
374                continue;
375            }
376
377            let Some(last_key) = current_series.current_key() else {
378                current_series.push(batch);
379                continue;
380            };
381
382            if last_key == batch.primary_key() {
383                current_series.push(batch);
384                continue;
385            }
386
387            // We find a new series, send the current one.
388            let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
389            let yield_start = Instant::now();
390            self.senders.send_batch(to_send).await?;
391            metrics.yield_cost += yield_start.elapsed();
392        }
393
394        if !current_series.is_empty() {
395            let yield_start = Instant::now();
396            self.senders.send_batch(current_series).await?;
397            metrics.yield_cost += yield_start.elapsed();
398        }
399
400        metrics.scan_cost += fetch_start.elapsed();
401        metrics.num_series_send_timeout = self.senders.num_timeout;
402        part_metrics.set_distributor_metrics(&metrics);
403
404        part_metrics.on_finish();
405
406        Ok(())
407    }
408}
409
410/// Batches of the same series.
411#[derive(Default)]
412pub struct SeriesBatch {
413    pub batches: SmallVec<[Batch; 4]>,
414}
415
416impl SeriesBatch {
417    /// Creates a new [SeriesBatch] from a single [Batch].
418    fn single(batch: Batch) -> Self {
419        Self {
420            batches: smallvec![batch],
421        }
422    }
423
424    fn current_key(&self) -> Option<&[u8]> {
425        self.batches.first().map(|batch| batch.primary_key())
426    }
427
428    fn push(&mut self, batch: Batch) {
429        self.batches.push(batch);
430    }
431
432    /// Returns true if there is no batch.
433    fn is_empty(&self) -> bool {
434        self.batches.is_empty()
435    }
436}
437
438/// List of senders.
439struct SenderList {
440    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
441    /// Number of None senders.
442    num_nones: usize,
443    /// Index of the current partition to send.
444    sender_idx: usize,
445    /// Number of timeout.
446    num_timeout: usize,
447}
448
449impl SenderList {
450    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
451        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
452        Self {
453            senders,
454            num_nones,
455            sender_idx: 0,
456            num_timeout: 0,
457        }
458    }
459
460    /// Finds a partition and tries to send the batch to the partition.
461    /// Returns None if it sends successfully.
462    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
463        for _ in 0..self.senders.len() {
464            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
465
466            let sender_idx = self.fetch_add_sender_idx();
467            let Some(sender) = &self.senders[sender_idx] else {
468                continue;
469            };
470
471            match sender.try_send(Ok(batch)) {
472                Ok(()) => return Ok(None),
473                Err(TrySendError::Full(res)) => {
474                    // Safety: we send Ok.
475                    batch = res.unwrap();
476                }
477                Err(TrySendError::Closed(res)) => {
478                    self.senders[sender_idx] = None;
479                    self.num_nones += 1;
480                    // Safety: we send Ok.
481                    batch = res.unwrap();
482                }
483            }
484        }
485
486        Ok(Some(batch))
487    }
488
489    /// Finds a partition and sends the batch to the partition.
490    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
491        // Sends the batch without blocking first.
492        match self.try_send_batch(batch)? {
493            Some(b) => {
494                // Unable to send batch to partition.
495                batch = b;
496            }
497            None => {
498                return Ok(());
499            }
500        }
501
502        loop {
503            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
504
505            let sender_idx = self.fetch_add_sender_idx();
506            let Some(sender) = &self.senders[sender_idx] else {
507                continue;
508            };
509            // Adds a timeout to avoid blocking indefinitely and sending
510            // the batch in a round-robin fashion when some partitions
511            // don't poll their inputs. This may happen if we have a
512            // node like sort merging. But it is rare when we are using SeriesScan.
513            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
514                Ok(()) => break,
515                Err(SendTimeoutError::Timeout(res)) => {
516                    self.num_timeout += 1;
517                    // Safety: we send Ok.
518                    batch = res.unwrap();
519                }
520                Err(SendTimeoutError::Closed(res)) => {
521                    self.senders[sender_idx] = None;
522                    self.num_nones += 1;
523                    // Safety: we send Ok.
524                    batch = res.unwrap();
525                }
526            }
527        }
528
529        Ok(())
530    }
531
532    async fn send_error(&self, error: Error) {
533        let error = Arc::new(error);
534        for sender in self.senders.iter().flatten() {
535            let result = Err(error.clone()).context(ScanSeriesSnafu);
536            let _ = sender.send(result).await;
537        }
538    }
539
540    fn fetch_add_sender_idx(&mut self) -> usize {
541        let sender_idx = self.sender_idx;
542        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
543        sender_idx
544    }
545}
546
547fn new_partition_metrics(
548    stream_ctx: &StreamContext,
549    metrics_set: &ExecutionPlanMetricsSet,
550    partition: usize,
551    metrics_list: &PartitionMetricsList,
552) -> PartitionMetrics {
553    let metrics = PartitionMetrics::new(
554        stream_ctx.input.mapper.metadata().region_id,
555        partition,
556        "SeriesScan",
557        stream_ctx.query_start,
558        metrics_set,
559    );
560
561    metrics_list.set(partition, metrics.clone());
562    metrics
563}