mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::schema::SchemaRef;
28use futures::StreamExt;
29use smallvec::{smallvec, SmallVec};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
36use tokio::sync::mpsc::{self, Receiver, Sender};
37use tokio::sync::Semaphore;
38
39use crate::error::{
40    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
41    ScanSeriesSnafu, TooManyFilesToReadSnafu,
42};
43use crate::read::range::RangeBuilderList;
44use crate::read::scan_region::{ScanInput, StreamContext};
45use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
46use crate::read::seq_scan::{build_sources, SeqScan};
47use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
48use crate::read::{Batch, ScannerMetrics};
49
50/// Timeout to send a batch to a sender.
51const SEND_TIMEOUT: Duration = Duration::from_millis(10);
52
53/// List of receivers.
54type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
55
56/// Scans a region and returns sorted rows of a series in the same partition.
57///
58/// The output order is always order by `(primary key, time index)` inside every
59/// partition.
60/// Always returns the same series (primary key) to the same partition.
61pub struct SeriesScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// Receivers of each partition.
67    receivers: Mutex<ReceiverList>,
68    /// Metrics for each partition.
69    /// The scanner only sets in query and keeps it empty during compaction.
70    metrics_list: Arc<PartitionMetricsList>,
71}
72
73impl SeriesScan {
74    /// Creates a new [SeriesScan].
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            receivers: Mutex::new(Vec::new()),
86            metrics_list: Arc::new(PartitionMetricsList::default()),
87        }
88    }
89
90    fn scan_partition_impl(
91        &self,
92        ctx: &QueryScanContext,
93        metrics_set: &ExecutionPlanMetricsSet,
94        partition: usize,
95    ) -> Result<SendableRecordBatchStream> {
96        let metrics = new_partition_metrics(
97            &self.stream_ctx,
98            ctx.explain_verbose,
99            metrics_set,
100            partition,
101            &self.metrics_list,
102        );
103
104        let batch_stream =
105            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
106
107        let input = &self.stream_ctx.input;
108        let record_batch_stream = ConvertBatchStream::new(
109            batch_stream,
110            input.mapper.clone(),
111            input.cache_strategy.clone(),
112            metrics,
113        );
114
115        Ok(Box::pin(RecordBatchStreamWrapper::new(
116            input.mapper.output_schema(),
117            Box::pin(record_batch_stream),
118        )))
119    }
120
121    fn scan_batch_in_partition(
122        &self,
123        ctx: &QueryScanContext,
124        partition: usize,
125        part_metrics: PartitionMetrics,
126        metrics_set: &ExecutionPlanMetricsSet,
127    ) -> Result<ScanBatchStream> {
128        if ctx.explain_verbose {
129            common_telemetry::info!(
130                "SeriesScan partition {}, region_id: {}",
131                partition,
132                self.stream_ctx.input.region_metadata().region_id
133            );
134        }
135
136        ensure!(
137            partition < self.properties.num_partitions(),
138            PartitionOutOfRangeSnafu {
139                given: partition,
140                all: self.properties.num_partitions(),
141            }
142        );
143
144        self.maybe_start_distributor(metrics_set, &self.metrics_list);
145
146        let mut receiver = self.take_receiver(partition)?;
147        let stream = try_stream! {
148            part_metrics.on_first_poll();
149
150            let mut fetch_start = Instant::now();
151            while let Some(series) = receiver.recv().await {
152                let series = series?;
153
154                let mut metrics = ScannerMetrics::default();
155                metrics.scan_cost += fetch_start.elapsed();
156                fetch_start = Instant::now();
157
158                metrics.num_batches += series.batches.len();
159                metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
160
161                let yield_start = Instant::now();
162                yield ScanBatch::Series(series);
163                metrics.yield_cost += yield_start.elapsed();
164
165                part_metrics.merge_metrics(&metrics);
166            }
167
168            part_metrics.on_finish();
169        };
170        Ok(Box::pin(stream))
171    }
172
173    /// Takes the receiver for the partition.
174    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
175        let mut rx_list = self.receivers.lock().unwrap();
176        rx_list[partition]
177            .take()
178            .context(ScanMultiTimesSnafu { partition })
179    }
180
181    /// Starts the distributor if the receiver list is empty.
182    fn maybe_start_distributor(
183        &self,
184        metrics_set: &ExecutionPlanMetricsSet,
185        metrics_list: &Arc<PartitionMetricsList>,
186    ) {
187        let mut rx_list = self.receivers.lock().unwrap();
188        if !rx_list.is_empty() {
189            return;
190        }
191
192        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
193        let mut distributor = SeriesDistributor {
194            stream_ctx: self.stream_ctx.clone(),
195            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
196            partitions: self.properties.partitions.clone(),
197            senders,
198            metrics_set: metrics_set.clone(),
199            metrics_list: metrics_list.clone(),
200        };
201        common_runtime::spawn_global(async move {
202            distributor.execute().await;
203        });
204
205        *rx_list = receivers;
206    }
207
208    /// Scans the region and returns a stream.
209    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
210        let part_num = self.properties.num_partitions();
211        let metrics_set = ExecutionPlanMetricsSet::default();
212        let streams = (0..part_num)
213            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
214            .collect::<Result<Vec<_>, BoxedError>>()?;
215        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
216        Ok(Box::pin(chained_stream))
217    }
218
219    /// Scan [`Batch`] in all partitions one by one.
220    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
221        let metrics_set = ExecutionPlanMetricsSet::new();
222
223        let streams = (0..self.properties.partitions.len())
224            .map(|partition| {
225                let metrics = new_partition_metrics(
226                    &self.stream_ctx,
227                    false,
228                    &metrics_set,
229                    partition,
230                    &self.metrics_list,
231                );
232
233                self.scan_batch_in_partition(
234                    &QueryScanContext::default(),
235                    partition,
236                    metrics,
237                    &metrics_set,
238                )
239            })
240            .collect::<Result<Vec<_>>>()?;
241
242        Ok(Box::pin(futures::stream::iter(streams).flatten()))
243    }
244
245    /// Checks resource limit for the scanner.
246    pub(crate) fn check_scan_limit(&self) -> Result<()> {
247        // Sum the total number of files across all partitions
248        let total_files: usize = self
249            .properties
250            .partitions
251            .iter()
252            .flat_map(|partition| partition.iter())
253            .map(|part_range| {
254                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
255                range_meta.indices.len()
256            })
257            .sum();
258
259        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
260        if total_files > max_concurrent_files {
261            return TooManyFilesToReadSnafu {
262                actual: total_files,
263                max: max_concurrent_files,
264            }
265            .fail();
266        }
267
268        Ok(())
269    }
270}
271
272fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
273    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
274        .map(|_| {
275            let (sender, receiver) = mpsc::channel(1);
276            (Some(sender), Some(receiver))
277        })
278        .unzip();
279    (SenderList::new(senders), receivers)
280}
281
282impl RegionScanner for SeriesScan {
283    fn properties(&self) -> &ScannerProperties {
284        &self.properties
285    }
286
287    fn schema(&self) -> SchemaRef {
288        self.stream_ctx.input.mapper.output_schema()
289    }
290
291    fn metadata(&self) -> RegionMetadataRef {
292        self.stream_ctx.input.mapper.metadata().clone()
293    }
294
295    fn scan_partition(
296        &self,
297        ctx: &QueryScanContext,
298        metrics_set: &ExecutionPlanMetricsSet,
299        partition: usize,
300    ) -> Result<SendableRecordBatchStream, BoxedError> {
301        self.scan_partition_impl(ctx, metrics_set, partition)
302            .map_err(BoxedError::new)
303    }
304
305    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
306        self.properties.prepare(request);
307
308        self.check_scan_limit().map_err(BoxedError::new)?;
309
310        Ok(())
311    }
312
313    fn has_predicate(&self) -> bool {
314        let predicate = self.stream_ctx.input.predicate();
315        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
316    }
317
318    fn set_logical_region(&mut self, logical_region: bool) {
319        self.properties.set_logical_region(logical_region);
320    }
321}
322
323impl DisplayAs for SeriesScan {
324    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
325        write!(
326            f,
327            "SeriesScan: region={}, ",
328            self.stream_ctx.input.mapper.metadata().region_id
329        )?;
330        match t {
331            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
332            DisplayFormatType::Verbose => {
333                self.stream_ctx.format_for_explain(true, f)?;
334                self.metrics_list.format_verbose_metrics(f)
335            }
336        }
337    }
338}
339
340impl fmt::Debug for SeriesScan {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        f.debug_struct("SeriesScan")
343            .field("num_ranges", &self.stream_ctx.ranges.len())
344            .finish()
345    }
346}
347
348#[cfg(test)]
349impl SeriesScan {
350    /// Returns the input.
351    pub(crate) fn input(&self) -> &ScanInput {
352        &self.stream_ctx.input
353    }
354}
355
356/// The distributor scans series and distributes them to different partitions.
357struct SeriesDistributor {
358    /// Context for the scan stream.
359    stream_ctx: Arc<StreamContext>,
360    /// Optional semaphore for limiting the number of concurrent scans.
361    semaphore: Option<Arc<Semaphore>>,
362    /// Partition ranges to scan.
363    partitions: Vec<Vec<PartitionRange>>,
364    /// Senders of all partitions.
365    senders: SenderList,
366    /// Metrics set to report.
367    /// The distributor report the metrics as an additional partition.
368    /// This may double the scan cost of the [SeriesScan] metrics. We can
369    /// get per-partition metrics in verbose mode to see the metrics of the
370    /// distributor.
371    metrics_set: ExecutionPlanMetricsSet,
372    metrics_list: Arc<PartitionMetricsList>,
373}
374
375impl SeriesDistributor {
376    /// Executes the distributor.
377    async fn execute(&mut self) {
378        if let Err(e) = self.scan_partitions().await {
379            self.senders.send_error(e).await;
380        }
381    }
382
383    /// Scans all parts.
384    async fn scan_partitions(&mut self) -> Result<()> {
385        let part_metrics = new_partition_metrics(
386            &self.stream_ctx,
387            false,
388            &self.metrics_set,
389            self.partitions.len(),
390            &self.metrics_list,
391        );
392        part_metrics.on_first_poll();
393
394        let range_builder_list = Arc::new(RangeBuilderList::new(
395            self.stream_ctx.input.num_memtables(),
396            self.stream_ctx.input.num_files(),
397        ));
398        // Scans all parts.
399        let mut sources = Vec::with_capacity(self.partitions.len());
400        for partition in &self.partitions {
401            sources.reserve(partition.len());
402            for part_range in partition {
403                build_sources(
404                    &self.stream_ctx,
405                    part_range,
406                    false,
407                    &part_metrics,
408                    range_builder_list.clone(),
409                    &mut sources,
410                )
411                .await?;
412            }
413        }
414
415        // Builds a reader that merge sources from all parts.
416        let mut reader =
417            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
418                .await?;
419        let mut metrics = SeriesDistributorMetrics::default();
420        let mut fetch_start = Instant::now();
421
422        let mut current_series = SeriesBatch::default();
423        while let Some(batch) = reader.next_batch().await? {
424            metrics.scan_cost += fetch_start.elapsed();
425            fetch_start = Instant::now();
426            metrics.num_batches += 1;
427            metrics.num_rows += batch.num_rows();
428
429            debug_assert!(!batch.is_empty());
430            if batch.is_empty() {
431                continue;
432            }
433
434            let Some(last_key) = current_series.current_key() else {
435                current_series.push(batch);
436                continue;
437            };
438
439            if last_key == batch.primary_key() {
440                current_series.push(batch);
441                continue;
442            }
443
444            // We find a new series, send the current one.
445            let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
446            let yield_start = Instant::now();
447            self.senders.send_batch(to_send).await?;
448            metrics.yield_cost += yield_start.elapsed();
449        }
450
451        if !current_series.is_empty() {
452            let yield_start = Instant::now();
453            self.senders.send_batch(current_series).await?;
454            metrics.yield_cost += yield_start.elapsed();
455        }
456
457        metrics.scan_cost += fetch_start.elapsed();
458        metrics.num_series_send_timeout = self.senders.num_timeout;
459        metrics.num_series_send_full = self.senders.num_full;
460        part_metrics.set_distributor_metrics(&metrics);
461
462        part_metrics.on_finish();
463
464        Ok(())
465    }
466}
467
468/// Batches of the same series.
469#[derive(Default)]
470pub struct SeriesBatch {
471    pub batches: SmallVec<[Batch; 4]>,
472}
473
474impl SeriesBatch {
475    /// Creates a new [SeriesBatch] from a single [Batch].
476    fn single(batch: Batch) -> Self {
477        Self {
478            batches: smallvec![batch],
479        }
480    }
481
482    fn current_key(&self) -> Option<&[u8]> {
483        self.batches.first().map(|batch| batch.primary_key())
484    }
485
486    fn push(&mut self, batch: Batch) {
487        self.batches.push(batch);
488    }
489
490    /// Returns true if there is no batch.
491    fn is_empty(&self) -> bool {
492        self.batches.is_empty()
493    }
494}
495
496/// List of senders.
497struct SenderList {
498    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
499    /// Number of None senders.
500    num_nones: usize,
501    /// Index of the current partition to send.
502    sender_idx: usize,
503    /// Number of timeout.
504    num_timeout: usize,
505    /// Number of full senders.
506    num_full: usize,
507}
508
509impl SenderList {
510    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
511        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
512        Self {
513            senders,
514            num_nones,
515            sender_idx: 0,
516            num_timeout: 0,
517            num_full: 0,
518        }
519    }
520
521    /// Finds a partition and tries to send the batch to the partition.
522    /// Returns None if it sends successfully.
523    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
524        for _ in 0..self.senders.len() {
525            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
526
527            let sender_idx = self.fetch_add_sender_idx();
528            let Some(sender) = &self.senders[sender_idx] else {
529                continue;
530            };
531
532            match sender.try_send(Ok(batch)) {
533                Ok(()) => return Ok(None),
534                Err(TrySendError::Full(res)) => {
535                    self.num_full += 1;
536                    // Safety: we send Ok.
537                    batch = res.unwrap();
538                }
539                Err(TrySendError::Closed(res)) => {
540                    self.senders[sender_idx] = None;
541                    self.num_nones += 1;
542                    // Safety: we send Ok.
543                    batch = res.unwrap();
544                }
545            }
546        }
547
548        Ok(Some(batch))
549    }
550
551    /// Finds a partition and sends the batch to the partition.
552    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
553        // Sends the batch without blocking first.
554        match self.try_send_batch(batch)? {
555            Some(b) => {
556                // Unable to send batch to partition.
557                batch = b;
558            }
559            None => {
560                return Ok(());
561            }
562        }
563
564        loop {
565            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
566
567            let sender_idx = self.fetch_add_sender_idx();
568            let Some(sender) = &self.senders[sender_idx] else {
569                continue;
570            };
571            // Adds a timeout to avoid blocking indefinitely and sending
572            // the batch in a round-robin fashion when some partitions
573            // don't poll their inputs. This may happen if we have a
574            // node like sort merging. But it is rare when we are using SeriesScan.
575            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
576                Ok(()) => break,
577                Err(SendTimeoutError::Timeout(res)) => {
578                    self.num_timeout += 1;
579                    // Safety: we send Ok.
580                    batch = res.unwrap();
581                }
582                Err(SendTimeoutError::Closed(res)) => {
583                    self.senders[sender_idx] = None;
584                    self.num_nones += 1;
585                    // Safety: we send Ok.
586                    batch = res.unwrap();
587                }
588            }
589        }
590
591        Ok(())
592    }
593
594    async fn send_error(&self, error: Error) {
595        let error = Arc::new(error);
596        for sender in self.senders.iter().flatten() {
597            let result = Err(error.clone()).context(ScanSeriesSnafu);
598            let _ = sender.send(result).await;
599        }
600    }
601
602    fn fetch_add_sender_idx(&mut self) -> usize {
603        let sender_idx = self.sender_idx;
604        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
605        sender_idx
606    }
607}
608
609fn new_partition_metrics(
610    stream_ctx: &StreamContext,
611    explain_verbose: bool,
612    metrics_set: &ExecutionPlanMetricsSet,
613    partition: usize,
614    metrics_list: &PartitionMetricsList,
615) -> PartitionMetrics {
616    let metrics = PartitionMetrics::new(
617        stream_ctx.input.mapper.metadata().region_id,
618        partition,
619        "SeriesScan",
620        stream_ctx.query_start,
621        explain_verbose,
622        metrics_set,
623    );
624
625    metrics_list.set(partition, metrics.clone());
626    metrics
627}