mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::array::BinaryArray;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::SchemaRef;
30use futures::{StreamExt, TryStreamExt};
31use smallvec::{SmallVec, smallvec};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
36};
37use tokio::sync::Semaphore;
38use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
39use tokio::sync::mpsc::{self, Receiver, Sender};
40
41use crate::error::{
42    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
43    ScanSeriesSnafu, TooManyFilesToReadSnafu,
44};
45use crate::read::range::RangeBuilderList;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
48use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{Batch, ScannerMetrics};
51use crate::sst::parquet::flat_format::primary_key_column_index;
52use crate::sst::parquet::format::PrimaryKeyArray;
53
54/// Timeout to send a batch to a sender.
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
56
57/// List of receivers.
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
59
60/// Scans a region and returns sorted rows of a series in the same partition.
61///
62/// The output order is always order by `(primary key, time index)` inside every
63/// partition.
64/// Always returns the same series (primary key) to the same partition.
65pub struct SeriesScan {
66    /// Properties of the scanner.
67    properties: ScannerProperties,
68    /// Context of streams.
69    stream_ctx: Arc<StreamContext>,
70    /// Receivers of each partition.
71    receivers: Mutex<ReceiverList>,
72    /// Metrics for each partition.
73    /// The scanner only sets in query and keeps it empty during compaction.
74    metrics_list: Arc<PartitionMetricsList>,
75}
76
77impl SeriesScan {
78    /// Creates a new [SeriesScan].
79    pub(crate) fn new(input: ScanInput) -> Self {
80        let mut properties = ScannerProperties::default()
81            .with_append_mode(input.append_mode)
82            .with_total_rows(input.total_rows());
83        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
84        properties.partitions = vec![stream_ctx.partition_ranges()];
85
86        Self {
87            properties,
88            stream_ctx,
89            receivers: Mutex::new(Vec::new()),
90            metrics_list: Arc::new(PartitionMetricsList::default()),
91        }
92    }
93
94    fn scan_partition_impl(
95        &self,
96        ctx: &QueryScanContext,
97        metrics_set: &ExecutionPlanMetricsSet,
98        partition: usize,
99    ) -> Result<SendableRecordBatchStream> {
100        let metrics = new_partition_metrics(
101            &self.stream_ctx,
102            ctx.explain_verbose,
103            metrics_set,
104            partition,
105            &self.metrics_list,
106        );
107
108        let batch_stream =
109            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
110
111        let input = &self.stream_ctx.input;
112        let record_batch_stream = ConvertBatchStream::new(
113            batch_stream,
114            input.mapper.clone(),
115            input.cache_strategy.clone(),
116            metrics,
117        );
118
119        Ok(Box::pin(RecordBatchStreamWrapper::new(
120            input.mapper.output_schema(),
121            Box::pin(record_batch_stream),
122        )))
123    }
124
125    fn scan_batch_in_partition(
126        &self,
127        ctx: &QueryScanContext,
128        partition: usize,
129        part_metrics: PartitionMetrics,
130        metrics_set: &ExecutionPlanMetricsSet,
131    ) -> Result<ScanBatchStream> {
132        if ctx.explain_verbose {
133            common_telemetry::info!(
134                "SeriesScan partition {}, region_id: {}",
135                partition,
136                self.stream_ctx.input.region_metadata().region_id
137            );
138        }
139
140        ensure!(
141            partition < self.properties.num_partitions(),
142            PartitionOutOfRangeSnafu {
143                given: partition,
144                all: self.properties.num_partitions(),
145            }
146        );
147
148        self.maybe_start_distributor(metrics_set, &self.metrics_list);
149
150        let mut receiver = self.take_receiver(partition)?;
151        let stream = try_stream! {
152            part_metrics.on_first_poll();
153
154            let mut fetch_start = Instant::now();
155            while let Some(series) = receiver.recv().await {
156                let series = series?;
157
158                let mut metrics = ScannerMetrics::default();
159                metrics.scan_cost += fetch_start.elapsed();
160                fetch_start = Instant::now();
161
162                metrics.num_batches += series.num_batches();
163                metrics.num_rows += series.num_rows();
164
165                let yield_start = Instant::now();
166                yield ScanBatch::Series(series);
167                metrics.yield_cost += yield_start.elapsed();
168
169                part_metrics.merge_metrics(&metrics);
170            }
171
172            part_metrics.on_finish();
173        };
174        Ok(Box::pin(stream))
175    }
176
177    /// Takes the receiver for the partition.
178    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
179        let mut rx_list = self.receivers.lock().unwrap();
180        rx_list[partition]
181            .take()
182            .context(ScanMultiTimesSnafu { partition })
183    }
184
185    /// Starts the distributor if the receiver list is empty.
186    fn maybe_start_distributor(
187        &self,
188        metrics_set: &ExecutionPlanMetricsSet,
189        metrics_list: &Arc<PartitionMetricsList>,
190    ) {
191        let mut rx_list = self.receivers.lock().unwrap();
192        if !rx_list.is_empty() {
193            return;
194        }
195
196        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
197        let mut distributor = SeriesDistributor {
198            stream_ctx: self.stream_ctx.clone(),
199            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
200            partitions: self.properties.partitions.clone(),
201            senders,
202            metrics_set: metrics_set.clone(),
203            metrics_list: metrics_list.clone(),
204        };
205        common_runtime::spawn_global(async move {
206            distributor.execute().await;
207        });
208
209        *rx_list = receivers;
210    }
211
212    /// Scans the region and returns a stream.
213    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
214        let part_num = self.properties.num_partitions();
215        let metrics_set = ExecutionPlanMetricsSet::default();
216        let streams = (0..part_num)
217            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
218            .collect::<Result<Vec<_>, BoxedError>>()?;
219        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
220        Ok(Box::pin(chained_stream))
221    }
222
223    /// Scan [`Batch`] in all partitions one by one.
224    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
225        let metrics_set = ExecutionPlanMetricsSet::new();
226
227        let streams = (0..self.properties.partitions.len())
228            .map(|partition| {
229                let metrics = new_partition_metrics(
230                    &self.stream_ctx,
231                    false,
232                    &metrics_set,
233                    partition,
234                    &self.metrics_list,
235                );
236
237                self.scan_batch_in_partition(
238                    &QueryScanContext::default(),
239                    partition,
240                    metrics,
241                    &metrics_set,
242                )
243            })
244            .collect::<Result<Vec<_>>>()?;
245
246        Ok(Box::pin(futures::stream::iter(streams).flatten()))
247    }
248
249    /// Checks resource limit for the scanner.
250    pub(crate) fn check_scan_limit(&self) -> Result<()> {
251        // Sum the total number of files across all partitions
252        let total_files: usize = self
253            .properties
254            .partitions
255            .iter()
256            .flat_map(|partition| partition.iter())
257            .map(|part_range| {
258                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
259                range_meta.indices.len()
260            })
261            .sum();
262
263        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
264        if total_files > max_concurrent_files {
265            return TooManyFilesToReadSnafu {
266                actual: total_files,
267                max: max_concurrent_files,
268            }
269            .fail();
270        }
271
272        Ok(())
273    }
274}
275
276fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
277    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
278        .map(|_| {
279            let (sender, receiver) = mpsc::channel(1);
280            (Some(sender), Some(receiver))
281        })
282        .unzip();
283    (SenderList::new(senders), receivers)
284}
285
286impl RegionScanner for SeriesScan {
287    fn properties(&self) -> &ScannerProperties {
288        &self.properties
289    }
290
291    fn schema(&self) -> SchemaRef {
292        self.stream_ctx.input.mapper.output_schema()
293    }
294
295    fn metadata(&self) -> RegionMetadataRef {
296        self.stream_ctx.input.mapper.metadata().clone()
297    }
298
299    fn scan_partition(
300        &self,
301        ctx: &QueryScanContext,
302        metrics_set: &ExecutionPlanMetricsSet,
303        partition: usize,
304    ) -> Result<SendableRecordBatchStream, BoxedError> {
305        self.scan_partition_impl(ctx, metrics_set, partition)
306            .map_err(BoxedError::new)
307    }
308
309    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
310        self.properties.prepare(request);
311
312        self.check_scan_limit().map_err(BoxedError::new)?;
313
314        Ok(())
315    }
316
317    fn has_predicate(&self) -> bool {
318        let predicate = self.stream_ctx.input.predicate();
319        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
320    }
321
322    fn set_logical_region(&mut self, logical_region: bool) {
323        self.properties.set_logical_region(logical_region);
324    }
325}
326
327impl DisplayAs for SeriesScan {
328    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
329        write!(
330            f,
331            "SeriesScan: region={}, ",
332            self.stream_ctx.input.mapper.metadata().region_id
333        )?;
334        match t {
335            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
336                self.stream_ctx.format_for_explain(false, f)
337            }
338            DisplayFormatType::Verbose => {
339                self.stream_ctx.format_for_explain(true, f)?;
340                self.metrics_list.format_verbose_metrics(f)
341            }
342        }
343    }
344}
345
346impl fmt::Debug for SeriesScan {
347    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348        f.debug_struct("SeriesScan")
349            .field("num_ranges", &self.stream_ctx.ranges.len())
350            .finish()
351    }
352}
353
354#[cfg(test)]
355impl SeriesScan {
356    /// Returns the input.
357    pub(crate) fn input(&self) -> &ScanInput {
358        &self.stream_ctx.input
359    }
360}
361
362/// The distributor scans series and distributes them to different partitions.
363struct SeriesDistributor {
364    /// Context for the scan stream.
365    stream_ctx: Arc<StreamContext>,
366    /// Optional semaphore for limiting the number of concurrent scans.
367    semaphore: Option<Arc<Semaphore>>,
368    /// Partition ranges to scan.
369    partitions: Vec<Vec<PartitionRange>>,
370    /// Senders of all partitions.
371    senders: SenderList,
372    /// Metrics set to report.
373    /// The distributor report the metrics as an additional partition.
374    /// This may double the scan cost of the [SeriesScan] metrics. We can
375    /// get per-partition metrics in verbose mode to see the metrics of the
376    /// distributor.
377    metrics_set: ExecutionPlanMetricsSet,
378    metrics_list: Arc<PartitionMetricsList>,
379}
380
381impl SeriesDistributor {
382    /// Executes the distributor.
383    async fn execute(&mut self) {
384        let result = if self.stream_ctx.input.flat_format {
385            self.scan_partitions_flat().await
386        } else {
387            self.scan_partitions().await
388        };
389
390        if let Err(e) = result {
391            self.senders.send_error(e).await;
392        }
393    }
394
395    /// Scans all parts in flat format using FlatSeriesBatchDivider.
396    async fn scan_partitions_flat(&mut self) -> Result<()> {
397        let part_metrics = new_partition_metrics(
398            &self.stream_ctx,
399            false,
400            &self.metrics_set,
401            self.partitions.len(),
402            &self.metrics_list,
403        );
404        part_metrics.on_first_poll();
405
406        let range_builder_list = Arc::new(RangeBuilderList::new(
407            self.stream_ctx.input.num_memtables(),
408            self.stream_ctx.input.num_files(),
409        ));
410        // Scans all parts.
411        let mut sources = Vec::with_capacity(self.partitions.len());
412        for partition in &self.partitions {
413            sources.reserve(partition.len());
414            for part_range in partition {
415                build_flat_sources(
416                    &self.stream_ctx,
417                    part_range,
418                    false,
419                    &part_metrics,
420                    range_builder_list.clone(),
421                    &mut sources,
422                )
423                .await?;
424            }
425        }
426
427        // Builds a flat reader that merge sources from all parts.
428        let mut reader = SeqScan::build_flat_reader_from_sources(
429            &self.stream_ctx,
430            sources,
431            self.semaphore.clone(),
432        )
433        .await?;
434        let mut metrics = SeriesDistributorMetrics::default();
435        let mut fetch_start = Instant::now();
436
437        let mut divider = FlatSeriesBatchDivider::default();
438        while let Some(record_batch) = reader.try_next().await? {
439            metrics.scan_cost += fetch_start.elapsed();
440            metrics.num_batches += 1;
441            metrics.num_rows += record_batch.num_rows();
442
443            debug_assert!(record_batch.num_rows() > 0);
444            if record_batch.num_rows() == 0 {
445                fetch_start = Instant::now();
446                continue;
447            }
448
449            // Use divider to split series
450            if let Some(series_batch) = divider.push(record_batch) {
451                let yield_start = Instant::now();
452                self.senders
453                    .send_batch(SeriesBatch::Flat(series_batch))
454                    .await?;
455                metrics.yield_cost += yield_start.elapsed();
456            }
457            fetch_start = Instant::now();
458        }
459
460        // Send any remaining batch in the divider
461        if let Some(series_batch) = divider.finish() {
462            let yield_start = Instant::now();
463            self.senders
464                .send_batch(SeriesBatch::Flat(series_batch))
465                .await?;
466            metrics.yield_cost += yield_start.elapsed();
467        }
468
469        metrics.scan_cost += fetch_start.elapsed();
470        metrics.num_series_send_timeout = self.senders.num_timeout;
471        metrics.num_series_send_full = self.senders.num_full;
472        part_metrics.set_distributor_metrics(&metrics);
473
474        part_metrics.on_finish();
475
476        Ok(())
477    }
478
479    /// Scans all parts.
480    async fn scan_partitions(&mut self) -> Result<()> {
481        let part_metrics = new_partition_metrics(
482            &self.stream_ctx,
483            false,
484            &self.metrics_set,
485            self.partitions.len(),
486            &self.metrics_list,
487        );
488        part_metrics.on_first_poll();
489
490        let range_builder_list = Arc::new(RangeBuilderList::new(
491            self.stream_ctx.input.num_memtables(),
492            self.stream_ctx.input.num_files(),
493        ));
494        // Scans all parts.
495        let mut sources = Vec::with_capacity(self.partitions.len());
496        for partition in &self.partitions {
497            sources.reserve(partition.len());
498            for part_range in partition {
499                build_sources(
500                    &self.stream_ctx,
501                    part_range,
502                    false,
503                    &part_metrics,
504                    range_builder_list.clone(),
505                    &mut sources,
506                )
507                .await?;
508            }
509        }
510
511        // Builds a reader that merge sources from all parts.
512        let mut reader =
513            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
514                .await?;
515        let mut metrics = SeriesDistributorMetrics::default();
516        let mut fetch_start = Instant::now();
517
518        let mut current_series = PrimaryKeySeriesBatch::default();
519        while let Some(batch) = reader.next_batch().await? {
520            metrics.scan_cost += fetch_start.elapsed();
521            metrics.num_batches += 1;
522            metrics.num_rows += batch.num_rows();
523
524            debug_assert!(!batch.is_empty());
525            if batch.is_empty() {
526                fetch_start = Instant::now();
527                continue;
528            }
529
530            let Some(last_key) = current_series.current_key() else {
531                current_series.push(batch);
532                fetch_start = Instant::now();
533                continue;
534            };
535
536            if last_key == batch.primary_key() {
537                current_series.push(batch);
538                fetch_start = Instant::now();
539                continue;
540            }
541
542            // We find a new series, send the current one.
543            let to_send =
544                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
545            let yield_start = Instant::now();
546            self.senders
547                .send_batch(SeriesBatch::PrimaryKey(to_send))
548                .await?;
549            metrics.yield_cost += yield_start.elapsed();
550            fetch_start = Instant::now();
551        }
552
553        if !current_series.is_empty() {
554            let yield_start = Instant::now();
555            self.senders
556                .send_batch(SeriesBatch::PrimaryKey(current_series))
557                .await?;
558            metrics.yield_cost += yield_start.elapsed();
559        }
560
561        metrics.scan_cost += fetch_start.elapsed();
562        metrics.num_series_send_timeout = self.senders.num_timeout;
563        metrics.num_series_send_full = self.senders.num_full;
564        part_metrics.set_distributor_metrics(&metrics);
565
566        part_metrics.on_finish();
567
568        Ok(())
569    }
570}
571
572/// Batches of the same series in primary key format.
573#[derive(Default, Debug)]
574pub struct PrimaryKeySeriesBatch {
575    pub batches: SmallVec<[Batch; 4]>,
576}
577
578impl PrimaryKeySeriesBatch {
579    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
580    fn single(batch: Batch) -> Self {
581        Self {
582            batches: smallvec![batch],
583        }
584    }
585
586    fn current_key(&self) -> Option<&[u8]> {
587        self.batches.first().map(|batch| batch.primary_key())
588    }
589
590    fn push(&mut self, batch: Batch) {
591        self.batches.push(batch);
592    }
593
594    /// Returns true if there is no batch.
595    fn is_empty(&self) -> bool {
596        self.batches.is_empty()
597    }
598}
599
600/// Batches of the same series.
601#[derive(Debug)]
602pub enum SeriesBatch {
603    PrimaryKey(PrimaryKeySeriesBatch),
604    Flat(FlatSeriesBatch),
605}
606
607impl SeriesBatch {
608    /// Returns the number of batches.
609    pub fn num_batches(&self) -> usize {
610        match self {
611            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
612            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
613        }
614    }
615
616    /// Returns the total number of rows across all batches.
617    pub fn num_rows(&self) -> usize {
618        match self {
619            SeriesBatch::PrimaryKey(primary_key_batch) => {
620                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
621            }
622            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
623        }
624    }
625}
626
627/// Batches of the same series in flat format.
628#[derive(Default, Debug)]
629pub struct FlatSeriesBatch {
630    pub batches: SmallVec<[RecordBatch; 4]>,
631}
632
633/// List of senders.
634struct SenderList {
635    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
636    /// Number of None senders.
637    num_nones: usize,
638    /// Index of the current partition to send.
639    sender_idx: usize,
640    /// Number of timeout.
641    num_timeout: usize,
642    /// Number of full senders.
643    num_full: usize,
644}
645
646impl SenderList {
647    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
648        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
649        Self {
650            senders,
651            num_nones,
652            sender_idx: 0,
653            num_timeout: 0,
654            num_full: 0,
655        }
656    }
657
658    /// Finds a partition and tries to send the batch to the partition.
659    /// Returns None if it sends successfully.
660    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
661        for _ in 0..self.senders.len() {
662            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
663
664            let sender_idx = self.fetch_add_sender_idx();
665            let Some(sender) = &self.senders[sender_idx] else {
666                continue;
667            };
668
669            match sender.try_send(Ok(batch)) {
670                Ok(()) => return Ok(None),
671                Err(TrySendError::Full(res)) => {
672                    self.num_full += 1;
673                    // Safety: we send Ok.
674                    batch = res.unwrap();
675                }
676                Err(TrySendError::Closed(res)) => {
677                    self.senders[sender_idx] = None;
678                    self.num_nones += 1;
679                    // Safety: we send Ok.
680                    batch = res.unwrap();
681                }
682            }
683        }
684
685        Ok(Some(batch))
686    }
687
688    /// Finds a partition and sends the batch to the partition.
689    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
690        // Sends the batch without blocking first.
691        match self.try_send_batch(batch)? {
692            Some(b) => {
693                // Unable to send batch to partition.
694                batch = b;
695            }
696            None => {
697                return Ok(());
698            }
699        }
700
701        loop {
702            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
703
704            let sender_idx = self.fetch_add_sender_idx();
705            let Some(sender) = &self.senders[sender_idx] else {
706                continue;
707            };
708            // Adds a timeout to avoid blocking indefinitely and sending
709            // the batch in a round-robin fashion when some partitions
710            // don't poll their inputs. This may happen if we have a
711            // node like sort merging. But it is rare when we are using SeriesScan.
712            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
713                Ok(()) => break,
714                Err(SendTimeoutError::Timeout(res)) => {
715                    self.num_timeout += 1;
716                    // Safety: we send Ok.
717                    batch = res.unwrap();
718                }
719                Err(SendTimeoutError::Closed(res)) => {
720                    self.senders[sender_idx] = None;
721                    self.num_nones += 1;
722                    // Safety: we send Ok.
723                    batch = res.unwrap();
724                }
725            }
726        }
727
728        Ok(())
729    }
730
731    async fn send_error(&self, error: Error) {
732        let error = Arc::new(error);
733        for sender in self.senders.iter().flatten() {
734            let result = Err(error.clone()).context(ScanSeriesSnafu);
735            let _ = sender.send(result).await;
736        }
737    }
738
739    fn fetch_add_sender_idx(&mut self) -> usize {
740        let sender_idx = self.sender_idx;
741        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
742        sender_idx
743    }
744}
745
746fn new_partition_metrics(
747    stream_ctx: &StreamContext,
748    explain_verbose: bool,
749    metrics_set: &ExecutionPlanMetricsSet,
750    partition: usize,
751    metrics_list: &PartitionMetricsList,
752) -> PartitionMetrics {
753    let metrics = PartitionMetrics::new(
754        stream_ctx.input.mapper.metadata().region_id,
755        partition,
756        "SeriesScan",
757        stream_ctx.query_start,
758        explain_verbose,
759        metrics_set,
760    );
761
762    metrics_list.set(partition, metrics.clone());
763    metrics
764}
765
766/// A divider to split flat record batches by time series.
767///
768/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
769/// However, a [FlatSeriesBatch] may contain rows from multiple series.
770#[derive(Default)]
771struct FlatSeriesBatchDivider {
772    buffer: FlatSeriesBatch,
773}
774
775impl FlatSeriesBatchDivider {
776    /// Pushes a record batch into the divider.
777    ///
778    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
779    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
780        // If buffer is empty
781        if self.buffer.batches.is_empty() {
782            self.buffer.batches.push(batch);
783            return None;
784        }
785
786        // Gets the primary key column from the incoming batch.
787        let pk_column_idx = primary_key_column_index(batch.num_columns());
788        let batch_pk_column = batch.column(pk_column_idx);
789        let batch_pk_array = batch_pk_column
790            .as_any()
791            .downcast_ref::<PrimaryKeyArray>()
792            .unwrap();
793        let batch_pk_values = batch_pk_array
794            .values()
795            .as_any()
796            .downcast_ref::<BinaryArray>()
797            .unwrap();
798        // Gets the last primary key of the incoming batch.
799        let batch_last_pk =
800            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
801        // Gets the last primary key of the buffer.
802        // Safety: the buffer is not empty.
803        let buffer_last_batch = self.buffer.batches.last().unwrap();
804        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
805        let buffer_pk_array = buffer_pk_column
806            .as_any()
807            .downcast_ref::<PrimaryKeyArray>()
808            .unwrap();
809        let buffer_pk_values = buffer_pk_array
810            .values()
811            .as_any()
812            .downcast_ref::<BinaryArray>()
813            .unwrap();
814        let buffer_last_pk =
815            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
816
817        // If last primary key in the batch is the same as last primary key in the buffer.
818        if batch_last_pk == buffer_last_pk {
819            self.buffer.batches.push(batch);
820            return None;
821        }
822        // Otherwise, the batch must have a different primary key, we find the first offset of the
823        // changed primary key.
824        let batch_pk_keys = batch_pk_array.keys();
825        let pk_indices = batch_pk_keys.values();
826        let mut change_offset = 0;
827        for (i, &key) in pk_indices.iter().enumerate() {
828            let batch_pk = batch_pk_values.value(key as usize);
829
830            if buffer_last_pk != batch_pk {
831                change_offset = i;
832                break;
833            }
834        }
835
836        // Splits the batch at the change offset
837        let (first_part, remaining_part) = if change_offset > 0 {
838            let first_part = batch.slice(0, change_offset);
839            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
840            (Some(first_part), Some(remaining_part))
841        } else {
842            (None, Some(batch))
843        };
844
845        // Creates the result from current buffer + first part of new batch
846        let mut result = std::mem::take(&mut self.buffer);
847        if let Some(first_part) = first_part {
848            result.batches.push(first_part);
849        }
850
851        // Pushes remaining part to the buffer if it exists
852        if let Some(remaining_part) = remaining_part {
853            self.buffer.batches.push(remaining_part);
854        }
855
856        Some(result)
857    }
858
859    /// Returns the final [FlatSeriesBatch].
860    fn finish(&mut self) -> Option<FlatSeriesBatch> {
861        if self.buffer.batches.is_empty() {
862            None
863        } else {
864            Some(std::mem::take(&mut self.buffer))
865        }
866    }
867}
868
869/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
870fn primary_key_at<'a>(
871    primary_key: &PrimaryKeyArray,
872    primary_key_values: &'a BinaryArray,
873    index: usize,
874) -> &'a [u8] {
875    let key = primary_key.keys().value(index);
876    primary_key_values.value(key as usize)
877}
878
879#[cfg(test)]
880mod tests {
881    use std::sync::Arc;
882
883    use api::v1::OpType;
884    use datatypes::arrow::array::{
885        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
886        TimestampMillisecondArray, UInt8Array, UInt64Array,
887    };
888    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
889    use datatypes::arrow::record_batch::RecordBatch;
890
891    use super::*;
892
893    fn new_test_record_batch(
894        primary_keys: &[&[u8]],
895        timestamps: &[i64],
896        sequences: &[u64],
897        op_types: &[OpType],
898        fields: &[u64],
899    ) -> RecordBatch {
900        let num_rows = timestamps.len();
901        debug_assert_eq!(sequences.len(), num_rows);
902        debug_assert_eq!(op_types.len(), num_rows);
903        debug_assert_eq!(fields.len(), num_rows);
904        debug_assert_eq!(primary_keys.len(), num_rows);
905
906        let columns: Vec<ArrayRef> = vec![
907            build_test_pk_string_dict_array(primary_keys),
908            Arc::new(Int64Array::from_iter(
909                fields.iter().map(|v| Some(*v as i64)),
910            )),
911            Arc::new(TimestampMillisecondArray::from_iter_values(
912                timestamps.iter().copied(),
913            )),
914            build_test_pk_array(primary_keys),
915            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
916            Arc::new(UInt8Array::from_iter_values(
917                op_types.iter().map(|v| *v as u8),
918            )),
919        ];
920
921        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
922    }
923
924    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
925        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
926        for &pk in primary_keys {
927            let pk_str = std::str::from_utf8(pk).unwrap();
928            builder.append(pk_str).unwrap();
929        }
930        Arc::new(builder.finish())
931    }
932
933    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
934        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
935        for &pk in primary_keys {
936            builder.append(pk).unwrap();
937        }
938        Arc::new(builder.finish())
939    }
940
941    fn build_test_flat_schema() -> SchemaRef {
942        let fields = vec![
943            Field::new(
944                "k0",
945                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
946                false,
947            ),
948            Field::new("field0", DataType::Int64, true),
949            Field::new(
950                "ts",
951                DataType::Timestamp(TimeUnit::Millisecond, None),
952                false,
953            ),
954            Field::new(
955                "__primary_key",
956                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
957                false,
958            ),
959            Field::new("__sequence", DataType::UInt64, false),
960            Field::new("__op_type", DataType::UInt8, false),
961        ];
962        Arc::new(Schema::new(fields))
963    }
964
965    #[test]
966    fn test_empty_buffer_first_push() {
967        let mut divider = FlatSeriesBatchDivider::default();
968        let result = divider.finish();
969        assert!(result.is_none());
970
971        let mut divider = FlatSeriesBatchDivider::default();
972        let batch = new_test_record_batch(
973            &[b"series1", b"series1"],
974            &[1000, 2000],
975            &[1, 2],
976            &[OpType::Put, OpType::Put],
977            &[10, 20],
978        );
979        let result = divider.push(batch);
980        assert!(result.is_none());
981        assert_eq!(divider.buffer.batches.len(), 1);
982    }
983
984    #[test]
985    fn test_same_series_accumulation() {
986        let mut divider = FlatSeriesBatchDivider::default();
987
988        let batch1 = new_test_record_batch(
989            &[b"series1", b"series1"],
990            &[1000, 2000],
991            &[1, 2],
992            &[OpType::Put, OpType::Put],
993            &[10, 20],
994        );
995
996        let batch2 = new_test_record_batch(
997            &[b"series1", b"series1"],
998            &[3000, 4000],
999            &[3, 4],
1000            &[OpType::Put, OpType::Put],
1001            &[30, 40],
1002        );
1003
1004        divider.push(batch1);
1005        let result = divider.push(batch2);
1006        assert!(result.is_none());
1007        let series_batch = divider.finish().unwrap();
1008        assert_eq!(series_batch.batches.len(), 2);
1009    }
1010
1011    #[test]
1012    fn test_series_boundary_detection() {
1013        let mut divider = FlatSeriesBatchDivider::default();
1014
1015        let batch1 = new_test_record_batch(
1016            &[b"series1", b"series1"],
1017            &[1000, 2000],
1018            &[1, 2],
1019            &[OpType::Put, OpType::Put],
1020            &[10, 20],
1021        );
1022
1023        let batch2 = new_test_record_batch(
1024            &[b"series2", b"series2"],
1025            &[3000, 4000],
1026            &[3, 4],
1027            &[OpType::Put, OpType::Put],
1028            &[30, 40],
1029        );
1030
1031        divider.push(batch1);
1032        let series_batch = divider.push(batch2).unwrap();
1033        assert_eq!(series_batch.batches.len(), 1);
1034
1035        assert_eq!(divider.buffer.batches.len(), 1);
1036    }
1037
1038    #[test]
1039    fn test_series_boundary_within_batch() {
1040        let mut divider = FlatSeriesBatchDivider::default();
1041
1042        let batch1 = new_test_record_batch(
1043            &[b"series1", b"series1"],
1044            &[1000, 2000],
1045            &[1, 2],
1046            &[OpType::Put, OpType::Put],
1047            &[10, 20],
1048        );
1049
1050        let batch2 = new_test_record_batch(
1051            &[b"series1", b"series2"],
1052            &[3000, 4000],
1053            &[3, 4],
1054            &[OpType::Put, OpType::Put],
1055            &[30, 40],
1056        );
1057
1058        divider.push(batch1);
1059        let series_batch = divider.push(batch2).unwrap();
1060        assert_eq!(series_batch.batches.len(), 2);
1061        assert_eq!(series_batch.batches[0].num_rows(), 2);
1062        assert_eq!(series_batch.batches[1].num_rows(), 1);
1063
1064        assert_eq!(divider.buffer.batches.len(), 1);
1065        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1066    }
1067
1068    #[test]
1069    fn test_series_splitting() {
1070        let mut divider = FlatSeriesBatchDivider::default();
1071
1072        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1073
1074        let batch2 = new_test_record_batch(
1075            &[b"series1", b"series2", b"series2", b"series3"],
1076            &[2000, 3000, 4000, 5000],
1077            &[2, 3, 4, 5],
1078            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1079            &[20, 30, 40, 50],
1080        );
1081
1082        divider.push(batch1);
1083        let series_batch = divider.push(batch2).unwrap();
1084        assert_eq!(series_batch.batches.len(), 2);
1085
1086        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1087        assert_eq!(total_rows, 2);
1088
1089        let final_batch = divider.finish().unwrap();
1090        assert_eq!(final_batch.batches.len(), 1);
1091        assert_eq!(final_batch.batches[0].num_rows(), 3);
1092    }
1093}