mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::array::BinaryArray;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::SchemaRef;
30use futures::{StreamExt, TryStreamExt};
31use smallvec::{SmallVec, smallvec};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
36};
37use tokio::sync::Semaphore;
38use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
39use tokio::sync::mpsc::{self, Receiver, Sender};
40
41use crate::error::{
42    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
43    ScanSeriesSnafu, TooManyFilesToReadSnafu,
44};
45use crate::read::range::RangeBuilderList;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
48use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{Batch, ScannerMetrics};
51use crate::sst::parquet::flat_format::primary_key_column_index;
52use crate::sst::parquet::format::PrimaryKeyArray;
53
54/// Timeout to send a batch to a sender.
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
56
57/// List of receivers.
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
59
60/// Scans a region and returns sorted rows of a series in the same partition.
61///
62/// The output order is always order by `(primary key, time index)` inside every
63/// partition.
64/// Always returns the same series (primary key) to the same partition.
65pub struct SeriesScan {
66    /// Properties of the scanner.
67    properties: ScannerProperties,
68    /// Context of streams.
69    stream_ctx: Arc<StreamContext>,
70    /// Receivers of each partition.
71    receivers: Mutex<ReceiverList>,
72    /// Metrics for each partition.
73    /// The scanner only sets in query and keeps it empty during compaction.
74    metrics_list: Arc<PartitionMetricsList>,
75}
76
77impl SeriesScan {
78    /// Creates a new [SeriesScan].
79    pub(crate) fn new(input: ScanInput) -> Self {
80        let mut properties = ScannerProperties::default()
81            .with_append_mode(input.append_mode)
82            .with_total_rows(input.total_rows());
83        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
84        properties.partitions = vec![stream_ctx.partition_ranges()];
85
86        Self {
87            properties,
88            stream_ctx,
89            receivers: Mutex::new(Vec::new()),
90            metrics_list: Arc::new(PartitionMetricsList::default()),
91        }
92    }
93
94    fn scan_partition_impl(
95        &self,
96        ctx: &QueryScanContext,
97        metrics_set: &ExecutionPlanMetricsSet,
98        partition: usize,
99    ) -> Result<SendableRecordBatchStream> {
100        let metrics = new_partition_metrics(
101            &self.stream_ctx,
102            ctx.explain_verbose,
103            metrics_set,
104            partition,
105            &self.metrics_list,
106        );
107
108        let batch_stream =
109            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
110
111        let input = &self.stream_ctx.input;
112        let record_batch_stream = ConvertBatchStream::new(
113            batch_stream,
114            input.mapper.clone(),
115            input.cache_strategy.clone(),
116            metrics,
117        );
118
119        Ok(Box::pin(RecordBatchStreamWrapper::new(
120            input.mapper.output_schema(),
121            Box::pin(record_batch_stream),
122        )))
123    }
124
125    fn scan_batch_in_partition(
126        &self,
127        ctx: &QueryScanContext,
128        partition: usize,
129        part_metrics: PartitionMetrics,
130        metrics_set: &ExecutionPlanMetricsSet,
131    ) -> Result<ScanBatchStream> {
132        if ctx.explain_verbose {
133            common_telemetry::info!(
134                "SeriesScan partition {}, region_id: {}",
135                partition,
136                self.stream_ctx.input.region_metadata().region_id
137            );
138        }
139
140        ensure!(
141            partition < self.properties.num_partitions(),
142            PartitionOutOfRangeSnafu {
143                given: partition,
144                all: self.properties.num_partitions(),
145            }
146        );
147
148        self.maybe_start_distributor(metrics_set, &self.metrics_list);
149
150        let mut receiver = self.take_receiver(partition)?;
151        let stream = try_stream! {
152            part_metrics.on_first_poll();
153
154            let mut fetch_start = Instant::now();
155            while let Some(series) = receiver.recv().await {
156                let series = series?;
157
158                let mut metrics = ScannerMetrics::default();
159                metrics.scan_cost += fetch_start.elapsed();
160                fetch_start = Instant::now();
161
162                metrics.num_batches += series.num_batches();
163                metrics.num_rows += series.num_rows();
164
165                let yield_start = Instant::now();
166                yield ScanBatch::Series(series);
167                metrics.yield_cost += yield_start.elapsed();
168
169                part_metrics.merge_metrics(&metrics);
170            }
171
172            part_metrics.on_finish();
173        };
174        Ok(Box::pin(stream))
175    }
176
177    /// Takes the receiver for the partition.
178    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
179        let mut rx_list = self.receivers.lock().unwrap();
180        rx_list[partition]
181            .take()
182            .context(ScanMultiTimesSnafu { partition })
183    }
184
185    /// Starts the distributor if the receiver list is empty.
186    fn maybe_start_distributor(
187        &self,
188        metrics_set: &ExecutionPlanMetricsSet,
189        metrics_list: &Arc<PartitionMetricsList>,
190    ) {
191        let mut rx_list = self.receivers.lock().unwrap();
192        if !rx_list.is_empty() {
193            return;
194        }
195
196        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
197        let mut distributor = SeriesDistributor {
198            stream_ctx: self.stream_ctx.clone(),
199            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
200            partitions: self.properties.partitions.clone(),
201            senders,
202            metrics_set: metrics_set.clone(),
203            metrics_list: metrics_list.clone(),
204        };
205        common_runtime::spawn_global(async move {
206            distributor.execute().await;
207        });
208
209        *rx_list = receivers;
210    }
211
212    /// Scans the region and returns a stream.
213    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
214        let part_num = self.properties.num_partitions();
215        let metrics_set = ExecutionPlanMetricsSet::default();
216        let streams = (0..part_num)
217            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
218            .collect::<Result<Vec<_>, BoxedError>>()?;
219        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
220        Ok(Box::pin(chained_stream))
221    }
222
223    /// Scan [`Batch`] in all partitions one by one.
224    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
225        let metrics_set = ExecutionPlanMetricsSet::new();
226
227        let streams = (0..self.properties.partitions.len())
228            .map(|partition| {
229                let metrics = new_partition_metrics(
230                    &self.stream_ctx,
231                    false,
232                    &metrics_set,
233                    partition,
234                    &self.metrics_list,
235                );
236
237                self.scan_batch_in_partition(
238                    &QueryScanContext::default(),
239                    partition,
240                    metrics,
241                    &metrics_set,
242                )
243            })
244            .collect::<Result<Vec<_>>>()?;
245
246        Ok(Box::pin(futures::stream::iter(streams).flatten()))
247    }
248
249    /// Checks resource limit for the scanner.
250    pub(crate) fn check_scan_limit(&self) -> Result<()> {
251        // Sum the total number of files across all partitions
252        let total_files: usize = self
253            .properties
254            .partitions
255            .iter()
256            .flat_map(|partition| partition.iter())
257            .map(|part_range| {
258                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
259                range_meta.indices.len()
260            })
261            .sum();
262
263        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
264        if total_files > max_concurrent_files {
265            return TooManyFilesToReadSnafu {
266                actual: total_files,
267                max: max_concurrent_files,
268            }
269            .fail();
270        }
271
272        Ok(())
273    }
274}
275
276fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
277    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
278        .map(|_| {
279            let (sender, receiver) = mpsc::channel(1);
280            (Some(sender), Some(receiver))
281        })
282        .unzip();
283    (SenderList::new(senders), receivers)
284}
285
286impl RegionScanner for SeriesScan {
287    fn name(&self) -> &str {
288        "SeriesScan"
289    }
290
291    fn properties(&self) -> &ScannerProperties {
292        &self.properties
293    }
294
295    fn schema(&self) -> SchemaRef {
296        self.stream_ctx.input.mapper.output_schema()
297    }
298
299    fn metadata(&self) -> RegionMetadataRef {
300        self.stream_ctx.input.mapper.metadata().clone()
301    }
302
303    fn scan_partition(
304        &self,
305        ctx: &QueryScanContext,
306        metrics_set: &ExecutionPlanMetricsSet,
307        partition: usize,
308    ) -> Result<SendableRecordBatchStream, BoxedError> {
309        self.scan_partition_impl(ctx, metrics_set, partition)
310            .map_err(BoxedError::new)
311    }
312
313    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
314        self.properties.prepare(request);
315
316        self.check_scan_limit().map_err(BoxedError::new)?;
317
318        Ok(())
319    }
320
321    fn has_predicate_without_region(&self) -> bool {
322        let predicate = self
323            .stream_ctx
324            .input
325            .predicate_group()
326            .predicate_without_region();
327        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
328    }
329
330    fn set_logical_region(&mut self, logical_region: bool) {
331        self.properties.set_logical_region(logical_region);
332    }
333}
334
335impl DisplayAs for SeriesScan {
336    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
337        write!(
338            f,
339            "SeriesScan: region={}, ",
340            self.stream_ctx.input.mapper.metadata().region_id
341        )?;
342        match t {
343            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
344                self.stream_ctx.format_for_explain(false, f)
345            }
346            DisplayFormatType::Verbose => {
347                self.stream_ctx.format_for_explain(true, f)?;
348                self.metrics_list.format_verbose_metrics(f)
349            }
350        }
351    }
352}
353
354impl fmt::Debug for SeriesScan {
355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356        f.debug_struct("SeriesScan")
357            .field("num_ranges", &self.stream_ctx.ranges.len())
358            .finish()
359    }
360}
361
362#[cfg(test)]
363impl SeriesScan {
364    /// Returns the input.
365    pub(crate) fn input(&self) -> &ScanInput {
366        &self.stream_ctx.input
367    }
368}
369
370/// The distributor scans series and distributes them to different partitions.
371struct SeriesDistributor {
372    /// Context for the scan stream.
373    stream_ctx: Arc<StreamContext>,
374    /// Optional semaphore for limiting the number of concurrent scans.
375    semaphore: Option<Arc<Semaphore>>,
376    /// Partition ranges to scan.
377    partitions: Vec<Vec<PartitionRange>>,
378    /// Senders of all partitions.
379    senders: SenderList,
380    /// Metrics set to report.
381    /// The distributor report the metrics as an additional partition.
382    /// This may double the scan cost of the [SeriesScan] metrics. We can
383    /// get per-partition metrics in verbose mode to see the metrics of the
384    /// distributor.
385    metrics_set: ExecutionPlanMetricsSet,
386    metrics_list: Arc<PartitionMetricsList>,
387}
388
389impl SeriesDistributor {
390    /// Executes the distributor.
391    async fn execute(&mut self) {
392        let result = if self.stream_ctx.input.flat_format {
393            self.scan_partitions_flat().await
394        } else {
395            self.scan_partitions().await
396        };
397
398        if let Err(e) = result {
399            self.senders.send_error(e).await;
400        }
401    }
402
403    /// Scans all parts in flat format using FlatSeriesBatchDivider.
404    async fn scan_partitions_flat(&mut self) -> Result<()> {
405        let part_metrics = new_partition_metrics(
406            &self.stream_ctx,
407            false,
408            &self.metrics_set,
409            self.partitions.len(),
410            &self.metrics_list,
411        );
412        part_metrics.on_first_poll();
413
414        let range_builder_list = Arc::new(RangeBuilderList::new(
415            self.stream_ctx.input.num_memtables(),
416            self.stream_ctx.input.num_files(),
417        ));
418        // Scans all parts.
419        let mut sources = Vec::with_capacity(self.partitions.len());
420        for partition in &self.partitions {
421            sources.reserve(partition.len());
422            for part_range in partition {
423                build_flat_sources(
424                    &self.stream_ctx,
425                    part_range,
426                    false,
427                    &part_metrics,
428                    range_builder_list.clone(),
429                    &mut sources,
430                    self.semaphore.clone(),
431                )
432                .await?;
433            }
434        }
435
436        // Builds a flat reader that merge sources from all parts.
437        let mut reader = SeqScan::build_flat_reader_from_sources(
438            &self.stream_ctx,
439            sources,
440            self.semaphore.clone(),
441            Some(&part_metrics),
442        )
443        .await?;
444        let mut metrics = SeriesDistributorMetrics::default();
445        let mut fetch_start = Instant::now();
446
447        let mut divider = FlatSeriesBatchDivider::default();
448        while let Some(record_batch) = reader.try_next().await? {
449            metrics.scan_cost += fetch_start.elapsed();
450            metrics.num_batches += 1;
451            metrics.num_rows += record_batch.num_rows();
452
453            debug_assert!(record_batch.num_rows() > 0);
454            if record_batch.num_rows() == 0 {
455                fetch_start = Instant::now();
456                continue;
457            }
458
459            // Use divider to split series
460            if let Some(series_batch) = divider.push(record_batch) {
461                let yield_start = Instant::now();
462                self.senders
463                    .send_batch(SeriesBatch::Flat(series_batch))
464                    .await?;
465                metrics.yield_cost += yield_start.elapsed();
466            }
467            fetch_start = Instant::now();
468        }
469
470        // Send any remaining batch in the divider
471        if let Some(series_batch) = divider.finish() {
472            let yield_start = Instant::now();
473            self.senders
474                .send_batch(SeriesBatch::Flat(series_batch))
475                .await?;
476            metrics.yield_cost += yield_start.elapsed();
477        }
478
479        metrics.scan_cost += fetch_start.elapsed();
480        metrics.num_series_send_timeout = self.senders.num_timeout;
481        metrics.num_series_send_full = self.senders.num_full;
482        part_metrics.set_distributor_metrics(&metrics);
483
484        part_metrics.on_finish();
485
486        Ok(())
487    }
488
489    /// Scans all parts.
490    async fn scan_partitions(&mut self) -> Result<()> {
491        let part_metrics = new_partition_metrics(
492            &self.stream_ctx,
493            false,
494            &self.metrics_set,
495            self.partitions.len(),
496            &self.metrics_list,
497        );
498        part_metrics.on_first_poll();
499
500        let range_builder_list = Arc::new(RangeBuilderList::new(
501            self.stream_ctx.input.num_memtables(),
502            self.stream_ctx.input.num_files(),
503        ));
504        // Scans all parts.
505        let mut sources = Vec::with_capacity(self.partitions.len());
506        for partition in &self.partitions {
507            sources.reserve(partition.len());
508            for part_range in partition {
509                build_sources(
510                    &self.stream_ctx,
511                    part_range,
512                    false,
513                    &part_metrics,
514                    range_builder_list.clone(),
515                    &mut sources,
516                    self.semaphore.clone(),
517                )
518                .await?;
519            }
520        }
521
522        // Builds a reader that merge sources from all parts.
523        let mut reader = SeqScan::build_reader_from_sources(
524            &self.stream_ctx,
525            sources,
526            self.semaphore.clone(),
527            Some(&part_metrics),
528        )
529        .await?;
530        let mut metrics = SeriesDistributorMetrics::default();
531        let mut fetch_start = Instant::now();
532
533        let mut current_series = PrimaryKeySeriesBatch::default();
534        while let Some(batch) = reader.next_batch().await? {
535            metrics.scan_cost += fetch_start.elapsed();
536            metrics.num_batches += 1;
537            metrics.num_rows += batch.num_rows();
538
539            debug_assert!(!batch.is_empty());
540            if batch.is_empty() {
541                fetch_start = Instant::now();
542                continue;
543            }
544
545            let Some(last_key) = current_series.current_key() else {
546                current_series.push(batch);
547                fetch_start = Instant::now();
548                continue;
549            };
550
551            if last_key == batch.primary_key() {
552                current_series.push(batch);
553                fetch_start = Instant::now();
554                continue;
555            }
556
557            // We find a new series, send the current one.
558            let to_send =
559                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
560            let yield_start = Instant::now();
561            self.senders
562                .send_batch(SeriesBatch::PrimaryKey(to_send))
563                .await?;
564            metrics.yield_cost += yield_start.elapsed();
565            fetch_start = Instant::now();
566        }
567
568        if !current_series.is_empty() {
569            let yield_start = Instant::now();
570            self.senders
571                .send_batch(SeriesBatch::PrimaryKey(current_series))
572                .await?;
573            metrics.yield_cost += yield_start.elapsed();
574        }
575
576        metrics.scan_cost += fetch_start.elapsed();
577        metrics.num_series_send_timeout = self.senders.num_timeout;
578        metrics.num_series_send_full = self.senders.num_full;
579        part_metrics.set_distributor_metrics(&metrics);
580
581        part_metrics.on_finish();
582
583        Ok(())
584    }
585}
586
587/// Batches of the same series in primary key format.
588#[derive(Default, Debug)]
589pub struct PrimaryKeySeriesBatch {
590    pub batches: SmallVec<[Batch; 4]>,
591}
592
593impl PrimaryKeySeriesBatch {
594    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
595    fn single(batch: Batch) -> Self {
596        Self {
597            batches: smallvec![batch],
598        }
599    }
600
601    fn current_key(&self) -> Option<&[u8]> {
602        self.batches.first().map(|batch| batch.primary_key())
603    }
604
605    fn push(&mut self, batch: Batch) {
606        self.batches.push(batch);
607    }
608
609    /// Returns true if there is no batch.
610    fn is_empty(&self) -> bool {
611        self.batches.is_empty()
612    }
613}
614
615/// Batches of the same series.
616#[derive(Debug)]
617pub enum SeriesBatch {
618    PrimaryKey(PrimaryKeySeriesBatch),
619    Flat(FlatSeriesBatch),
620}
621
622impl SeriesBatch {
623    /// Returns the number of batches.
624    pub fn num_batches(&self) -> usize {
625        match self {
626            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
627            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
628        }
629    }
630
631    /// Returns the total number of rows across all batches.
632    pub fn num_rows(&self) -> usize {
633        match self {
634            SeriesBatch::PrimaryKey(primary_key_batch) => {
635                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
636            }
637            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
638        }
639    }
640}
641
642/// Batches of the same series in flat format.
643#[derive(Default, Debug)]
644pub struct FlatSeriesBatch {
645    pub batches: SmallVec<[RecordBatch; 4]>,
646}
647
648/// List of senders.
649struct SenderList {
650    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
651    /// Number of None senders.
652    num_nones: usize,
653    /// Index of the current partition to send.
654    sender_idx: usize,
655    /// Number of timeout.
656    num_timeout: usize,
657    /// Number of full senders.
658    num_full: usize,
659}
660
661impl SenderList {
662    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
663        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
664        Self {
665            senders,
666            num_nones,
667            sender_idx: 0,
668            num_timeout: 0,
669            num_full: 0,
670        }
671    }
672
673    /// Finds a partition and tries to send the batch to the partition.
674    /// Returns None if it sends successfully.
675    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
676        for _ in 0..self.senders.len() {
677            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
678
679            let sender_idx = self.fetch_add_sender_idx();
680            let Some(sender) = &self.senders[sender_idx] else {
681                continue;
682            };
683
684            match sender.try_send(Ok(batch)) {
685                Ok(()) => return Ok(None),
686                Err(TrySendError::Full(res)) => {
687                    self.num_full += 1;
688                    // Safety: we send Ok.
689                    batch = res.unwrap();
690                }
691                Err(TrySendError::Closed(res)) => {
692                    self.senders[sender_idx] = None;
693                    self.num_nones += 1;
694                    // Safety: we send Ok.
695                    batch = res.unwrap();
696                }
697            }
698        }
699
700        Ok(Some(batch))
701    }
702
703    /// Finds a partition and sends the batch to the partition.
704    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
705        // Sends the batch without blocking first.
706        match self.try_send_batch(batch)? {
707            Some(b) => {
708                // Unable to send batch to partition.
709                batch = b;
710            }
711            None => {
712                return Ok(());
713            }
714        }
715
716        loop {
717            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
718
719            let sender_idx = self.fetch_add_sender_idx();
720            let Some(sender) = &self.senders[sender_idx] else {
721                continue;
722            };
723            // Adds a timeout to avoid blocking indefinitely and sending
724            // the batch in a round-robin fashion when some partitions
725            // don't poll their inputs. This may happen if we have a
726            // node like sort merging. But it is rare when we are using SeriesScan.
727            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
728                Ok(()) => break,
729                Err(SendTimeoutError::Timeout(res)) => {
730                    self.num_timeout += 1;
731                    // Safety: we send Ok.
732                    batch = res.unwrap();
733                }
734                Err(SendTimeoutError::Closed(res)) => {
735                    self.senders[sender_idx] = None;
736                    self.num_nones += 1;
737                    // Safety: we send Ok.
738                    batch = res.unwrap();
739                }
740            }
741        }
742
743        Ok(())
744    }
745
746    async fn send_error(&self, error: Error) {
747        let error = Arc::new(error);
748        for sender in self.senders.iter().flatten() {
749            let result = Err(error.clone()).context(ScanSeriesSnafu);
750            let _ = sender.send(result).await;
751        }
752    }
753
754    fn fetch_add_sender_idx(&mut self) -> usize {
755        let sender_idx = self.sender_idx;
756        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
757        sender_idx
758    }
759}
760
761fn new_partition_metrics(
762    stream_ctx: &StreamContext,
763    explain_verbose: bool,
764    metrics_set: &ExecutionPlanMetricsSet,
765    partition: usize,
766    metrics_list: &PartitionMetricsList,
767) -> PartitionMetrics {
768    let metrics = PartitionMetrics::new(
769        stream_ctx.input.mapper.metadata().region_id,
770        partition,
771        "SeriesScan",
772        stream_ctx.query_start,
773        explain_verbose,
774        metrics_set,
775    );
776
777    metrics_list.set(partition, metrics.clone());
778    metrics
779}
780
781/// A divider to split flat record batches by time series.
782///
783/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
784/// However, a [FlatSeriesBatch] may contain rows from multiple series.
785#[derive(Default)]
786struct FlatSeriesBatchDivider {
787    buffer: FlatSeriesBatch,
788}
789
790impl FlatSeriesBatchDivider {
791    /// Pushes a record batch into the divider.
792    ///
793    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
794    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
795        // If buffer is empty
796        if self.buffer.batches.is_empty() {
797            self.buffer.batches.push(batch);
798            return None;
799        }
800
801        // Gets the primary key column from the incoming batch.
802        let pk_column_idx = primary_key_column_index(batch.num_columns());
803        let batch_pk_column = batch.column(pk_column_idx);
804        let batch_pk_array = batch_pk_column
805            .as_any()
806            .downcast_ref::<PrimaryKeyArray>()
807            .unwrap();
808        let batch_pk_values = batch_pk_array
809            .values()
810            .as_any()
811            .downcast_ref::<BinaryArray>()
812            .unwrap();
813        // Gets the last primary key of the incoming batch.
814        let batch_last_pk =
815            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
816        // Gets the last primary key of the buffer.
817        // Safety: the buffer is not empty.
818        let buffer_last_batch = self.buffer.batches.last().unwrap();
819        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
820        let buffer_pk_array = buffer_pk_column
821            .as_any()
822            .downcast_ref::<PrimaryKeyArray>()
823            .unwrap();
824        let buffer_pk_values = buffer_pk_array
825            .values()
826            .as_any()
827            .downcast_ref::<BinaryArray>()
828            .unwrap();
829        let buffer_last_pk =
830            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
831
832        // If last primary key in the batch is the same as last primary key in the buffer.
833        if batch_last_pk == buffer_last_pk {
834            self.buffer.batches.push(batch);
835            return None;
836        }
837        // Otherwise, the batch must have a different primary key, we find the first offset of the
838        // changed primary key.
839        let batch_pk_keys = batch_pk_array.keys();
840        let pk_indices = batch_pk_keys.values();
841        let mut change_offset = 0;
842        for (i, &key) in pk_indices.iter().enumerate() {
843            let batch_pk = batch_pk_values.value(key as usize);
844
845            if buffer_last_pk != batch_pk {
846                change_offset = i;
847                break;
848            }
849        }
850
851        // Splits the batch at the change offset
852        let (first_part, remaining_part) = if change_offset > 0 {
853            let first_part = batch.slice(0, change_offset);
854            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
855            (Some(first_part), Some(remaining_part))
856        } else {
857            (None, Some(batch))
858        };
859
860        // Creates the result from current buffer + first part of new batch
861        let mut result = std::mem::take(&mut self.buffer);
862        if let Some(first_part) = first_part {
863            result.batches.push(first_part);
864        }
865
866        // Pushes remaining part to the buffer if it exists
867        if let Some(remaining_part) = remaining_part {
868            self.buffer.batches.push(remaining_part);
869        }
870
871        Some(result)
872    }
873
874    /// Returns the final [FlatSeriesBatch].
875    fn finish(&mut self) -> Option<FlatSeriesBatch> {
876        if self.buffer.batches.is_empty() {
877            None
878        } else {
879            Some(std::mem::take(&mut self.buffer))
880        }
881    }
882}
883
884/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
885fn primary_key_at<'a>(
886    primary_key: &PrimaryKeyArray,
887    primary_key_values: &'a BinaryArray,
888    index: usize,
889) -> &'a [u8] {
890    let key = primary_key.keys().value(index);
891    primary_key_values.value(key as usize)
892}
893
894#[cfg(test)]
895mod tests {
896    use std::sync::Arc;
897
898    use api::v1::OpType;
899    use datatypes::arrow::array::{
900        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
901        TimestampMillisecondArray, UInt8Array, UInt64Array,
902    };
903    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
904    use datatypes::arrow::record_batch::RecordBatch;
905
906    use super::*;
907
908    fn new_test_record_batch(
909        primary_keys: &[&[u8]],
910        timestamps: &[i64],
911        sequences: &[u64],
912        op_types: &[OpType],
913        fields: &[u64],
914    ) -> RecordBatch {
915        let num_rows = timestamps.len();
916        debug_assert_eq!(sequences.len(), num_rows);
917        debug_assert_eq!(op_types.len(), num_rows);
918        debug_assert_eq!(fields.len(), num_rows);
919        debug_assert_eq!(primary_keys.len(), num_rows);
920
921        let columns: Vec<ArrayRef> = vec![
922            build_test_pk_string_dict_array(primary_keys),
923            Arc::new(Int64Array::from_iter(
924                fields.iter().map(|v| Some(*v as i64)),
925            )),
926            Arc::new(TimestampMillisecondArray::from_iter_values(
927                timestamps.iter().copied(),
928            )),
929            build_test_pk_array(primary_keys),
930            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
931            Arc::new(UInt8Array::from_iter_values(
932                op_types.iter().map(|v| *v as u8),
933            )),
934        ];
935
936        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
937    }
938
939    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
940        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
941        for &pk in primary_keys {
942            let pk_str = std::str::from_utf8(pk).unwrap();
943            builder.append(pk_str).unwrap();
944        }
945        Arc::new(builder.finish())
946    }
947
948    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
949        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
950        for &pk in primary_keys {
951            builder.append(pk).unwrap();
952        }
953        Arc::new(builder.finish())
954    }
955
956    fn build_test_flat_schema() -> SchemaRef {
957        let fields = vec![
958            Field::new(
959                "k0",
960                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
961                false,
962            ),
963            Field::new("field0", DataType::Int64, true),
964            Field::new(
965                "ts",
966                DataType::Timestamp(TimeUnit::Millisecond, None),
967                false,
968            ),
969            Field::new(
970                "__primary_key",
971                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
972                false,
973            ),
974            Field::new("__sequence", DataType::UInt64, false),
975            Field::new("__op_type", DataType::UInt8, false),
976        ];
977        Arc::new(Schema::new(fields))
978    }
979
980    #[test]
981    fn test_empty_buffer_first_push() {
982        let mut divider = FlatSeriesBatchDivider::default();
983        let result = divider.finish();
984        assert!(result.is_none());
985
986        let mut divider = FlatSeriesBatchDivider::default();
987        let batch = new_test_record_batch(
988            &[b"series1", b"series1"],
989            &[1000, 2000],
990            &[1, 2],
991            &[OpType::Put, OpType::Put],
992            &[10, 20],
993        );
994        let result = divider.push(batch);
995        assert!(result.is_none());
996        assert_eq!(divider.buffer.batches.len(), 1);
997    }
998
999    #[test]
1000    fn test_same_series_accumulation() {
1001        let mut divider = FlatSeriesBatchDivider::default();
1002
1003        let batch1 = new_test_record_batch(
1004            &[b"series1", b"series1"],
1005            &[1000, 2000],
1006            &[1, 2],
1007            &[OpType::Put, OpType::Put],
1008            &[10, 20],
1009        );
1010
1011        let batch2 = new_test_record_batch(
1012            &[b"series1", b"series1"],
1013            &[3000, 4000],
1014            &[3, 4],
1015            &[OpType::Put, OpType::Put],
1016            &[30, 40],
1017        );
1018
1019        divider.push(batch1);
1020        let result = divider.push(batch2);
1021        assert!(result.is_none());
1022        let series_batch = divider.finish().unwrap();
1023        assert_eq!(series_batch.batches.len(), 2);
1024    }
1025
1026    #[test]
1027    fn test_series_boundary_detection() {
1028        let mut divider = FlatSeriesBatchDivider::default();
1029
1030        let batch1 = new_test_record_batch(
1031            &[b"series1", b"series1"],
1032            &[1000, 2000],
1033            &[1, 2],
1034            &[OpType::Put, OpType::Put],
1035            &[10, 20],
1036        );
1037
1038        let batch2 = new_test_record_batch(
1039            &[b"series2", b"series2"],
1040            &[3000, 4000],
1041            &[3, 4],
1042            &[OpType::Put, OpType::Put],
1043            &[30, 40],
1044        );
1045
1046        divider.push(batch1);
1047        let series_batch = divider.push(batch2).unwrap();
1048        assert_eq!(series_batch.batches.len(), 1);
1049
1050        assert_eq!(divider.buffer.batches.len(), 1);
1051    }
1052
1053    #[test]
1054    fn test_series_boundary_within_batch() {
1055        let mut divider = FlatSeriesBatchDivider::default();
1056
1057        let batch1 = new_test_record_batch(
1058            &[b"series1", b"series1"],
1059            &[1000, 2000],
1060            &[1, 2],
1061            &[OpType::Put, OpType::Put],
1062            &[10, 20],
1063        );
1064
1065        let batch2 = new_test_record_batch(
1066            &[b"series1", b"series2"],
1067            &[3000, 4000],
1068            &[3, 4],
1069            &[OpType::Put, OpType::Put],
1070            &[30, 40],
1071        );
1072
1073        divider.push(batch1);
1074        let series_batch = divider.push(batch2).unwrap();
1075        assert_eq!(series_batch.batches.len(), 2);
1076        assert_eq!(series_batch.batches[0].num_rows(), 2);
1077        assert_eq!(series_batch.batches[1].num_rows(), 1);
1078
1079        assert_eq!(divider.buffer.batches.len(), 1);
1080        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1081    }
1082
1083    #[test]
1084    fn test_series_splitting() {
1085        let mut divider = FlatSeriesBatchDivider::default();
1086
1087        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1088
1089        let batch2 = new_test_record_batch(
1090            &[b"series1", b"series2", b"series2", b"series3"],
1091            &[2000, 3000, 4000, 5000],
1092            &[2, 3, 4, 5],
1093            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1094            &[20, 30, 40, 50],
1095        );
1096
1097        divider.push(batch1);
1098        let series_batch = divider.push(batch2).unwrap();
1099        assert_eq!(series_batch.batches.len(), 2);
1100
1101        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1102        assert_eq!(total_rows, 2);
1103
1104        let final_batch = divider.finish().unwrap();
1105        assert_eq!(final_batch.batches.len(), 1);
1106        assert_eq!(final_batch.batches[0].num_rows(), 3);
1107    }
1108}