mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::arrow::array::BinaryArray;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::schema::SchemaRef;
31use futures::{StreamExt, TryStreamExt};
32use smallvec::{SmallVec, smallvec};
33use snafu::{OptionExt, ResultExt, ensure};
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_engine::{
36    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
37};
38use tokio::sync::Semaphore;
39use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
40use tokio::sync::mpsc::{self, Receiver, Sender};
41
42use crate::error::{
43    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
44    ScanSeriesSnafu, TooManyFilesToReadSnafu,
45};
46use crate::read::range::{RangeBuilderList, file_range_counts};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
49use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{Batch, ScannerMetrics};
52use crate::sst::parquet::flat_format::primary_key_column_index;
53use crate::sst::parquet::format::PrimaryKeyArray;
54
55/// Timeout to send a batch to a sender.
56const SEND_TIMEOUT: Duration = Duration::from_micros(100);
57
58/// List of receivers.
59type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
60
61/// Scans a region and returns sorted rows of a series in the same partition.
62///
63/// The output order is always order by `(primary key, time index)` inside every
64/// partition.
65/// Always returns the same series (primary key) to the same partition.
66pub struct SeriesScan {
67    /// Properties of the scanner.
68    properties: ScannerProperties,
69    /// Context of streams.
70    stream_ctx: Arc<StreamContext>,
71    /// Receivers of each partition.
72    receivers: Mutex<ReceiverList>,
73    /// Metrics for each partition.
74    /// The scanner only sets in query and keeps it empty during compaction.
75    metrics_list: Arc<PartitionMetricsList>,
76}
77
78impl SeriesScan {
79    /// Creates a new [SeriesScan].
80    pub(crate) fn new(input: ScanInput) -> Self {
81        let mut properties = ScannerProperties::default()
82            .with_append_mode(input.append_mode)
83            .with_total_rows(input.total_rows());
84        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
85        properties.partitions = vec![stream_ctx.partition_ranges()];
86
87        Self {
88            properties,
89            stream_ctx,
90            receivers: Mutex::new(Vec::new()),
91            metrics_list: Arc::new(PartitionMetricsList::default()),
92        }
93    }
94
95    #[tracing::instrument(
96        skip_all,
97        fields(
98            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
99            partition = partition
100        )
101    )]
102    fn scan_partition_impl(
103        &self,
104        ctx: &QueryScanContext,
105        metrics_set: &ExecutionPlanMetricsSet,
106        partition: usize,
107    ) -> Result<SendableRecordBatchStream> {
108        let metrics = new_partition_metrics(
109            &self.stream_ctx,
110            ctx.explain_verbose,
111            metrics_set,
112            partition,
113            &self.metrics_list,
114        );
115
116        let batch_stream =
117            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
118
119        let input = &self.stream_ctx.input;
120        let record_batch_stream = ConvertBatchStream::new(
121            batch_stream,
122            input.mapper.clone(),
123            input.cache_strategy.clone(),
124            metrics,
125        );
126
127        Ok(Box::pin(RecordBatchStreamWrapper::new(
128            input.mapper.output_schema(),
129            Box::pin(record_batch_stream),
130        )))
131    }
132
133    #[tracing::instrument(
134        skip_all,
135        fields(
136            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
137            partition = partition
138        )
139    )]
140    fn scan_batch_in_partition(
141        &self,
142        ctx: &QueryScanContext,
143        partition: usize,
144        part_metrics: PartitionMetrics,
145        metrics_set: &ExecutionPlanMetricsSet,
146    ) -> Result<ScanBatchStream> {
147        if ctx.explain_verbose {
148            common_telemetry::info!(
149                "SeriesScan partition {}, region_id: {}",
150                partition,
151                self.stream_ctx.input.region_metadata().region_id
152            );
153        }
154
155        ensure!(
156            partition < self.properties.num_partitions(),
157            PartitionOutOfRangeSnafu {
158                given: partition,
159                all: self.properties.num_partitions(),
160            }
161        );
162
163        self.maybe_start_distributor(metrics_set, &self.metrics_list);
164
165        let mut receiver = self.take_receiver(partition)?;
166        let stream = try_stream! {
167            part_metrics.on_first_poll();
168
169            let mut fetch_start = Instant::now();
170            while let Some(series) = receiver.recv().await {
171                let series = series?;
172
173                let mut metrics = ScannerMetrics::default();
174                metrics.scan_cost += fetch_start.elapsed();
175                fetch_start = Instant::now();
176
177                metrics.num_batches += series.num_batches();
178                metrics.num_rows += series.num_rows();
179
180                let yield_start = Instant::now();
181                yield ScanBatch::Series(series);
182                metrics.yield_cost += yield_start.elapsed();
183
184                part_metrics.merge_metrics(&metrics);
185            }
186
187            part_metrics.on_finish();
188        };
189        Ok(Box::pin(stream))
190    }
191
192    /// Takes the receiver for the partition.
193    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
194        let mut rx_list = self.receivers.lock().unwrap();
195        rx_list[partition]
196            .take()
197            .context(ScanMultiTimesSnafu { partition })
198    }
199
200    /// Starts the distributor if the receiver list is empty.
201    #[tracing::instrument(
202        skip(self, metrics_set, metrics_list),
203        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
204    )]
205    fn maybe_start_distributor(
206        &self,
207        metrics_set: &ExecutionPlanMetricsSet,
208        metrics_list: &Arc<PartitionMetricsList>,
209    ) {
210        let mut rx_list = self.receivers.lock().unwrap();
211        if !rx_list.is_empty() {
212            return;
213        }
214
215        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
216        let mut distributor = SeriesDistributor {
217            stream_ctx: self.stream_ctx.clone(),
218            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
219            partitions: self.properties.partitions.clone(),
220            senders,
221            metrics_set: metrics_set.clone(),
222            metrics_list: metrics_list.clone(),
223        };
224        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
225        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
226        common_runtime::spawn_global(
227            async move {
228                distributor.execute().await;
229            }
230            .instrument(span),
231        );
232
233        *rx_list = receivers;
234    }
235
236    /// Scans the region and returns a stream.
237    #[tracing::instrument(
238        skip_all,
239        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
240    )]
241    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
242        let part_num = self.properties.num_partitions();
243        let metrics_set = ExecutionPlanMetricsSet::default();
244        let streams = (0..part_num)
245            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
246            .collect::<Result<Vec<_>, BoxedError>>()?;
247        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
248        Ok(Box::pin(chained_stream))
249    }
250
251    /// Scan [`Batch`] in all partitions one by one.
252    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
253        let metrics_set = ExecutionPlanMetricsSet::new();
254
255        let streams = (0..self.properties.partitions.len())
256            .map(|partition| {
257                let metrics = new_partition_metrics(
258                    &self.stream_ctx,
259                    false,
260                    &metrics_set,
261                    partition,
262                    &self.metrics_list,
263                );
264
265                self.scan_batch_in_partition(
266                    &QueryScanContext::default(),
267                    partition,
268                    metrics,
269                    &metrics_set,
270                )
271            })
272            .collect::<Result<Vec<_>>>()?;
273
274        Ok(Box::pin(futures::stream::iter(streams).flatten()))
275    }
276
277    /// Checks resource limit for the scanner.
278    pub(crate) fn check_scan_limit(&self) -> Result<()> {
279        // Sum the total number of files across all partitions
280        let total_files: usize = self
281            .properties
282            .partitions
283            .iter()
284            .flat_map(|partition| partition.iter())
285            .map(|part_range| {
286                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
287                range_meta.indices.len()
288            })
289            .sum();
290
291        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
292        if total_files > max_concurrent_files {
293            return TooManyFilesToReadSnafu {
294                actual: total_files,
295                max: max_concurrent_files,
296            }
297            .fail();
298        }
299
300        Ok(())
301    }
302}
303
304fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
305    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
306        .map(|_| {
307            let (sender, receiver) = mpsc::channel(1);
308            (Some(sender), Some(receiver))
309        })
310        .unzip();
311    (SenderList::new(senders), receivers)
312}
313
314impl RegionScanner for SeriesScan {
315    fn name(&self) -> &str {
316        "SeriesScan"
317    }
318
319    fn properties(&self) -> &ScannerProperties {
320        &self.properties
321    }
322
323    fn schema(&self) -> SchemaRef {
324        self.stream_ctx.input.mapper.output_schema()
325    }
326
327    fn metadata(&self) -> RegionMetadataRef {
328        self.stream_ctx.input.mapper.metadata().clone()
329    }
330
331    fn scan_partition(
332        &self,
333        ctx: &QueryScanContext,
334        metrics_set: &ExecutionPlanMetricsSet,
335        partition: usize,
336    ) -> Result<SendableRecordBatchStream, BoxedError> {
337        self.scan_partition_impl(ctx, metrics_set, partition)
338            .map_err(BoxedError::new)
339    }
340
341    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
342        self.properties.prepare(request);
343
344        self.check_scan_limit().map_err(BoxedError::new)?;
345
346        Ok(())
347    }
348
349    fn has_predicate_without_region(&self) -> bool {
350        let predicate = self
351            .stream_ctx
352            .input
353            .predicate_group()
354            .predicate_without_region();
355        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
356    }
357
358    fn set_logical_region(&mut self, logical_region: bool) {
359        self.properties.set_logical_region(logical_region);
360    }
361}
362
363impl DisplayAs for SeriesScan {
364    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365        write!(
366            f,
367            "SeriesScan: region={}, ",
368            self.stream_ctx.input.mapper.metadata().region_id
369        )?;
370        match t {
371            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
372                self.stream_ctx.format_for_explain(false, f)
373            }
374            DisplayFormatType::Verbose => {
375                self.stream_ctx.format_for_explain(true, f)?;
376                self.metrics_list.format_verbose_metrics(f)
377            }
378        }
379    }
380}
381
382impl fmt::Debug for SeriesScan {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        f.debug_struct("SeriesScan")
385            .field("num_ranges", &self.stream_ctx.ranges.len())
386            .finish()
387    }
388}
389
390#[cfg(test)]
391impl SeriesScan {
392    /// Returns the input.
393    pub(crate) fn input(&self) -> &ScanInput {
394        &self.stream_ctx.input
395    }
396}
397
398/// The distributor scans series and distributes them to different partitions.
399struct SeriesDistributor {
400    /// Context for the scan stream.
401    stream_ctx: Arc<StreamContext>,
402    /// Optional semaphore for limiting the number of concurrent scans.
403    semaphore: Option<Arc<Semaphore>>,
404    /// Partition ranges to scan.
405    partitions: Vec<Vec<PartitionRange>>,
406    /// Senders of all partitions.
407    senders: SenderList,
408    /// Metrics set to report.
409    /// The distributor report the metrics as an additional partition.
410    /// This may double the scan cost of the [SeriesScan] metrics. We can
411    /// get per-partition metrics in verbose mode to see the metrics of the
412    /// distributor.
413    metrics_set: ExecutionPlanMetricsSet,
414    metrics_list: Arc<PartitionMetricsList>,
415}
416
417impl SeriesDistributor {
418    /// Executes the distributor.
419    #[tracing::instrument(
420        skip_all,
421        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
422    )]
423    async fn execute(&mut self) {
424        let result = if self.stream_ctx.input.flat_format {
425            self.scan_partitions_flat().await
426        } else {
427            self.scan_partitions().await
428        };
429
430        if let Err(e) = result {
431            self.senders.send_error(e).await;
432        }
433    }
434
435    /// Scans all parts in flat format using FlatSeriesBatchDivider.
436    #[tracing::instrument(
437        skip_all,
438        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
439    )]
440    async fn scan_partitions_flat(&mut self) -> Result<()> {
441        let part_metrics = new_partition_metrics(
442            &self.stream_ctx,
443            false,
444            &self.metrics_set,
445            self.partitions.len(),
446            &self.metrics_list,
447        );
448        part_metrics.on_first_poll();
449        // Start fetch time before building sources so scan cost contains
450        // build part cost.
451        let mut fetch_start = Instant::now();
452
453        let counts = file_range_counts(
454            self.stream_ctx.input.num_memtables(),
455            self.stream_ctx.input.num_files(),
456            &self.stream_ctx.ranges,
457            self.partitions.iter().flatten(),
458        );
459        let range_builder_list = Arc::new(RangeBuilderList::new(
460            self.stream_ctx.input.num_memtables(),
461            counts,
462        ));
463        // Scans all parts.
464        let mut sources = Vec::with_capacity(self.partitions.len());
465        for partition in &self.partitions {
466            sources.reserve(partition.len());
467            for part_range in partition {
468                build_flat_sources(
469                    &self.stream_ctx,
470                    part_range,
471                    false,
472                    &part_metrics,
473                    range_builder_list.clone(),
474                    &mut sources,
475                    self.semaphore.clone(),
476                )
477                .await?;
478            }
479        }
480
481        // Builds a flat reader that merge sources from all parts.
482        let mut reader = SeqScan::build_flat_reader_from_sources(
483            &self.stream_ctx,
484            sources,
485            self.semaphore.clone(),
486            Some(&part_metrics),
487        )
488        .await?;
489        let mut metrics = SeriesDistributorMetrics::default();
490
491        let mut divider = FlatSeriesBatchDivider::default();
492        while let Some(record_batch) = reader.try_next().await? {
493            metrics.scan_cost += fetch_start.elapsed();
494            metrics.num_batches += 1;
495            metrics.num_rows += record_batch.num_rows();
496
497            debug_assert!(record_batch.num_rows() > 0);
498            if record_batch.num_rows() == 0 {
499                fetch_start = Instant::now();
500                continue;
501            }
502
503            // Use divider to split series
504            let divider_start = Instant::now();
505            let series_batch = divider.push(record_batch);
506            metrics.divider_cost += divider_start.elapsed();
507            if let Some(series_batch) = series_batch {
508                let yield_start = Instant::now();
509                self.senders
510                    .send_batch(SeriesBatch::Flat(series_batch))
511                    .await?;
512                metrics.yield_cost += yield_start.elapsed();
513            }
514            fetch_start = Instant::now();
515        }
516
517        // Send any remaining batch in the divider
518        let divider_start = Instant::now();
519        let series_batch = divider.finish();
520        metrics.divider_cost += divider_start.elapsed();
521        if let Some(series_batch) = series_batch {
522            let yield_start = Instant::now();
523            self.senders
524                .send_batch(SeriesBatch::Flat(series_batch))
525                .await?;
526            metrics.yield_cost += yield_start.elapsed();
527        }
528
529        metrics.scan_cost += fetch_start.elapsed();
530        metrics.num_series_send_timeout = self.senders.num_timeout;
531        metrics.num_series_send_full = self.senders.num_full;
532        part_metrics.set_distributor_metrics(&metrics);
533
534        part_metrics.on_finish();
535
536        Ok(())
537    }
538
539    /// Scans all parts.
540    #[tracing::instrument(
541        skip_all,
542        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
543    )]
544    async fn scan_partitions(&mut self) -> Result<()> {
545        let part_metrics = new_partition_metrics(
546            &self.stream_ctx,
547            false,
548            &self.metrics_set,
549            self.partitions.len(),
550            &self.metrics_list,
551        );
552        part_metrics.on_first_poll();
553        // Start fetch time before building sources so scan cost contains
554        // build part cost.
555        let mut fetch_start = Instant::now();
556
557        let counts = file_range_counts(
558            self.stream_ctx.input.num_memtables(),
559            self.stream_ctx.input.num_files(),
560            &self.stream_ctx.ranges,
561            self.partitions.iter().flatten(),
562        );
563        let range_builder_list = Arc::new(RangeBuilderList::new(
564            self.stream_ctx.input.num_memtables(),
565            counts,
566        ));
567        // Scans all parts.
568        let mut sources = Vec::with_capacity(self.partitions.len());
569        for partition in &self.partitions {
570            sources.reserve(partition.len());
571            for part_range in partition {
572                build_sources(
573                    &self.stream_ctx,
574                    part_range,
575                    false,
576                    &part_metrics,
577                    range_builder_list.clone(),
578                    &mut sources,
579                    self.semaphore.clone(),
580                )
581                .await?;
582            }
583        }
584
585        // Builds a reader that merge sources from all parts.
586        let mut reader = SeqScan::build_reader_from_sources(
587            &self.stream_ctx,
588            sources,
589            self.semaphore.clone(),
590            Some(&part_metrics),
591        )
592        .await?;
593        let mut metrics = SeriesDistributorMetrics::default();
594
595        let mut current_series = PrimaryKeySeriesBatch::default();
596        while let Some(batch) = reader.next_batch().await? {
597            metrics.scan_cost += fetch_start.elapsed();
598            metrics.num_batches += 1;
599            metrics.num_rows += batch.num_rows();
600
601            debug_assert!(!batch.is_empty());
602            if batch.is_empty() {
603                fetch_start = Instant::now();
604                continue;
605            }
606
607            let Some(last_key) = current_series.current_key() else {
608                current_series.push(batch);
609                fetch_start = Instant::now();
610                continue;
611            };
612
613            if last_key == batch.primary_key() {
614                current_series.push(batch);
615                fetch_start = Instant::now();
616                continue;
617            }
618
619            // We find a new series, send the current one.
620            let to_send =
621                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
622            let yield_start = Instant::now();
623            self.senders
624                .send_batch(SeriesBatch::PrimaryKey(to_send))
625                .await?;
626            metrics.yield_cost += yield_start.elapsed();
627            fetch_start = Instant::now();
628        }
629
630        if !current_series.is_empty() {
631            let yield_start = Instant::now();
632            self.senders
633                .send_batch(SeriesBatch::PrimaryKey(current_series))
634                .await?;
635            metrics.yield_cost += yield_start.elapsed();
636        }
637
638        metrics.scan_cost += fetch_start.elapsed();
639        metrics.num_series_send_timeout = self.senders.num_timeout;
640        metrics.num_series_send_full = self.senders.num_full;
641        part_metrics.set_distributor_metrics(&metrics);
642
643        part_metrics.on_finish();
644
645        Ok(())
646    }
647}
648
649/// Batches of the same series in primary key format.
650#[derive(Default, Debug)]
651pub struct PrimaryKeySeriesBatch {
652    pub batches: SmallVec<[Batch; 4]>,
653}
654
655impl PrimaryKeySeriesBatch {
656    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
657    fn single(batch: Batch) -> Self {
658        Self {
659            batches: smallvec![batch],
660        }
661    }
662
663    fn current_key(&self) -> Option<&[u8]> {
664        self.batches.first().map(|batch| batch.primary_key())
665    }
666
667    fn push(&mut self, batch: Batch) {
668        self.batches.push(batch);
669    }
670
671    /// Returns true if there is no batch.
672    fn is_empty(&self) -> bool {
673        self.batches.is_empty()
674    }
675}
676
677/// Batches of the same series.
678#[derive(Debug)]
679pub enum SeriesBatch {
680    PrimaryKey(PrimaryKeySeriesBatch),
681    Flat(FlatSeriesBatch),
682}
683
684impl SeriesBatch {
685    /// Returns the number of batches.
686    pub fn num_batches(&self) -> usize {
687        match self {
688            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
689            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
690        }
691    }
692
693    /// Returns the total number of rows across all batches.
694    pub fn num_rows(&self) -> usize {
695        match self {
696            SeriesBatch::PrimaryKey(primary_key_batch) => {
697                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
698            }
699            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
700        }
701    }
702}
703
704/// Batches of the same series in flat format.
705#[derive(Default, Debug)]
706pub struct FlatSeriesBatch {
707    pub batches: SmallVec<[RecordBatch; 4]>,
708}
709
710/// List of senders.
711struct SenderList {
712    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
713    /// Number of None senders.
714    num_nones: usize,
715    /// Index of the current partition to send.
716    sender_idx: usize,
717    /// Number of timeout.
718    num_timeout: usize,
719    /// Number of full senders.
720    num_full: usize,
721}
722
723impl SenderList {
724    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
725        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
726        Self {
727            senders,
728            num_nones,
729            sender_idx: 0,
730            num_timeout: 0,
731            num_full: 0,
732        }
733    }
734
735    /// Finds a partition and tries to send the batch to the partition.
736    /// Returns None if it sends successfully.
737    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
738        for _ in 0..self.senders.len() {
739            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
740
741            let sender_idx = self.fetch_add_sender_idx();
742            let Some(sender) = &self.senders[sender_idx] else {
743                continue;
744            };
745
746            match sender.try_send(Ok(batch)) {
747                Ok(()) => return Ok(None),
748                Err(TrySendError::Full(res)) => {
749                    self.num_full += 1;
750                    // Safety: we send Ok.
751                    batch = res.unwrap();
752                }
753                Err(TrySendError::Closed(res)) => {
754                    self.senders[sender_idx] = None;
755                    self.num_nones += 1;
756                    // Safety: we send Ok.
757                    batch = res.unwrap();
758                }
759            }
760        }
761
762        Ok(Some(batch))
763    }
764
765    /// Finds a partition and sends the batch to the partition.
766    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
767        // Sends the batch without blocking first.
768        match self.try_send_batch(batch)? {
769            Some(b) => {
770                // Unable to send batch to partition.
771                batch = b;
772            }
773            None => {
774                return Ok(());
775            }
776        }
777
778        loop {
779            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
780
781            let sender_idx = self.fetch_add_sender_idx();
782            let Some(sender) = &self.senders[sender_idx] else {
783                continue;
784            };
785            // Adds a timeout to avoid blocking indefinitely and sending
786            // the batch in a round-robin fashion when some partitions
787            // don't poll their inputs. This may happen if we have a
788            // node like sort merging. But it is rare when we are using SeriesScan.
789            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
790                Ok(()) => break,
791                Err(SendTimeoutError::Timeout(res)) => {
792                    self.num_timeout += 1;
793                    // Safety: we send Ok.
794                    batch = res.unwrap();
795                }
796                Err(SendTimeoutError::Closed(res)) => {
797                    self.senders[sender_idx] = None;
798                    self.num_nones += 1;
799                    // Safety: we send Ok.
800                    batch = res.unwrap();
801                }
802            }
803        }
804
805        Ok(())
806    }
807
808    async fn send_error(&self, error: Error) {
809        let error = Arc::new(error);
810        for sender in self.senders.iter().flatten() {
811            let result = Err(error.clone()).context(ScanSeriesSnafu);
812            let _ = sender.send(result).await;
813        }
814    }
815
816    fn fetch_add_sender_idx(&mut self) -> usize {
817        let sender_idx = self.sender_idx;
818        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
819        sender_idx
820    }
821}
822
823fn new_partition_metrics(
824    stream_ctx: &StreamContext,
825    explain_verbose: bool,
826    metrics_set: &ExecutionPlanMetricsSet,
827    partition: usize,
828    metrics_list: &PartitionMetricsList,
829) -> PartitionMetrics {
830    let metrics = PartitionMetrics::new(
831        stream_ctx.input.mapper.metadata().region_id,
832        partition,
833        "SeriesScan",
834        stream_ctx.query_start,
835        explain_verbose,
836        metrics_set,
837    );
838
839    metrics_list.set(partition, metrics.clone());
840    metrics
841}
842
843/// A divider to split flat record batches by time series.
844///
845/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
846/// However, a [FlatSeriesBatch] may contain rows from multiple series.
847#[derive(Default)]
848struct FlatSeriesBatchDivider {
849    buffer: FlatSeriesBatch,
850}
851
852impl FlatSeriesBatchDivider {
853    /// Pushes a record batch into the divider.
854    ///
855    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
856    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
857        // If buffer is empty
858        if self.buffer.batches.is_empty() {
859            self.buffer.batches.push(batch);
860            return None;
861        }
862
863        // Gets the primary key column from the incoming batch.
864        let pk_column_idx = primary_key_column_index(batch.num_columns());
865        let batch_pk_column = batch.column(pk_column_idx);
866        let batch_pk_array = batch_pk_column
867            .as_any()
868            .downcast_ref::<PrimaryKeyArray>()
869            .unwrap();
870        let batch_pk_values = batch_pk_array
871            .values()
872            .as_any()
873            .downcast_ref::<BinaryArray>()
874            .unwrap();
875        // Gets the last primary key of the incoming batch.
876        let batch_last_pk =
877            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
878        // Gets the last primary key of the buffer.
879        // Safety: the buffer is not empty.
880        let buffer_last_batch = self.buffer.batches.last().unwrap();
881        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
882        let buffer_pk_array = buffer_pk_column
883            .as_any()
884            .downcast_ref::<PrimaryKeyArray>()
885            .unwrap();
886        let buffer_pk_values = buffer_pk_array
887            .values()
888            .as_any()
889            .downcast_ref::<BinaryArray>()
890            .unwrap();
891        let buffer_last_pk =
892            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
893
894        // If last primary key in the batch is the same as last primary key in the buffer.
895        if batch_last_pk == buffer_last_pk {
896            self.buffer.batches.push(batch);
897            return None;
898        }
899        // Otherwise, the batch must have a different primary key, we find the first offset of the
900        // changed primary key.
901        let batch_pk_keys = batch_pk_array.keys();
902        let pk_indices = batch_pk_keys.values();
903        let mut change_offset = 0;
904        for (i, &key) in pk_indices.iter().enumerate() {
905            let batch_pk = batch_pk_values.value(key as usize);
906
907            if buffer_last_pk != batch_pk {
908                change_offset = i;
909                break;
910            }
911        }
912
913        // Splits the batch at the change offset
914        let (first_part, remaining_part) = if change_offset > 0 {
915            let first_part = batch.slice(0, change_offset);
916            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
917            (Some(first_part), Some(remaining_part))
918        } else {
919            (None, Some(batch))
920        };
921
922        // Creates the result from current buffer + first part of new batch
923        let mut result = std::mem::take(&mut self.buffer);
924        if let Some(first_part) = first_part {
925            result.batches.push(first_part);
926        }
927
928        // Pushes remaining part to the buffer if it exists
929        if let Some(remaining_part) = remaining_part {
930            self.buffer.batches.push(remaining_part);
931        }
932
933        Some(result)
934    }
935
936    /// Returns the final [FlatSeriesBatch].
937    fn finish(&mut self) -> Option<FlatSeriesBatch> {
938        if self.buffer.batches.is_empty() {
939            None
940        } else {
941            Some(std::mem::take(&mut self.buffer))
942        }
943    }
944}
945
946/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
947fn primary_key_at<'a>(
948    primary_key: &PrimaryKeyArray,
949    primary_key_values: &'a BinaryArray,
950    index: usize,
951) -> &'a [u8] {
952    let key = primary_key.keys().value(index);
953    primary_key_values.value(key as usize)
954}
955
956#[cfg(test)]
957mod tests {
958    use std::sync::Arc;
959
960    use api::v1::OpType;
961    use datatypes::arrow::array::{
962        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
963        TimestampMillisecondArray, UInt8Array, UInt64Array,
964    };
965    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
966    use datatypes::arrow::record_batch::RecordBatch;
967
968    use super::*;
969
970    fn new_test_record_batch(
971        primary_keys: &[&[u8]],
972        timestamps: &[i64],
973        sequences: &[u64],
974        op_types: &[OpType],
975        fields: &[u64],
976    ) -> RecordBatch {
977        let num_rows = timestamps.len();
978        debug_assert_eq!(sequences.len(), num_rows);
979        debug_assert_eq!(op_types.len(), num_rows);
980        debug_assert_eq!(fields.len(), num_rows);
981        debug_assert_eq!(primary_keys.len(), num_rows);
982
983        let columns: Vec<ArrayRef> = vec![
984            build_test_pk_string_dict_array(primary_keys),
985            Arc::new(Int64Array::from_iter(
986                fields.iter().map(|v| Some(*v as i64)),
987            )),
988            Arc::new(TimestampMillisecondArray::from_iter_values(
989                timestamps.iter().copied(),
990            )),
991            build_test_pk_array(primary_keys),
992            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
993            Arc::new(UInt8Array::from_iter_values(
994                op_types.iter().map(|v| *v as u8),
995            )),
996        ];
997
998        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
999    }
1000
1001    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
1002        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
1003        for &pk in primary_keys {
1004            let pk_str = std::str::from_utf8(pk).unwrap();
1005            builder.append(pk_str).unwrap();
1006        }
1007        Arc::new(builder.finish())
1008    }
1009
1010    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1011        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1012        for &pk in primary_keys {
1013            builder.append(pk).unwrap();
1014        }
1015        Arc::new(builder.finish())
1016    }
1017
1018    fn build_test_flat_schema() -> SchemaRef {
1019        let fields = vec![
1020            Field::new(
1021                "k0",
1022                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1023                false,
1024            ),
1025            Field::new("field0", DataType::Int64, true),
1026            Field::new(
1027                "ts",
1028                DataType::Timestamp(TimeUnit::Millisecond, None),
1029                false,
1030            ),
1031            Field::new(
1032                "__primary_key",
1033                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1034                false,
1035            ),
1036            Field::new("__sequence", DataType::UInt64, false),
1037            Field::new("__op_type", DataType::UInt8, false),
1038        ];
1039        Arc::new(Schema::new(fields))
1040    }
1041
1042    #[test]
1043    fn test_empty_buffer_first_push() {
1044        let mut divider = FlatSeriesBatchDivider::default();
1045        let result = divider.finish();
1046        assert!(result.is_none());
1047
1048        let mut divider = FlatSeriesBatchDivider::default();
1049        let batch = new_test_record_batch(
1050            &[b"series1", b"series1"],
1051            &[1000, 2000],
1052            &[1, 2],
1053            &[OpType::Put, OpType::Put],
1054            &[10, 20],
1055        );
1056        let result = divider.push(batch);
1057        assert!(result.is_none());
1058        assert_eq!(divider.buffer.batches.len(), 1);
1059    }
1060
1061    #[test]
1062    fn test_same_series_accumulation() {
1063        let mut divider = FlatSeriesBatchDivider::default();
1064
1065        let batch1 = new_test_record_batch(
1066            &[b"series1", b"series1"],
1067            &[1000, 2000],
1068            &[1, 2],
1069            &[OpType::Put, OpType::Put],
1070            &[10, 20],
1071        );
1072
1073        let batch2 = new_test_record_batch(
1074            &[b"series1", b"series1"],
1075            &[3000, 4000],
1076            &[3, 4],
1077            &[OpType::Put, OpType::Put],
1078            &[30, 40],
1079        );
1080
1081        divider.push(batch1);
1082        let result = divider.push(batch2);
1083        assert!(result.is_none());
1084        let series_batch = divider.finish().unwrap();
1085        assert_eq!(series_batch.batches.len(), 2);
1086    }
1087
1088    #[test]
1089    fn test_series_boundary_detection() {
1090        let mut divider = FlatSeriesBatchDivider::default();
1091
1092        let batch1 = new_test_record_batch(
1093            &[b"series1", b"series1"],
1094            &[1000, 2000],
1095            &[1, 2],
1096            &[OpType::Put, OpType::Put],
1097            &[10, 20],
1098        );
1099
1100        let batch2 = new_test_record_batch(
1101            &[b"series2", b"series2"],
1102            &[3000, 4000],
1103            &[3, 4],
1104            &[OpType::Put, OpType::Put],
1105            &[30, 40],
1106        );
1107
1108        divider.push(batch1);
1109        let series_batch = divider.push(batch2).unwrap();
1110        assert_eq!(series_batch.batches.len(), 1);
1111
1112        assert_eq!(divider.buffer.batches.len(), 1);
1113    }
1114
1115    #[test]
1116    fn test_series_boundary_within_batch() {
1117        let mut divider = FlatSeriesBatchDivider::default();
1118
1119        let batch1 = new_test_record_batch(
1120            &[b"series1", b"series1"],
1121            &[1000, 2000],
1122            &[1, 2],
1123            &[OpType::Put, OpType::Put],
1124            &[10, 20],
1125        );
1126
1127        let batch2 = new_test_record_batch(
1128            &[b"series1", b"series2"],
1129            &[3000, 4000],
1130            &[3, 4],
1131            &[OpType::Put, OpType::Put],
1132            &[30, 40],
1133        );
1134
1135        divider.push(batch1);
1136        let series_batch = divider.push(batch2).unwrap();
1137        assert_eq!(series_batch.batches.len(), 2);
1138        assert_eq!(series_batch.batches[0].num_rows(), 2);
1139        assert_eq!(series_batch.batches[1].num_rows(), 1);
1140
1141        assert_eq!(divider.buffer.batches.len(), 1);
1142        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1143    }
1144
1145    #[test]
1146    fn test_series_splitting() {
1147        let mut divider = FlatSeriesBatchDivider::default();
1148
1149        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1150
1151        let batch2 = new_test_record_batch(
1152            &[b"series1", b"series2", b"series2", b"series3"],
1153            &[2000, 3000, 4000, 5000],
1154            &[2, 3, 4, 5],
1155            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1156            &[20, 30, 40, 50],
1157        );
1158
1159        divider.push(batch1);
1160        let series_batch = divider.push(batch2).unwrap();
1161        assert_eq!(series_batch.batches.len(), 2);
1162
1163        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1164        assert_eq!(total_rows, 2);
1165
1166        let final_batch = divider.finish().unwrap();
1167        assert_eq!(final_batch.batches.len(), 1);
1168        assert_eq!(final_batch.batches[0].num_rows(), 3);
1169    }
1170}