mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::{SmallVec, smallvec};
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45    ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::pruner::{PartitionPruner, Pruner};
48use crate::read::scan_region::{ScanInput, StreamContext};
49use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
50use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{Batch, ScannerMetrics};
53use crate::sst::parquet::flat_format::primary_key_column_index;
54use crate::sst::parquet::format::PrimaryKeyArray;
55
56/// Timeout to send a batch to a sender.
57const SEND_TIMEOUT: Duration = Duration::from_micros(100);
58
59/// List of receivers.
60type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
61
62/// Scans a region and returns sorted rows of a series in the same partition.
63///
64/// The output order is always order by `(primary key, time index)` inside every
65/// partition.
66/// Always returns the same series (primary key) to the same partition.
67pub struct SeriesScan {
68    /// Properties of the scanner.
69    properties: ScannerProperties,
70    /// Context of streams.
71    stream_ctx: Arc<StreamContext>,
72    /// Shared pruner for file range building.
73    pruner: Arc<Pruner>,
74    /// Receivers of each partition.
75    receivers: Mutex<ReceiverList>,
76    /// Metrics for each partition.
77    /// The scanner only sets in query and keeps it empty during compaction.
78    metrics_list: Arc<PartitionMetricsList>,
79}
80
81impl SeriesScan {
82    /// Creates a new [SeriesScan].
83    pub(crate) fn new(input: ScanInput) -> Self {
84        let mut properties = ScannerProperties::default()
85            .with_append_mode(input.append_mode)
86            .with_total_rows(input.total_rows());
87        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
88        properties.partitions = vec![stream_ctx.partition_ranges()];
89
90        // Create the shared pruner with number of workers equal to CPU cores.
91        let num_workers = common_stat::get_total_cpu_cores().max(1);
92        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
93
94        Self {
95            properties,
96            stream_ctx,
97            pruner,
98            receivers: Mutex::new(Vec::new()),
99            metrics_list: Arc::new(PartitionMetricsList::default()),
100        }
101    }
102
103    #[tracing::instrument(
104        skip_all,
105        fields(
106            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
107            partition = partition
108        )
109    )]
110    fn scan_partition_impl(
111        &self,
112        ctx: &QueryScanContext,
113        metrics_set: &ExecutionPlanMetricsSet,
114        partition: usize,
115    ) -> Result<SendableRecordBatchStream> {
116        let metrics = new_partition_metrics(
117            &self.stream_ctx,
118            ctx.explain_verbose,
119            metrics_set,
120            partition,
121            &self.metrics_list,
122        );
123
124        let batch_stream =
125            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
126
127        let input = &self.stream_ctx.input;
128        let record_batch_stream = ConvertBatchStream::new(
129            batch_stream,
130            input.mapper.clone(),
131            input.cache_strategy.clone(),
132            metrics,
133        );
134
135        Ok(Box::pin(RecordBatchStreamWrapper::new(
136            input.mapper.output_schema(),
137            Box::pin(record_batch_stream),
138        )))
139    }
140
141    #[tracing::instrument(
142        skip_all,
143        fields(
144            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
145            partition = partition
146        )
147    )]
148    fn scan_batch_in_partition(
149        &self,
150        ctx: &QueryScanContext,
151        partition: usize,
152        part_metrics: PartitionMetrics,
153        metrics_set: &ExecutionPlanMetricsSet,
154    ) -> Result<ScanBatchStream> {
155        if ctx.explain_verbose {
156            common_telemetry::info!(
157                "SeriesScan partition {}, region_id: {}",
158                partition,
159                self.stream_ctx.input.region_metadata().region_id
160            );
161        }
162
163        ensure!(
164            partition < self.properties.num_partitions(),
165            PartitionOutOfRangeSnafu {
166                given: partition,
167                all: self.properties.num_partitions(),
168            }
169        );
170
171        self.maybe_start_distributor(metrics_set, &self.metrics_list);
172
173        let mut receiver = self.take_receiver(partition)?;
174        let stream = try_stream! {
175            part_metrics.on_first_poll();
176
177            let mut fetch_start = Instant::now();
178            while let Some(series) = receiver.recv().await {
179                let series = series?;
180
181                let mut metrics = ScannerMetrics::default();
182                metrics.scan_cost += fetch_start.elapsed();
183                fetch_start = Instant::now();
184
185                metrics.num_batches += series.num_batches();
186                metrics.num_rows += series.num_rows();
187
188                let yield_start = Instant::now();
189                yield ScanBatch::Series(series);
190                metrics.yield_cost += yield_start.elapsed();
191
192                part_metrics.merge_metrics(&metrics);
193            }
194
195            part_metrics.on_finish();
196        };
197        Ok(Box::pin(stream))
198    }
199
200    /// Takes the receiver for the partition.
201    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
202        let mut rx_list = self.receivers.lock().unwrap();
203        rx_list[partition]
204            .take()
205            .context(ScanMultiTimesSnafu { partition })
206    }
207
208    /// Starts the distributor if the receiver list is empty.
209    #[tracing::instrument(
210        skip(self, metrics_set, metrics_list),
211        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
212    )]
213    fn maybe_start_distributor(
214        &self,
215        metrics_set: &ExecutionPlanMetricsSet,
216        metrics_list: &Arc<PartitionMetricsList>,
217    ) {
218        let mut rx_list = self.receivers.lock().unwrap();
219        if !rx_list.is_empty() {
220            return;
221        }
222
223        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
224        let mut distributor = SeriesDistributor {
225            stream_ctx: self.stream_ctx.clone(),
226            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
227            partitions: self.properties.partitions.clone(),
228            pruner: self.pruner.clone(),
229            senders,
230            metrics_set: metrics_set.clone(),
231            metrics_list: metrics_list.clone(),
232        };
233        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
234        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
235        common_runtime::spawn_global(
236            async move {
237                distributor.execute().await;
238            }
239            .instrument(span),
240        );
241
242        *rx_list = receivers;
243    }
244
245    /// Scans the region and returns a stream.
246    #[tracing::instrument(
247        skip_all,
248        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
249    )]
250    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
251        let part_num = self.properties.num_partitions();
252        let metrics_set = ExecutionPlanMetricsSet::default();
253        let streams = (0..part_num)
254            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
255            .collect::<Result<Vec<_>, BoxedError>>()?;
256        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
257        Ok(Box::pin(chained_stream))
258    }
259
260    /// Scan [`Batch`] in all partitions one by one.
261    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
262        let metrics_set = ExecutionPlanMetricsSet::new();
263
264        let streams = (0..self.properties.partitions.len())
265            .map(|partition| {
266                let metrics = new_partition_metrics(
267                    &self.stream_ctx,
268                    false,
269                    &metrics_set,
270                    partition,
271                    &self.metrics_list,
272                );
273
274                self.scan_batch_in_partition(
275                    &QueryScanContext::default(),
276                    partition,
277                    metrics,
278                    &metrics_set,
279                )
280            })
281            .collect::<Result<Vec<_>>>()?;
282
283        Ok(Box::pin(futures::stream::iter(streams).flatten()))
284    }
285
286    /// Checks resource limit for the scanner.
287    pub(crate) fn check_scan_limit(&self) -> Result<()> {
288        // Sum the total number of files across all partitions
289        let total_files: usize = self
290            .properties
291            .partitions
292            .iter()
293            .flat_map(|partition| partition.iter())
294            .map(|part_range| {
295                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
296                range_meta.indices.len()
297            })
298            .sum();
299
300        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
301        if total_files > max_concurrent_files {
302            return TooManyFilesToReadSnafu {
303                actual: total_files,
304                max: max_concurrent_files,
305            }
306            .fail();
307        }
308
309        Ok(())
310    }
311}
312
313fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
314    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
315        .map(|_| {
316            let (sender, receiver) = mpsc::channel(1);
317            (Some(sender), Some(receiver))
318        })
319        .unzip();
320    (SenderList::new(senders), receivers)
321}
322
323impl RegionScanner for SeriesScan {
324    fn name(&self) -> &str {
325        "SeriesScan"
326    }
327
328    fn properties(&self) -> &ScannerProperties {
329        &self.properties
330    }
331
332    fn schema(&self) -> SchemaRef {
333        self.stream_ctx.input.mapper.output_schema()
334    }
335
336    fn metadata(&self) -> RegionMetadataRef {
337        self.stream_ctx.input.mapper.metadata().clone()
338    }
339
340    fn scan_partition(
341        &self,
342        ctx: &QueryScanContext,
343        metrics_set: &ExecutionPlanMetricsSet,
344        partition: usize,
345    ) -> Result<SendableRecordBatchStream, BoxedError> {
346        self.scan_partition_impl(ctx, metrics_set, partition)
347            .map_err(BoxedError::new)
348    }
349
350    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
351        self.properties.prepare(request);
352
353        self.check_scan_limit().map_err(BoxedError::new)?;
354
355        Ok(())
356    }
357
358    fn has_predicate_without_region(&self) -> bool {
359        let predicate = self
360            .stream_ctx
361            .input
362            .predicate_group()
363            .predicate_without_region();
364        predicate.is_some()
365    }
366
367    fn add_dyn_filter_to_predicate(
368        &mut self,
369        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
370    ) -> Vec<bool> {
371        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
372    }
373
374    fn set_logical_region(&mut self, logical_region: bool) {
375        self.properties.set_logical_region(logical_region);
376    }
377}
378
379impl DisplayAs for SeriesScan {
380    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
381        write!(
382            f,
383            "SeriesScan: region={}, ",
384            self.stream_ctx.input.mapper.metadata().region_id
385        )?;
386        match t {
387            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
388                self.stream_ctx.format_for_explain(false, f)
389            }
390            DisplayFormatType::Verbose => {
391                self.stream_ctx.format_for_explain(true, f)?;
392                self.metrics_list.format_verbose_metrics(f)
393            }
394        }
395    }
396}
397
398impl fmt::Debug for SeriesScan {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        f.debug_struct("SeriesScan")
401            .field("num_ranges", &self.stream_ctx.ranges.len())
402            .finish()
403    }
404}
405
406#[cfg(test)]
407impl SeriesScan {
408    /// Returns the input.
409    pub(crate) fn input(&self) -> &ScanInput {
410        &self.stream_ctx.input
411    }
412}
413
414/// The distributor scans series and distributes them to different partitions.
415struct SeriesDistributor {
416    /// Context for the scan stream.
417    stream_ctx: Arc<StreamContext>,
418    /// Optional semaphore for limiting the number of concurrent scans.
419    semaphore: Option<Arc<Semaphore>>,
420    /// Partition ranges to scan.
421    partitions: Vec<Vec<PartitionRange>>,
422    /// Shared pruner for file range building.
423    pruner: Arc<Pruner>,
424    /// Senders of all partitions.
425    senders: SenderList,
426    /// Metrics set to report.
427    /// The distributor report the metrics as an additional partition.
428    /// This may double the scan cost of the [SeriesScan] metrics. We can
429    /// get per-partition metrics in verbose mode to see the metrics of the
430    /// distributor.
431    metrics_set: ExecutionPlanMetricsSet,
432    metrics_list: Arc<PartitionMetricsList>,
433}
434
435impl SeriesDistributor {
436    /// Executes the distributor.
437    #[tracing::instrument(
438        skip_all,
439        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
440    )]
441    async fn execute(&mut self) {
442        let result = if self.stream_ctx.input.flat_format {
443            self.scan_partitions_flat().await
444        } else {
445            self.scan_partitions().await
446        };
447
448        if let Err(e) = result {
449            self.senders.send_error(e).await;
450        }
451    }
452
453    /// Scans all parts in flat format using FlatSeriesBatchDivider.
454    #[tracing::instrument(
455        skip_all,
456        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
457    )]
458    async fn scan_partitions_flat(&mut self) -> Result<()> {
459        // Initialize reference counts for all partition ranges.
460        for partition_ranges in &self.partitions {
461            self.pruner.add_partition_ranges(partition_ranges);
462        }
463
464        // Create PartitionPruner covering all partitions
465        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
466        let partition_pruner = Arc::new(PartitionPruner::new(
467            self.pruner.clone(),
468            &all_partition_ranges,
469        ));
470
471        let part_metrics = new_partition_metrics(
472            &self.stream_ctx,
473            false,
474            &self.metrics_set,
475            self.partitions.len(),
476            &self.metrics_list,
477        );
478        part_metrics.on_first_poll();
479        // Start fetch time before building sources so scan cost contains
480        // build part cost.
481        let mut fetch_start = Instant::now();
482
483        // Scans all parts.
484        let mut sources = Vec::with_capacity(self.partitions.len());
485        for partition in &self.partitions {
486            sources.reserve(partition.len());
487            for part_range in partition {
488                build_flat_sources(
489                    &self.stream_ctx,
490                    part_range,
491                    false,
492                    &part_metrics,
493                    partition_pruner.clone(),
494                    &mut sources,
495                    self.semaphore.clone(),
496                )
497                .await?;
498            }
499        }
500
501        // Builds a flat reader that merge sources from all parts.
502        let mut reader = SeqScan::build_flat_reader_from_sources(
503            &self.stream_ctx,
504            sources,
505            self.semaphore.clone(),
506            Some(&part_metrics),
507        )
508        .await?;
509        let mut metrics = SeriesDistributorMetrics::default();
510
511        let mut divider = FlatSeriesBatchDivider::default();
512        while let Some(record_batch) = reader.try_next().await? {
513            metrics.scan_cost += fetch_start.elapsed();
514            metrics.num_batches += 1;
515            metrics.num_rows += record_batch.num_rows();
516
517            debug_assert!(record_batch.num_rows() > 0);
518            if record_batch.num_rows() == 0 {
519                fetch_start = Instant::now();
520                continue;
521            }
522
523            // Use divider to split series
524            let divider_start = Instant::now();
525            let series_batch = divider.push(record_batch);
526            metrics.divider_cost += divider_start.elapsed();
527            if let Some(series_batch) = series_batch {
528                let yield_start = Instant::now();
529                self.senders
530                    .send_batch(SeriesBatch::Flat(series_batch))
531                    .await?;
532                metrics.yield_cost += yield_start.elapsed();
533            }
534            fetch_start = Instant::now();
535        }
536
537        // Send any remaining batch in the divider
538        let divider_start = Instant::now();
539        let series_batch = divider.finish();
540        metrics.divider_cost += divider_start.elapsed();
541        if let Some(series_batch) = series_batch {
542            let yield_start = Instant::now();
543            self.senders
544                .send_batch(SeriesBatch::Flat(series_batch))
545                .await?;
546            metrics.yield_cost += yield_start.elapsed();
547        }
548
549        metrics.scan_cost += fetch_start.elapsed();
550        metrics.num_series_send_timeout = self.senders.num_timeout;
551        metrics.num_series_send_full = self.senders.num_full;
552        part_metrics.set_distributor_metrics(&metrics);
553
554        part_metrics.on_finish();
555
556        Ok(())
557    }
558
559    /// Scans all parts.
560    #[tracing::instrument(
561        skip_all,
562        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
563    )]
564    async fn scan_partitions(&mut self) -> Result<()> {
565        // Initialize reference counts for all partition ranges.
566        for partition_ranges in &self.partitions {
567            self.pruner.add_partition_ranges(partition_ranges);
568        }
569
570        // Create PartitionPruner covering all partitions
571        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
572        let partition_pruner = Arc::new(PartitionPruner::new(
573            self.pruner.clone(),
574            &all_partition_ranges,
575        ));
576
577        let part_metrics = new_partition_metrics(
578            &self.stream_ctx,
579            false,
580            &self.metrics_set,
581            self.partitions.len(),
582            &self.metrics_list,
583        );
584        part_metrics.on_first_poll();
585        // Start fetch time before building sources so scan cost contains
586        // build part cost.
587        let mut fetch_start = Instant::now();
588
589        // Scans all parts.
590        let mut sources = Vec::with_capacity(self.partitions.len());
591        for partition in &self.partitions {
592            sources.reserve(partition.len());
593            for part_range in partition {
594                build_sources(
595                    &self.stream_ctx,
596                    part_range,
597                    false,
598                    &part_metrics,
599                    partition_pruner.clone(),
600                    &mut sources,
601                    self.semaphore.clone(),
602                )
603                .await?;
604            }
605        }
606
607        // Builds a reader that merge sources from all parts.
608        let mut reader = SeqScan::build_reader_from_sources(
609            &self.stream_ctx,
610            sources,
611            self.semaphore.clone(),
612            Some(&part_metrics),
613        )
614        .await?;
615        let mut metrics = SeriesDistributorMetrics::default();
616
617        let mut current_series = PrimaryKeySeriesBatch::default();
618        while let Some(batch) = reader.next_batch().await? {
619            metrics.scan_cost += fetch_start.elapsed();
620            metrics.num_batches += 1;
621            metrics.num_rows += batch.num_rows();
622
623            debug_assert!(!batch.is_empty());
624            if batch.is_empty() {
625                fetch_start = Instant::now();
626                continue;
627            }
628
629            let Some(last_key) = current_series.current_key() else {
630                current_series.push(batch);
631                fetch_start = Instant::now();
632                continue;
633            };
634
635            if last_key == batch.primary_key() {
636                current_series.push(batch);
637                fetch_start = Instant::now();
638                continue;
639            }
640
641            // We find a new series, send the current one.
642            let to_send =
643                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
644            let yield_start = Instant::now();
645            self.senders
646                .send_batch(SeriesBatch::PrimaryKey(to_send))
647                .await?;
648            metrics.yield_cost += yield_start.elapsed();
649            fetch_start = Instant::now();
650        }
651
652        if !current_series.is_empty() {
653            let yield_start = Instant::now();
654            self.senders
655                .send_batch(SeriesBatch::PrimaryKey(current_series))
656                .await?;
657            metrics.yield_cost += yield_start.elapsed();
658        }
659
660        metrics.scan_cost += fetch_start.elapsed();
661        metrics.num_series_send_timeout = self.senders.num_timeout;
662        metrics.num_series_send_full = self.senders.num_full;
663        part_metrics.set_distributor_metrics(&metrics);
664
665        part_metrics.on_finish();
666
667        Ok(())
668    }
669}
670
671/// Batches of the same series in primary key format.
672#[derive(Default, Debug)]
673pub struct PrimaryKeySeriesBatch {
674    pub batches: SmallVec<[Batch; 4]>,
675}
676
677impl PrimaryKeySeriesBatch {
678    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
679    fn single(batch: Batch) -> Self {
680        Self {
681            batches: smallvec![batch],
682        }
683    }
684
685    fn current_key(&self) -> Option<&[u8]> {
686        self.batches.first().map(|batch| batch.primary_key())
687    }
688
689    fn push(&mut self, batch: Batch) {
690        self.batches.push(batch);
691    }
692
693    /// Returns true if there is no batch.
694    fn is_empty(&self) -> bool {
695        self.batches.is_empty()
696    }
697}
698
699/// Batches of the same series.
700#[derive(Debug)]
701pub enum SeriesBatch {
702    PrimaryKey(PrimaryKeySeriesBatch),
703    Flat(FlatSeriesBatch),
704}
705
706impl SeriesBatch {
707    /// Returns the number of batches.
708    pub fn num_batches(&self) -> usize {
709        match self {
710            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
711            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
712        }
713    }
714
715    /// Returns the total number of rows across all batches.
716    pub fn num_rows(&self) -> usize {
717        match self {
718            SeriesBatch::PrimaryKey(primary_key_batch) => {
719                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
720            }
721            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
722        }
723    }
724}
725
726/// Batches of the same series in flat format.
727#[derive(Default, Debug)]
728pub struct FlatSeriesBatch {
729    pub batches: SmallVec<[RecordBatch; 4]>,
730}
731
732/// List of senders.
733struct SenderList {
734    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
735    /// Number of None senders.
736    num_nones: usize,
737    /// Index of the current partition to send.
738    sender_idx: usize,
739    /// Number of timeout.
740    num_timeout: usize,
741    /// Number of full senders.
742    num_full: usize,
743}
744
745impl SenderList {
746    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
747        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
748        Self {
749            senders,
750            num_nones,
751            sender_idx: 0,
752            num_timeout: 0,
753            num_full: 0,
754        }
755    }
756
757    /// Finds a partition and tries to send the batch to the partition.
758    /// Returns None if it sends successfully.
759    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
760        for _ in 0..self.senders.len() {
761            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
762
763            let sender_idx = self.fetch_add_sender_idx();
764            let Some(sender) = &self.senders[sender_idx] else {
765                continue;
766            };
767
768            match sender.try_send(Ok(batch)) {
769                Ok(()) => return Ok(None),
770                Err(TrySendError::Full(res)) => {
771                    self.num_full += 1;
772                    // Safety: we send Ok.
773                    batch = res.unwrap();
774                }
775                Err(TrySendError::Closed(res)) => {
776                    self.senders[sender_idx] = None;
777                    self.num_nones += 1;
778                    // Safety: we send Ok.
779                    batch = res.unwrap();
780                }
781            }
782        }
783
784        Ok(Some(batch))
785    }
786
787    /// Finds a partition and sends the batch to the partition.
788    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
789        // Sends the batch without blocking first.
790        match self.try_send_batch(batch)? {
791            Some(b) => {
792                // Unable to send batch to partition.
793                batch = b;
794            }
795            None => {
796                return Ok(());
797            }
798        }
799
800        loop {
801            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
802
803            let sender_idx = self.fetch_add_sender_idx();
804            let Some(sender) = &self.senders[sender_idx] else {
805                continue;
806            };
807            // Adds a timeout to avoid blocking indefinitely and sending
808            // the batch in a round-robin fashion when some partitions
809            // don't poll their inputs. This may happen if we have a
810            // node like sort merging. But it is rare when we are using SeriesScan.
811            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
812                Ok(()) => break,
813                Err(SendTimeoutError::Timeout(res)) => {
814                    self.num_timeout += 1;
815                    // Safety: we send Ok.
816                    batch = res.unwrap();
817                }
818                Err(SendTimeoutError::Closed(res)) => {
819                    self.senders[sender_idx] = None;
820                    self.num_nones += 1;
821                    // Safety: we send Ok.
822                    batch = res.unwrap();
823                }
824            }
825        }
826
827        Ok(())
828    }
829
830    async fn send_error(&self, error: Error) {
831        let error = Arc::new(error);
832        for sender in self.senders.iter().flatten() {
833            let result = Err(error.clone()).context(ScanSeriesSnafu);
834            let _ = sender.send(result).await;
835        }
836    }
837
838    fn fetch_add_sender_idx(&mut self) -> usize {
839        let sender_idx = self.sender_idx;
840        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
841        sender_idx
842    }
843}
844
845fn new_partition_metrics(
846    stream_ctx: &StreamContext,
847    explain_verbose: bool,
848    metrics_set: &ExecutionPlanMetricsSet,
849    partition: usize,
850    metrics_list: &PartitionMetricsList,
851) -> PartitionMetrics {
852    let metrics = PartitionMetrics::new(
853        stream_ctx.input.mapper.metadata().region_id,
854        partition,
855        "SeriesScan",
856        stream_ctx.query_start,
857        explain_verbose,
858        metrics_set,
859    );
860
861    metrics_list.set(partition, metrics.clone());
862    metrics
863}
864
865/// A divider to split flat record batches by time series.
866///
867/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
868/// However, a [FlatSeriesBatch] may contain rows from multiple series.
869#[derive(Default)]
870struct FlatSeriesBatchDivider {
871    buffer: FlatSeriesBatch,
872}
873
874impl FlatSeriesBatchDivider {
875    /// Pushes a record batch into the divider.
876    ///
877    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
878    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
879        // If buffer is empty
880        if self.buffer.batches.is_empty() {
881            self.buffer.batches.push(batch);
882            return None;
883        }
884
885        // Gets the primary key column from the incoming batch.
886        let pk_column_idx = primary_key_column_index(batch.num_columns());
887        let batch_pk_column = batch.column(pk_column_idx);
888        let batch_pk_array = batch_pk_column
889            .as_any()
890            .downcast_ref::<PrimaryKeyArray>()
891            .unwrap();
892        let batch_pk_values = batch_pk_array
893            .values()
894            .as_any()
895            .downcast_ref::<BinaryArray>()
896            .unwrap();
897        // Gets the last primary key of the incoming batch.
898        let batch_last_pk =
899            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
900        // Gets the last primary key of the buffer.
901        // Safety: the buffer is not empty.
902        let buffer_last_batch = self.buffer.batches.last().unwrap();
903        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
904        let buffer_pk_array = buffer_pk_column
905            .as_any()
906            .downcast_ref::<PrimaryKeyArray>()
907            .unwrap();
908        let buffer_pk_values = buffer_pk_array
909            .values()
910            .as_any()
911            .downcast_ref::<BinaryArray>()
912            .unwrap();
913        let buffer_last_pk =
914            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
915
916        // If last primary key in the batch is the same as last primary key in the buffer.
917        if batch_last_pk == buffer_last_pk {
918            self.buffer.batches.push(batch);
919            return None;
920        }
921        // Otherwise, the batch must have a different primary key, we find the first offset of the
922        // changed primary key.
923        let batch_pk_keys = batch_pk_array.keys();
924        let pk_indices = batch_pk_keys.values();
925        let mut change_offset = 0;
926        for (i, &key) in pk_indices.iter().enumerate() {
927            let batch_pk = batch_pk_values.value(key as usize);
928
929            if buffer_last_pk != batch_pk {
930                change_offset = i;
931                break;
932            }
933        }
934
935        // Splits the batch at the change offset
936        let (first_part, remaining_part) = if change_offset > 0 {
937            let first_part = batch.slice(0, change_offset);
938            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
939            (Some(first_part), Some(remaining_part))
940        } else {
941            (None, Some(batch))
942        };
943
944        // Creates the result from current buffer + first part of new batch
945        let mut result = std::mem::take(&mut self.buffer);
946        if let Some(first_part) = first_part {
947            result.batches.push(first_part);
948        }
949
950        // Pushes remaining part to the buffer if it exists
951        if let Some(remaining_part) = remaining_part {
952            self.buffer.batches.push(remaining_part);
953        }
954
955        Some(result)
956    }
957
958    /// Returns the final [FlatSeriesBatch].
959    fn finish(&mut self) -> Option<FlatSeriesBatch> {
960        if self.buffer.batches.is_empty() {
961            None
962        } else {
963            Some(std::mem::take(&mut self.buffer))
964        }
965    }
966}
967
968/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
969fn primary_key_at<'a>(
970    primary_key: &PrimaryKeyArray,
971    primary_key_values: &'a BinaryArray,
972    index: usize,
973) -> &'a [u8] {
974    let key = primary_key.keys().value(index);
975    primary_key_values.value(key as usize)
976}
977
978#[cfg(test)]
979mod tests {
980    use std::sync::Arc;
981
982    use api::v1::OpType;
983    use datatypes::arrow::array::{
984        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
985        TimestampMillisecondArray, UInt8Array, UInt64Array,
986    };
987    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
988    use datatypes::arrow::record_batch::RecordBatch;
989
990    use super::*;
991
992    fn new_test_record_batch(
993        primary_keys: &[&[u8]],
994        timestamps: &[i64],
995        sequences: &[u64],
996        op_types: &[OpType],
997        fields: &[u64],
998    ) -> RecordBatch {
999        let num_rows = timestamps.len();
1000        debug_assert_eq!(sequences.len(), num_rows);
1001        debug_assert_eq!(op_types.len(), num_rows);
1002        debug_assert_eq!(fields.len(), num_rows);
1003        debug_assert_eq!(primary_keys.len(), num_rows);
1004
1005        let columns: Vec<ArrayRef> = vec![
1006            build_test_pk_string_dict_array(primary_keys),
1007            Arc::new(Int64Array::from_iter(
1008                fields.iter().map(|v| Some(*v as i64)),
1009            )),
1010            Arc::new(TimestampMillisecondArray::from_iter_values(
1011                timestamps.iter().copied(),
1012            )),
1013            build_test_pk_array(primary_keys),
1014            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
1015            Arc::new(UInt8Array::from_iter_values(
1016                op_types.iter().map(|v| *v as u8),
1017            )),
1018        ];
1019
1020        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
1021    }
1022
1023    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
1024        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
1025        for &pk in primary_keys {
1026            let pk_str = std::str::from_utf8(pk).unwrap();
1027            builder.append(pk_str).unwrap();
1028        }
1029        Arc::new(builder.finish())
1030    }
1031
1032    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1033        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1034        for &pk in primary_keys {
1035            builder.append(pk).unwrap();
1036        }
1037        Arc::new(builder.finish())
1038    }
1039
1040    fn build_test_flat_schema() -> SchemaRef {
1041        let fields = vec![
1042            Field::new(
1043                "k0",
1044                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1045                false,
1046            ),
1047            Field::new("field0", DataType::Int64, true),
1048            Field::new(
1049                "ts",
1050                DataType::Timestamp(TimeUnit::Millisecond, None),
1051                false,
1052            ),
1053            Field::new(
1054                "__primary_key",
1055                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1056                false,
1057            ),
1058            Field::new("__sequence", DataType::UInt64, false),
1059            Field::new("__op_type", DataType::UInt8, false),
1060        ];
1061        Arc::new(Schema::new(fields))
1062    }
1063
1064    #[test]
1065    fn test_empty_buffer_first_push() {
1066        let mut divider = FlatSeriesBatchDivider::default();
1067        let result = divider.finish();
1068        assert!(result.is_none());
1069
1070        let mut divider = FlatSeriesBatchDivider::default();
1071        let batch = new_test_record_batch(
1072            &[b"series1", b"series1"],
1073            &[1000, 2000],
1074            &[1, 2],
1075            &[OpType::Put, OpType::Put],
1076            &[10, 20],
1077        );
1078        let result = divider.push(batch);
1079        assert!(result.is_none());
1080        assert_eq!(divider.buffer.batches.len(), 1);
1081    }
1082
1083    #[test]
1084    fn test_same_series_accumulation() {
1085        let mut divider = FlatSeriesBatchDivider::default();
1086
1087        let batch1 = new_test_record_batch(
1088            &[b"series1", b"series1"],
1089            &[1000, 2000],
1090            &[1, 2],
1091            &[OpType::Put, OpType::Put],
1092            &[10, 20],
1093        );
1094
1095        let batch2 = new_test_record_batch(
1096            &[b"series1", b"series1"],
1097            &[3000, 4000],
1098            &[3, 4],
1099            &[OpType::Put, OpType::Put],
1100            &[30, 40],
1101        );
1102
1103        divider.push(batch1);
1104        let result = divider.push(batch2);
1105        assert!(result.is_none());
1106        let series_batch = divider.finish().unwrap();
1107        assert_eq!(series_batch.batches.len(), 2);
1108    }
1109
1110    #[test]
1111    fn test_series_boundary_detection() {
1112        let mut divider = FlatSeriesBatchDivider::default();
1113
1114        let batch1 = new_test_record_batch(
1115            &[b"series1", b"series1"],
1116            &[1000, 2000],
1117            &[1, 2],
1118            &[OpType::Put, OpType::Put],
1119            &[10, 20],
1120        );
1121
1122        let batch2 = new_test_record_batch(
1123            &[b"series2", b"series2"],
1124            &[3000, 4000],
1125            &[3, 4],
1126            &[OpType::Put, OpType::Put],
1127            &[30, 40],
1128        );
1129
1130        divider.push(batch1);
1131        let series_batch = divider.push(batch2).unwrap();
1132        assert_eq!(series_batch.batches.len(), 1);
1133
1134        assert_eq!(divider.buffer.batches.len(), 1);
1135    }
1136
1137    #[test]
1138    fn test_series_boundary_within_batch() {
1139        let mut divider = FlatSeriesBatchDivider::default();
1140
1141        let batch1 = new_test_record_batch(
1142            &[b"series1", b"series1"],
1143            &[1000, 2000],
1144            &[1, 2],
1145            &[OpType::Put, OpType::Put],
1146            &[10, 20],
1147        );
1148
1149        let batch2 = new_test_record_batch(
1150            &[b"series1", b"series2"],
1151            &[3000, 4000],
1152            &[3, 4],
1153            &[OpType::Put, OpType::Put],
1154            &[30, 40],
1155        );
1156
1157        divider.push(batch1);
1158        let series_batch = divider.push(batch2).unwrap();
1159        assert_eq!(series_batch.batches.len(), 2);
1160        assert_eq!(series_batch.batches[0].num_rows(), 2);
1161        assert_eq!(series_batch.batches[1].num_rows(), 1);
1162
1163        assert_eq!(divider.buffer.batches.len(), 1);
1164        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1165    }
1166
1167    #[test]
1168    fn test_series_splitting() {
1169        let mut divider = FlatSeriesBatchDivider::default();
1170
1171        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1172
1173        let batch2 = new_test_record_batch(
1174            &[b"series1", b"series2", b"series2", b"series3"],
1175            &[2000, 3000, 4000, 5000],
1176            &[2, 3, 4, 5],
1177            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1178            &[20, 30, 40, 50],
1179        );
1180
1181        divider.push(batch1);
1182        let series_batch = divider.push(batch2).unwrap();
1183        assert_eq!(series_batch.batches.len(), 2);
1184
1185        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1186        assert_eq!(total_rows, 2);
1187
1188        let final_batch = divider.finish().unwrap();
1189        assert_eq!(final_batch.batches.len(), 1);
1190        assert_eq!(final_batch.batches[0].num_rows(), 3);
1191    }
1192}