mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::util::ChainedRecordBatchStream;
25use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::compute::concat_batches;
29use datatypes::schema::SchemaRef;
30use smallvec::{smallvec, SmallVec};
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metadata::RegionMetadataRef;
33use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
34use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
35use tokio::sync::mpsc::{self, Receiver, Sender};
36use tokio::sync::Semaphore;
37
38use crate::error::{
39    ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
40    ScanMultiTimesSnafu, ScanSeriesSnafu,
41};
42use crate::read::range::RangeBuilderList;
43use crate::read::scan_region::{ScanInput, StreamContext};
44use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
45use crate::read::seq_scan::{build_sources, SeqScan};
46use crate::read::{Batch, ScannerMetrics};
47
48/// Timeout to send a batch to a sender.
49const SEND_TIMEOUT: Duration = Duration::from_millis(10);
50
51/// List of receivers.
52type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
53
54/// Scans a region and returns sorted rows of a series in the same partition.
55///
56/// The output order is always order by `(primary key, time index)` inside every
57/// partition.
58/// Always returns the same series (primary key) to the same partition.
59pub struct SeriesScan {
60    /// Properties of the scanner.
61    properties: ScannerProperties,
62    /// Context of streams.
63    stream_ctx: Arc<StreamContext>,
64    /// Receivers of each partition.
65    receivers: Mutex<ReceiverList>,
66    /// Metrics for each partition.
67    /// The scanner only sets in query and keeps it empty during compaction.
68    metrics_list: Arc<PartitionMetricsList>,
69}
70
71impl SeriesScan {
72    /// Creates a new [SeriesScan].
73    pub(crate) fn new(input: ScanInput) -> Self {
74        let mut properties = ScannerProperties::default()
75            .with_append_mode(input.append_mode)
76            .with_total_rows(input.total_rows());
77        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
78        properties.partitions = vec![stream_ctx.partition_ranges()];
79
80        Self {
81            properties,
82            stream_ctx,
83            receivers: Mutex::new(Vec::new()),
84            metrics_list: Arc::new(PartitionMetricsList::default()),
85        }
86    }
87
88    fn scan_partition_impl(
89        &self,
90        metrics_set: &ExecutionPlanMetricsSet,
91        partition: usize,
92    ) -> Result<SendableRecordBatchStream, BoxedError> {
93        if partition >= self.properties.num_partitions() {
94            return Err(BoxedError::new(
95                PartitionOutOfRangeSnafu {
96                    given: partition,
97                    all: self.properties.num_partitions(),
98                }
99                .build(),
100            ));
101        }
102
103        self.maybe_start_distributor(metrics_set, &self.metrics_list);
104
105        let part_metrics =
106            new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
107        let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
108        let stream_ctx = self.stream_ctx.clone();
109
110        let stream = try_stream! {
111            part_metrics.on_first_poll();
112
113            let cache = &stream_ctx.input.cache_strategy;
114            let mut df_record_batches = Vec::new();
115            let mut fetch_start = Instant::now();
116            while let Some(result) = receiver.recv().await {
117                let mut metrics = ScannerMetrics::default();
118                let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
119                metrics.scan_cost += fetch_start.elapsed();
120                fetch_start = Instant::now();
121
122                let convert_start = Instant::now();
123                df_record_batches.reserve(series.batches.len());
124                for batch in series.batches {
125                    metrics.num_batches += 1;
126                    metrics.num_rows += batch.num_rows();
127
128                    let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
129                    df_record_batches.push(record_batch.into_df_record_batch());
130                }
131
132                let output_schema = stream_ctx.input.mapper.output_schema();
133                let df_record_batch =
134                    concat_batches(output_schema.arrow_schema(), &df_record_batches)
135                        .context(ComputeArrowSnafu)
136                        .map_err(BoxedError::new)
137                        .context(ExternalSnafu)?;
138                df_record_batches.clear();
139                let record_batch =
140                    RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
141                metrics.convert_cost += convert_start.elapsed();
142
143                let yield_start = Instant::now();
144                yield record_batch;
145                metrics.yield_cost += yield_start.elapsed();
146
147                part_metrics.merge_metrics(&metrics);
148            }
149        };
150
151        let stream = Box::pin(RecordBatchStreamWrapper::new(
152            self.stream_ctx.input.mapper.output_schema(),
153            Box::pin(stream),
154        ));
155
156        Ok(stream)
157    }
158
159    /// Takes the receiver for the partition.
160    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
161        let mut rx_list = self.receivers.lock().unwrap();
162        rx_list[partition]
163            .take()
164            .context(ScanMultiTimesSnafu { partition })
165    }
166
167    /// Starts the distributor if the receiver list is empty.
168    fn maybe_start_distributor(
169        &self,
170        metrics_set: &ExecutionPlanMetricsSet,
171        metrics_list: &Arc<PartitionMetricsList>,
172    ) {
173        let mut rx_list = self.receivers.lock().unwrap();
174        if !rx_list.is_empty() {
175            return;
176        }
177
178        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
179        let mut distributor = SeriesDistributor {
180            stream_ctx: self.stream_ctx.clone(),
181            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
182            partitions: self.properties.partitions.clone(),
183            senders,
184            metrics_set: metrics_set.clone(),
185            metrics_list: metrics_list.clone(),
186        };
187        common_runtime::spawn_global(async move {
188            distributor.execute().await;
189        });
190
191        *rx_list = receivers;
192    }
193
194    /// Scans the region and returns a stream.
195    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
196        let part_num = self.properties.num_partitions();
197        let metrics_set = ExecutionPlanMetricsSet::default();
198        let streams = (0..part_num)
199            .map(|i| self.scan_partition(&metrics_set, i))
200            .collect::<Result<Vec<_>, BoxedError>>()?;
201        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
202        Ok(Box::pin(chained_stream))
203    }
204}
205
206fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
207    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
208        .map(|_| {
209            let (sender, receiver) = mpsc::channel(1);
210            (Some(sender), Some(receiver))
211        })
212        .unzip();
213    (SenderList::new(senders), receivers)
214}
215
216impl RegionScanner for SeriesScan {
217    fn properties(&self) -> &ScannerProperties {
218        &self.properties
219    }
220
221    fn schema(&self) -> SchemaRef {
222        self.stream_ctx.input.mapper.output_schema()
223    }
224
225    fn metadata(&self) -> RegionMetadataRef {
226        self.stream_ctx.input.mapper.metadata().clone()
227    }
228
229    fn scan_partition(
230        &self,
231        metrics_set: &ExecutionPlanMetricsSet,
232        partition: usize,
233    ) -> Result<SendableRecordBatchStream, BoxedError> {
234        self.scan_partition_impl(metrics_set, partition)
235    }
236
237    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
238        self.properties.prepare(request);
239        Ok(())
240    }
241
242    fn has_predicate(&self) -> bool {
243        let predicate = self.stream_ctx.input.predicate();
244        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
245    }
246
247    fn set_logical_region(&mut self, logical_region: bool) {
248        self.properties.set_logical_region(logical_region);
249    }
250}
251
252impl DisplayAs for SeriesScan {
253    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
254        write!(
255            f,
256            "SeriesScan: region={}, ",
257            self.stream_ctx.input.mapper.metadata().region_id
258        )?;
259        match t {
260            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
261            DisplayFormatType::Verbose => {
262                self.stream_ctx.format_for_explain(true, f)?;
263                self.metrics_list.format_verbose_metrics(f)
264            }
265        }
266    }
267}
268
269impl fmt::Debug for SeriesScan {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        f.debug_struct("SeriesScan")
272            .field("num_ranges", &self.stream_ctx.ranges.len())
273            .finish()
274    }
275}
276
277#[cfg(test)]
278impl SeriesScan {
279    /// Returns the input.
280    pub(crate) fn input(&self) -> &ScanInput {
281        &self.stream_ctx.input
282    }
283}
284
285/// The distributor scans series and distributes them to different partitions.
286struct SeriesDistributor {
287    /// Context for the scan stream.
288    stream_ctx: Arc<StreamContext>,
289    /// Optional semaphore for limiting the number of concurrent scans.
290    semaphore: Option<Arc<Semaphore>>,
291    /// Partition ranges to scan.
292    partitions: Vec<Vec<PartitionRange>>,
293    /// Senders of all partitions.
294    senders: SenderList,
295    /// Metrics set to report.
296    /// The distributor report the metrics as an additional partition.
297    /// This may double the scan cost of the [SeriesScan] metrics. We can
298    /// get per-partition metrics in verbose mode to see the metrics of the
299    /// distributor.
300    metrics_set: ExecutionPlanMetricsSet,
301    metrics_list: Arc<PartitionMetricsList>,
302}
303
304impl SeriesDistributor {
305    /// Executes the distributor.
306    async fn execute(&mut self) {
307        if let Err(e) = self.scan_partitions().await {
308            self.senders.send_error(e).await;
309        }
310    }
311
312    /// Scans all parts.
313    async fn scan_partitions(&mut self) -> Result<()> {
314        let part_metrics = new_partition_metrics(
315            &self.stream_ctx,
316            &self.metrics_set,
317            self.partitions.len(),
318            &self.metrics_list,
319        );
320        part_metrics.on_first_poll();
321
322        let range_builder_list = Arc::new(RangeBuilderList::new(
323            self.stream_ctx.input.num_memtables(),
324            self.stream_ctx.input.num_files(),
325        ));
326        // Scans all parts.
327        let mut sources = Vec::with_capacity(self.partitions.len());
328        for partition in &self.partitions {
329            sources.reserve(partition.len());
330            for part_range in partition {
331                build_sources(
332                    &self.stream_ctx,
333                    part_range,
334                    false,
335                    &part_metrics,
336                    range_builder_list.clone(),
337                    &mut sources,
338                );
339            }
340        }
341
342        // Builds a reader that merge sources from all parts.
343        let mut reader =
344            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
345                .await?;
346        let mut metrics = SeriesDistributorMetrics::default();
347        let mut fetch_start = Instant::now();
348
349        let mut current_series = SeriesBatch::default();
350        while let Some(batch) = reader.next_batch().await? {
351            metrics.scan_cost += fetch_start.elapsed();
352            fetch_start = Instant::now();
353            metrics.num_batches += 1;
354            metrics.num_rows += batch.num_rows();
355
356            debug_assert!(!batch.is_empty());
357            if batch.is_empty() {
358                continue;
359            }
360
361            let Some(last_key) = current_series.current_key() else {
362                current_series.push(batch);
363                continue;
364            };
365
366            if last_key == batch.primary_key() {
367                current_series.push(batch);
368                continue;
369            }
370
371            // We find a new series, send the current one.
372            let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
373            let yield_start = Instant::now();
374            self.senders.send_batch(to_send).await?;
375            metrics.yield_cost += yield_start.elapsed();
376        }
377
378        if !current_series.is_empty() {
379            let yield_start = Instant::now();
380            self.senders.send_batch(current_series).await?;
381            metrics.yield_cost += yield_start.elapsed();
382        }
383
384        metrics.scan_cost += fetch_start.elapsed();
385        metrics.num_series_send_timeout = self.senders.num_timeout;
386        part_metrics.set_distributor_metrics(&metrics);
387
388        part_metrics.on_finish();
389
390        Ok(())
391    }
392}
393
394/// Batches of the same series.
395#[derive(Default)]
396struct SeriesBatch {
397    batches: SmallVec<[Batch; 4]>,
398}
399
400impl SeriesBatch {
401    /// Creates a new [SeriesBatch] from a single [Batch].
402    fn single(batch: Batch) -> Self {
403        Self {
404            batches: smallvec![batch],
405        }
406    }
407
408    fn current_key(&self) -> Option<&[u8]> {
409        self.batches.first().map(|batch| batch.primary_key())
410    }
411
412    fn push(&mut self, batch: Batch) {
413        self.batches.push(batch);
414    }
415
416    /// Returns true if there is no batch.
417    fn is_empty(&self) -> bool {
418        self.batches.is_empty()
419    }
420}
421
422/// List of senders.
423struct SenderList {
424    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
425    /// Number of None senders.
426    num_nones: usize,
427    /// Index of the current partition to send.
428    sender_idx: usize,
429    /// Number of timeout.
430    num_timeout: usize,
431}
432
433impl SenderList {
434    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
435        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
436        Self {
437            senders,
438            num_nones,
439            sender_idx: 0,
440            num_timeout: 0,
441        }
442    }
443
444    /// Finds a partition and tries to send the batch to the partition.
445    /// Returns None if it sends successfully.
446    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
447        for _ in 0..self.senders.len() {
448            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
449
450            let sender_idx = self.fetch_add_sender_idx();
451            let Some(sender) = &self.senders[sender_idx] else {
452                continue;
453            };
454
455            match sender.try_send(Ok(batch)) {
456                Ok(()) => return Ok(None),
457                Err(TrySendError::Full(res)) => {
458                    // Safety: we send Ok.
459                    batch = res.unwrap();
460                }
461                Err(TrySendError::Closed(res)) => {
462                    self.senders[sender_idx] = None;
463                    self.num_nones += 1;
464                    // Safety: we send Ok.
465                    batch = res.unwrap();
466                }
467            }
468        }
469
470        Ok(Some(batch))
471    }
472
473    /// Finds a partition and sends the batch to the partition.
474    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
475        // Sends the batch without blocking first.
476        match self.try_send_batch(batch)? {
477            Some(b) => {
478                // Unable to send batch to partition.
479                batch = b;
480            }
481            None => {
482                return Ok(());
483            }
484        }
485
486        loop {
487            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
488
489            let sender_idx = self.fetch_add_sender_idx();
490            let Some(sender) = &self.senders[sender_idx] else {
491                continue;
492            };
493            // Adds a timeout to avoid blocking indefinitely and sending
494            // the batch in a round-robin fashion when some partitions
495            // don't poll their inputs. This may happen if we have a
496            // node like sort merging. But it is rare when we are using SeriesScan.
497            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
498                Ok(()) => break,
499                Err(SendTimeoutError::Timeout(res)) => {
500                    self.num_timeout += 1;
501                    // Safety: we send Ok.
502                    batch = res.unwrap();
503                }
504                Err(SendTimeoutError::Closed(res)) => {
505                    self.senders[sender_idx] = None;
506                    self.num_nones += 1;
507                    // Safety: we send Ok.
508                    batch = res.unwrap();
509                }
510            }
511        }
512
513        Ok(())
514    }
515
516    async fn send_error(&self, error: Error) {
517        let error = Arc::new(error);
518        for sender in self.senders.iter().flatten() {
519            let result = Err(error.clone()).context(ScanSeriesSnafu);
520            let _ = sender.send(result).await;
521        }
522    }
523
524    fn fetch_add_sender_idx(&mut self) -> usize {
525        let sender_idx = self.sender_idx;
526        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
527        sender_idx
528    }
529}
530
531fn new_partition_metrics(
532    stream_ctx: &StreamContext,
533    metrics_set: &ExecutionPlanMetricsSet,
534    partition: usize,
535    metrics_list: &PartitionMetricsList,
536) -> PartitionMetrics {
537    let metrics = PartitionMetrics::new(
538        stream_ctx.input.mapper.metadata().region_id,
539        partition,
540        "SeriesScan",
541        stream_ctx.query_start,
542        metrics_set,
543    );
544
545    metrics_list.set(partition, metrics.clone());
546    metrics
547}