mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::arrow::array::BinaryArray;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::schema::SchemaRef;
31use futures::{StreamExt, TryStreamExt};
32use smallvec::{SmallVec, smallvec};
33use snafu::{OptionExt, ResultExt, ensure};
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_engine::{
36    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
37};
38use tokio::sync::Semaphore;
39use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
40use tokio::sync::mpsc::{self, Receiver, Sender};
41
42use crate::error::{
43    Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
44    ScanSeriesSnafu, TooManyFilesToReadSnafu,
45};
46use crate::read::pruner::{PartitionPruner, Pruner};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
49use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{Batch, ScannerMetrics};
52use crate::sst::parquet::flat_format::primary_key_column_index;
53use crate::sst::parquet::format::PrimaryKeyArray;
54
55/// Timeout to send a batch to a sender.
56const SEND_TIMEOUT: Duration = Duration::from_micros(100);
57
58/// List of receivers.
59type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
60
61/// Scans a region and returns sorted rows of a series in the same partition.
62///
63/// The output order is always order by `(primary key, time index)` inside every
64/// partition.
65/// Always returns the same series (primary key) to the same partition.
66pub struct SeriesScan {
67    /// Properties of the scanner.
68    properties: ScannerProperties,
69    /// Context of streams.
70    stream_ctx: Arc<StreamContext>,
71    /// Shared pruner for file range building.
72    pruner: Arc<Pruner>,
73    /// Receivers of each partition.
74    receivers: Mutex<ReceiverList>,
75    /// Metrics for each partition.
76    /// The scanner only sets in query and keeps it empty during compaction.
77    metrics_list: Arc<PartitionMetricsList>,
78}
79
80impl SeriesScan {
81    /// Creates a new [SeriesScan].
82    pub(crate) fn new(input: ScanInput) -> Self {
83        let mut properties = ScannerProperties::default()
84            .with_append_mode(input.append_mode)
85            .with_total_rows(input.total_rows());
86        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
87        properties.partitions = vec![stream_ctx.partition_ranges()];
88
89        // Create the shared pruner with number of workers equal to CPU cores.
90        let num_workers = common_stat::get_total_cpu_cores().max(1);
91        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
92
93        Self {
94            properties,
95            stream_ctx,
96            pruner,
97            receivers: Mutex::new(Vec::new()),
98            metrics_list: Arc::new(PartitionMetricsList::default()),
99        }
100    }
101
102    #[tracing::instrument(
103        skip_all,
104        fields(
105            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
106            partition = partition
107        )
108    )]
109    fn scan_partition_impl(
110        &self,
111        ctx: &QueryScanContext,
112        metrics_set: &ExecutionPlanMetricsSet,
113        partition: usize,
114    ) -> Result<SendableRecordBatchStream> {
115        let metrics = new_partition_metrics(
116            &self.stream_ctx,
117            ctx.explain_verbose,
118            metrics_set,
119            partition,
120            &self.metrics_list,
121        );
122
123        let batch_stream =
124            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
125
126        let input = &self.stream_ctx.input;
127        let record_batch_stream = ConvertBatchStream::new(
128            batch_stream,
129            input.mapper.clone(),
130            input.cache_strategy.clone(),
131            metrics,
132        );
133
134        Ok(Box::pin(RecordBatchStreamWrapper::new(
135            input.mapper.output_schema(),
136            Box::pin(record_batch_stream),
137        )))
138    }
139
140    #[tracing::instrument(
141        skip_all,
142        fields(
143            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
144            partition = partition
145        )
146    )]
147    fn scan_batch_in_partition(
148        &self,
149        ctx: &QueryScanContext,
150        partition: usize,
151        part_metrics: PartitionMetrics,
152        metrics_set: &ExecutionPlanMetricsSet,
153    ) -> Result<ScanBatchStream> {
154        if ctx.explain_verbose {
155            common_telemetry::info!(
156                "SeriesScan partition {}, region_id: {}",
157                partition,
158                self.stream_ctx.input.region_metadata().region_id
159            );
160        }
161
162        ensure!(
163            partition < self.properties.num_partitions(),
164            PartitionOutOfRangeSnafu {
165                given: partition,
166                all: self.properties.num_partitions(),
167            }
168        );
169
170        self.maybe_start_distributor(metrics_set, &self.metrics_list);
171
172        let mut receiver = self.take_receiver(partition)?;
173        let stream = try_stream! {
174            part_metrics.on_first_poll();
175
176            let mut fetch_start = Instant::now();
177            while let Some(series) = receiver.recv().await {
178                let series = series?;
179
180                let mut metrics = ScannerMetrics::default();
181                metrics.scan_cost += fetch_start.elapsed();
182                fetch_start = Instant::now();
183
184                metrics.num_batches += series.num_batches();
185                metrics.num_rows += series.num_rows();
186
187                let yield_start = Instant::now();
188                yield ScanBatch::Series(series);
189                metrics.yield_cost += yield_start.elapsed();
190
191                part_metrics.merge_metrics(&metrics);
192            }
193
194            part_metrics.on_finish();
195        };
196        Ok(Box::pin(stream))
197    }
198
199    /// Takes the receiver for the partition.
200    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
201        let mut rx_list = self.receivers.lock().unwrap();
202        rx_list[partition]
203            .take()
204            .context(ScanMultiTimesSnafu { partition })
205    }
206
207    /// Starts the distributor if the receiver list is empty.
208    #[tracing::instrument(
209        skip(self, metrics_set, metrics_list),
210        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
211    )]
212    fn maybe_start_distributor(
213        &self,
214        metrics_set: &ExecutionPlanMetricsSet,
215        metrics_list: &Arc<PartitionMetricsList>,
216    ) {
217        let mut rx_list = self.receivers.lock().unwrap();
218        if !rx_list.is_empty() {
219            return;
220        }
221
222        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
223        let mut distributor = SeriesDistributor {
224            stream_ctx: self.stream_ctx.clone(),
225            semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
226            partitions: self.properties.partitions.clone(),
227            pruner: self.pruner.clone(),
228            senders,
229            metrics_set: metrics_set.clone(),
230            metrics_list: metrics_list.clone(),
231        };
232        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
233        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
234        common_runtime::spawn_global(
235            async move {
236                distributor.execute().await;
237            }
238            .instrument(span),
239        );
240
241        *rx_list = receivers;
242    }
243
244    /// Scans the region and returns a stream.
245    #[tracing::instrument(
246        skip_all,
247        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
248    )]
249    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
250        let part_num = self.properties.num_partitions();
251        let metrics_set = ExecutionPlanMetricsSet::default();
252        let streams = (0..part_num)
253            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
254            .collect::<Result<Vec<_>, BoxedError>>()?;
255        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
256        Ok(Box::pin(chained_stream))
257    }
258
259    /// Scan [`Batch`] in all partitions one by one.
260    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
261        let metrics_set = ExecutionPlanMetricsSet::new();
262
263        let streams = (0..self.properties.partitions.len())
264            .map(|partition| {
265                let metrics = new_partition_metrics(
266                    &self.stream_ctx,
267                    false,
268                    &metrics_set,
269                    partition,
270                    &self.metrics_list,
271                );
272
273                self.scan_batch_in_partition(
274                    &QueryScanContext::default(),
275                    partition,
276                    metrics,
277                    &metrics_set,
278                )
279            })
280            .collect::<Result<Vec<_>>>()?;
281
282        Ok(Box::pin(futures::stream::iter(streams).flatten()))
283    }
284
285    /// Checks resource limit for the scanner.
286    pub(crate) fn check_scan_limit(&self) -> Result<()> {
287        // Sum the total number of files across all partitions
288        let total_files: usize = self
289            .properties
290            .partitions
291            .iter()
292            .flat_map(|partition| partition.iter())
293            .map(|part_range| {
294                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
295                range_meta.indices.len()
296            })
297            .sum();
298
299        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
300        if total_files > max_concurrent_files {
301            return TooManyFilesToReadSnafu {
302                actual: total_files,
303                max: max_concurrent_files,
304            }
305            .fail();
306        }
307
308        Ok(())
309    }
310}
311
312fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
313    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
314        .map(|_| {
315            let (sender, receiver) = mpsc::channel(1);
316            (Some(sender), Some(receiver))
317        })
318        .unzip();
319    (SenderList::new(senders), receivers)
320}
321
322impl RegionScanner for SeriesScan {
323    fn name(&self) -> &str {
324        "SeriesScan"
325    }
326
327    fn properties(&self) -> &ScannerProperties {
328        &self.properties
329    }
330
331    fn schema(&self) -> SchemaRef {
332        self.stream_ctx.input.mapper.output_schema()
333    }
334
335    fn metadata(&self) -> RegionMetadataRef {
336        self.stream_ctx.input.mapper.metadata().clone()
337    }
338
339    fn scan_partition(
340        &self,
341        ctx: &QueryScanContext,
342        metrics_set: &ExecutionPlanMetricsSet,
343        partition: usize,
344    ) -> Result<SendableRecordBatchStream, BoxedError> {
345        self.scan_partition_impl(ctx, metrics_set, partition)
346            .map_err(BoxedError::new)
347    }
348
349    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
350        self.properties.prepare(request);
351
352        self.check_scan_limit().map_err(BoxedError::new)?;
353
354        Ok(())
355    }
356
357    fn has_predicate_without_region(&self) -> bool {
358        let predicate = self
359            .stream_ctx
360            .input
361            .predicate_group()
362            .predicate_without_region();
363        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
364    }
365
366    fn set_logical_region(&mut self, logical_region: bool) {
367        self.properties.set_logical_region(logical_region);
368    }
369}
370
371impl DisplayAs for SeriesScan {
372    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
373        write!(
374            f,
375            "SeriesScan: region={}, ",
376            self.stream_ctx.input.mapper.metadata().region_id
377        )?;
378        match t {
379            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
380                self.stream_ctx.format_for_explain(false, f)
381            }
382            DisplayFormatType::Verbose => {
383                self.stream_ctx.format_for_explain(true, f)?;
384                self.metrics_list.format_verbose_metrics(f)
385            }
386        }
387    }
388}
389
390impl fmt::Debug for SeriesScan {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        f.debug_struct("SeriesScan")
393            .field("num_ranges", &self.stream_ctx.ranges.len())
394            .finish()
395    }
396}
397
398#[cfg(test)]
399impl SeriesScan {
400    /// Returns the input.
401    pub(crate) fn input(&self) -> &ScanInput {
402        &self.stream_ctx.input
403    }
404}
405
406/// The distributor scans series and distributes them to different partitions.
407struct SeriesDistributor {
408    /// Context for the scan stream.
409    stream_ctx: Arc<StreamContext>,
410    /// Optional semaphore for limiting the number of concurrent scans.
411    semaphore: Option<Arc<Semaphore>>,
412    /// Partition ranges to scan.
413    partitions: Vec<Vec<PartitionRange>>,
414    /// Shared pruner for file range building.
415    pruner: Arc<Pruner>,
416    /// Senders of all partitions.
417    senders: SenderList,
418    /// Metrics set to report.
419    /// The distributor report the metrics as an additional partition.
420    /// This may double the scan cost of the [SeriesScan] metrics. We can
421    /// get per-partition metrics in verbose mode to see the metrics of the
422    /// distributor.
423    metrics_set: ExecutionPlanMetricsSet,
424    metrics_list: Arc<PartitionMetricsList>,
425}
426
427impl SeriesDistributor {
428    /// Executes the distributor.
429    #[tracing::instrument(
430        skip_all,
431        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
432    )]
433    async fn execute(&mut self) {
434        let result = if self.stream_ctx.input.flat_format {
435            self.scan_partitions_flat().await
436        } else {
437            self.scan_partitions().await
438        };
439
440        if let Err(e) = result {
441            self.senders.send_error(e).await;
442        }
443    }
444
445    /// Scans all parts in flat format using FlatSeriesBatchDivider.
446    #[tracing::instrument(
447        skip_all,
448        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
449    )]
450    async fn scan_partitions_flat(&mut self) -> Result<()> {
451        // Initialize reference counts for all partition ranges.
452        for partition_ranges in &self.partitions {
453            self.pruner.add_partition_ranges(partition_ranges);
454        }
455
456        // Create PartitionPruner covering all partitions
457        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
458        let partition_pruner = Arc::new(PartitionPruner::new(
459            self.pruner.clone(),
460            &all_partition_ranges,
461        ));
462
463        let part_metrics = new_partition_metrics(
464            &self.stream_ctx,
465            false,
466            &self.metrics_set,
467            self.partitions.len(),
468            &self.metrics_list,
469        );
470        part_metrics.on_first_poll();
471        // Start fetch time before building sources so scan cost contains
472        // build part cost.
473        let mut fetch_start = Instant::now();
474
475        // Scans all parts.
476        let mut sources = Vec::with_capacity(self.partitions.len());
477        for partition in &self.partitions {
478            sources.reserve(partition.len());
479            for part_range in partition {
480                build_flat_sources(
481                    &self.stream_ctx,
482                    part_range,
483                    false,
484                    &part_metrics,
485                    partition_pruner.clone(),
486                    &mut sources,
487                    self.semaphore.clone(),
488                )
489                .await?;
490            }
491        }
492
493        // Builds a flat reader that merge sources from all parts.
494        let mut reader = SeqScan::build_flat_reader_from_sources(
495            &self.stream_ctx,
496            sources,
497            self.semaphore.clone(),
498            Some(&part_metrics),
499        )
500        .await?;
501        let mut metrics = SeriesDistributorMetrics::default();
502
503        let mut divider = FlatSeriesBatchDivider::default();
504        while let Some(record_batch) = reader.try_next().await? {
505            metrics.scan_cost += fetch_start.elapsed();
506            metrics.num_batches += 1;
507            metrics.num_rows += record_batch.num_rows();
508
509            debug_assert!(record_batch.num_rows() > 0);
510            if record_batch.num_rows() == 0 {
511                fetch_start = Instant::now();
512                continue;
513            }
514
515            // Use divider to split series
516            let divider_start = Instant::now();
517            let series_batch = divider.push(record_batch);
518            metrics.divider_cost += divider_start.elapsed();
519            if let Some(series_batch) = series_batch {
520                let yield_start = Instant::now();
521                self.senders
522                    .send_batch(SeriesBatch::Flat(series_batch))
523                    .await?;
524                metrics.yield_cost += yield_start.elapsed();
525            }
526            fetch_start = Instant::now();
527        }
528
529        // Send any remaining batch in the divider
530        let divider_start = Instant::now();
531        let series_batch = divider.finish();
532        metrics.divider_cost += divider_start.elapsed();
533        if let Some(series_batch) = series_batch {
534            let yield_start = Instant::now();
535            self.senders
536                .send_batch(SeriesBatch::Flat(series_batch))
537                .await?;
538            metrics.yield_cost += yield_start.elapsed();
539        }
540
541        metrics.scan_cost += fetch_start.elapsed();
542        metrics.num_series_send_timeout = self.senders.num_timeout;
543        metrics.num_series_send_full = self.senders.num_full;
544        part_metrics.set_distributor_metrics(&metrics);
545
546        part_metrics.on_finish();
547
548        Ok(())
549    }
550
551    /// Scans all parts.
552    #[tracing::instrument(
553        skip_all,
554        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
555    )]
556    async fn scan_partitions(&mut self) -> Result<()> {
557        // Initialize reference counts for all partition ranges.
558        for partition_ranges in &self.partitions {
559            self.pruner.add_partition_ranges(partition_ranges);
560        }
561
562        // Create PartitionPruner covering all partitions
563        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
564        let partition_pruner = Arc::new(PartitionPruner::new(
565            self.pruner.clone(),
566            &all_partition_ranges,
567        ));
568
569        let part_metrics = new_partition_metrics(
570            &self.stream_ctx,
571            false,
572            &self.metrics_set,
573            self.partitions.len(),
574            &self.metrics_list,
575        );
576        part_metrics.on_first_poll();
577        // Start fetch time before building sources so scan cost contains
578        // build part cost.
579        let mut fetch_start = Instant::now();
580
581        // Scans all parts.
582        let mut sources = Vec::with_capacity(self.partitions.len());
583        for partition in &self.partitions {
584            sources.reserve(partition.len());
585            for part_range in partition {
586                build_sources(
587                    &self.stream_ctx,
588                    part_range,
589                    false,
590                    &part_metrics,
591                    partition_pruner.clone(),
592                    &mut sources,
593                    self.semaphore.clone(),
594                )
595                .await?;
596            }
597        }
598
599        // Builds a reader that merge sources from all parts.
600        let mut reader = SeqScan::build_reader_from_sources(
601            &self.stream_ctx,
602            sources,
603            self.semaphore.clone(),
604            Some(&part_metrics),
605        )
606        .await?;
607        let mut metrics = SeriesDistributorMetrics::default();
608
609        let mut current_series = PrimaryKeySeriesBatch::default();
610        while let Some(batch) = reader.next_batch().await? {
611            metrics.scan_cost += fetch_start.elapsed();
612            metrics.num_batches += 1;
613            metrics.num_rows += batch.num_rows();
614
615            debug_assert!(!batch.is_empty());
616            if batch.is_empty() {
617                fetch_start = Instant::now();
618                continue;
619            }
620
621            let Some(last_key) = current_series.current_key() else {
622                current_series.push(batch);
623                fetch_start = Instant::now();
624                continue;
625            };
626
627            if last_key == batch.primary_key() {
628                current_series.push(batch);
629                fetch_start = Instant::now();
630                continue;
631            }
632
633            // We find a new series, send the current one.
634            let to_send =
635                std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
636            let yield_start = Instant::now();
637            self.senders
638                .send_batch(SeriesBatch::PrimaryKey(to_send))
639                .await?;
640            metrics.yield_cost += yield_start.elapsed();
641            fetch_start = Instant::now();
642        }
643
644        if !current_series.is_empty() {
645            let yield_start = Instant::now();
646            self.senders
647                .send_batch(SeriesBatch::PrimaryKey(current_series))
648                .await?;
649            metrics.yield_cost += yield_start.elapsed();
650        }
651
652        metrics.scan_cost += fetch_start.elapsed();
653        metrics.num_series_send_timeout = self.senders.num_timeout;
654        metrics.num_series_send_full = self.senders.num_full;
655        part_metrics.set_distributor_metrics(&metrics);
656
657        part_metrics.on_finish();
658
659        Ok(())
660    }
661}
662
663/// Batches of the same series in primary key format.
664#[derive(Default, Debug)]
665pub struct PrimaryKeySeriesBatch {
666    pub batches: SmallVec<[Batch; 4]>,
667}
668
669impl PrimaryKeySeriesBatch {
670    /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch].
671    fn single(batch: Batch) -> Self {
672        Self {
673            batches: smallvec![batch],
674        }
675    }
676
677    fn current_key(&self) -> Option<&[u8]> {
678        self.batches.first().map(|batch| batch.primary_key())
679    }
680
681    fn push(&mut self, batch: Batch) {
682        self.batches.push(batch);
683    }
684
685    /// Returns true if there is no batch.
686    fn is_empty(&self) -> bool {
687        self.batches.is_empty()
688    }
689}
690
691/// Batches of the same series.
692#[derive(Debug)]
693pub enum SeriesBatch {
694    PrimaryKey(PrimaryKeySeriesBatch),
695    Flat(FlatSeriesBatch),
696}
697
698impl SeriesBatch {
699    /// Returns the number of batches.
700    pub fn num_batches(&self) -> usize {
701        match self {
702            SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
703            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
704        }
705    }
706
707    /// Returns the total number of rows across all batches.
708    pub fn num_rows(&self) -> usize {
709        match self {
710            SeriesBatch::PrimaryKey(primary_key_batch) => {
711                primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
712            }
713            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
714        }
715    }
716}
717
718/// Batches of the same series in flat format.
719#[derive(Default, Debug)]
720pub struct FlatSeriesBatch {
721    pub batches: SmallVec<[RecordBatch; 4]>,
722}
723
724/// List of senders.
725struct SenderList {
726    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
727    /// Number of None senders.
728    num_nones: usize,
729    /// Index of the current partition to send.
730    sender_idx: usize,
731    /// Number of timeout.
732    num_timeout: usize,
733    /// Number of full senders.
734    num_full: usize,
735}
736
737impl SenderList {
738    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
739        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
740        Self {
741            senders,
742            num_nones,
743            sender_idx: 0,
744            num_timeout: 0,
745            num_full: 0,
746        }
747    }
748
749    /// Finds a partition and tries to send the batch to the partition.
750    /// Returns None if it sends successfully.
751    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
752        for _ in 0..self.senders.len() {
753            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
754
755            let sender_idx = self.fetch_add_sender_idx();
756            let Some(sender) = &self.senders[sender_idx] else {
757                continue;
758            };
759
760            match sender.try_send(Ok(batch)) {
761                Ok(()) => return Ok(None),
762                Err(TrySendError::Full(res)) => {
763                    self.num_full += 1;
764                    // Safety: we send Ok.
765                    batch = res.unwrap();
766                }
767                Err(TrySendError::Closed(res)) => {
768                    self.senders[sender_idx] = None;
769                    self.num_nones += 1;
770                    // Safety: we send Ok.
771                    batch = res.unwrap();
772                }
773            }
774        }
775
776        Ok(Some(batch))
777    }
778
779    /// Finds a partition and sends the batch to the partition.
780    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
781        // Sends the batch without blocking first.
782        match self.try_send_batch(batch)? {
783            Some(b) => {
784                // Unable to send batch to partition.
785                batch = b;
786            }
787            None => {
788                return Ok(());
789            }
790        }
791
792        loop {
793            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
794
795            let sender_idx = self.fetch_add_sender_idx();
796            let Some(sender) = &self.senders[sender_idx] else {
797                continue;
798            };
799            // Adds a timeout to avoid blocking indefinitely and sending
800            // the batch in a round-robin fashion when some partitions
801            // don't poll their inputs. This may happen if we have a
802            // node like sort merging. But it is rare when we are using SeriesScan.
803            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
804                Ok(()) => break,
805                Err(SendTimeoutError::Timeout(res)) => {
806                    self.num_timeout += 1;
807                    // Safety: we send Ok.
808                    batch = res.unwrap();
809                }
810                Err(SendTimeoutError::Closed(res)) => {
811                    self.senders[sender_idx] = None;
812                    self.num_nones += 1;
813                    // Safety: we send Ok.
814                    batch = res.unwrap();
815                }
816            }
817        }
818
819        Ok(())
820    }
821
822    async fn send_error(&self, error: Error) {
823        let error = Arc::new(error);
824        for sender in self.senders.iter().flatten() {
825            let result = Err(error.clone()).context(ScanSeriesSnafu);
826            let _ = sender.send(result).await;
827        }
828    }
829
830    fn fetch_add_sender_idx(&mut self) -> usize {
831        let sender_idx = self.sender_idx;
832        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
833        sender_idx
834    }
835}
836
837fn new_partition_metrics(
838    stream_ctx: &StreamContext,
839    explain_verbose: bool,
840    metrics_set: &ExecutionPlanMetricsSet,
841    partition: usize,
842    metrics_list: &PartitionMetricsList,
843) -> PartitionMetrics {
844    let metrics = PartitionMetrics::new(
845        stream_ctx.input.mapper.metadata().region_id,
846        partition,
847        "SeriesScan",
848        stream_ctx.query_start,
849        explain_verbose,
850        metrics_set,
851    );
852
853    metrics_list.set(partition, metrics.clone());
854    metrics
855}
856
857/// A divider to split flat record batches by time series.
858///
859/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
860/// However, a [FlatSeriesBatch] may contain rows from multiple series.
861#[derive(Default)]
862struct FlatSeriesBatchDivider {
863    buffer: FlatSeriesBatch,
864}
865
866impl FlatSeriesBatchDivider {
867    /// Pushes a record batch into the divider.
868    ///
869    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
870    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
871        // If buffer is empty
872        if self.buffer.batches.is_empty() {
873            self.buffer.batches.push(batch);
874            return None;
875        }
876
877        // Gets the primary key column from the incoming batch.
878        let pk_column_idx = primary_key_column_index(batch.num_columns());
879        let batch_pk_column = batch.column(pk_column_idx);
880        let batch_pk_array = batch_pk_column
881            .as_any()
882            .downcast_ref::<PrimaryKeyArray>()
883            .unwrap();
884        let batch_pk_values = batch_pk_array
885            .values()
886            .as_any()
887            .downcast_ref::<BinaryArray>()
888            .unwrap();
889        // Gets the last primary key of the incoming batch.
890        let batch_last_pk =
891            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
892        // Gets the last primary key of the buffer.
893        // Safety: the buffer is not empty.
894        let buffer_last_batch = self.buffer.batches.last().unwrap();
895        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
896        let buffer_pk_array = buffer_pk_column
897            .as_any()
898            .downcast_ref::<PrimaryKeyArray>()
899            .unwrap();
900        let buffer_pk_values = buffer_pk_array
901            .values()
902            .as_any()
903            .downcast_ref::<BinaryArray>()
904            .unwrap();
905        let buffer_last_pk =
906            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
907
908        // If last primary key in the batch is the same as last primary key in the buffer.
909        if batch_last_pk == buffer_last_pk {
910            self.buffer.batches.push(batch);
911            return None;
912        }
913        // Otherwise, the batch must have a different primary key, we find the first offset of the
914        // changed primary key.
915        let batch_pk_keys = batch_pk_array.keys();
916        let pk_indices = batch_pk_keys.values();
917        let mut change_offset = 0;
918        for (i, &key) in pk_indices.iter().enumerate() {
919            let batch_pk = batch_pk_values.value(key as usize);
920
921            if buffer_last_pk != batch_pk {
922                change_offset = i;
923                break;
924            }
925        }
926
927        // Splits the batch at the change offset
928        let (first_part, remaining_part) = if change_offset > 0 {
929            let first_part = batch.slice(0, change_offset);
930            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
931            (Some(first_part), Some(remaining_part))
932        } else {
933            (None, Some(batch))
934        };
935
936        // Creates the result from current buffer + first part of new batch
937        let mut result = std::mem::take(&mut self.buffer);
938        if let Some(first_part) = first_part {
939            result.batches.push(first_part);
940        }
941
942        // Pushes remaining part to the buffer if it exists
943        if let Some(remaining_part) = remaining_part {
944            self.buffer.batches.push(remaining_part);
945        }
946
947        Some(result)
948    }
949
950    /// Returns the final [FlatSeriesBatch].
951    fn finish(&mut self) -> Option<FlatSeriesBatch> {
952        if self.buffer.batches.is_empty() {
953            None
954        } else {
955            Some(std::mem::take(&mut self.buffer))
956        }
957    }
958}
959
960/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
961fn primary_key_at<'a>(
962    primary_key: &PrimaryKeyArray,
963    primary_key_values: &'a BinaryArray,
964    index: usize,
965) -> &'a [u8] {
966    let key = primary_key.keys().value(index);
967    primary_key_values.value(key as usize)
968}
969
970#[cfg(test)]
971mod tests {
972    use std::sync::Arc;
973
974    use api::v1::OpType;
975    use datatypes::arrow::array::{
976        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
977        TimestampMillisecondArray, UInt8Array, UInt64Array,
978    };
979    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
980    use datatypes::arrow::record_batch::RecordBatch;
981
982    use super::*;
983
984    fn new_test_record_batch(
985        primary_keys: &[&[u8]],
986        timestamps: &[i64],
987        sequences: &[u64],
988        op_types: &[OpType],
989        fields: &[u64],
990    ) -> RecordBatch {
991        let num_rows = timestamps.len();
992        debug_assert_eq!(sequences.len(), num_rows);
993        debug_assert_eq!(op_types.len(), num_rows);
994        debug_assert_eq!(fields.len(), num_rows);
995        debug_assert_eq!(primary_keys.len(), num_rows);
996
997        let columns: Vec<ArrayRef> = vec![
998            build_test_pk_string_dict_array(primary_keys),
999            Arc::new(Int64Array::from_iter(
1000                fields.iter().map(|v| Some(*v as i64)),
1001            )),
1002            Arc::new(TimestampMillisecondArray::from_iter_values(
1003                timestamps.iter().copied(),
1004            )),
1005            build_test_pk_array(primary_keys),
1006            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
1007            Arc::new(UInt8Array::from_iter_values(
1008                op_types.iter().map(|v| *v as u8),
1009            )),
1010        ];
1011
1012        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
1013    }
1014
1015    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
1016        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
1017        for &pk in primary_keys {
1018            let pk_str = std::str::from_utf8(pk).unwrap();
1019            builder.append(pk_str).unwrap();
1020        }
1021        Arc::new(builder.finish())
1022    }
1023
1024    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1025        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1026        for &pk in primary_keys {
1027            builder.append(pk).unwrap();
1028        }
1029        Arc::new(builder.finish())
1030    }
1031
1032    fn build_test_flat_schema() -> SchemaRef {
1033        let fields = vec![
1034            Field::new(
1035                "k0",
1036                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1037                false,
1038            ),
1039            Field::new("field0", DataType::Int64, true),
1040            Field::new(
1041                "ts",
1042                DataType::Timestamp(TimeUnit::Millisecond, None),
1043                false,
1044            ),
1045            Field::new(
1046                "__primary_key",
1047                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1048                false,
1049            ),
1050            Field::new("__sequence", DataType::UInt64, false),
1051            Field::new("__op_type", DataType::UInt8, false),
1052        ];
1053        Arc::new(Schema::new(fields))
1054    }
1055
1056    #[test]
1057    fn test_empty_buffer_first_push() {
1058        let mut divider = FlatSeriesBatchDivider::default();
1059        let result = divider.finish();
1060        assert!(result.is_none());
1061
1062        let mut divider = FlatSeriesBatchDivider::default();
1063        let batch = new_test_record_batch(
1064            &[b"series1", b"series1"],
1065            &[1000, 2000],
1066            &[1, 2],
1067            &[OpType::Put, OpType::Put],
1068            &[10, 20],
1069        );
1070        let result = divider.push(batch);
1071        assert!(result.is_none());
1072        assert_eq!(divider.buffer.batches.len(), 1);
1073    }
1074
1075    #[test]
1076    fn test_same_series_accumulation() {
1077        let mut divider = FlatSeriesBatchDivider::default();
1078
1079        let batch1 = new_test_record_batch(
1080            &[b"series1", b"series1"],
1081            &[1000, 2000],
1082            &[1, 2],
1083            &[OpType::Put, OpType::Put],
1084            &[10, 20],
1085        );
1086
1087        let batch2 = new_test_record_batch(
1088            &[b"series1", b"series1"],
1089            &[3000, 4000],
1090            &[3, 4],
1091            &[OpType::Put, OpType::Put],
1092            &[30, 40],
1093        );
1094
1095        divider.push(batch1);
1096        let result = divider.push(batch2);
1097        assert!(result.is_none());
1098        let series_batch = divider.finish().unwrap();
1099        assert_eq!(series_batch.batches.len(), 2);
1100    }
1101
1102    #[test]
1103    fn test_series_boundary_detection() {
1104        let mut divider = FlatSeriesBatchDivider::default();
1105
1106        let batch1 = new_test_record_batch(
1107            &[b"series1", b"series1"],
1108            &[1000, 2000],
1109            &[1, 2],
1110            &[OpType::Put, OpType::Put],
1111            &[10, 20],
1112        );
1113
1114        let batch2 = new_test_record_batch(
1115            &[b"series2", b"series2"],
1116            &[3000, 4000],
1117            &[3, 4],
1118            &[OpType::Put, OpType::Put],
1119            &[30, 40],
1120        );
1121
1122        divider.push(batch1);
1123        let series_batch = divider.push(batch2).unwrap();
1124        assert_eq!(series_batch.batches.len(), 1);
1125
1126        assert_eq!(divider.buffer.batches.len(), 1);
1127    }
1128
1129    #[test]
1130    fn test_series_boundary_within_batch() {
1131        let mut divider = FlatSeriesBatchDivider::default();
1132
1133        let batch1 = new_test_record_batch(
1134            &[b"series1", b"series1"],
1135            &[1000, 2000],
1136            &[1, 2],
1137            &[OpType::Put, OpType::Put],
1138            &[10, 20],
1139        );
1140
1141        let batch2 = new_test_record_batch(
1142            &[b"series1", b"series2"],
1143            &[3000, 4000],
1144            &[3, 4],
1145            &[OpType::Put, OpType::Put],
1146            &[30, 40],
1147        );
1148
1149        divider.push(batch1);
1150        let series_batch = divider.push(batch2).unwrap();
1151        assert_eq!(series_batch.batches.len(), 2);
1152        assert_eq!(series_batch.batches[0].num_rows(), 2);
1153        assert_eq!(series_batch.batches[1].num_rows(), 1);
1154
1155        assert_eq!(divider.buffer.batches.len(), 1);
1156        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1157    }
1158
1159    #[test]
1160    fn test_series_splitting() {
1161        let mut divider = FlatSeriesBatchDivider::default();
1162
1163        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1164
1165        let batch2 = new_test_record_batch(
1166            &[b"series1", b"series2", b"series2", b"series3"],
1167            &[2000, 3000, 4000, 5000],
1168            &[2, 3, 4, 5],
1169            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1170            &[20, 30, 40, 50],
1171        );
1172
1173        divider.push(batch1);
1174        let series_batch = divider.push(batch2).unwrap();
1175        assert_eq!(series_batch.batches.len(), 2);
1176
1177        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1178        assert_eq!(total_rows, 2);
1179
1180        let final_batch = divider.finish().unwrap();
1181        assert_eq!(final_batch.batches.len(), 1);
1182        assert_eq!(final_batch.batches[0].num_rows(), 3);
1183    }
1184}