mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::StreamExt;
29use smallvec::{smallvec, SmallVec};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
36use tokio::sync::mpsc::{self, Receiver, Sender};
37use tokio::sync::Semaphore;
38
39use crate::error::{
40    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
41    ScanSeriesSnafu, TooManyFilesToReadSnafu,
42};
43use crate::read::range::RangeBuilderList;
44use crate::read::scan_region::{ScanInput, StreamContext};
45use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
46use crate::read::seq_scan::{build_sources, SeqScan};
47use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
48use crate::read::{Batch, ScannerMetrics};
49
50/// Timeout to send a batch to a sender.
51const SEND_TIMEOUT: Duration = Duration::from_millis(10);
52
53/// List of receivers.
54type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
55
56/// Scans a region and returns sorted rows of a series in the same partition.
57///
58/// The output order is always order by `(primary key, time index)` inside every
59/// partition.
60/// Always returns the same series (primary key) to the same partition.
61pub struct SeriesScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// Receivers of each partition.
67    receivers: Mutex<ReceiverList>,
68    /// Metrics for each partition.
69    /// The scanner only sets in query and keeps it empty during compaction.
70    metrics_list: Arc<PartitionMetricsList>,
71}
72
73impl SeriesScan {
74    /// Creates a new [SeriesScan].
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            receivers: Mutex::new(Vec::new()),
86            metrics_list: Arc::new(PartitionMetricsList::default()),
87        }
88    }
89
90    fn scan_partition_impl(
91        &self,
92        ctx: &QueryScanContext,
93        metrics_set: &ExecutionPlanMetricsSet,
94        partition: usize,
95    ) -> Result<SendableRecordBatchStream> {
96        let metrics = new_partition_metrics(
97            &self.stream_ctx,
98            ctx.explain_verbose,
99            metrics_set,
100            partition,
101            &self.metrics_list,
102        );
103
104        let batch_stream =
105            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
106
107        let input = &self.stream_ctx.input;
108        let record_batch_stream = ConvertBatchStream::new(
109            batch_stream,
110            input.mapper.clone(),
111            input.cache_strategy.clone(),
112            metrics,
113        );
114
115        Ok(Box::pin(RecordBatchStreamWrapper::new(
116            input.mapper.output_schema(),
117            Box::pin(record_batch_stream),
118        )))
119    }
120
121    fn scan_batch_in_partition(
122        &self,
123        ctx: &QueryScanContext,
124        partition: usize,
125        part_metrics: PartitionMetrics,
126        metrics_set: &ExecutionPlanMetricsSet,
127    ) -> Result<ScanBatchStream> {
128        if ctx.explain_verbose {
129            common_telemetry::info!(
130                "SeriesScan partition {}, region_id: {}",
131                partition,
132                self.stream_ctx.input.region_metadata().region_id
133            );
134        }
135
136        ensure!(
137            partition < self.properties.num_partitions(),
138            PartitionOutOfRangeSnafu {
139                given: partition,
140                all: self.properties.num_partitions(),
141            }
142        );
143
144        self.maybe_start_distributor(metrics_set, &self.metrics_list);
145
146        let mut receiver = self.take_receiver(partition)?;
147        let stream = try_stream! {
148            part_metrics.on_first_poll();
149
150            let mut fetch_start = Instant::now();
151            while let Some(series) = receiver.recv().await {
152                let series = series?;
153
154                let mut metrics = ScannerMetrics::default();
155                metrics.scan_cost += fetch_start.elapsed();
156                fetch_start = Instant::now();
157
158                metrics.num_batches += series.batches.len();
159                metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
160
161                let yield_start = Instant::now();
162                yield ScanBatch::Series(series);
163                metrics.yield_cost += yield_start.elapsed();
164
165                part_metrics.merge_metrics(&metrics);
166            }
167
168            part_metrics.on_finish();
169        };
170        Ok(Box::pin(stream))
171    }
172
173    /// Takes the receiver for the partition.
174    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
175        let mut rx_list = self.receivers.lock().unwrap();
176        rx_list[partition]
177            .take()
178            .context(ScanMultiTimesSnafu { partition })
179    }
180
181    /// Starts the distributor if the receiver list is empty.
182    fn maybe_start_distributor(
183        &self,
184        metrics_set: &ExecutionPlanMetricsSet,
185        metrics_list: &Arc<PartitionMetricsList>,
186    ) {
187        let mut rx_list = self.receivers.lock().unwrap();
188        if !rx_list.is_empty() {
189            return;
190        }
191
192        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
193        let mut distributor = SeriesDistributor {
194            stream_ctx: self.stream_ctx.clone(),
195            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
196            partitions: self.properties.partitions.clone(),
197            senders,
198            metrics_set: metrics_set.clone(),
199            metrics_list: metrics_list.clone(),
200        };
201        common_runtime::spawn_global(async move {
202            distributor.execute().await;
203        });
204
205        *rx_list = receivers;
206    }
207
208    /// Scans the region and returns a stream.
209    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
210        let part_num = self.properties.num_partitions();
211        let metrics_set = ExecutionPlanMetricsSet::default();
212        let streams = (0..part_num)
213            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
214            .collect::<Result<Vec<_>, BoxedError>>()?;
215        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
216        Ok(Box::pin(chained_stream))
217    }
218
219    /// Scan [`Batch`] in all partitions one by one.
220    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
221        let metrics_set = ExecutionPlanMetricsSet::new();
222
223        let streams = (0..self.properties.partitions.len())
224            .map(|partition| {
225                let metrics = new_partition_metrics(
226                    &self.stream_ctx,
227                    false,
228                    &metrics_set,
229                    partition,
230                    &self.metrics_list,
231                );
232
233                self.scan_batch_in_partition(
234                    &QueryScanContext::default(),
235                    partition,
236                    metrics,
237                    &metrics_set,
238                )
239            })
240            .collect::<Result<Vec<_>>>()?;
241
242        Ok(Box::pin(futures::stream::iter(streams).flatten()))
243    }
244
245    /// Checks resource limit for the scanner.
246    pub(crate) fn check_scan_limit(&self) -> Result<()> {
247        // Sum the total number of files across all partitions
248        let total_files: usize = self
249            .properties
250            .partitions
251            .iter()
252            .flat_map(|partition| partition.iter())
253            .map(|part_range| {
254                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
255                range_meta.indices.len()
256            })
257            .sum();
258
259        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
260        if total_files > max_concurrent_files {
261            return TooManyFilesToReadSnafu {
262                actual: total_files,
263                max: max_concurrent_files,
264            }
265            .fail();
266        }
267
268        Ok(())
269    }
270}
271
272fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
273    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
274        .map(|_| {
275            let (sender, receiver) = mpsc::channel(1);
276            (Some(sender), Some(receiver))
277        })
278        .unzip();
279    (SenderList::new(senders), receivers)
280}
281
282impl RegionScanner for SeriesScan {
283    fn properties(&self) -> &ScannerProperties {
284        &self.properties
285    }
286
287    fn schema(&self) -> SchemaRef {
288        self.stream_ctx.input.mapper.output_schema()
289    }
290
291    fn metadata(&self) -> RegionMetadataRef {
292        self.stream_ctx.input.mapper.metadata().clone()
293    }
294
295    fn scan_partition(
296        &self,
297        ctx: &QueryScanContext,
298        metrics_set: &ExecutionPlanMetricsSet,
299        partition: usize,
300    ) -> Result<SendableRecordBatchStream, BoxedError> {
301        self.scan_partition_impl(ctx, metrics_set, partition)
302            .map_err(BoxedError::new)
303    }
304
305    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
306        self.properties.prepare(request);
307
308        self.check_scan_limit().map_err(BoxedError::new)?;
309
310        Ok(())
311    }
312
313    fn has_predicate(&self) -> bool {
314        let predicate = self.stream_ctx.input.predicate();
315        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
316    }
317
318    fn set_logical_region(&mut self, logical_region: bool) {
319        self.properties.set_logical_region(logical_region);
320    }
321}
322
323impl DisplayAs for SeriesScan {
324    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
325        write!(
326            f,
327            "SeriesScan: region={}, ",
328            self.stream_ctx.input.mapper.metadata().region_id
329        )?;
330        match t {
331            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
332                self.stream_ctx.format_for_explain(false, f)
333            }
334            DisplayFormatType::Verbose => {
335                self.stream_ctx.format_for_explain(true, f)?;
336                self.metrics_list.format_verbose_metrics(f)
337            }
338        }
339    }
340}
341
342impl fmt::Debug for SeriesScan {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        f.debug_struct("SeriesScan")
345            .field("num_ranges", &self.stream_ctx.ranges.len())
346            .finish()
347    }
348}
349
350#[cfg(test)]
351impl SeriesScan {
352    /// Returns the input.
353    pub(crate) fn input(&self) -> &ScanInput {
354        &self.stream_ctx.input
355    }
356}
357
358/// The distributor scans series and distributes them to different partitions.
359struct SeriesDistributor {
360    /// Context for the scan stream.
361    stream_ctx: Arc<StreamContext>,
362    /// Optional semaphore for limiting the number of concurrent scans.
363    semaphore: Option<Arc<Semaphore>>,
364    /// Partition ranges to scan.
365    partitions: Vec<Vec<PartitionRange>>,
366    /// Senders of all partitions.
367    senders: SenderList,
368    /// Metrics set to report.
369    /// The distributor report the metrics as an additional partition.
370    /// This may double the scan cost of the [SeriesScan] metrics. We can
371    /// get per-partition metrics in verbose mode to see the metrics of the
372    /// distributor.
373    metrics_set: ExecutionPlanMetricsSet,
374    metrics_list: Arc<PartitionMetricsList>,
375}
376
377impl SeriesDistributor {
378    /// Executes the distributor.
379    async fn execute(&mut self) {
380        if let Err(e) = self.scan_partitions().await {
381            self.senders.send_error(e).await;
382        }
383    }
384
385    /// Scans all parts.
386    async fn scan_partitions(&mut self) -> Result<()> {
387        let part_metrics = new_partition_metrics(
388            &self.stream_ctx,
389            false,
390            &self.metrics_set,
391            self.partitions.len(),
392            &self.metrics_list,
393        );
394        part_metrics.on_first_poll();
395
396        let range_builder_list = Arc::new(RangeBuilderList::new(
397            self.stream_ctx.input.num_memtables(),
398            self.stream_ctx.input.num_files(),
399        ));
400        // Scans all parts.
401        let mut sources = Vec::with_capacity(self.partitions.len());
402        for partition in &self.partitions {
403            sources.reserve(partition.len());
404            for part_range in partition {
405                build_sources(
406                    &self.stream_ctx,
407                    part_range,
408                    false,
409                    &part_metrics,
410                    range_builder_list.clone(),
411                    &mut sources,
412                )
413                .await?;
414            }
415        }
416
417        // Builds a reader that merge sources from all parts.
418        let mut reader =
419            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
420                .await?;
421        let mut metrics = SeriesDistributorMetrics::default();
422        let mut fetch_start = Instant::now();
423
424        let mut current_series = SeriesBatch::default();
425        while let Some(batch) = reader.next_batch().await? {
426            metrics.scan_cost += fetch_start.elapsed();
427            fetch_start = Instant::now();
428            metrics.num_batches += 1;
429            metrics.num_rows += batch.num_rows();
430
431            debug_assert!(!batch.is_empty());
432            if batch.is_empty() {
433                continue;
434            }
435
436            let Some(last_key) = current_series.current_key() else {
437                current_series.push(batch);
438                continue;
439            };
440
441            if last_key == batch.primary_key() {
442                current_series.push(batch);
443                continue;
444            }
445
446            // We find a new series, send the current one.
447            let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
448            let yield_start = Instant::now();
449            self.senders.send_batch(to_send).await?;
450            metrics.yield_cost += yield_start.elapsed();
451        }
452
453        if !current_series.is_empty() {
454            let yield_start = Instant::now();
455            self.senders.send_batch(current_series).await?;
456            metrics.yield_cost += yield_start.elapsed();
457        }
458
459        metrics.scan_cost += fetch_start.elapsed();
460        metrics.num_series_send_timeout = self.senders.num_timeout;
461        metrics.num_series_send_full = self.senders.num_full;
462        part_metrics.set_distributor_metrics(&metrics);
463
464        part_metrics.on_finish();
465
466        Ok(())
467    }
468}
469
470/// Batches of the same series.
471#[derive(Default)]
472pub struct SeriesBatch {
473    pub batches: SmallVec<[Batch; 4]>,
474}
475
476impl SeriesBatch {
477    /// Creates a new [SeriesBatch] from a single [Batch].
478    fn single(batch: Batch) -> Self {
479        Self {
480            batches: smallvec![batch],
481        }
482    }
483
484    fn current_key(&self) -> Option<&[u8]> {
485        self.batches.first().map(|batch| batch.primary_key())
486    }
487
488    fn push(&mut self, batch: Batch) {
489        self.batches.push(batch);
490    }
491
492    /// Returns true if there is no batch.
493    fn is_empty(&self) -> bool {
494        self.batches.is_empty()
495    }
496}
497
498/// List of senders.
499struct SenderList {
500    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
501    /// Number of None senders.
502    num_nones: usize,
503    /// Index of the current partition to send.
504    sender_idx: usize,
505    /// Number of timeout.
506    num_timeout: usize,
507    /// Number of full senders.
508    num_full: usize,
509}
510
511impl SenderList {
512    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
513        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
514        Self {
515            senders,
516            num_nones,
517            sender_idx: 0,
518            num_timeout: 0,
519            num_full: 0,
520        }
521    }
522
523    /// Finds a partition and tries to send the batch to the partition.
524    /// Returns None if it sends successfully.
525    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
526        for _ in 0..self.senders.len() {
527            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
528
529            let sender_idx = self.fetch_add_sender_idx();
530            let Some(sender) = &self.senders[sender_idx] else {
531                continue;
532            };
533
534            match sender.try_send(Ok(batch)) {
535                Ok(()) => return Ok(None),
536                Err(TrySendError::Full(res)) => {
537                    self.num_full += 1;
538                    // Safety: we send Ok.
539                    batch = res.unwrap();
540                }
541                Err(TrySendError::Closed(res)) => {
542                    self.senders[sender_idx] = None;
543                    self.num_nones += 1;
544                    // Safety: we send Ok.
545                    batch = res.unwrap();
546                }
547            }
548        }
549
550        Ok(Some(batch))
551    }
552
553    /// Finds a partition and sends the batch to the partition.
554    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
555        // Sends the batch without blocking first.
556        match self.try_send_batch(batch)? {
557            Some(b) => {
558                // Unable to send batch to partition.
559                batch = b;
560            }
561            None => {
562                return Ok(());
563            }
564        }
565
566        loop {
567            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
568
569            let sender_idx = self.fetch_add_sender_idx();
570            let Some(sender) = &self.senders[sender_idx] else {
571                continue;
572            };
573            // Adds a timeout to avoid blocking indefinitely and sending
574            // the batch in a round-robin fashion when some partitions
575            // don't poll their inputs. This may happen if we have a
576            // node like sort merging. But it is rare when we are using SeriesScan.
577            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
578                Ok(()) => break,
579                Err(SendTimeoutError::Timeout(res)) => {
580                    self.num_timeout += 1;
581                    // Safety: we send Ok.
582                    batch = res.unwrap();
583                }
584                Err(SendTimeoutError::Closed(res)) => {
585                    self.senders[sender_idx] = None;
586                    self.num_nones += 1;
587                    // Safety: we send Ok.
588                    batch = res.unwrap();
589                }
590            }
591        }
592
593        Ok(())
594    }
595
596    async fn send_error(&self, error: Error) {
597        let error = Arc::new(error);
598        for sender in self.senders.iter().flatten() {
599            let result = Err(error.clone()).context(ScanSeriesSnafu);
600            let _ = sender.send(result).await;
601        }
602    }
603
604    fn fetch_add_sender_idx(&mut self) -> usize {
605        let sender_idx = self.sender_idx;
606        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
607        sender_idx
608    }
609}
610
611fn new_partition_metrics(
612    stream_ctx: &StreamContext,
613    explain_verbose: bool,
614    metrics_set: &ExecutionPlanMetricsSet,
615    partition: usize,
616    metrics_list: &PartitionMetricsList,
617) -> PartitionMetrics {
618    let metrics = PartitionMetrics::new(
619        stream_ctx.input.mapper.metadata().region_id,
620        partition,
621        "SeriesScan",
622        stream_ctx.query_start,
623        explain_verbose,
624        metrics_set,
625    );
626
627    metrics_list.set(partition, metrics.clone());
628    metrics
629}