mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::array::BinaryArray;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::SchemaRef;
30use futures::{StreamExt, TryStreamExt};
31use smallvec::{SmallVec, smallvec};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
36};
37use tokio::sync::Semaphore;
38use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
39use tokio::sync::mpsc::{self, Receiver, Sender};
40
41use crate::error::{
42    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
43    ScanSeriesSnafu, TooManyFilesToReadSnafu,
44};
45use crate::read::range::RangeBuilderList;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
48use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{Batch, ScannerMetrics};
51use crate::sst::parquet::flat_format::primary_key_column_index;
52use crate::sst::parquet::format::PrimaryKeyArray;
53
54/// Timeout to send a batch to a sender.
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
56
57/// List of receivers.
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
59
60/// Scans a region and returns sorted rows of a series in the same partition.
61///
62/// The output order is always order by `(primary key, time index)` inside every
63/// partition.
64/// Always returns the same series (primary key) to the same partition.
65pub struct SeriesScan {
66    /// Properties of the scanner.
67    properties: ScannerProperties,
68    /// Context of streams.
69    stream_ctx: Arc<StreamContext>,
70    /// Receivers of each partition.
71    receivers: Mutex<ReceiverList>,
72    /// Metrics for each partition.
73    /// The scanner only sets in query and keeps it empty during compaction.
74    metrics_list: Arc<PartitionMetricsList>,
75}
76
77impl SeriesScan {
78    /// Creates a new [SeriesScan].
79    pub(crate) fn new(input: ScanInput) -> Self {
80        let mut properties = ScannerProperties::default()
81            .with_append_mode(input.append_mode)
82            .with_total_rows(input.total_rows());
83        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
84        properties.partitions = vec![stream_ctx.partition_ranges()];
85
86        Self {
87            properties,
88            stream_ctx,
89            receivers: Mutex::new(Vec::new()),
90            metrics_list: Arc::new(PartitionMetricsList::default()),
91        }
92    }
93
94    fn scan_partition_impl(
95        &self,
96        ctx: &QueryScanContext,
97        metrics_set: &ExecutionPlanMetricsSet,
98        partition: usize,
99    ) -> Result<SendableRecordBatchStream> {
100        let metrics = new_partition_metrics(
101            &self.stream_ctx,
102            ctx.explain_verbose,
103            metrics_set,
104            partition,
105            &self.metrics_list,
106        );
107
108        let batch_stream =
109            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
110
111        let input = &self.stream_ctx.input;
112        let record_batch_stream = ConvertBatchStream::new(
113            batch_stream,
114            input.mapper.clone(),
115            input.cache_strategy.clone(),
116            metrics,
117        );
118
119        Ok(Box::pin(RecordBatchStreamWrapper::new(
120            input.mapper.output_schema(),
121            Box::pin(record_batch_stream),
122        )))
123    }
124
125    fn scan_batch_in_partition(
126        &self,
127        ctx: &QueryScanContext,
128        partition: usize,
129        part_metrics: PartitionMetrics,
130        metrics_set: &ExecutionPlanMetricsSet,
131    ) -> Result<ScanBatchStream> {
132        if ctx.explain_verbose {
133            common_telemetry::info!(
134                "SeriesScan partition {}, region_id: {}",
135                partition,
136                self.stream_ctx.input.region_metadata().region_id
137            );
138        }
139
140        ensure!(
141            partition < self.properties.num_partitions(),
142            PartitionOutOfRangeSnafu {
143                given: partition,
144                all: self.properties.num_partitions(),
145            }
146        );
147
148        self.maybe_start_distributor(metrics_set, &self.metrics_list);
149
150        let mut receiver = self.take_receiver(partition)?;
151        let stream = try_stream! {
152            part_metrics.on_first_poll();
153
154            let mut fetch_start = Instant::now();
155            while let Some(series) = receiver.recv().await {
156                let series = series?;
157
158                let mut metrics = ScannerMetrics::default();
159                metrics.scan_cost += fetch_start.elapsed();
160                fetch_start = Instant::now();
161
162                metrics.num_batches += series.num_batches();
163                metrics.num_rows += series.num_rows();
164
165                let yield_start = Instant::now();
166                yield ScanBatch::Series(series);
167                metrics.yield_cost += yield_start.elapsed();
168
169                part_metrics.merge_metrics(&metrics);
170            }
171
172            part_metrics.on_finish();
173        };
174        Ok(Box::pin(stream))
175    }
176
177    /// Takes the receiver for the partition.
178    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
179        let mut rx_list = self.receivers.lock().unwrap();
180        rx_list[partition]
181            .take()
182            .context(ScanMultiTimesSnafu { partition })
183    }
184
185    /// Starts the distributor if the receiver list is empty.
186    fn maybe_start_distributor(
187        &self,
188        metrics_set: &ExecutionPlanMetricsSet,
189        metrics_list: &Arc<PartitionMetricsList>,
190    ) {
191        let mut rx_list = self.receivers.lock().unwrap();
192        if !rx_list.is_empty() {
193            return;
194        }
195
196        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
197        let mut distributor = SeriesDistributor {
198            stream_ctx: self.stream_ctx.clone(),
199            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
200            partitions: self.properties.partitions.clone(),
201            senders,
202            metrics_set: metrics_set.clone(),
203            metrics_list: metrics_list.clone(),
204        };
205        common_runtime::spawn_global(async move {
206            distributor.execute().await;
207        });
208
209        *rx_list = receivers;
210    }
211
212    /// Scans the region and returns a stream.
213    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
214        let part_num = self.properties.num_partitions();
215        let metrics_set = ExecutionPlanMetricsSet::default();
216        let streams = (0..part_num)
217            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
218            .collect::<Result<Vec<_>, BoxedError>>()?;
219        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
220        Ok(Box::pin(chained_stream))
221    }
222
223    /// Scan [`Batch`] in all partitions one by one.
224    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
225        let metrics_set = ExecutionPlanMetricsSet::new();
226
227        let streams = (0..self.properties.partitions.len())
228            .map(|partition| {
229                let metrics = new_partition_metrics(
230                    &self.stream_ctx,
231                    false,
232                    &metrics_set,
233                    partition,
234                    &self.metrics_list,
235                );
236
237                self.scan_batch_in_partition(
238                    &QueryScanContext::default(),
239                    partition,
240                    metrics,
241                    &metrics_set,
242                )
243            })
244            .collect::<Result<Vec<_>>>()?;
245
246        Ok(Box::pin(futures::stream::iter(streams).flatten()))
247    }
248
249    /// Checks resource limit for the scanner.
250    pub(crate) fn check_scan_limit(&self) -> Result<()> {
251        // Sum the total number of files across all partitions
252        let total_files: usize = self
253            .properties
254            .partitions
255            .iter()
256            .flat_map(|partition| partition.iter())
257            .map(|part_range| {
258                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
259                range_meta.indices.len()
260            })
261            .sum();
262
263        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
264        if total_files > max_concurrent_files {
265            return TooManyFilesToReadSnafu {
266                actual: total_files,
267                max: max_concurrent_files,
268            }
269            .fail();
270        }
271
272        Ok(())
273    }
274}
275
276fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
277    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
278        .map(|_| {
279            let (sender, receiver) = mpsc::channel(1);
280            (Some(sender), Some(receiver))
281        })
282        .unzip();
283    (SenderList::new(senders), receivers)
284}
285
286impl RegionScanner for SeriesScan {
287    fn name(&self) -> &str {
288        "SeriesScan"
289    }
290
291    fn properties(&self) -> &ScannerProperties {
292        &self.properties
293    }
294
295    fn schema(&self) -> SchemaRef {
296        self.stream_ctx.input.mapper.output_schema()
297    }
298
299    fn metadata(&self) -> RegionMetadataRef {
300        self.stream_ctx.input.mapper.metadata().clone()
301    }
302
303    fn scan_partition(
304        &self,
305        ctx: &QueryScanContext,
306        metrics_set: &ExecutionPlanMetricsSet,
307        partition: usize,
308    ) -> Result<SendableRecordBatchStream, BoxedError> {
309        self.scan_partition_impl(ctx, metrics_set, partition)
310            .map_err(BoxedError::new)
311    }
312
313    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
314        self.properties.prepare(request);
315
316        self.check_scan_limit().map_err(BoxedError::new)?;
317
318        Ok(())
319    }
320
321    fn has_predicate_without_region(&self) -> bool {
322        let predicate = self
323            .stream_ctx
324            .input
325            .predicate_group()
326            .predicate_without_region();
327        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
328    }
329
330    fn set_logical_region(&mut self, logical_region: bool) {
331        self.properties.set_logical_region(logical_region);
332    }
333}
334
335impl DisplayAs for SeriesScan {
336    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
337        write!(
338            f,
339            "SeriesScan: region={}, ",
340            self.stream_ctx.input.mapper.metadata().region_id
341        )?;
342        match t {
343            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
344                self.stream_ctx.format_for_explain(false, f)
345            }
346            DisplayFormatType::Verbose => {
347                self.stream_ctx.format_for_explain(true, f)?;
348                self.metrics_list.format_verbose_metrics(f)
349            }
350        }
351    }
352}
353
354impl fmt::Debug for SeriesScan {
355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356        f.debug_struct("SeriesScan")
357            .field("num_ranges", &self.stream_ctx.ranges.len())
358            .finish()
359    }
360}
361
362#[cfg(test)]
363impl SeriesScan {
364    /// Returns the input.
365    pub(crate) fn input(&self) -> &ScanInput {
366        &self.stream_ctx.input
367    }
368}
369
370/// The distributor scans series and distributes them to different partitions.
371struct SeriesDistributor {
372    /// Context for the scan stream.
373    stream_ctx: Arc<StreamContext>,
374    /// Optional semaphore for limiting the number of concurrent scans.
375    semaphore: Option<Arc<Semaphore>>,
376    /// Partition ranges to scan.
377    partitions: Vec<Vec<PartitionRange>>,
378    /// Senders of all partitions.
379    senders: SenderList,
380    /// Metrics set to report.
381    /// The distributor report the metrics as an additional partition.
382    /// This may double the scan cost of the [SeriesScan] metrics. We can
383    /// get per-partition metrics in verbose mode to see the metrics of the
384    /// distributor.
385    metrics_set: ExecutionPlanMetricsSet,
386    metrics_list: Arc<PartitionMetricsList>,
387}
388
389impl SeriesDistributor {
390    /// Executes the distributor.
391    async fn execute(&mut self) {
392        let result = if self.stream_ctx.input.flat_format {
393            self.scan_partitions_flat().await
394        } else {
395            self.scan_partitions().await
396        };
397
398        if let Err(e) = result {
399            self.senders.send_error(e).await;
400        }
401    }
402
403    /// Scans all parts in flat format using FlatSeriesBatchDivider.
404    async fn scan_partitions_flat(&mut self) -> Result<()> {
405        let part_metrics = new_partition_metrics(
406            &self.stream_ctx,
407            false,
408            &self.metrics_set,
409            self.partitions.len(),
410            &self.metrics_list,
411        );
412        part_metrics.on_first_poll();
413
414        let range_builder_list = Arc::new(RangeBuilderList::new(
415            self.stream_ctx.input.num_memtables(),
416            self.stream_ctx.input.num_files(),
417        ));
418        // Scans all parts.
419        let mut sources = Vec::with_capacity(self.partitions.len());
420        for partition in &self.partitions {
421            sources.reserve(partition.len());
422            for part_range in partition {
423                build_flat_sources(
424                    &self.stream_ctx,
425                    part_range,
426                    false,
427                    &part_metrics,
428                    range_builder_list.clone(),
429                    &mut sources,
430                    self.semaphore.clone(),
431                )
432                .await?;
433            }
434        }
435
436        // Builds a flat reader that merge sources from all parts.
437        let mut reader = SeqScan::build_flat_reader_from_sources(
438            &self.stream_ctx,
439            sources,
440            self.semaphore.clone(),
441        )
442        .await?;
443        let mut metrics = SeriesDistributorMetrics::default();
444        let mut fetch_start = Instant::now();
445
446        let mut divider = FlatSeriesBatchDivider::default();
447        while let Some(record_batch) = reader.try_next().await? {
448            metrics.scan_cost += fetch_start.elapsed();
449            metrics.num_batches += 1;
450            metrics.num_rows += record_batch.num_rows();
451
452            debug_assert!(record_batch.num_rows() > 0);
453            if record_batch.num_rows() == 0 {
454                fetch_start = Instant::now();
455                continue;
456            }
457
458            // Use divider to split series
459            if let Some(series_batch) = divider.push(record_batch) {
460                let yield_start = Instant::now();
461                self.senders
462                    .send_batch(SeriesBatch::Flat(series_batch))
463                    .await?;
464                metrics.yield_cost += yield_start.elapsed();
465            }
466            fetch_start = Instant::now();
467        }
468
469        // Send any remaining batch in the divider
470        if let Some(series_batch) = divider.finish() {
471            let yield_start = Instant::now();
472            self.senders
473                .send_batch(SeriesBatch::Flat(series_batch))
474                .await?;
475            metrics.yield_cost += yield_start.elapsed();
476        }
477
478        metrics.scan_cost += fetch_start.elapsed();
479        metrics.num_series_send_timeout = self.senders.num_timeout;
480        metrics.num_series_send_full = self.senders.num_full;
481        part_metrics.set_distributor_metrics(&metrics);
482
483        part_metrics.on_finish();
484
485        Ok(())
486    }
487
488    /// Scans all parts.
489    async fn scan_partitions(&mut self) -> Result<()> {
490        let part_metrics = new_partition_metrics(
491            &self.stream_ctx,
492            false,
493            &self.metrics_set,
494            self.partitions.len(),
495            &self.metrics_list,
496        );
497        part_metrics.on_first_poll();
498
499        let range_builder_list = Arc::new(RangeBuilderList::new(
500            self.stream_ctx.input.num_memtables(),
501            self.stream_ctx.input.num_files(),
502        ));
503        // Scans all parts.
504        let mut sources = Vec::with_capacity(self.partitions.len());
505        for partition in &self.partitions {
506            sources.reserve(partition.len());
507            for part_range in partition {
508                build_sources(
509                    &self.stream_ctx,
510                    part_range,
511                    false,
512                    &part_metrics,
513                    range_builder_list.clone(),
514                    &mut sources,
515                    self.semaphore.clone(),
516                )
517                .await?;
518            }
519        }
520
521        // Builds a reader that merge sources from all parts.
522        let mut reader =
523            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
524                .await?;
525        let mut metrics = SeriesDistributorMetrics::default();
526        let mut fetch_start = Instant::now();
527
528        let mut current_series = PrimaryKeySeriesBatch::default();
529        while let Some(batch) = reader.next_batch().await? {
530            metrics.scan_cost += fetch_start.elapsed();
531            metrics.num_batches += 1;
532            metrics.num_rows += batch.num_rows();
533
534            debug_assert!(!batch.is_empty());
535            if batch.is_empty() {
536                fetch_start = Instant::now();
537                continue;
538            }
539
540            let Some(last_key) = current_series.current_key() else {
541                current_series.push(batch);
542                fetch_start = Instant::now();
543                continue;
544            };
545
546            if last_key == batch.primary_key() {
547                current_series.push(batch);
548                fetch_start = Instant::now();
549                continue;
550            }
551
552            // We find a new series, send the current one.
553            let to_send =
554                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
555            let yield_start = Instant::now();
556            self.senders
557                .send_batch(SeriesBatch::PrimaryKey(to_send))
558                .await?;
559            metrics.yield_cost += yield_start.elapsed();
560            fetch_start = Instant::now();
561        }
562
563        if !current_series.is_empty() {
564            let yield_start = Instant::now();
565            self.senders
566                .send_batch(SeriesBatch::PrimaryKey(current_series))
567                .await?;
568            metrics.yield_cost += yield_start.elapsed();
569        }
570
571        metrics.scan_cost += fetch_start.elapsed();
572        metrics.num_series_send_timeout = self.senders.num_timeout;
573        metrics.num_series_send_full = self.senders.num_full;
574        part_metrics.set_distributor_metrics(&metrics);
575
576        part_metrics.on_finish();
577
578        Ok(())
579    }
580}
581
582/// Batches of the same series in primary key format.
583#[derive(Default, Debug)]
584pub struct PrimaryKeySeriesBatch {
585    pub batches: SmallVec<[Batch; 4]>,
586}
587
588impl PrimaryKeySeriesBatch {
589    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
590    fn single(batch: Batch) -> Self {
591        Self {
592            batches: smallvec![batch],
593        }
594    }
595
596    fn current_key(&self) -> Option<&[u8]> {
597        self.batches.first().map(|batch| batch.primary_key())
598    }
599
600    fn push(&mut self, batch: Batch) {
601        self.batches.push(batch);
602    }
603
604    /// Returns true if there is no batch.
605    fn is_empty(&self) -> bool {
606        self.batches.is_empty()
607    }
608}
609
610/// Batches of the same series.
611#[derive(Debug)]
612pub enum SeriesBatch {
613    PrimaryKey(PrimaryKeySeriesBatch),
614    Flat(FlatSeriesBatch),
615}
616
617impl SeriesBatch {
618    /// Returns the number of batches.
619    pub fn num_batches(&self) -> usize {
620        match self {
621            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
622            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
623        }
624    }
625
626    /// Returns the total number of rows across all batches.
627    pub fn num_rows(&self) -> usize {
628        match self {
629            SeriesBatch::PrimaryKey(primary_key_batch) => {
630                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
631            }
632            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
633        }
634    }
635}
636
637/// Batches of the same series in flat format.
638#[derive(Default, Debug)]
639pub struct FlatSeriesBatch {
640    pub batches: SmallVec<[RecordBatch; 4]>,
641}
642
643/// List of senders.
644struct SenderList {
645    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
646    /// Number of None senders.
647    num_nones: usize,
648    /// Index of the current partition to send.
649    sender_idx: usize,
650    /// Number of timeout.
651    num_timeout: usize,
652    /// Number of full senders.
653    num_full: usize,
654}
655
656impl SenderList {
657    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
658        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
659        Self {
660            senders,
661            num_nones,
662            sender_idx: 0,
663            num_timeout: 0,
664            num_full: 0,
665        }
666    }
667
668    /// Finds a partition and tries to send the batch to the partition.
669    /// Returns None if it sends successfully.
670    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
671        for _ in 0..self.senders.len() {
672            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
673
674            let sender_idx = self.fetch_add_sender_idx();
675            let Some(sender) = &self.senders[sender_idx] else {
676                continue;
677            };
678
679            match sender.try_send(Ok(batch)) {
680                Ok(()) => return Ok(None),
681                Err(TrySendError::Full(res)) => {
682                    self.num_full += 1;
683                    // Safety: we send Ok.
684                    batch = res.unwrap();
685                }
686                Err(TrySendError::Closed(res)) => {
687                    self.senders[sender_idx] = None;
688                    self.num_nones += 1;
689                    // Safety: we send Ok.
690                    batch = res.unwrap();
691                }
692            }
693        }
694
695        Ok(Some(batch))
696    }
697
698    /// Finds a partition and sends the batch to the partition.
699    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
700        // Sends the batch without blocking first.
701        match self.try_send_batch(batch)? {
702            Some(b) => {
703                // Unable to send batch to partition.
704                batch = b;
705            }
706            None => {
707                return Ok(());
708            }
709        }
710
711        loop {
712            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
713
714            let sender_idx = self.fetch_add_sender_idx();
715            let Some(sender) = &self.senders[sender_idx] else {
716                continue;
717            };
718            // Adds a timeout to avoid blocking indefinitely and sending
719            // the batch in a round-robin fashion when some partitions
720            // don't poll their inputs. This may happen if we have a
721            // node like sort merging. But it is rare when we are using SeriesScan.
722            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
723                Ok(()) => break,
724                Err(SendTimeoutError::Timeout(res)) => {
725                    self.num_timeout += 1;
726                    // Safety: we send Ok.
727                    batch = res.unwrap();
728                }
729                Err(SendTimeoutError::Closed(res)) => {
730                    self.senders[sender_idx] = None;
731                    self.num_nones += 1;
732                    // Safety: we send Ok.
733                    batch = res.unwrap();
734                }
735            }
736        }
737
738        Ok(())
739    }
740
741    async fn send_error(&self, error: Error) {
742        let error = Arc::new(error);
743        for sender in self.senders.iter().flatten() {
744            let result = Err(error.clone()).context(ScanSeriesSnafu);
745            let _ = sender.send(result).await;
746        }
747    }
748
749    fn fetch_add_sender_idx(&mut self) -> usize {
750        let sender_idx = self.sender_idx;
751        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
752        sender_idx
753    }
754}
755
756fn new_partition_metrics(
757    stream_ctx: &StreamContext,
758    explain_verbose: bool,
759    metrics_set: &ExecutionPlanMetricsSet,
760    partition: usize,
761    metrics_list: &PartitionMetricsList,
762) -> PartitionMetrics {
763    let metrics = PartitionMetrics::new(
764        stream_ctx.input.mapper.metadata().region_id,
765        partition,
766        "SeriesScan",
767        stream_ctx.query_start,
768        explain_verbose,
769        metrics_set,
770    );
771
772    metrics_list.set(partition, metrics.clone());
773    metrics
774}
775
776/// A divider to split flat record batches by time series.
777///
778/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
779/// However, a [FlatSeriesBatch] may contain rows from multiple series.
780#[derive(Default)]
781struct FlatSeriesBatchDivider {
782    buffer: FlatSeriesBatch,
783}
784
785impl FlatSeriesBatchDivider {
786    /// Pushes a record batch into the divider.
787    ///
788    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
789    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
790        // If buffer is empty
791        if self.buffer.batches.is_empty() {
792            self.buffer.batches.push(batch);
793            return None;
794        }
795
796        // Gets the primary key column from the incoming batch.
797        let pk_column_idx = primary_key_column_index(batch.num_columns());
798        let batch_pk_column = batch.column(pk_column_idx);
799        let batch_pk_array = batch_pk_column
800            .as_any()
801            .downcast_ref::<PrimaryKeyArray>()
802            .unwrap();
803        let batch_pk_values = batch_pk_array
804            .values()
805            .as_any()
806            .downcast_ref::<BinaryArray>()
807            .unwrap();
808        // Gets the last primary key of the incoming batch.
809        let batch_last_pk =
810            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
811        // Gets the last primary key of the buffer.
812        // Safety: the buffer is not empty.
813        let buffer_last_batch = self.buffer.batches.last().unwrap();
814        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
815        let buffer_pk_array = buffer_pk_column
816            .as_any()
817            .downcast_ref::<PrimaryKeyArray>()
818            .unwrap();
819        let buffer_pk_values = buffer_pk_array
820            .values()
821            .as_any()
822            .downcast_ref::<BinaryArray>()
823            .unwrap();
824        let buffer_last_pk =
825            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
826
827        // If last primary key in the batch is the same as last primary key in the buffer.
828        if batch_last_pk == buffer_last_pk {
829            self.buffer.batches.push(batch);
830            return None;
831        }
832        // Otherwise, the batch must have a different primary key, we find the first offset of the
833        // changed primary key.
834        let batch_pk_keys = batch_pk_array.keys();
835        let pk_indices = batch_pk_keys.values();
836        let mut change_offset = 0;
837        for (i, &key) in pk_indices.iter().enumerate() {
838            let batch_pk = batch_pk_values.value(key as usize);
839
840            if buffer_last_pk != batch_pk {
841                change_offset = i;
842                break;
843            }
844        }
845
846        // Splits the batch at the change offset
847        let (first_part, remaining_part) = if change_offset > 0 {
848            let first_part = batch.slice(0, change_offset);
849            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
850            (Some(first_part), Some(remaining_part))
851        } else {
852            (None, Some(batch))
853        };
854
855        // Creates the result from current buffer + first part of new batch
856        let mut result = std::mem::take(&mut self.buffer);
857        if let Some(first_part) = first_part {
858            result.batches.push(first_part);
859        }
860
861        // Pushes remaining part to the buffer if it exists
862        if let Some(remaining_part) = remaining_part {
863            self.buffer.batches.push(remaining_part);
864        }
865
866        Some(result)
867    }
868
869    /// Returns the final [FlatSeriesBatch].
870    fn finish(&mut self) -> Option<FlatSeriesBatch> {
871        if self.buffer.batches.is_empty() {
872            None
873        } else {
874            Some(std::mem::take(&mut self.buffer))
875        }
876    }
877}
878
879/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
880fn primary_key_at<'a>(
881    primary_key: &PrimaryKeyArray,
882    primary_key_values: &'a BinaryArray,
883    index: usize,
884) -> &'a [u8] {
885    let key = primary_key.keys().value(index);
886    primary_key_values.value(key as usize)
887}
888
889#[cfg(test)]
890mod tests {
891    use std::sync::Arc;
892
893    use api::v1::OpType;
894    use datatypes::arrow::array::{
895        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
896        TimestampMillisecondArray, UInt8Array, UInt64Array,
897    };
898    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
899    use datatypes::arrow::record_batch::RecordBatch;
900
901    use super::*;
902
903    fn new_test_record_batch(
904        primary_keys: &[&[u8]],
905        timestamps: &[i64],
906        sequences: &[u64],
907        op_types: &[OpType],
908        fields: &[u64],
909    ) -> RecordBatch {
910        let num_rows = timestamps.len();
911        debug_assert_eq!(sequences.len(), num_rows);
912        debug_assert_eq!(op_types.len(), num_rows);
913        debug_assert_eq!(fields.len(), num_rows);
914        debug_assert_eq!(primary_keys.len(), num_rows);
915
916        let columns: Vec<ArrayRef> = vec![
917            build_test_pk_string_dict_array(primary_keys),
918            Arc::new(Int64Array::from_iter(
919                fields.iter().map(|v| Some(*v as i64)),
920            )),
921            Arc::new(TimestampMillisecondArray::from_iter_values(
922                timestamps.iter().copied(),
923            )),
924            build_test_pk_array(primary_keys),
925            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
926            Arc::new(UInt8Array::from_iter_values(
927                op_types.iter().map(|v| *v as u8),
928            )),
929        ];
930
931        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
932    }
933
934    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
935        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
936        for &pk in primary_keys {
937            let pk_str = std::str::from_utf8(pk).unwrap();
938            builder.append(pk_str).unwrap();
939        }
940        Arc::new(builder.finish())
941    }
942
943    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
944        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
945        for &pk in primary_keys {
946            builder.append(pk).unwrap();
947        }
948        Arc::new(builder.finish())
949    }
950
951    fn build_test_flat_schema() -> SchemaRef {
952        let fields = vec![
953            Field::new(
954                "k0",
955                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
956                false,
957            ),
958            Field::new("field0", DataType::Int64, true),
959            Field::new(
960                "ts",
961                DataType::Timestamp(TimeUnit::Millisecond, None),
962                false,
963            ),
964            Field::new(
965                "__primary_key",
966                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
967                false,
968            ),
969            Field::new("__sequence", DataType::UInt64, false),
970            Field::new("__op_type", DataType::UInt8, false),
971        ];
972        Arc::new(Schema::new(fields))
973    }
974
975    #[test]
976    fn test_empty_buffer_first_push() {
977        let mut divider = FlatSeriesBatchDivider::default();
978        let result = divider.finish();
979        assert!(result.is_none());
980
981        let mut divider = FlatSeriesBatchDivider::default();
982        let batch = new_test_record_batch(
983            &[b"series1", b"series1"],
984            &[1000, 2000],
985            &[1, 2],
986            &[OpType::Put, OpType::Put],
987            &[10, 20],
988        );
989        let result = divider.push(batch);
990        assert!(result.is_none());
991        assert_eq!(divider.buffer.batches.len(), 1);
992    }
993
994    #[test]
995    fn test_same_series_accumulation() {
996        let mut divider = FlatSeriesBatchDivider::default();
997
998        let batch1 = new_test_record_batch(
999            &[b"series1", b"series1"],
1000            &[1000, 2000],
1001            &[1, 2],
1002            &[OpType::Put, OpType::Put],
1003            &[10, 20],
1004        );
1005
1006        let batch2 = new_test_record_batch(
1007            &[b"series1", b"series1"],
1008            &[3000, 4000],
1009            &[3, 4],
1010            &[OpType::Put, OpType::Put],
1011            &[30, 40],
1012        );
1013
1014        divider.push(batch1);
1015        let result = divider.push(batch2);
1016        assert!(result.is_none());
1017        let series_batch = divider.finish().unwrap();
1018        assert_eq!(series_batch.batches.len(), 2);
1019    }
1020
1021    #[test]
1022    fn test_series_boundary_detection() {
1023        let mut divider = FlatSeriesBatchDivider::default();
1024
1025        let batch1 = new_test_record_batch(
1026            &[b"series1", b"series1"],
1027            &[1000, 2000],
1028            &[1, 2],
1029            &[OpType::Put, OpType::Put],
1030            &[10, 20],
1031        );
1032
1033        let batch2 = new_test_record_batch(
1034            &[b"series2", b"series2"],
1035            &[3000, 4000],
1036            &[3, 4],
1037            &[OpType::Put, OpType::Put],
1038            &[30, 40],
1039        );
1040
1041        divider.push(batch1);
1042        let series_batch = divider.push(batch2).unwrap();
1043        assert_eq!(series_batch.batches.len(), 1);
1044
1045        assert_eq!(divider.buffer.batches.len(), 1);
1046    }
1047
1048    #[test]
1049    fn test_series_boundary_within_batch() {
1050        let mut divider = FlatSeriesBatchDivider::default();
1051
1052        let batch1 = new_test_record_batch(
1053            &[b"series1", b"series1"],
1054            &[1000, 2000],
1055            &[1, 2],
1056            &[OpType::Put, OpType::Put],
1057            &[10, 20],
1058        );
1059
1060        let batch2 = new_test_record_batch(
1061            &[b"series1", b"series2"],
1062            &[3000, 4000],
1063            &[3, 4],
1064            &[OpType::Put, OpType::Put],
1065            &[30, 40],
1066        );
1067
1068        divider.push(batch1);
1069        let series_batch = divider.push(batch2).unwrap();
1070        assert_eq!(series_batch.batches.len(), 2);
1071        assert_eq!(series_batch.batches[0].num_rows(), 2);
1072        assert_eq!(series_batch.batches[1].num_rows(), 1);
1073
1074        assert_eq!(divider.buffer.batches.len(), 1);
1075        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1076    }
1077
1078    #[test]
1079    fn test_series_splitting() {
1080        let mut divider = FlatSeriesBatchDivider::default();
1081
1082        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1083
1084        let batch2 = new_test_record_batch(
1085            &[b"series1", b"series2", b"series2", b"series3"],
1086            &[2000, 3000, 4000, 5000],
1087            &[2, 3, 4, 5],
1088            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1089            &[20, 30, 40, 50],
1090        );
1091
1092        divider.push(batch1);
1093        let series_batch = divider.push(batch2).unwrap();
1094        assert_eq!(series_batch.batches.len(), 2);
1095
1096        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1097        assert_eq!(total_rows, 2);
1098
1099        let final_batch = divider.finish().unwrap();
1100        assert_eq!(final_batch.batches.len(), 1);
1101        assert_eq!(final_batch.batches[0].num_rows(), 3);
1102    }
1103}