Skip to main content

mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::SmallVec;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44    Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45    ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::ScannerMetrics;
48use crate::read::pruner::{PartitionPruner, Pruner};
49use crate::read::scan_region::{ScanInput, StreamContext};
50use crate::read::scan_util::{
51    PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
52    compute_parallel_channel_size,
53};
54use crate::read::seq_scan::SeqScan;
55use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
56use crate::sst::parquet::flat_format::primary_key_column_index;
57use crate::sst::parquet::format::PrimaryKeyArray;
58
59/// Timeout to send a batch to a sender.
60const SEND_TIMEOUT: Duration = Duration::from_micros(100);
61
62/// List of receivers.
63type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
64
65/// Scans a region and returns sorted rows of a series in the same partition.
66///
67/// The output order is always order by `(primary key, time index)` inside every
68/// partition.
69/// Always returns the same series (primary key) to the same partition.
70pub struct SeriesScan {
71    /// Properties of the scanner.
72    properties: ScannerProperties,
73    /// Context of streams.
74    stream_ctx: Arc<StreamContext>,
75    /// Shared pruner for file range building.
76    pruner: Arc<Pruner>,
77    /// Receivers of each partition.
78    receivers: Mutex<ReceiverList>,
79    /// Metrics for each partition.
80    /// The scanner only sets in query and keeps it empty during compaction.
81    metrics_list: Arc<PartitionMetricsList>,
82}
83
84impl SeriesScan {
85    /// Creates a new [SeriesScan].
86    pub(crate) fn new(input: ScanInput) -> Self {
87        let mut properties = ScannerProperties::default()
88            .with_append_mode(input.append_mode)
89            .with_total_rows(input.total_rows());
90        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
91        properties.partitions = vec![stream_ctx.partition_ranges()];
92
93        // Create the shared pruner with number of workers equal to CPU cores.
94        let num_workers = common_stat::get_total_cpu_cores().max(1);
95        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
96
97        Self {
98            properties,
99            stream_ctx,
100            pruner,
101            receivers: Mutex::new(Vec::new()),
102            metrics_list: Arc::new(PartitionMetricsList::default()),
103        }
104    }
105
106    #[tracing::instrument(
107        skip_all,
108        fields(
109            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
110            partition = partition
111        )
112    )]
113    fn scan_partition_impl(
114        &self,
115        ctx: &QueryScanContext,
116        metrics_set: &ExecutionPlanMetricsSet,
117        partition: usize,
118    ) -> Result<SendableRecordBatchStream> {
119        let metrics = new_partition_metrics(
120            &self.stream_ctx,
121            ctx.explain_verbose,
122            metrics_set,
123            partition,
124            &self.metrics_list,
125        );
126
127        let batch_stream =
128            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
129
130        let input = &self.stream_ctx.input;
131        let record_batch_stream = ConvertBatchStream::new(
132            batch_stream,
133            input.mapper.clone(),
134            input.cache_strategy.clone(),
135            metrics,
136        );
137
138        Ok(Box::pin(RecordBatchStreamWrapper::new(
139            input.mapper.output_schema(),
140            Box::pin(record_batch_stream),
141        )))
142    }
143
144    #[tracing::instrument(
145        skip_all,
146        fields(
147            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
148            partition = partition
149        )
150    )]
151    fn scan_batch_in_partition(
152        &self,
153        ctx: &QueryScanContext,
154        partition: usize,
155        part_metrics: PartitionMetrics,
156        metrics_set: &ExecutionPlanMetricsSet,
157    ) -> Result<ScanBatchStream> {
158        if ctx.explain_verbose {
159            common_telemetry::info!(
160                "SeriesScan partition {}, region_id: {}",
161                partition,
162                self.stream_ctx.input.region_metadata().region_id
163            );
164        }
165
166        ensure!(
167            partition < self.properties.num_partitions(),
168            PartitionOutOfRangeSnafu {
169                given: partition,
170                all: self.properties.num_partitions(),
171            }
172        );
173
174        self.maybe_start_distributor(metrics_set, &self.metrics_list, ctx.explain_verbose);
175
176        let mut receiver = self.take_receiver(partition)?;
177        let stream = try_stream! {
178            part_metrics.on_first_poll();
179
180            let mut fetch_start = Instant::now();
181            while let Some(series) = receiver.recv().await {
182                let series = series?;
183
184                let mut metrics = ScannerMetrics::default();
185                metrics.scan_cost += fetch_start.elapsed();
186                fetch_start = Instant::now();
187
188                metrics.num_batches += series.num_batches();
189                metrics.num_rows += series.num_rows();
190
191                let yield_start = Instant::now();
192                yield ScanBatch::Series(series);
193                metrics.yield_cost += yield_start.elapsed();
194
195                part_metrics.merge_metrics(&metrics);
196            }
197
198            part_metrics.on_finish();
199        };
200        Ok(Box::pin(stream))
201    }
202
203    /// Takes the receiver for the partition.
204    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
205        let mut rx_list = self.receivers.lock().unwrap();
206        rx_list[partition]
207            .take()
208            .context(ScanMultiTimesSnafu { partition })
209    }
210
211    /// Starts the distributor if the receiver list is empty.
212    #[tracing::instrument(
213        skip(self, metrics_set, metrics_list),
214        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
215    )]
216    fn maybe_start_distributor(
217        &self,
218        metrics_set: &ExecutionPlanMetricsSet,
219        metrics_list: &Arc<PartitionMetricsList>,
220        explain_verbose: bool,
221    ) {
222        let mut rx_list = self.receivers.lock().unwrap();
223        if !rx_list.is_empty() {
224            return;
225        }
226
227        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
228        let mut distributor = SeriesDistributor {
229            stream_ctx: self.stream_ctx.clone(),
230            range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
231            final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
232            partitions: self.properties.partitions.clone(),
233            pruner: self.pruner.clone(),
234            senders,
235            metrics_set: metrics_set.clone(),
236            metrics_list: metrics_list.clone(),
237            explain_verbose,
238        };
239        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
240        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
241        common_runtime::spawn_query(
242            async move {
243                distributor.execute().await;
244            }
245            .instrument(span),
246        );
247
248        *rx_list = receivers;
249    }
250
251    /// Scans the region and returns a stream.
252    #[tracing::instrument(
253        skip_all,
254        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
255    )]
256    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
257        let part_num = self.properties.num_partitions();
258        let metrics_set = ExecutionPlanMetricsSet::default();
259        let streams = (0..part_num)
260            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
261            .collect::<Result<Vec<_>, BoxedError>>()?;
262        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
263        Ok(Box::pin(chained_stream))
264    }
265
266    /// Scan [`Batch`] in all partitions one by one.
267    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
268        let metrics_set = ExecutionPlanMetricsSet::new();
269
270        let streams = (0..self.properties.partitions.len())
271            .map(|partition| {
272                let metrics = new_partition_metrics(
273                    &self.stream_ctx,
274                    false,
275                    &metrics_set,
276                    partition,
277                    &self.metrics_list,
278                );
279
280                self.scan_batch_in_partition(
281                    &QueryScanContext::default(),
282                    partition,
283                    metrics,
284                    &metrics_set,
285                )
286            })
287            .collect::<Result<Vec<_>>>()?;
288
289        Ok(Box::pin(futures::stream::iter(streams).flatten()))
290    }
291
292    /// Checks resource limit for the scanner.
293    pub(crate) fn check_scan_limit(&self) -> Result<()> {
294        // Sum the total number of files across all partitions
295        let total_files: usize = self
296            .properties
297            .partitions
298            .iter()
299            .flat_map(|partition| partition.iter())
300            .map(|part_range| {
301                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
302                range_meta.indices.len()
303            })
304            .sum();
305
306        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
307        if total_files > max_concurrent_files {
308            return TooManyFilesToReadSnafu {
309                actual: total_files,
310                max: max_concurrent_files,
311            }
312            .fail();
313        }
314
315        Ok(())
316    }
317}
318
319fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
320    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
321        .map(|_| {
322            let (sender, receiver) = mpsc::channel(1);
323            (Some(sender), Some(receiver))
324        })
325        .unzip();
326    (SenderList::new(senders), receivers)
327}
328
329impl RegionScanner for SeriesScan {
330    fn name(&self) -> &str {
331        "SeriesScan"
332    }
333
334    fn properties(&self) -> &ScannerProperties {
335        &self.properties
336    }
337
338    fn schema(&self) -> SchemaRef {
339        self.stream_ctx.input.mapper.output_schema()
340    }
341
342    fn metadata(&self) -> RegionMetadataRef {
343        self.stream_ctx.input.mapper.metadata().clone()
344    }
345
346    fn scan_partition(
347        &self,
348        ctx: &QueryScanContext,
349        metrics_set: &ExecutionPlanMetricsSet,
350        partition: usize,
351    ) -> Result<SendableRecordBatchStream, BoxedError> {
352        self.scan_partition_impl(ctx, metrics_set, partition)
353            .map_err(BoxedError::new)
354    }
355
356    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
357        self.properties.prepare(request);
358
359        self.check_scan_limit().map_err(BoxedError::new)?;
360
361        Ok(())
362    }
363
364    fn has_predicate_without_region(&self) -> bool {
365        let predicate = self
366            .stream_ctx
367            .input
368            .predicate_group()
369            .predicate_without_region();
370        predicate.is_some()
371    }
372
373    fn add_dyn_filter_to_predicate(
374        &mut self,
375        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
376    ) -> Vec<bool> {
377        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
378    }
379
380    fn set_logical_region(&mut self, logical_region: bool) {
381        self.properties.set_logical_region(logical_region);
382    }
383
384    fn set_query_load_region_id(&mut self, region_id: store_api::storage::RegionId) {
385        self.properties.set_query_load_region_id(region_id);
386    }
387
388    fn snapshot_sequence(&self) -> Option<u64> {
389        self.stream_ctx.input.snapshot_sequence
390    }
391}
392
393impl DisplayAs for SeriesScan {
394    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
395        write!(
396            f,
397            "SeriesScan: region={}, ",
398            self.stream_ctx.input.mapper.metadata().region_id
399        )?;
400        match t {
401            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
402                self.stream_ctx.format_for_explain(false, f)
403            }
404            DisplayFormatType::Verbose => {
405                self.stream_ctx.format_for_explain(true, f)?;
406                self.metrics_list.format_verbose_metrics(f)
407            }
408        }
409    }
410}
411
412impl fmt::Debug for SeriesScan {
413    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
414        f.debug_struct("SeriesScan")
415            .field("num_ranges", &self.stream_ctx.ranges.len())
416            .finish()
417    }
418}
419
420#[cfg(test)]
421impl SeriesScan {
422    /// Returns the input.
423    pub(crate) fn input(&self) -> &ScanInput {
424        &self.stream_ctx.input
425    }
426}
427
428/// The distributor scans series and distributes them to different partitions.
429struct SeriesDistributor {
430    /// Context for the scan stream.
431    stream_ctx: Arc<StreamContext>,
432    /// Semaphore for file scanning and range-level merging.
433    range_semaphore: Option<Arc<Semaphore>>,
434    /// Semaphore for the final merge across all range streams.
435    /// Must be separate from `range_semaphore` to avoid deadlock: final merge tasks
436    /// hold a permit while waiting for data from range-level merge tasks, which also
437    /// need permits to produce data.
438    final_merge_semaphore: Option<Arc<Semaphore>>,
439    /// Partition ranges to scan.
440    partitions: Vec<Vec<PartitionRange>>,
441    /// Shared pruner for file range building.
442    pruner: Arc<Pruner>,
443    /// Senders of all partitions.
444    senders: SenderList,
445    /// Metrics set to report.
446    /// The distributor report the metrics as an additional partition.
447    /// This may double the scan cost of the [SeriesScan] metrics. We can
448    /// get per-partition metrics in verbose mode to see the metrics of the
449    /// distributor.
450    metrics_set: ExecutionPlanMetricsSet,
451    metrics_list: Arc<PartitionMetricsList>,
452    /// Whether to use verbose logging and collect detailed metrics.
453    explain_verbose: bool,
454}
455
456impl SeriesDistributor {
457    /// Executes the distributor.
458    #[tracing::instrument(
459        skip_all,
460        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
461    )]
462    async fn execute(&mut self) {
463        let result = self.scan_partitions_flat().await;
464
465        if let Err(e) = result {
466            self.senders.send_error(e).await;
467        }
468    }
469
470    /// Scans all parts in flat format using FlatSeriesBatchDivider.
471    #[tracing::instrument(
472        skip_all,
473        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
474    )]
475    async fn scan_partitions_flat(&mut self) -> Result<()> {
476        // Initialize reference counts for all partition ranges.
477        for partition_ranges in &self.partitions {
478            self.pruner.add_partition_ranges(partition_ranges);
479        }
480
481        // Create PartitionPruner covering all partitions
482        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
483        let partition_pruner = Arc::new(PartitionPruner::new(
484            self.pruner.clone(),
485            &all_partition_ranges,
486        ));
487
488        let part_metrics = new_partition_metrics(
489            &self.stream_ctx,
490            self.explain_verbose,
491            &self.metrics_set,
492            self.partitions.len(),
493            &self.metrics_list,
494        );
495        part_metrics.on_first_poll();
496        // Start fetch time before building sources so scan cost contains
497        // build part cost.
498        let mut fetch_start = Instant::now();
499
500        // Builds one deduped stream per partition range, then merges across ranges.
501        let build_start = Instant::now();
502        let mut tasks = Vec::new();
503        for partition in &self.partitions {
504            for part_range in partition {
505                let stream_ctx = self.stream_ctx.clone();
506                let part_range = *part_range;
507                let part_metrics = part_metrics.clone();
508                let partition_pruner = partition_pruner.clone();
509                let file_scan_semaphore = self.range_semaphore.clone();
510                let merge_semaphore = self.range_semaphore.clone();
511                tasks.push(common_runtime::spawn_query(async move {
512                    SeqScan::build_flat_partition_range_read(
513                        &stream_ctx,
514                        &part_range,
515                        false,
516                        &part_metrics,
517                        partition_pruner,
518                        file_scan_semaphore,
519                        merge_semaphore,
520                    )
521                    .await
522                }));
523            }
524        }
525        let mut range_streams = Vec::with_capacity(tasks.len());
526        let mut estimated_batch_sizes = Vec::with_capacity(tasks.len());
527        for task in tasks {
528            let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??;
529            range_streams.push(stream);
530            estimated_batch_sizes.push(estimated_batch_size);
531        }
532        let channel_size =
533            compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes));
534        common_telemetry::debug!(
535            "SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}",
536            range_streams.len(),
537            self.stream_ctx.input.region_metadata().region_id,
538            build_start.elapsed(),
539            channel_size,
540        );
541
542        // Each partition range stream is already deduped, so skip dedup here.
543        // Use a separate semaphore for the final merge to avoid deadlock with
544        // range-level merge tasks that share the range_semaphore.
545        let mut reader = SeqScan::build_flat_reader_from_sources(
546            &self.stream_ctx,
547            range_streams,
548            self.final_merge_semaphore.clone(),
549            Some(&part_metrics),
550            true,
551            channel_size,
552        )
553        .await?;
554        let mut metrics = SeriesDistributorMetrics::default();
555
556        let mut divider = FlatSeriesBatchDivider::default();
557        while let Some(record_batch) = reader.try_next().await? {
558            metrics.scan_cost += fetch_start.elapsed();
559            metrics.num_batches += 1;
560            metrics.num_rows += record_batch.num_rows();
561
562            debug_assert!(record_batch.num_rows() > 0);
563            if record_batch.num_rows() == 0 {
564                fetch_start = Instant::now();
565                continue;
566            }
567
568            // Use divider to split series
569            let divider_start = Instant::now();
570            let series_batch = divider.push(record_batch);
571            metrics.divider_cost += divider_start.elapsed();
572            if let Some(series_batch) = series_batch {
573                let yield_start = Instant::now();
574                self.senders
575                    .send_batch(SeriesBatch::Flat(series_batch))
576                    .await?;
577                metrics.yield_cost += yield_start.elapsed();
578            }
579            fetch_start = Instant::now();
580        }
581
582        // Send any remaining batch in the divider
583        let divider_start = Instant::now();
584        let series_batch = divider.finish();
585        metrics.divider_cost += divider_start.elapsed();
586        if let Some(series_batch) = series_batch {
587            let yield_start = Instant::now();
588            self.senders
589                .send_batch(SeriesBatch::Flat(series_batch))
590                .await?;
591            metrics.yield_cost += yield_start.elapsed();
592        }
593
594        metrics.scan_cost += fetch_start.elapsed();
595        metrics.num_series_send_timeout = self.senders.num_timeout;
596        metrics.num_series_send_full = self.senders.num_full;
597        part_metrics.set_distributor_metrics(&metrics);
598
599        part_metrics.on_finish();
600
601        Ok(())
602    }
603}
604
605/// Batches of the same series.
606#[derive(Debug)]
607pub enum SeriesBatch {
608    Flat(FlatSeriesBatch),
609}
610
611impl SeriesBatch {
612    /// Returns the number of batches.
613    pub fn num_batches(&self) -> usize {
614        match self {
615            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
616        }
617    }
618
619    /// Returns the total number of rows across all batches.
620    pub fn num_rows(&self) -> usize {
621        match self {
622            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
623        }
624    }
625}
626
627/// Batches of the same series in flat format.
628#[derive(Default, Debug)]
629pub struct FlatSeriesBatch {
630    pub batches: SmallVec<[RecordBatch; 4]>,
631}
632
633/// List of senders.
634struct SenderList {
635    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
636    /// Number of None senders.
637    num_nones: usize,
638    /// Index of the current partition to send.
639    sender_idx: usize,
640    /// Number of timeout.
641    num_timeout: usize,
642    /// Number of full senders.
643    num_full: usize,
644}
645
646impl SenderList {
647    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
648        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
649        Self {
650            senders,
651            num_nones,
652            sender_idx: 0,
653            num_timeout: 0,
654            num_full: 0,
655        }
656    }
657
658    /// Finds a partition and tries to send the batch to the partition.
659    /// Returns None if it sends successfully.
660    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
661        for _ in 0..self.senders.len() {
662            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
663
664            let sender_idx = self.fetch_add_sender_idx();
665            let Some(sender) = &self.senders[sender_idx] else {
666                continue;
667            };
668
669            match sender.try_send(Ok(batch)) {
670                Ok(()) => return Ok(None),
671                Err(TrySendError::Full(res)) => {
672                    self.num_full += 1;
673                    // Safety: we send Ok.
674                    batch = res.unwrap();
675                }
676                Err(TrySendError::Closed(res)) => {
677                    self.senders[sender_idx] = None;
678                    self.num_nones += 1;
679                    // Safety: we send Ok.
680                    batch = res.unwrap();
681                }
682            }
683        }
684
685        Ok(Some(batch))
686    }
687
688    /// Finds a partition and sends the batch to the partition.
689    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
690        // Sends the batch without blocking first.
691        match self.try_send_batch(batch)? {
692            Some(b) => {
693                // Unable to send batch to partition.
694                batch = b;
695            }
696            None => {
697                return Ok(());
698            }
699        }
700
701        loop {
702            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
703
704            let sender_idx = self.fetch_add_sender_idx();
705            let Some(sender) = &self.senders[sender_idx] else {
706                continue;
707            };
708            // Adds a timeout to avoid blocking indefinitely and sending
709            // the batch in a round-robin fashion when some partitions
710            // don't poll their inputs. This may happen if we have a
711            // node like sort merging. But it is rare when we are using SeriesScan.
712            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
713                Ok(()) => break,
714                Err(SendTimeoutError::Timeout(res)) => {
715                    self.num_timeout += 1;
716                    // Safety: we send Ok.
717                    batch = res.unwrap();
718                }
719                Err(SendTimeoutError::Closed(res)) => {
720                    self.senders[sender_idx] = None;
721                    self.num_nones += 1;
722                    // Safety: we send Ok.
723                    batch = res.unwrap();
724                }
725            }
726        }
727
728        Ok(())
729    }
730
731    async fn send_error(&self, error: Error) {
732        let error = Arc::new(error);
733        for sender in self.senders.iter().flatten() {
734            let result = Err(error.clone()).context(ScanSeriesSnafu);
735            let _ = sender.send(result).await;
736        }
737    }
738
739    fn fetch_add_sender_idx(&mut self) -> usize {
740        let sender_idx = self.sender_idx;
741        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
742        sender_idx
743    }
744}
745
746fn new_partition_metrics(
747    stream_ctx: &StreamContext,
748    explain_verbose: bool,
749    metrics_set: &ExecutionPlanMetricsSet,
750    partition: usize,
751    metrics_list: &PartitionMetricsList,
752) -> PartitionMetrics {
753    let metrics = PartitionMetrics::new(
754        stream_ctx.input.mapper.metadata().region_id,
755        partition,
756        "SeriesScan",
757        stream_ctx.query_start,
758        explain_verbose,
759        metrics_set,
760    );
761
762    metrics_list.set(partition, metrics.clone());
763    metrics
764}
765
766/// A divider to split flat record batches by time series.
767///
768/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
769/// However, a [FlatSeriesBatch] may contain rows from multiple series.
770#[derive(Default)]
771struct FlatSeriesBatchDivider {
772    buffer: FlatSeriesBatch,
773}
774
775impl FlatSeriesBatchDivider {
776    /// Pushes a record batch into the divider.
777    ///
778    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
779    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
780        // If buffer is empty
781        if self.buffer.batches.is_empty() {
782            self.buffer.batches.push(batch);
783            return None;
784        }
785
786        // Gets the primary key column from the incoming batch.
787        let pk_column_idx = primary_key_column_index(batch.num_columns());
788        let batch_pk_column = batch.column(pk_column_idx);
789        let batch_pk_array = batch_pk_column
790            .as_any()
791            .downcast_ref::<PrimaryKeyArray>()
792            .unwrap();
793        let batch_pk_values = batch_pk_array
794            .values()
795            .as_any()
796            .downcast_ref::<BinaryArray>()
797            .unwrap();
798        // Gets the last primary key of the incoming batch.
799        let batch_last_pk =
800            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
801        // Gets the last primary key of the buffer.
802        // Safety: the buffer is not empty.
803        let buffer_last_batch = self.buffer.batches.last().unwrap();
804        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
805        let buffer_pk_array = buffer_pk_column
806            .as_any()
807            .downcast_ref::<PrimaryKeyArray>()
808            .unwrap();
809        let buffer_pk_values = buffer_pk_array
810            .values()
811            .as_any()
812            .downcast_ref::<BinaryArray>()
813            .unwrap();
814        let buffer_last_pk =
815            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
816
817        // If last primary key in the batch is the same as last primary key in the buffer.
818        if batch_last_pk == buffer_last_pk {
819            self.buffer.batches.push(batch);
820            return None;
821        }
822        // Otherwise, the batch must have a different primary key, we find the first offset of the
823        // changed primary key.
824        let batch_pk_keys = batch_pk_array.keys();
825        let pk_indices = batch_pk_keys.values();
826        let mut change_offset = 0;
827        for (i, &key) in pk_indices.iter().enumerate() {
828            let batch_pk = batch_pk_values.value(key as usize);
829
830            if buffer_last_pk != batch_pk {
831                change_offset = i;
832                break;
833            }
834        }
835
836        // Splits the batch at the change offset
837        let (first_part, remaining_part) = if change_offset > 0 {
838            let first_part = batch.slice(0, change_offset);
839            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
840            (Some(first_part), Some(remaining_part))
841        } else {
842            (None, Some(batch))
843        };
844
845        // Creates the result from current buffer + first part of new batch
846        let mut result = std::mem::take(&mut self.buffer);
847        if let Some(first_part) = first_part {
848            result.batches.push(first_part);
849        }
850
851        // Pushes remaining part to the buffer if it exists
852        if let Some(remaining_part) = remaining_part {
853            self.buffer.batches.push(remaining_part);
854        }
855
856        Some(result)
857    }
858
859    /// Returns the final [FlatSeriesBatch].
860    fn finish(&mut self) -> Option<FlatSeriesBatch> {
861        if self.buffer.batches.is_empty() {
862            None
863        } else {
864            Some(std::mem::take(&mut self.buffer))
865        }
866    }
867}
868
869/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
870fn primary_key_at<'a>(
871    primary_key: &PrimaryKeyArray,
872    primary_key_values: &'a BinaryArray,
873    index: usize,
874) -> &'a [u8] {
875    let key = primary_key.keys().value(index);
876    primary_key_values.value(key as usize)
877}
878
879#[cfg(test)]
880mod tests {
881    use std::sync::Arc;
882
883    use api::v1::OpType;
884    use datatypes::arrow::array::{
885        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
886        TimestampMillisecondArray, UInt8Array, UInt64Array,
887    };
888    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
889    use datatypes::arrow::record_batch::RecordBatch;
890
891    use super::*;
892
893    fn new_test_record_batch(
894        primary_keys: &[&[u8]],
895        timestamps: &[i64],
896        sequences: &[u64],
897        op_types: &[OpType],
898        fields: &[u64],
899    ) -> RecordBatch {
900        let num_rows = timestamps.len();
901        debug_assert_eq!(sequences.len(), num_rows);
902        debug_assert_eq!(op_types.len(), num_rows);
903        debug_assert_eq!(fields.len(), num_rows);
904        debug_assert_eq!(primary_keys.len(), num_rows);
905
906        let columns: Vec<ArrayRef> = vec![
907            build_test_pk_string_dict_array(primary_keys),
908            Arc::new(Int64Array::from_iter(
909                fields.iter().map(|v| Some(*v as i64)),
910            )),
911            Arc::new(TimestampMillisecondArray::from_iter_values(
912                timestamps.iter().copied(),
913            )),
914            build_test_pk_array(primary_keys),
915            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
916            Arc::new(UInt8Array::from_iter_values(
917                op_types.iter().map(|v| *v as u8),
918            )),
919        ];
920
921        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
922    }
923
924    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
925        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
926        for &pk in primary_keys {
927            let pk_str = std::str::from_utf8(pk).unwrap();
928            builder.append(pk_str).unwrap();
929        }
930        Arc::new(builder.finish())
931    }
932
933    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
934        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
935        for &pk in primary_keys {
936            builder.append(pk).unwrap();
937        }
938        Arc::new(builder.finish())
939    }
940
941    fn build_test_flat_schema() -> SchemaRef {
942        let fields = vec![
943            Field::new(
944                "k0",
945                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
946                false,
947            ),
948            Field::new("field0", DataType::Int64, true),
949            Field::new(
950                "ts",
951                DataType::Timestamp(TimeUnit::Millisecond, None),
952                false,
953            ),
954            Field::new(
955                "__primary_key",
956                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
957                false,
958            ),
959            Field::new("__sequence", DataType::UInt64, false),
960            Field::new("__op_type", DataType::UInt8, false),
961        ];
962        Arc::new(Schema::new(fields))
963    }
964
965    #[test]
966    fn test_empty_buffer_first_push() {
967        let mut divider = FlatSeriesBatchDivider::default();
968        let result = divider.finish();
969        assert!(result.is_none());
970
971        let mut divider = FlatSeriesBatchDivider::default();
972        let batch = new_test_record_batch(
973            &[b"series1", b"series1"],
974            &[1000, 2000],
975            &[1, 2],
976            &[OpType::Put, OpType::Put],
977            &[10, 20],
978        );
979        let result = divider.push(batch);
980        assert!(result.is_none());
981        assert_eq!(divider.buffer.batches.len(), 1);
982    }
983
984    #[test]
985    fn test_same_series_accumulation() {
986        let mut divider = FlatSeriesBatchDivider::default();
987
988        let batch1 = new_test_record_batch(
989            &[b"series1", b"series1"],
990            &[1000, 2000],
991            &[1, 2],
992            &[OpType::Put, OpType::Put],
993            &[10, 20],
994        );
995
996        let batch2 = new_test_record_batch(
997            &[b"series1", b"series1"],
998            &[3000, 4000],
999            &[3, 4],
1000            &[OpType::Put, OpType::Put],
1001            &[30, 40],
1002        );
1003
1004        divider.push(batch1);
1005        let result = divider.push(batch2);
1006        assert!(result.is_none());
1007        let series_batch = divider.finish().unwrap();
1008        assert_eq!(series_batch.batches.len(), 2);
1009    }
1010
1011    #[test]
1012    fn test_series_boundary_detection() {
1013        let mut divider = FlatSeriesBatchDivider::default();
1014
1015        let batch1 = new_test_record_batch(
1016            &[b"series1", b"series1"],
1017            &[1000, 2000],
1018            &[1, 2],
1019            &[OpType::Put, OpType::Put],
1020            &[10, 20],
1021        );
1022
1023        let batch2 = new_test_record_batch(
1024            &[b"series2", b"series2"],
1025            &[3000, 4000],
1026            &[3, 4],
1027            &[OpType::Put, OpType::Put],
1028            &[30, 40],
1029        );
1030
1031        divider.push(batch1);
1032        let series_batch = divider.push(batch2).unwrap();
1033        assert_eq!(series_batch.batches.len(), 1);
1034
1035        assert_eq!(divider.buffer.batches.len(), 1);
1036    }
1037
1038    #[test]
1039    fn test_series_boundary_within_batch() {
1040        let mut divider = FlatSeriesBatchDivider::default();
1041
1042        let batch1 = new_test_record_batch(
1043            &[b"series1", b"series1"],
1044            &[1000, 2000],
1045            &[1, 2],
1046            &[OpType::Put, OpType::Put],
1047            &[10, 20],
1048        );
1049
1050        let batch2 = new_test_record_batch(
1051            &[b"series1", b"series2"],
1052            &[3000, 4000],
1053            &[3, 4],
1054            &[OpType::Put, OpType::Put],
1055            &[30, 40],
1056        );
1057
1058        divider.push(batch1);
1059        let series_batch = divider.push(batch2).unwrap();
1060        assert_eq!(series_batch.batches.len(), 2);
1061        assert_eq!(series_batch.batches[0].num_rows(), 2);
1062        assert_eq!(series_batch.batches[1].num_rows(), 1);
1063
1064        assert_eq!(divider.buffer.batches.len(), 1);
1065        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1066    }
1067
1068    #[test]
1069    fn test_series_splitting() {
1070        let mut divider = FlatSeriesBatchDivider::default();
1071
1072        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1073
1074        let batch2 = new_test_record_batch(
1075            &[b"series1", b"series2", b"series2", b"series3"],
1076            &[2000, 3000, 4000, 5000],
1077            &[2, 3, 4, 5],
1078            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1079            &[20, 30, 40, 50],
1080        );
1081
1082        divider.push(batch1);
1083        let series_batch = divider.push(batch2).unwrap();
1084        assert_eq!(series_batch.batches.len(), 2);
1085
1086        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1087        assert_eq!(total_rows, 2);
1088
1089        let final_batch = divider.finish().unwrap();
1090        assert_eq!(final_batch.batches.len(), 1);
1091        assert_eq!(final_batch.batches[0].num_rows(), 3);
1092    }
1093}