mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::arrow::array::BinaryArray;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::schema::SchemaRef;
31use futures::{StreamExt, TryStreamExt};
32use smallvec::{SmallVec, smallvec};
33use snafu::{OptionExt, ResultExt, ensure};
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_engine::{
36    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
37};
38use tokio::sync::Semaphore;
39use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
40use tokio::sync::mpsc::{self, Receiver, Sender};
41
42use crate::error::{
43    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
44    ScanSeriesSnafu, TooManyFilesToReadSnafu,
45};
46use crate::read::range::RangeBuilderList;
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
49use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{Batch, ScannerMetrics};
52use crate::sst::parquet::flat_format::primary_key_column_index;
53use crate::sst::parquet::format::PrimaryKeyArray;
54
55/// Timeout to send a batch to a sender.
56const SEND_TIMEOUT: Duration = Duration::from_micros(100);
57
58/// List of receivers.
59type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
60
61/// Scans a region and returns sorted rows of a series in the same partition.
62///
63/// The output order is always order by `(primary key, time index)` inside every
64/// partition.
65/// Always returns the same series (primary key) to the same partition.
66pub struct SeriesScan {
67    /// Properties of the scanner.
68    properties: ScannerProperties,
69    /// Context of streams.
70    stream_ctx: Arc<StreamContext>,
71    /// Receivers of each partition.
72    receivers: Mutex<ReceiverList>,
73    /// Metrics for each partition.
74    /// The scanner only sets in query and keeps it empty during compaction.
75    metrics_list: Arc<PartitionMetricsList>,
76}
77
78impl SeriesScan {
79    /// Creates a new [SeriesScan].
80    pub(crate) fn new(input: ScanInput) -> Self {
81        let mut properties = ScannerProperties::default()
82            .with_append_mode(input.append_mode)
83            .with_total_rows(input.total_rows());
84        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
85        properties.partitions = vec![stream_ctx.partition_ranges()];
86
87        Self {
88            properties,
89            stream_ctx,
90            receivers: Mutex::new(Vec::new()),
91            metrics_list: Arc::new(PartitionMetricsList::default()),
92        }
93    }
94
95    #[tracing::instrument(
96        skip_all,
97        fields(
98            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
99            partition = partition
100        )
101    )]
102    fn scan_partition_impl(
103        &self,
104        ctx: &QueryScanContext,
105        metrics_set: &ExecutionPlanMetricsSet,
106        partition: usize,
107    ) -> Result<SendableRecordBatchStream> {
108        let metrics = new_partition_metrics(
109            &self.stream_ctx,
110            ctx.explain_verbose,
111            metrics_set,
112            partition,
113            &self.metrics_list,
114        );
115
116        let batch_stream =
117            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
118
119        let input = &self.stream_ctx.input;
120        let record_batch_stream = ConvertBatchStream::new(
121            batch_stream,
122            input.mapper.clone(),
123            input.cache_strategy.clone(),
124            metrics,
125        );
126
127        Ok(Box::pin(RecordBatchStreamWrapper::new(
128            input.mapper.output_schema(),
129            Box::pin(record_batch_stream),
130        )))
131    }
132
133    #[tracing::instrument(
134        skip_all,
135        fields(
136            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
137            partition = partition
138        )
139    )]
140    fn scan_batch_in_partition(
141        &self,
142        ctx: &QueryScanContext,
143        partition: usize,
144        part_metrics: PartitionMetrics,
145        metrics_set: &ExecutionPlanMetricsSet,
146    ) -> Result<ScanBatchStream> {
147        if ctx.explain_verbose {
148            common_telemetry::info!(
149                "SeriesScan partition {}, region_id: {}",
150                partition,
151                self.stream_ctx.input.region_metadata().region_id
152            );
153        }
154
155        ensure!(
156            partition < self.properties.num_partitions(),
157            PartitionOutOfRangeSnafu {
158                given: partition,
159                all: self.properties.num_partitions(),
160            }
161        );
162
163        self.maybe_start_distributor(metrics_set, &self.metrics_list);
164
165        let mut receiver = self.take_receiver(partition)?;
166        let stream = try_stream! {
167            part_metrics.on_first_poll();
168
169            let mut fetch_start = Instant::now();
170            while let Some(series) = receiver.recv().await {
171                let series = series?;
172
173                let mut metrics = ScannerMetrics::default();
174                metrics.scan_cost += fetch_start.elapsed();
175                fetch_start = Instant::now();
176
177                metrics.num_batches += series.num_batches();
178                metrics.num_rows += series.num_rows();
179
180                let yield_start = Instant::now();
181                yield ScanBatch::Series(series);
182                metrics.yield_cost += yield_start.elapsed();
183
184                part_metrics.merge_metrics(&metrics);
185            }
186
187            part_metrics.on_finish();
188        };
189        Ok(Box::pin(stream))
190    }
191
192    /// Takes the receiver for the partition.
193    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
194        let mut rx_list = self.receivers.lock().unwrap();
195        rx_list[partition]
196            .take()
197            .context(ScanMultiTimesSnafu { partition })
198    }
199
200    /// Starts the distributor if the receiver list is empty.
201    #[tracing::instrument(
202        skip(self, metrics_set, metrics_list),
203        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
204    )]
205    fn maybe_start_distributor(
206        &self,
207        metrics_set: &ExecutionPlanMetricsSet,
208        metrics_list: &Arc<PartitionMetricsList>,
209    ) {
210        let mut rx_list = self.receivers.lock().unwrap();
211        if !rx_list.is_empty() {
212            return;
213        }
214
215        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
216        let mut distributor = SeriesDistributor {
217            stream_ctx: self.stream_ctx.clone(),
218            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
219            partitions: self.properties.partitions.clone(),
220            senders,
221            metrics_set: metrics_set.clone(),
222            metrics_list: metrics_list.clone(),
223        };
224        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
225        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
226        common_runtime::spawn_global(
227            async move {
228                distributor.execute().await;
229            }
230            .instrument(span),
231        );
232
233        *rx_list = receivers;
234    }
235
236    /// Scans the region and returns a stream.
237    #[tracing::instrument(
238        skip_all,
239        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
240    )]
241    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
242        let part_num = self.properties.num_partitions();
243        let metrics_set = ExecutionPlanMetricsSet::default();
244        let streams = (0..part_num)
245            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
246            .collect::<Result<Vec<_>, BoxedError>>()?;
247        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
248        Ok(Box::pin(chained_stream))
249    }
250
251    /// Scan [`Batch`] in all partitions one by one.
252    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
253        let metrics_set = ExecutionPlanMetricsSet::new();
254
255        let streams = (0..self.properties.partitions.len())
256            .map(|partition| {
257                let metrics = new_partition_metrics(
258                    &self.stream_ctx,
259                    false,
260                    &metrics_set,
261                    partition,
262                    &self.metrics_list,
263                );
264
265                self.scan_batch_in_partition(
266                    &QueryScanContext::default(),
267                    partition,
268                    metrics,
269                    &metrics_set,
270                )
271            })
272            .collect::<Result<Vec<_>>>()?;
273
274        Ok(Box::pin(futures::stream::iter(streams).flatten()))
275    }
276
277    /// Checks resource limit for the scanner.
278    pub(crate) fn check_scan_limit(&self) -> Result<()> {
279        // Sum the total number of files across all partitions
280        let total_files: usize = self
281            .properties
282            .partitions
283            .iter()
284            .flat_map(|partition| partition.iter())
285            .map(|part_range| {
286                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
287                range_meta.indices.len()
288            })
289            .sum();
290
291        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
292        if total_files > max_concurrent_files {
293            return TooManyFilesToReadSnafu {
294                actual: total_files,
295                max: max_concurrent_files,
296            }
297            .fail();
298        }
299
300        Ok(())
301    }
302}
303
304fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
305    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
306        .map(|_| {
307            let (sender, receiver) = mpsc::channel(1);
308            (Some(sender), Some(receiver))
309        })
310        .unzip();
311    (SenderList::new(senders), receivers)
312}
313
314impl RegionScanner for SeriesScan {
315    fn name(&self) -> &str {
316        "SeriesScan"
317    }
318
319    fn properties(&self) -> &ScannerProperties {
320        &self.properties
321    }
322
323    fn schema(&self) -> SchemaRef {
324        self.stream_ctx.input.mapper.output_schema()
325    }
326
327    fn metadata(&self) -> RegionMetadataRef {
328        self.stream_ctx.input.mapper.metadata().clone()
329    }
330
331    fn scan_partition(
332        &self,
333        ctx: &QueryScanContext,
334        metrics_set: &ExecutionPlanMetricsSet,
335        partition: usize,
336    ) -> Result<SendableRecordBatchStream, BoxedError> {
337        self.scan_partition_impl(ctx, metrics_set, partition)
338            .map_err(BoxedError::new)
339    }
340
341    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
342        self.properties.prepare(request);
343
344        self.check_scan_limit().map_err(BoxedError::new)?;
345
346        Ok(())
347    }
348
349    fn has_predicate_without_region(&self) -> bool {
350        let predicate = self
351            .stream_ctx
352            .input
353            .predicate_group()
354            .predicate_without_region();
355        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
356    }
357
358    fn set_logical_region(&mut self, logical_region: bool) {
359        self.properties.set_logical_region(logical_region);
360    }
361}
362
363impl DisplayAs for SeriesScan {
364    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365        write!(
366            f,
367            "SeriesScan: region={}, ",
368            self.stream_ctx.input.mapper.metadata().region_id
369        )?;
370        match t {
371            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
372                self.stream_ctx.format_for_explain(false, f)
373            }
374            DisplayFormatType::Verbose => {
375                self.stream_ctx.format_for_explain(true, f)?;
376                self.metrics_list.format_verbose_metrics(f)
377            }
378        }
379    }
380}
381
382impl fmt::Debug for SeriesScan {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        f.debug_struct("SeriesScan")
385            .field("num_ranges", &self.stream_ctx.ranges.len())
386            .finish()
387    }
388}
389
390#[cfg(test)]
391impl SeriesScan {
392    /// Returns the input.
393    pub(crate) fn input(&self) -> &ScanInput {
394        &self.stream_ctx.input
395    }
396}
397
398/// The distributor scans series and distributes them to different partitions.
399struct SeriesDistributor {
400    /// Context for the scan stream.
401    stream_ctx: Arc<StreamContext>,
402    /// Optional semaphore for limiting the number of concurrent scans.
403    semaphore: Option<Arc<Semaphore>>,
404    /// Partition ranges to scan.
405    partitions: Vec<Vec<PartitionRange>>,
406    /// Senders of all partitions.
407    senders: SenderList,
408    /// Metrics set to report.
409    /// The distributor report the metrics as an additional partition.
410    /// This may double the scan cost of the [SeriesScan] metrics. We can
411    /// get per-partition metrics in verbose mode to see the metrics of the
412    /// distributor.
413    metrics_set: ExecutionPlanMetricsSet,
414    metrics_list: Arc<PartitionMetricsList>,
415}
416
417impl SeriesDistributor {
418    /// Executes the distributor.
419    #[tracing::instrument(
420        skip_all,
421        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
422    )]
423    async fn execute(&mut self) {
424        let result = if self.stream_ctx.input.flat_format {
425            self.scan_partitions_flat().await
426        } else {
427            self.scan_partitions().await
428        };
429
430        if let Err(e) = result {
431            self.senders.send_error(e).await;
432        }
433    }
434
435    /// Scans all parts in flat format using FlatSeriesBatchDivider.
436    #[tracing::instrument(
437        skip_all,
438        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
439    )]
440    async fn scan_partitions_flat(&mut self) -> Result<()> {
441        let part_metrics = new_partition_metrics(
442            &self.stream_ctx,
443            false,
444            &self.metrics_set,
445            self.partitions.len(),
446            &self.metrics_list,
447        );
448        part_metrics.on_first_poll();
449        // Start fetch time before building sources so scan cost contains
450        // build part cost.
451        let mut fetch_start = Instant::now();
452
453        let range_builder_list = Arc::new(RangeBuilderList::new(
454            self.stream_ctx.input.num_memtables(),
455            self.stream_ctx.input.num_files(),
456        ));
457        // Scans all parts.
458        let mut sources = Vec::with_capacity(self.partitions.len());
459        for partition in &self.partitions {
460            sources.reserve(partition.len());
461            for part_range in partition {
462                build_flat_sources(
463                    &self.stream_ctx,
464                    part_range,
465                    false,
466                    &part_metrics,
467                    range_builder_list.clone(),
468                    &mut sources,
469                    self.semaphore.clone(),
470                )
471                .await?;
472            }
473        }
474
475        // Builds a flat reader that merge sources from all parts.
476        let mut reader = SeqScan::build_flat_reader_from_sources(
477            &self.stream_ctx,
478            sources,
479            self.semaphore.clone(),
480            Some(&part_metrics),
481        )
482        .await?;
483        let mut metrics = SeriesDistributorMetrics::default();
484
485        let mut divider = FlatSeriesBatchDivider::default();
486        while let Some(record_batch) = reader.try_next().await? {
487            metrics.scan_cost += fetch_start.elapsed();
488            metrics.num_batches += 1;
489            metrics.num_rows += record_batch.num_rows();
490
491            debug_assert!(record_batch.num_rows() > 0);
492            if record_batch.num_rows() == 0 {
493                fetch_start = Instant::now();
494                continue;
495            }
496
497            // Use divider to split series
498            let divider_start = Instant::now();
499            let series_batch = divider.push(record_batch);
500            metrics.divider_cost += divider_start.elapsed();
501            if let Some(series_batch) = series_batch {
502                let yield_start = Instant::now();
503                self.senders
504                    .send_batch(SeriesBatch::Flat(series_batch))
505                    .await?;
506                metrics.yield_cost += yield_start.elapsed();
507            }
508            fetch_start = Instant::now();
509        }
510
511        // Send any remaining batch in the divider
512        let divider_start = Instant::now();
513        let series_batch = divider.finish();
514        metrics.divider_cost += divider_start.elapsed();
515        if let Some(series_batch) = series_batch {
516            let yield_start = Instant::now();
517            self.senders
518                .send_batch(SeriesBatch::Flat(series_batch))
519                .await?;
520            metrics.yield_cost += yield_start.elapsed();
521        }
522
523        metrics.scan_cost += fetch_start.elapsed();
524        metrics.num_series_send_timeout = self.senders.num_timeout;
525        metrics.num_series_send_full = self.senders.num_full;
526        part_metrics.set_distributor_metrics(&metrics);
527
528        part_metrics.on_finish();
529
530        Ok(())
531    }
532
533    /// Scans all parts.
534    #[tracing::instrument(
535        skip_all,
536        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
537    )]
538    async fn scan_partitions(&mut self) -> Result<()> {
539        let part_metrics = new_partition_metrics(
540            &self.stream_ctx,
541            false,
542            &self.metrics_set,
543            self.partitions.len(),
544            &self.metrics_list,
545        );
546        part_metrics.on_first_poll();
547        // Start fetch time before building sources so scan cost contains
548        // build part cost.
549        let mut fetch_start = Instant::now();
550
551        let range_builder_list = Arc::new(RangeBuilderList::new(
552            self.stream_ctx.input.num_memtables(),
553            self.stream_ctx.input.num_files(),
554        ));
555        // Scans all parts.
556        let mut sources = Vec::with_capacity(self.partitions.len());
557        for partition in &self.partitions {
558            sources.reserve(partition.len());
559            for part_range in partition {
560                build_sources(
561                    &self.stream_ctx,
562                    part_range,
563                    false,
564                    &part_metrics,
565                    range_builder_list.clone(),
566                    &mut sources,
567                    self.semaphore.clone(),
568                )
569                .await?;
570            }
571        }
572
573        // Builds a reader that merge sources from all parts.
574        let mut reader = SeqScan::build_reader_from_sources(
575            &self.stream_ctx,
576            sources,
577            self.semaphore.clone(),
578            Some(&part_metrics),
579        )
580        .await?;
581        let mut metrics = SeriesDistributorMetrics::default();
582
583        let mut current_series = PrimaryKeySeriesBatch::default();
584        while let Some(batch) = reader.next_batch().await? {
585            metrics.scan_cost += fetch_start.elapsed();
586            metrics.num_batches += 1;
587            metrics.num_rows += batch.num_rows();
588
589            debug_assert!(!batch.is_empty());
590            if batch.is_empty() {
591                fetch_start = Instant::now();
592                continue;
593            }
594
595            let Some(last_key) = current_series.current_key() else {
596                current_series.push(batch);
597                fetch_start = Instant::now();
598                continue;
599            };
600
601            if last_key == batch.primary_key() {
602                current_series.push(batch);
603                fetch_start = Instant::now();
604                continue;
605            }
606
607            // We find a new series, send the current one.
608            let to_send =
609                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
610            let yield_start = Instant::now();
611            self.senders
612                .send_batch(SeriesBatch::PrimaryKey(to_send))
613                .await?;
614            metrics.yield_cost += yield_start.elapsed();
615            fetch_start = Instant::now();
616        }
617
618        if !current_series.is_empty() {
619            let yield_start = Instant::now();
620            self.senders
621                .send_batch(SeriesBatch::PrimaryKey(current_series))
622                .await?;
623            metrics.yield_cost += yield_start.elapsed();
624        }
625
626        metrics.scan_cost += fetch_start.elapsed();
627        metrics.num_series_send_timeout = self.senders.num_timeout;
628        metrics.num_series_send_full = self.senders.num_full;
629        part_metrics.set_distributor_metrics(&metrics);
630
631        part_metrics.on_finish();
632
633        Ok(())
634    }
635}
636
637/// Batches of the same series in primary key format.
638#[derive(Default, Debug)]
639pub struct PrimaryKeySeriesBatch {
640    pub batches: SmallVec<[Batch; 4]>,
641}
642
643impl PrimaryKeySeriesBatch {
644    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
645    fn single(batch: Batch) -> Self {
646        Self {
647            batches: smallvec![batch],
648        }
649    }
650
651    fn current_key(&self) -> Option<&[u8]> {
652        self.batches.first().map(|batch| batch.primary_key())
653    }
654
655    fn push(&mut self, batch: Batch) {
656        self.batches.push(batch);
657    }
658
659    /// Returns true if there is no batch.
660    fn is_empty(&self) -> bool {
661        self.batches.is_empty()
662    }
663}
664
665/// Batches of the same series.
666#[derive(Debug)]
667pub enum SeriesBatch {
668    PrimaryKey(PrimaryKeySeriesBatch),
669    Flat(FlatSeriesBatch),
670}
671
672impl SeriesBatch {
673    /// Returns the number of batches.
674    pub fn num_batches(&self) -> usize {
675        match self {
676            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
677            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
678        }
679    }
680
681    /// Returns the total number of rows across all batches.
682    pub fn num_rows(&self) -> usize {
683        match self {
684            SeriesBatch::PrimaryKey(primary_key_batch) => {
685                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
686            }
687            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
688        }
689    }
690}
691
692/// Batches of the same series in flat format.
693#[derive(Default, Debug)]
694pub struct FlatSeriesBatch {
695    pub batches: SmallVec<[RecordBatch; 4]>,
696}
697
698/// List of senders.
699struct SenderList {
700    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
701    /// Number of None senders.
702    num_nones: usize,
703    /// Index of the current partition to send.
704    sender_idx: usize,
705    /// Number of timeout.
706    num_timeout: usize,
707    /// Number of full senders.
708    num_full: usize,
709}
710
711impl SenderList {
712    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
713        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
714        Self {
715            senders,
716            num_nones,
717            sender_idx: 0,
718            num_timeout: 0,
719            num_full: 0,
720        }
721    }
722
723    /// Finds a partition and tries to send the batch to the partition.
724    /// Returns None if it sends successfully.
725    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
726        for _ in 0..self.senders.len() {
727            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
728
729            let sender_idx = self.fetch_add_sender_idx();
730            let Some(sender) = &self.senders[sender_idx] else {
731                continue;
732            };
733
734            match sender.try_send(Ok(batch)) {
735                Ok(()) => return Ok(None),
736                Err(TrySendError::Full(res)) => {
737                    self.num_full += 1;
738                    // Safety: we send Ok.
739                    batch = res.unwrap();
740                }
741                Err(TrySendError::Closed(res)) => {
742                    self.senders[sender_idx] = None;
743                    self.num_nones += 1;
744                    // Safety: we send Ok.
745                    batch = res.unwrap();
746                }
747            }
748        }
749
750        Ok(Some(batch))
751    }
752
753    /// Finds a partition and sends the batch to the partition.
754    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
755        // Sends the batch without blocking first.
756        match self.try_send_batch(batch)? {
757            Some(b) => {
758                // Unable to send batch to partition.
759                batch = b;
760            }
761            None => {
762                return Ok(());
763            }
764        }
765
766        loop {
767            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
768
769            let sender_idx = self.fetch_add_sender_idx();
770            let Some(sender) = &self.senders[sender_idx] else {
771                continue;
772            };
773            // Adds a timeout to avoid blocking indefinitely and sending
774            // the batch in a round-robin fashion when some partitions
775            // don't poll their inputs. This may happen if we have a
776            // node like sort merging. But it is rare when we are using SeriesScan.
777            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
778                Ok(()) => break,
779                Err(SendTimeoutError::Timeout(res)) => {
780                    self.num_timeout += 1;
781                    // Safety: we send Ok.
782                    batch = res.unwrap();
783                }
784                Err(SendTimeoutError::Closed(res)) => {
785                    self.senders[sender_idx] = None;
786                    self.num_nones += 1;
787                    // Safety: we send Ok.
788                    batch = res.unwrap();
789                }
790            }
791        }
792
793        Ok(())
794    }
795
796    async fn send_error(&self, error: Error) {
797        let error = Arc::new(error);
798        for sender in self.senders.iter().flatten() {
799            let result = Err(error.clone()).context(ScanSeriesSnafu);
800            let _ = sender.send(result).await;
801        }
802    }
803
804    fn fetch_add_sender_idx(&mut self) -> usize {
805        let sender_idx = self.sender_idx;
806        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
807        sender_idx
808    }
809}
810
811fn new_partition_metrics(
812    stream_ctx: &StreamContext,
813    explain_verbose: bool,
814    metrics_set: &ExecutionPlanMetricsSet,
815    partition: usize,
816    metrics_list: &PartitionMetricsList,
817) -> PartitionMetrics {
818    let metrics = PartitionMetrics::new(
819        stream_ctx.input.mapper.metadata().region_id,
820        partition,
821        "SeriesScan",
822        stream_ctx.query_start,
823        explain_verbose,
824        metrics_set,
825    );
826
827    metrics_list.set(partition, metrics.clone());
828    metrics
829}
830
831/// A divider to split flat record batches by time series.
832///
833/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
834/// However, a [FlatSeriesBatch] may contain rows from multiple series.
835#[derive(Default)]
836struct FlatSeriesBatchDivider {
837    buffer: FlatSeriesBatch,
838}
839
840impl FlatSeriesBatchDivider {
841    /// Pushes a record batch into the divider.
842    ///
843    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
844    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
845        // If buffer is empty
846        if self.buffer.batches.is_empty() {
847            self.buffer.batches.push(batch);
848            return None;
849        }
850
851        // Gets the primary key column from the incoming batch.
852        let pk_column_idx = primary_key_column_index(batch.num_columns());
853        let batch_pk_column = batch.column(pk_column_idx);
854        let batch_pk_array = batch_pk_column
855            .as_any()
856            .downcast_ref::<PrimaryKeyArray>()
857            .unwrap();
858        let batch_pk_values = batch_pk_array
859            .values()
860            .as_any()
861            .downcast_ref::<BinaryArray>()
862            .unwrap();
863        // Gets the last primary key of the incoming batch.
864        let batch_last_pk =
865            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
866        // Gets the last primary key of the buffer.
867        // Safety: the buffer is not empty.
868        let buffer_last_batch = self.buffer.batches.last().unwrap();
869        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
870        let buffer_pk_array = buffer_pk_column
871            .as_any()
872            .downcast_ref::<PrimaryKeyArray>()
873            .unwrap();
874        let buffer_pk_values = buffer_pk_array
875            .values()
876            .as_any()
877            .downcast_ref::<BinaryArray>()
878            .unwrap();
879        let buffer_last_pk =
880            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
881
882        // If last primary key in the batch is the same as last primary key in the buffer.
883        if batch_last_pk == buffer_last_pk {
884            self.buffer.batches.push(batch);
885            return None;
886        }
887        // Otherwise, the batch must have a different primary key, we find the first offset of the
888        // changed primary key.
889        let batch_pk_keys = batch_pk_array.keys();
890        let pk_indices = batch_pk_keys.values();
891        let mut change_offset = 0;
892        for (i, &key) in pk_indices.iter().enumerate() {
893            let batch_pk = batch_pk_values.value(key as usize);
894
895            if buffer_last_pk != batch_pk {
896                change_offset = i;
897                break;
898            }
899        }
900
901        // Splits the batch at the change offset
902        let (first_part, remaining_part) = if change_offset > 0 {
903            let first_part = batch.slice(0, change_offset);
904            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
905            (Some(first_part), Some(remaining_part))
906        } else {
907            (None, Some(batch))
908        };
909
910        // Creates the result from current buffer + first part of new batch
911        let mut result = std::mem::take(&mut self.buffer);
912        if let Some(first_part) = first_part {
913            result.batches.push(first_part);
914        }
915
916        // Pushes remaining part to the buffer if it exists
917        if let Some(remaining_part) = remaining_part {
918            self.buffer.batches.push(remaining_part);
919        }
920
921        Some(result)
922    }
923
924    /// Returns the final [FlatSeriesBatch].
925    fn finish(&mut self) -> Option<FlatSeriesBatch> {
926        if self.buffer.batches.is_empty() {
927            None
928        } else {
929            Some(std::mem::take(&mut self.buffer))
930        }
931    }
932}
933
934/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
935fn primary_key_at<'a>(
936    primary_key: &PrimaryKeyArray,
937    primary_key_values: &'a BinaryArray,
938    index: usize,
939) -> &'a [u8] {
940    let key = primary_key.keys().value(index);
941    primary_key_values.value(key as usize)
942}
943
944#[cfg(test)]
945mod tests {
946    use std::sync::Arc;
947
948    use api::v1::OpType;
949    use datatypes::arrow::array::{
950        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
951        TimestampMillisecondArray, UInt8Array, UInt64Array,
952    };
953    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
954    use datatypes::arrow::record_batch::RecordBatch;
955
956    use super::*;
957
958    fn new_test_record_batch(
959        primary_keys: &[&[u8]],
960        timestamps: &[i64],
961        sequences: &[u64],
962        op_types: &[OpType],
963        fields: &[u64],
964    ) -> RecordBatch {
965        let num_rows = timestamps.len();
966        debug_assert_eq!(sequences.len(), num_rows);
967        debug_assert_eq!(op_types.len(), num_rows);
968        debug_assert_eq!(fields.len(), num_rows);
969        debug_assert_eq!(primary_keys.len(), num_rows);
970
971        let columns: Vec<ArrayRef> = vec![
972            build_test_pk_string_dict_array(primary_keys),
973            Arc::new(Int64Array::from_iter(
974                fields.iter().map(|v| Some(*v as i64)),
975            )),
976            Arc::new(TimestampMillisecondArray::from_iter_values(
977                timestamps.iter().copied(),
978            )),
979            build_test_pk_array(primary_keys),
980            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
981            Arc::new(UInt8Array::from_iter_values(
982                op_types.iter().map(|v| *v as u8),
983            )),
984        ];
985
986        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
987    }
988
989    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
990        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
991        for &pk in primary_keys {
992            let pk_str = std::str::from_utf8(pk).unwrap();
993            builder.append(pk_str).unwrap();
994        }
995        Arc::new(builder.finish())
996    }
997
998    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
999        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1000        for &pk in primary_keys {
1001            builder.append(pk).unwrap();
1002        }
1003        Arc::new(builder.finish())
1004    }
1005
1006    fn build_test_flat_schema() -> SchemaRef {
1007        let fields = vec![
1008            Field::new(
1009                "k0",
1010                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1011                false,
1012            ),
1013            Field::new("field0", DataType::Int64, true),
1014            Field::new(
1015                "ts",
1016                DataType::Timestamp(TimeUnit::Millisecond, None),
1017                false,
1018            ),
1019            Field::new(
1020                "__primary_key",
1021                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1022                false,
1023            ),
1024            Field::new("__sequence", DataType::UInt64, false),
1025            Field::new("__op_type", DataType::UInt8, false),
1026        ];
1027        Arc::new(Schema::new(fields))
1028    }
1029
1030    #[test]
1031    fn test_empty_buffer_first_push() {
1032        let mut divider = FlatSeriesBatchDivider::default();
1033        let result = divider.finish();
1034        assert!(result.is_none());
1035
1036        let mut divider = FlatSeriesBatchDivider::default();
1037        let batch = new_test_record_batch(
1038            &[b"series1", b"series1"],
1039            &[1000, 2000],
1040            &[1, 2],
1041            &[OpType::Put, OpType::Put],
1042            &[10, 20],
1043        );
1044        let result = divider.push(batch);
1045        assert!(result.is_none());
1046        assert_eq!(divider.buffer.batches.len(), 1);
1047    }
1048
1049    #[test]
1050    fn test_same_series_accumulation() {
1051        let mut divider = FlatSeriesBatchDivider::default();
1052
1053        let batch1 = new_test_record_batch(
1054            &[b"series1", b"series1"],
1055            &[1000, 2000],
1056            &[1, 2],
1057            &[OpType::Put, OpType::Put],
1058            &[10, 20],
1059        );
1060
1061        let batch2 = new_test_record_batch(
1062            &[b"series1", b"series1"],
1063            &[3000, 4000],
1064            &[3, 4],
1065            &[OpType::Put, OpType::Put],
1066            &[30, 40],
1067        );
1068
1069        divider.push(batch1);
1070        let result = divider.push(batch2);
1071        assert!(result.is_none());
1072        let series_batch = divider.finish().unwrap();
1073        assert_eq!(series_batch.batches.len(), 2);
1074    }
1075
1076    #[test]
1077    fn test_series_boundary_detection() {
1078        let mut divider = FlatSeriesBatchDivider::default();
1079
1080        let batch1 = new_test_record_batch(
1081            &[b"series1", b"series1"],
1082            &[1000, 2000],
1083            &[1, 2],
1084            &[OpType::Put, OpType::Put],
1085            &[10, 20],
1086        );
1087
1088        let batch2 = new_test_record_batch(
1089            &[b"series2", b"series2"],
1090            &[3000, 4000],
1091            &[3, 4],
1092            &[OpType::Put, OpType::Put],
1093            &[30, 40],
1094        );
1095
1096        divider.push(batch1);
1097        let series_batch = divider.push(batch2).unwrap();
1098        assert_eq!(series_batch.batches.len(), 1);
1099
1100        assert_eq!(divider.buffer.batches.len(), 1);
1101    }
1102
1103    #[test]
1104    fn test_series_boundary_within_batch() {
1105        let mut divider = FlatSeriesBatchDivider::default();
1106
1107        let batch1 = new_test_record_batch(
1108            &[b"series1", b"series1"],
1109            &[1000, 2000],
1110            &[1, 2],
1111            &[OpType::Put, OpType::Put],
1112            &[10, 20],
1113        );
1114
1115        let batch2 = new_test_record_batch(
1116            &[b"series1", b"series2"],
1117            &[3000, 4000],
1118            &[3, 4],
1119            &[OpType::Put, OpType::Put],
1120            &[30, 40],
1121        );
1122
1123        divider.push(batch1);
1124        let series_batch = divider.push(batch2).unwrap();
1125        assert_eq!(series_batch.batches.len(), 2);
1126        assert_eq!(series_batch.batches[0].num_rows(), 2);
1127        assert_eq!(series_batch.batches[1].num_rows(), 1);
1128
1129        assert_eq!(divider.buffer.batches.len(), 1);
1130        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1131    }
1132
1133    #[test]
1134    fn test_series_splitting() {
1135        let mut divider = FlatSeriesBatchDivider::default();
1136
1137        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1138
1139        let batch2 = new_test_record_batch(
1140            &[b"series1", b"series2", b"series2", b"series3"],
1141            &[2000, 3000, 4000, 5000],
1142            &[2, 3, 4, 5],
1143            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1144            &[20, 30, 40, 50],
1145        );
1146
1147        divider.push(batch1);
1148        let series_batch = divider.push(batch2).unwrap();
1149        assert_eq!(series_batch.batches.len(), 2);
1150
1151        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1152        assert_eq!(total_rows, 2);
1153
1154        let final_batch = divider.finish().unwrap();
1155        assert_eq!(final_batch.batches.len(), 1);
1156        assert_eq!(final_batch.batches[0].num_rows(), 3);
1157    }
1158}