mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::array::BinaryArray;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::SchemaRef;
30use futures::{StreamExt, TryStreamExt};
31use smallvec::{SmallVec, smallvec};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
36};
37use tokio::sync::Semaphore;
38use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
39use tokio::sync::mpsc::{self, Receiver, Sender};
40
41use crate::error::{
42    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
43    ScanSeriesSnafu, TooManyFilesToReadSnafu,
44};
45use crate::read::range::RangeBuilderList;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
48use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{Batch, ScannerMetrics};
51use crate::sst::parquet::flat_format::primary_key_column_index;
52use crate::sst::parquet::format::PrimaryKeyArray;
53
54/// Timeout to send a batch to a sender.
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
56
57/// List of receivers.
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
59
60/// Scans a region and returns sorted rows of a series in the same partition.
61///
62/// The output order is always order by `(primary key, time index)` inside every
63/// partition.
64/// Always returns the same series (primary key) to the same partition.
65pub struct SeriesScan {
66    /// Properties of the scanner.
67    properties: ScannerProperties,
68    /// Context of streams.
69    stream_ctx: Arc<StreamContext>,
70    /// Receivers of each partition.
71    receivers: Mutex<ReceiverList>,
72    /// Metrics for each partition.
73    /// The scanner only sets in query and keeps it empty during compaction.
74    metrics_list: Arc<PartitionMetricsList>,
75}
76
77impl SeriesScan {
78    /// Creates a new [SeriesScan].
79    pub(crate) fn new(input: ScanInput) -> Self {
80        let mut properties = ScannerProperties::default()
81            .with_append_mode(input.append_mode)
82            .with_total_rows(input.total_rows());
83        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
84        properties.partitions = vec![stream_ctx.partition_ranges()];
85
86        Self {
87            properties,
88            stream_ctx,
89            receivers: Mutex::new(Vec::new()),
90            metrics_list: Arc::new(PartitionMetricsList::default()),
91        }
92    }
93
94    fn scan_partition_impl(
95        &self,
96        ctx: &QueryScanContext,
97        metrics_set: &ExecutionPlanMetricsSet,
98        partition: usize,
99    ) -> Result<SendableRecordBatchStream> {
100        let metrics = new_partition_metrics(
101            &self.stream_ctx,
102            ctx.explain_verbose,
103            metrics_set,
104            partition,
105            &self.metrics_list,
106        );
107
108        let batch_stream =
109            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
110
111        let input = &self.stream_ctx.input;
112        let record_batch_stream = ConvertBatchStream::new(
113            batch_stream,
114            input.mapper.clone(),
115            input.cache_strategy.clone(),
116            metrics,
117        );
118
119        Ok(Box::pin(RecordBatchStreamWrapper::new(
120            input.mapper.output_schema(),
121            Box::pin(record_batch_stream),
122        )))
123    }
124
125    fn scan_batch_in_partition(
126        &self,
127        ctx: &QueryScanContext,
128        partition: usize,
129        part_metrics: PartitionMetrics,
130        metrics_set: &ExecutionPlanMetricsSet,
131    ) -> Result<ScanBatchStream> {
132        if ctx.explain_verbose {
133            common_telemetry::info!(
134                "SeriesScan partition {}, region_id: {}",
135                partition,
136                self.stream_ctx.input.region_metadata().region_id
137            );
138        }
139
140        ensure!(
141            partition < self.properties.num_partitions(),
142            PartitionOutOfRangeSnafu {
143                given: partition,
144                all: self.properties.num_partitions(),
145            }
146        );
147
148        self.maybe_start_distributor(metrics_set, &self.metrics_list);
149
150        let mut receiver = self.take_receiver(partition)?;
151        let stream = try_stream! {
152            part_metrics.on_first_poll();
153
154            let mut fetch_start = Instant::now();
155            while let Some(series) = receiver.recv().await {
156                let series = series?;
157
158                let mut metrics = ScannerMetrics::default();
159                metrics.scan_cost += fetch_start.elapsed();
160                fetch_start = Instant::now();
161
162                metrics.num_batches += series.num_batches();
163                metrics.num_rows += series.num_rows();
164
165                let yield_start = Instant::now();
166                yield ScanBatch::Series(series);
167                metrics.yield_cost += yield_start.elapsed();
168
169                part_metrics.merge_metrics(&metrics);
170            }
171
172            part_metrics.on_finish();
173        };
174        Ok(Box::pin(stream))
175    }
176
177    /// Takes the receiver for the partition.
178    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
179        let mut rx_list = self.receivers.lock().unwrap();
180        rx_list[partition]
181            .take()
182            .context(ScanMultiTimesSnafu { partition })
183    }
184
185    /// Starts the distributor if the receiver list is empty.
186    fn maybe_start_distributor(
187        &self,
188        metrics_set: &ExecutionPlanMetricsSet,
189        metrics_list: &Arc<PartitionMetricsList>,
190    ) {
191        let mut rx_list = self.receivers.lock().unwrap();
192        if !rx_list.is_empty() {
193            return;
194        }
195
196        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
197        let mut distributor = SeriesDistributor {
198            stream_ctx: self.stream_ctx.clone(),
199            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
200            partitions: self.properties.partitions.clone(),
201            senders,
202            metrics_set: metrics_set.clone(),
203            metrics_list: metrics_list.clone(),
204        };
205        common_runtime::spawn_global(async move {
206            distributor.execute().await;
207        });
208
209        *rx_list = receivers;
210    }
211
212    /// Scans the region and returns a stream.
213    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
214        let part_num = self.properties.num_partitions();
215        let metrics_set = ExecutionPlanMetricsSet::default();
216        let streams = (0..part_num)
217            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
218            .collect::<Result<Vec<_>, BoxedError>>()?;
219        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
220        Ok(Box::pin(chained_stream))
221    }
222
223    /// Scan [`Batch`] in all partitions one by one.
224    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
225        let metrics_set = ExecutionPlanMetricsSet::new();
226
227        let streams = (0..self.properties.partitions.len())
228            .map(|partition| {
229                let metrics = new_partition_metrics(
230                    &self.stream_ctx,
231                    false,
232                    &metrics_set,
233                    partition,
234                    &self.metrics_list,
235                );
236
237                self.scan_batch_in_partition(
238                    &QueryScanContext::default(),
239                    partition,
240                    metrics,
241                    &metrics_set,
242                )
243            })
244            .collect::<Result<Vec<_>>>()?;
245
246        Ok(Box::pin(futures::stream::iter(streams).flatten()))
247    }
248
249    /// Checks resource limit for the scanner.
250    pub(crate) fn check_scan_limit(&self) -> Result<()> {
251        // Sum the total number of files across all partitions
252        let total_files: usize = self
253            .properties
254            .partitions
255            .iter()
256            .flat_map(|partition| partition.iter())
257            .map(|part_range| {
258                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
259                range_meta.indices.len()
260            })
261            .sum();
262
263        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
264        if total_files > max_concurrent_files {
265            return TooManyFilesToReadSnafu {
266                actual: total_files,
267                max: max_concurrent_files,
268            }
269            .fail();
270        }
271
272        Ok(())
273    }
274}
275
276fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
277    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
278        .map(|_| {
279            let (sender, receiver) = mpsc::channel(1);
280            (Some(sender), Some(receiver))
281        })
282        .unzip();
283    (SenderList::new(senders), receivers)
284}
285
286impl RegionScanner for SeriesScan {
287    fn properties(&self) -> &ScannerProperties {
288        &self.properties
289    }
290
291    fn schema(&self) -> SchemaRef {
292        self.stream_ctx.input.mapper.output_schema()
293    }
294
295    fn metadata(&self) -> RegionMetadataRef {
296        self.stream_ctx.input.mapper.metadata().clone()
297    }
298
299    fn scan_partition(
300        &self,
301        ctx: &QueryScanContext,
302        metrics_set: &ExecutionPlanMetricsSet,
303        partition: usize,
304    ) -> Result<SendableRecordBatchStream, BoxedError> {
305        self.scan_partition_impl(ctx, metrics_set, partition)
306            .map_err(BoxedError::new)
307    }
308
309    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
310        self.properties.prepare(request);
311
312        self.check_scan_limit().map_err(BoxedError::new)?;
313
314        Ok(())
315    }
316
317    fn has_predicate_without_region(&self) -> bool {
318        let predicate = self
319            .stream_ctx
320            .input
321            .predicate_group()
322            .predicate_without_region();
323        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
324    }
325
326    fn set_logical_region(&mut self, logical_region: bool) {
327        self.properties.set_logical_region(logical_region);
328    }
329}
330
331impl DisplayAs for SeriesScan {
332    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
333        write!(
334            f,
335            "SeriesScan: region={}, ",
336            self.stream_ctx.input.mapper.metadata().region_id
337        )?;
338        match t {
339            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
340                self.stream_ctx.format_for_explain(false, f)
341            }
342            DisplayFormatType::Verbose => {
343                self.stream_ctx.format_for_explain(true, f)?;
344                self.metrics_list.format_verbose_metrics(f)
345            }
346        }
347    }
348}
349
350impl fmt::Debug for SeriesScan {
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        f.debug_struct("SeriesScan")
353            .field("num_ranges", &self.stream_ctx.ranges.len())
354            .finish()
355    }
356}
357
358#[cfg(test)]
359impl SeriesScan {
360    /// Returns the input.
361    pub(crate) fn input(&self) -> &ScanInput {
362        &self.stream_ctx.input
363    }
364}
365
366/// The distributor scans series and distributes them to different partitions.
367struct SeriesDistributor {
368    /// Context for the scan stream.
369    stream_ctx: Arc<StreamContext>,
370    /// Optional semaphore for limiting the number of concurrent scans.
371    semaphore: Option<Arc<Semaphore>>,
372    /// Partition ranges to scan.
373    partitions: Vec<Vec<PartitionRange>>,
374    /// Senders of all partitions.
375    senders: SenderList,
376    /// Metrics set to report.
377    /// The distributor report the metrics as an additional partition.
378    /// This may double the scan cost of the [SeriesScan] metrics. We can
379    /// get per-partition metrics in verbose mode to see the metrics of the
380    /// distributor.
381    metrics_set: ExecutionPlanMetricsSet,
382    metrics_list: Arc<PartitionMetricsList>,
383}
384
385impl SeriesDistributor {
386    /// Executes the distributor.
387    async fn execute(&mut self) {
388        let result = if self.stream_ctx.input.flat_format {
389            self.scan_partitions_flat().await
390        } else {
391            self.scan_partitions().await
392        };
393
394        if let Err(e) = result {
395            self.senders.send_error(e).await;
396        }
397    }
398
399    /// Scans all parts in flat format using FlatSeriesBatchDivider.
400    async fn scan_partitions_flat(&mut self) -> Result<()> {
401        let part_metrics = new_partition_metrics(
402            &self.stream_ctx,
403            false,
404            &self.metrics_set,
405            self.partitions.len(),
406            &self.metrics_list,
407        );
408        part_metrics.on_first_poll();
409
410        let range_builder_list = Arc::new(RangeBuilderList::new(
411            self.stream_ctx.input.num_memtables(),
412            self.stream_ctx.input.num_files(),
413        ));
414        // Scans all parts.
415        let mut sources = Vec::with_capacity(self.partitions.len());
416        for partition in &self.partitions {
417            sources.reserve(partition.len());
418            for part_range in partition {
419                build_flat_sources(
420                    &self.stream_ctx,
421                    part_range,
422                    false,
423                    &part_metrics,
424                    range_builder_list.clone(),
425                    &mut sources,
426                    self.semaphore.clone(),
427                )
428                .await?;
429            }
430        }
431
432        // Builds a flat reader that merge sources from all parts.
433        let mut reader = SeqScan::build_flat_reader_from_sources(
434            &self.stream_ctx,
435            sources,
436            self.semaphore.clone(),
437        )
438        .await?;
439        let mut metrics = SeriesDistributorMetrics::default();
440        let mut fetch_start = Instant::now();
441
442        let mut divider = FlatSeriesBatchDivider::default();
443        while let Some(record_batch) = reader.try_next().await? {
444            metrics.scan_cost += fetch_start.elapsed();
445            metrics.num_batches += 1;
446            metrics.num_rows += record_batch.num_rows();
447
448            debug_assert!(record_batch.num_rows() > 0);
449            if record_batch.num_rows() == 0 {
450                fetch_start = Instant::now();
451                continue;
452            }
453
454            // Use divider to split series
455            if let Some(series_batch) = divider.push(record_batch) {
456                let yield_start = Instant::now();
457                self.senders
458                    .send_batch(SeriesBatch::Flat(series_batch))
459                    .await?;
460                metrics.yield_cost += yield_start.elapsed();
461            }
462            fetch_start = Instant::now();
463        }
464
465        // Send any remaining batch in the divider
466        if let Some(series_batch) = divider.finish() {
467            let yield_start = Instant::now();
468            self.senders
469                .send_batch(SeriesBatch::Flat(series_batch))
470                .await?;
471            metrics.yield_cost += yield_start.elapsed();
472        }
473
474        metrics.scan_cost += fetch_start.elapsed();
475        metrics.num_series_send_timeout = self.senders.num_timeout;
476        metrics.num_series_send_full = self.senders.num_full;
477        part_metrics.set_distributor_metrics(&metrics);
478
479        part_metrics.on_finish();
480
481        Ok(())
482    }
483
484    /// Scans all parts.
485    async fn scan_partitions(&mut self) -> Result<()> {
486        let part_metrics = new_partition_metrics(
487            &self.stream_ctx,
488            false,
489            &self.metrics_set,
490            self.partitions.len(),
491            &self.metrics_list,
492        );
493        part_metrics.on_first_poll();
494
495        let range_builder_list = Arc::new(RangeBuilderList::new(
496            self.stream_ctx.input.num_memtables(),
497            self.stream_ctx.input.num_files(),
498        ));
499        // Scans all parts.
500        let mut sources = Vec::with_capacity(self.partitions.len());
501        for partition in &self.partitions {
502            sources.reserve(partition.len());
503            for part_range in partition {
504                build_sources(
505                    &self.stream_ctx,
506                    part_range,
507                    false,
508                    &part_metrics,
509                    range_builder_list.clone(),
510                    &mut sources,
511                    self.semaphore.clone(),
512                )
513                .await?;
514            }
515        }
516
517        // Builds a reader that merge sources from all parts.
518        let mut reader =
519            SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
520                .await?;
521        let mut metrics = SeriesDistributorMetrics::default();
522        let mut fetch_start = Instant::now();
523
524        let mut current_series = PrimaryKeySeriesBatch::default();
525        while let Some(batch) = reader.next_batch().await? {
526            metrics.scan_cost += fetch_start.elapsed();
527            metrics.num_batches += 1;
528            metrics.num_rows += batch.num_rows();
529
530            debug_assert!(!batch.is_empty());
531            if batch.is_empty() {
532                fetch_start = Instant::now();
533                continue;
534            }
535
536            let Some(last_key) = current_series.current_key() else {
537                current_series.push(batch);
538                fetch_start = Instant::now();
539                continue;
540            };
541
542            if last_key == batch.primary_key() {
543                current_series.push(batch);
544                fetch_start = Instant::now();
545                continue;
546            }
547
548            // We find a new series, send the current one.
549            let to_send =
550                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
551            let yield_start = Instant::now();
552            self.senders
553                .send_batch(SeriesBatch::PrimaryKey(to_send))
554                .await?;
555            metrics.yield_cost += yield_start.elapsed();
556            fetch_start = Instant::now();
557        }
558
559        if !current_series.is_empty() {
560            let yield_start = Instant::now();
561            self.senders
562                .send_batch(SeriesBatch::PrimaryKey(current_series))
563                .await?;
564            metrics.yield_cost += yield_start.elapsed();
565        }
566
567        metrics.scan_cost += fetch_start.elapsed();
568        metrics.num_series_send_timeout = self.senders.num_timeout;
569        metrics.num_series_send_full = self.senders.num_full;
570        part_metrics.set_distributor_metrics(&metrics);
571
572        part_metrics.on_finish();
573
574        Ok(())
575    }
576}
577
578/// Batches of the same series in primary key format.
579#[derive(Default, Debug)]
580pub struct PrimaryKeySeriesBatch {
581    pub batches: SmallVec<[Batch; 4]>,
582}
583
584impl PrimaryKeySeriesBatch {
585    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
586    fn single(batch: Batch) -> Self {
587        Self {
588            batches: smallvec![batch],
589        }
590    }
591
592    fn current_key(&self) -> Option<&[u8]> {
593        self.batches.first().map(|batch| batch.primary_key())
594    }
595
596    fn push(&mut self, batch: Batch) {
597        self.batches.push(batch);
598    }
599
600    /// Returns true if there is no batch.
601    fn is_empty(&self) -> bool {
602        self.batches.is_empty()
603    }
604}
605
606/// Batches of the same series.
607#[derive(Debug)]
608pub enum SeriesBatch {
609    PrimaryKey(PrimaryKeySeriesBatch),
610    Flat(FlatSeriesBatch),
611}
612
613impl SeriesBatch {
614    /// Returns the number of batches.
615    pub fn num_batches(&self) -> usize {
616        match self {
617            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
618            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
619        }
620    }
621
622    /// Returns the total number of rows across all batches.
623    pub fn num_rows(&self) -> usize {
624        match self {
625            SeriesBatch::PrimaryKey(primary_key_batch) => {
626                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
627            }
628            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
629        }
630    }
631}
632
633/// Batches of the same series in flat format.
634#[derive(Default, Debug)]
635pub struct FlatSeriesBatch {
636    pub batches: SmallVec<[RecordBatch; 4]>,
637}
638
639/// List of senders.
640struct SenderList {
641    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
642    /// Number of None senders.
643    num_nones: usize,
644    /// Index of the current partition to send.
645    sender_idx: usize,
646    /// Number of timeout.
647    num_timeout: usize,
648    /// Number of full senders.
649    num_full: usize,
650}
651
652impl SenderList {
653    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
654        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
655        Self {
656            senders,
657            num_nones,
658            sender_idx: 0,
659            num_timeout: 0,
660            num_full: 0,
661        }
662    }
663
664    /// Finds a partition and tries to send the batch to the partition.
665    /// Returns None if it sends successfully.
666    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
667        for _ in 0..self.senders.len() {
668            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
669
670            let sender_idx = self.fetch_add_sender_idx();
671            let Some(sender) = &self.senders[sender_idx] else {
672                continue;
673            };
674
675            match sender.try_send(Ok(batch)) {
676                Ok(()) => return Ok(None),
677                Err(TrySendError::Full(res)) => {
678                    self.num_full += 1;
679                    // Safety: we send Ok.
680                    batch = res.unwrap();
681                }
682                Err(TrySendError::Closed(res)) => {
683                    self.senders[sender_idx] = None;
684                    self.num_nones += 1;
685                    // Safety: we send Ok.
686                    batch = res.unwrap();
687                }
688            }
689        }
690
691        Ok(Some(batch))
692    }
693
694    /// Finds a partition and sends the batch to the partition.
695    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
696        // Sends the batch without blocking first.
697        match self.try_send_batch(batch)? {
698            Some(b) => {
699                // Unable to send batch to partition.
700                batch = b;
701            }
702            None => {
703                return Ok(());
704            }
705        }
706
707        loop {
708            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
709
710            let sender_idx = self.fetch_add_sender_idx();
711            let Some(sender) = &self.senders[sender_idx] else {
712                continue;
713            };
714            // Adds a timeout to avoid blocking indefinitely and sending
715            // the batch in a round-robin fashion when some partitions
716            // don't poll their inputs. This may happen if we have a
717            // node like sort merging. But it is rare when we are using SeriesScan.
718            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
719                Ok(()) => break,
720                Err(SendTimeoutError::Timeout(res)) => {
721                    self.num_timeout += 1;
722                    // Safety: we send Ok.
723                    batch = res.unwrap();
724                }
725                Err(SendTimeoutError::Closed(res)) => {
726                    self.senders[sender_idx] = None;
727                    self.num_nones += 1;
728                    // Safety: we send Ok.
729                    batch = res.unwrap();
730                }
731            }
732        }
733
734        Ok(())
735    }
736
737    async fn send_error(&self, error: Error) {
738        let error = Arc::new(error);
739        for sender in self.senders.iter().flatten() {
740            let result = Err(error.clone()).context(ScanSeriesSnafu);
741            let _ = sender.send(result).await;
742        }
743    }
744
745    fn fetch_add_sender_idx(&mut self) -> usize {
746        let sender_idx = self.sender_idx;
747        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
748        sender_idx
749    }
750}
751
752fn new_partition_metrics(
753    stream_ctx: &StreamContext,
754    explain_verbose: bool,
755    metrics_set: &ExecutionPlanMetricsSet,
756    partition: usize,
757    metrics_list: &PartitionMetricsList,
758) -> PartitionMetrics {
759    let metrics = PartitionMetrics::new(
760        stream_ctx.input.mapper.metadata().region_id,
761        partition,
762        "SeriesScan",
763        stream_ctx.query_start,
764        explain_verbose,
765        metrics_set,
766    );
767
768    metrics_list.set(partition, metrics.clone());
769    metrics
770}
771
772/// A divider to split flat record batches by time series.
773///
774/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
775/// However, a [FlatSeriesBatch] may contain rows from multiple series.
776#[derive(Default)]
777struct FlatSeriesBatchDivider {
778    buffer: FlatSeriesBatch,
779}
780
781impl FlatSeriesBatchDivider {
782    /// Pushes a record batch into the divider.
783    ///
784    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
785    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
786        // If buffer is empty
787        if self.buffer.batches.is_empty() {
788            self.buffer.batches.push(batch);
789            return None;
790        }
791
792        // Gets the primary key column from the incoming batch.
793        let pk_column_idx = primary_key_column_index(batch.num_columns());
794        let batch_pk_column = batch.column(pk_column_idx);
795        let batch_pk_array = batch_pk_column
796            .as_any()
797            .downcast_ref::<PrimaryKeyArray>()
798            .unwrap();
799        let batch_pk_values = batch_pk_array
800            .values()
801            .as_any()
802            .downcast_ref::<BinaryArray>()
803            .unwrap();
804        // Gets the last primary key of the incoming batch.
805        let batch_last_pk =
806            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
807        // Gets the last primary key of the buffer.
808        // Safety: the buffer is not empty.
809        let buffer_last_batch = self.buffer.batches.last().unwrap();
810        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
811        let buffer_pk_array = buffer_pk_column
812            .as_any()
813            .downcast_ref::<PrimaryKeyArray>()
814            .unwrap();
815        let buffer_pk_values = buffer_pk_array
816            .values()
817            .as_any()
818            .downcast_ref::<BinaryArray>()
819            .unwrap();
820        let buffer_last_pk =
821            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
822
823        // If last primary key in the batch is the same as last primary key in the buffer.
824        if batch_last_pk == buffer_last_pk {
825            self.buffer.batches.push(batch);
826            return None;
827        }
828        // Otherwise, the batch must have a different primary key, we find the first offset of the
829        // changed primary key.
830        let batch_pk_keys = batch_pk_array.keys();
831        let pk_indices = batch_pk_keys.values();
832        let mut change_offset = 0;
833        for (i, &key) in pk_indices.iter().enumerate() {
834            let batch_pk = batch_pk_values.value(key as usize);
835
836            if buffer_last_pk != batch_pk {
837                change_offset = i;
838                break;
839            }
840        }
841
842        // Splits the batch at the change offset
843        let (first_part, remaining_part) = if change_offset > 0 {
844            let first_part = batch.slice(0, change_offset);
845            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
846            (Some(first_part), Some(remaining_part))
847        } else {
848            (None, Some(batch))
849        };
850
851        // Creates the result from current buffer + first part of new batch
852        let mut result = std::mem::take(&mut self.buffer);
853        if let Some(first_part) = first_part {
854            result.batches.push(first_part);
855        }
856
857        // Pushes remaining part to the buffer if it exists
858        if let Some(remaining_part) = remaining_part {
859            self.buffer.batches.push(remaining_part);
860        }
861
862        Some(result)
863    }
864
865    /// Returns the final [FlatSeriesBatch].
866    fn finish(&mut self) -> Option<FlatSeriesBatch> {
867        if self.buffer.batches.is_empty() {
868            None
869        } else {
870            Some(std::mem::take(&mut self.buffer))
871        }
872    }
873}
874
875/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
876fn primary_key_at<'a>(
877    primary_key: &PrimaryKeyArray,
878    primary_key_values: &'a BinaryArray,
879    index: usize,
880) -> &'a [u8] {
881    let key = primary_key.keys().value(index);
882    primary_key_values.value(key as usize)
883}
884
885#[cfg(test)]
886mod tests {
887    use std::sync::Arc;
888
889    use api::v1::OpType;
890    use datatypes::arrow::array::{
891        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
892        TimestampMillisecondArray, UInt8Array, UInt64Array,
893    };
894    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
895    use datatypes::arrow::record_batch::RecordBatch;
896
897    use super::*;
898
899    fn new_test_record_batch(
900        primary_keys: &[&[u8]],
901        timestamps: &[i64],
902        sequences: &[u64],
903        op_types: &[OpType],
904        fields: &[u64],
905    ) -> RecordBatch {
906        let num_rows = timestamps.len();
907        debug_assert_eq!(sequences.len(), num_rows);
908        debug_assert_eq!(op_types.len(), num_rows);
909        debug_assert_eq!(fields.len(), num_rows);
910        debug_assert_eq!(primary_keys.len(), num_rows);
911
912        let columns: Vec<ArrayRef> = vec![
913            build_test_pk_string_dict_array(primary_keys),
914            Arc::new(Int64Array::from_iter(
915                fields.iter().map(|v| Some(*v as i64)),
916            )),
917            Arc::new(TimestampMillisecondArray::from_iter_values(
918                timestamps.iter().copied(),
919            )),
920            build_test_pk_array(primary_keys),
921            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
922            Arc::new(UInt8Array::from_iter_values(
923                op_types.iter().map(|v| *v as u8),
924            )),
925        ];
926
927        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
928    }
929
930    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
931        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
932        for &pk in primary_keys {
933            let pk_str = std::str::from_utf8(pk).unwrap();
934            builder.append(pk_str).unwrap();
935        }
936        Arc::new(builder.finish())
937    }
938
939    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
940        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
941        for &pk in primary_keys {
942            builder.append(pk).unwrap();
943        }
944        Arc::new(builder.finish())
945    }
946
947    fn build_test_flat_schema() -> SchemaRef {
948        let fields = vec![
949            Field::new(
950                "k0",
951                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
952                false,
953            ),
954            Field::new("field0", DataType::Int64, true),
955            Field::new(
956                "ts",
957                DataType::Timestamp(TimeUnit::Millisecond, None),
958                false,
959            ),
960            Field::new(
961                "__primary_key",
962                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
963                false,
964            ),
965            Field::new("__sequence", DataType::UInt64, false),
966            Field::new("__op_type", DataType::UInt8, false),
967        ];
968        Arc::new(Schema::new(fields))
969    }
970
971    #[test]
972    fn test_empty_buffer_first_push() {
973        let mut divider = FlatSeriesBatchDivider::default();
974        let result = divider.finish();
975        assert!(result.is_none());
976
977        let mut divider = FlatSeriesBatchDivider::default();
978        let batch = new_test_record_batch(
979            &[b"series1", b"series1"],
980            &[1000, 2000],
981            &[1, 2],
982            &[OpType::Put, OpType::Put],
983            &[10, 20],
984        );
985        let result = divider.push(batch);
986        assert!(result.is_none());
987        assert_eq!(divider.buffer.batches.len(), 1);
988    }
989
990    #[test]
991    fn test_same_series_accumulation() {
992        let mut divider = FlatSeriesBatchDivider::default();
993
994        let batch1 = new_test_record_batch(
995            &[b"series1", b"series1"],
996            &[1000, 2000],
997            &[1, 2],
998            &[OpType::Put, OpType::Put],
999            &[10, 20],
1000        );
1001
1002        let batch2 = new_test_record_batch(
1003            &[b"series1", b"series1"],
1004            &[3000, 4000],
1005            &[3, 4],
1006            &[OpType::Put, OpType::Put],
1007            &[30, 40],
1008        );
1009
1010        divider.push(batch1);
1011        let result = divider.push(batch2);
1012        assert!(result.is_none());
1013        let series_batch = divider.finish().unwrap();
1014        assert_eq!(series_batch.batches.len(), 2);
1015    }
1016
1017    #[test]
1018    fn test_series_boundary_detection() {
1019        let mut divider = FlatSeriesBatchDivider::default();
1020
1021        let batch1 = new_test_record_batch(
1022            &[b"series1", b"series1"],
1023            &[1000, 2000],
1024            &[1, 2],
1025            &[OpType::Put, OpType::Put],
1026            &[10, 20],
1027        );
1028
1029        let batch2 = new_test_record_batch(
1030            &[b"series2", b"series2"],
1031            &[3000, 4000],
1032            &[3, 4],
1033            &[OpType::Put, OpType::Put],
1034            &[30, 40],
1035        );
1036
1037        divider.push(batch1);
1038        let series_batch = divider.push(batch2).unwrap();
1039        assert_eq!(series_batch.batches.len(), 1);
1040
1041        assert_eq!(divider.buffer.batches.len(), 1);
1042    }
1043
1044    #[test]
1045    fn test_series_boundary_within_batch() {
1046        let mut divider = FlatSeriesBatchDivider::default();
1047
1048        let batch1 = new_test_record_batch(
1049            &[b"series1", b"series1"],
1050            &[1000, 2000],
1051            &[1, 2],
1052            &[OpType::Put, OpType::Put],
1053            &[10, 20],
1054        );
1055
1056        let batch2 = new_test_record_batch(
1057            &[b"series1", b"series2"],
1058            &[3000, 4000],
1059            &[3, 4],
1060            &[OpType::Put, OpType::Put],
1061            &[30, 40],
1062        );
1063
1064        divider.push(batch1);
1065        let series_batch = divider.push(batch2).unwrap();
1066        assert_eq!(series_batch.batches.len(), 2);
1067        assert_eq!(series_batch.batches[0].num_rows(), 2);
1068        assert_eq!(series_batch.batches[1].num_rows(), 1);
1069
1070        assert_eq!(divider.buffer.batches.len(), 1);
1071        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1072    }
1073
1074    #[test]
1075    fn test_series_splitting() {
1076        let mut divider = FlatSeriesBatchDivider::default();
1077
1078        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1079
1080        let batch2 = new_test_record_batch(
1081            &[b"series1", b"series2", b"series2", b"series3"],
1082            &[2000, 3000, 4000, 5000],
1083            &[2, 3, 4, 5],
1084            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1085            &[20, 30, 40, 50],
1086        );
1087
1088        divider.push(batch1);
1089        let series_batch = divider.push(batch2).unwrap();
1090        assert_eq!(series_batch.batches.len(), 2);
1091
1092        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1093        assert_eq!(total_rows, 2);
1094
1095        let final_batch = divider.finish().unwrap();
1096        assert_eq!(final_batch.batches.len(), 1);
1097        assert_eq!(final_batch.batches[0].num_rows(), 3);
1098    }
1099}