Skip to main content

mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::SmallVec;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44    Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45    ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::ScannerMetrics;
48use crate::read::pruner::{PartitionPruner, Pruner};
49use crate::read::scan_region::{ScanInput, StreamContext};
50use crate::read::scan_util::{
51    PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
52    compute_parallel_channel_size,
53};
54use crate::read::seq_scan::SeqScan;
55use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
56use crate::sst::parquet::flat_format::primary_key_column_index;
57use crate::sst::parquet::format::PrimaryKeyArray;
58
59/// Timeout to send a batch to a sender.
60const SEND_TIMEOUT: Duration = Duration::from_micros(100);
61
62/// List of receivers.
63type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
64
65/// Scans a region and returns sorted rows of a series in the same partition.
66///
67/// The output order is always order by `(primary key, time index)` inside every
68/// partition.
69/// Always returns the same series (primary key) to the same partition.
70pub struct SeriesScan {
71    /// Properties of the scanner.
72    properties: ScannerProperties,
73    /// Context of streams.
74    stream_ctx: Arc<StreamContext>,
75    /// Shared pruner for file range building.
76    pruner: Arc<Pruner>,
77    /// Receivers of each partition.
78    receivers: Mutex<ReceiverList>,
79    /// Metrics for each partition.
80    /// The scanner only sets in query and keeps it empty during compaction.
81    metrics_list: Arc<PartitionMetricsList>,
82}
83
84impl SeriesScan {
85    /// Creates a new [SeriesScan].
86    pub(crate) fn new(input: ScanInput) -> Self {
87        let mut properties = ScannerProperties::default()
88            .with_append_mode(input.append_mode)
89            .with_total_rows(input.total_rows());
90        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
91        properties.partitions = vec![stream_ctx.partition_ranges()];
92
93        // Create the shared pruner with number of workers equal to CPU cores.
94        let num_workers = common_stat::get_total_cpu_cores().max(1);
95        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
96
97        Self {
98            properties,
99            stream_ctx,
100            pruner,
101            receivers: Mutex::new(Vec::new()),
102            metrics_list: Arc::new(PartitionMetricsList::default()),
103        }
104    }
105
106    #[tracing::instrument(
107        skip_all,
108        fields(
109            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
110            partition = partition
111        )
112    )]
113    fn scan_partition_impl(
114        &self,
115        ctx: &QueryScanContext,
116        metrics_set: &ExecutionPlanMetricsSet,
117        partition: usize,
118    ) -> Result<SendableRecordBatchStream> {
119        let metrics = new_partition_metrics(
120            &self.stream_ctx,
121            ctx.explain_verbose,
122            metrics_set,
123            partition,
124            &self.metrics_list,
125        );
126
127        let batch_stream =
128            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
129
130        let input = &self.stream_ctx.input;
131        let record_batch_stream = ConvertBatchStream::new(
132            batch_stream,
133            input.mapper.clone(),
134            input.cache_strategy.clone(),
135            metrics,
136        );
137
138        Ok(Box::pin(RecordBatchStreamWrapper::new(
139            input.mapper.output_schema(),
140            Box::pin(record_batch_stream),
141        )))
142    }
143
144    #[tracing::instrument(
145        skip_all,
146        fields(
147            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
148            partition = partition
149        )
150    )]
151    fn scan_batch_in_partition(
152        &self,
153        ctx: &QueryScanContext,
154        partition: usize,
155        part_metrics: PartitionMetrics,
156        metrics_set: &ExecutionPlanMetricsSet,
157    ) -> Result<ScanBatchStream> {
158        if ctx.explain_verbose {
159            common_telemetry::info!(
160                "SeriesScan partition {}, region_id: {}",
161                partition,
162                self.stream_ctx.input.region_metadata().region_id
163            );
164        }
165
166        ensure!(
167            partition < self.properties.num_partitions(),
168            PartitionOutOfRangeSnafu {
169                given: partition,
170                all: self.properties.num_partitions(),
171            }
172        );
173
174        self.maybe_start_distributor(metrics_set, &self.metrics_list, ctx.explain_verbose);
175
176        let mut receiver = self.take_receiver(partition)?;
177        let stream = try_stream! {
178            part_metrics.on_first_poll();
179
180            let mut fetch_start = Instant::now();
181            while let Some(series) = receiver.recv().await {
182                let series = series?;
183
184                let mut metrics = ScannerMetrics::default();
185                metrics.scan_cost += fetch_start.elapsed();
186                fetch_start = Instant::now();
187
188                metrics.num_batches += series.num_batches();
189                metrics.num_rows += series.num_rows();
190
191                let yield_start = Instant::now();
192                yield ScanBatch::Series(series);
193                metrics.yield_cost += yield_start.elapsed();
194
195                part_metrics.merge_metrics(&metrics);
196            }
197
198            part_metrics.on_finish();
199        };
200        Ok(Box::pin(stream))
201    }
202
203    /// Takes the receiver for the partition.
204    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
205        let mut rx_list = self.receivers.lock().unwrap();
206        rx_list[partition]
207            .take()
208            .context(ScanMultiTimesSnafu { partition })
209    }
210
211    /// Starts the distributor if the receiver list is empty.
212    #[tracing::instrument(
213        skip(self, metrics_set, metrics_list),
214        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
215    )]
216    fn maybe_start_distributor(
217        &self,
218        metrics_set: &ExecutionPlanMetricsSet,
219        metrics_list: &Arc<PartitionMetricsList>,
220        explain_verbose: bool,
221    ) {
222        let mut rx_list = self.receivers.lock().unwrap();
223        if !rx_list.is_empty() {
224            return;
225        }
226
227        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
228        let mut distributor = SeriesDistributor {
229            stream_ctx: self.stream_ctx.clone(),
230            range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
231            final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
232            partitions: self.properties.partitions.clone(),
233            pruner: self.pruner.clone(),
234            senders,
235            metrics_set: metrics_set.clone(),
236            metrics_list: metrics_list.clone(),
237            explain_verbose,
238        };
239        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
240        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
241        common_runtime::spawn_global(
242            async move {
243                distributor.execute().await;
244            }
245            .instrument(span),
246        );
247
248        *rx_list = receivers;
249    }
250
251    /// Scans the region and returns a stream.
252    #[tracing::instrument(
253        skip_all,
254        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
255    )]
256    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
257        let part_num = self.properties.num_partitions();
258        let metrics_set = ExecutionPlanMetricsSet::default();
259        let streams = (0..part_num)
260            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
261            .collect::<Result<Vec<_>, BoxedError>>()?;
262        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
263        Ok(Box::pin(chained_stream))
264    }
265
266    /// Scan [`Batch`] in all partitions one by one.
267    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
268        let metrics_set = ExecutionPlanMetricsSet::new();
269
270        let streams = (0..self.properties.partitions.len())
271            .map(|partition| {
272                let metrics = new_partition_metrics(
273                    &self.stream_ctx,
274                    false,
275                    &metrics_set,
276                    partition,
277                    &self.metrics_list,
278                );
279
280                self.scan_batch_in_partition(
281                    &QueryScanContext::default(),
282                    partition,
283                    metrics,
284                    &metrics_set,
285                )
286            })
287            .collect::<Result<Vec<_>>>()?;
288
289        Ok(Box::pin(futures::stream::iter(streams).flatten()))
290    }
291
292    /// Checks resource limit for the scanner.
293    pub(crate) fn check_scan_limit(&self) -> Result<()> {
294        // Sum the total number of files across all partitions
295        let total_files: usize = self
296            .properties
297            .partitions
298            .iter()
299            .flat_map(|partition| partition.iter())
300            .map(|part_range| {
301                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
302                range_meta.indices.len()
303            })
304            .sum();
305
306        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
307        if total_files > max_concurrent_files {
308            return TooManyFilesToReadSnafu {
309                actual: total_files,
310                max: max_concurrent_files,
311            }
312            .fail();
313        }
314
315        Ok(())
316    }
317}
318
319fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
320    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
321        .map(|_| {
322            let (sender, receiver) = mpsc::channel(1);
323            (Some(sender), Some(receiver))
324        })
325        .unzip();
326    (SenderList::new(senders), receivers)
327}
328
329impl RegionScanner for SeriesScan {
330    fn name(&self) -> &str {
331        "SeriesScan"
332    }
333
334    fn properties(&self) -> &ScannerProperties {
335        &self.properties
336    }
337
338    fn schema(&self) -> SchemaRef {
339        self.stream_ctx.input.mapper.output_schema()
340    }
341
342    fn metadata(&self) -> RegionMetadataRef {
343        self.stream_ctx.input.mapper.metadata().clone()
344    }
345
346    fn scan_partition(
347        &self,
348        ctx: &QueryScanContext,
349        metrics_set: &ExecutionPlanMetricsSet,
350        partition: usize,
351    ) -> Result<SendableRecordBatchStream, BoxedError> {
352        self.scan_partition_impl(ctx, metrics_set, partition)
353            .map_err(BoxedError::new)
354    }
355
356    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
357        self.properties.prepare(request);
358
359        self.check_scan_limit().map_err(BoxedError::new)?;
360
361        Ok(())
362    }
363
364    fn has_predicate_without_region(&self) -> bool {
365        let predicate = self
366            .stream_ctx
367            .input
368            .predicate_group()
369            .predicate_without_region();
370        predicate.is_some()
371    }
372
373    fn add_dyn_filter_to_predicate(
374        &mut self,
375        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
376    ) -> Vec<bool> {
377        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
378    }
379
380    fn set_logical_region(&mut self, logical_region: bool) {
381        self.properties.set_logical_region(logical_region);
382    }
383
384    fn snapshot_sequence(&self) -> Option<u64> {
385        self.stream_ctx.input.snapshot_sequence
386    }
387}
388
389impl DisplayAs for SeriesScan {
390    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
391        write!(
392            f,
393            "SeriesScan: region={}, ",
394            self.stream_ctx.input.mapper.metadata().region_id
395        )?;
396        match t {
397            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
398                self.stream_ctx.format_for_explain(false, f)
399            }
400            DisplayFormatType::Verbose => {
401                self.stream_ctx.format_for_explain(true, f)?;
402                self.metrics_list.format_verbose_metrics(f)
403            }
404        }
405    }
406}
407
408impl fmt::Debug for SeriesScan {
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        f.debug_struct("SeriesScan")
411            .field("num_ranges", &self.stream_ctx.ranges.len())
412            .finish()
413    }
414}
415
416#[cfg(test)]
417impl SeriesScan {
418    /// Returns the input.
419    pub(crate) fn input(&self) -> &ScanInput {
420        &self.stream_ctx.input
421    }
422}
423
424/// The distributor scans series and distributes them to different partitions.
425struct SeriesDistributor {
426    /// Context for the scan stream.
427    stream_ctx: Arc<StreamContext>,
428    /// Semaphore for file scanning and range-level merging.
429    range_semaphore: Option<Arc<Semaphore>>,
430    /// Semaphore for the final merge across all range streams.
431    /// Must be separate from `range_semaphore` to avoid deadlock: final merge tasks
432    /// hold a permit while waiting for data from range-level merge tasks, which also
433    /// need permits to produce data.
434    final_merge_semaphore: Option<Arc<Semaphore>>,
435    /// Partition ranges to scan.
436    partitions: Vec<Vec<PartitionRange>>,
437    /// Shared pruner for file range building.
438    pruner: Arc<Pruner>,
439    /// Senders of all partitions.
440    senders: SenderList,
441    /// Metrics set to report.
442    /// The distributor report the metrics as an additional partition.
443    /// This may double the scan cost of the [SeriesScan] metrics. We can
444    /// get per-partition metrics in verbose mode to see the metrics of the
445    /// distributor.
446    metrics_set: ExecutionPlanMetricsSet,
447    metrics_list: Arc<PartitionMetricsList>,
448    /// Whether to use verbose logging and collect detailed metrics.
449    explain_verbose: bool,
450}
451
452impl SeriesDistributor {
453    /// Executes the distributor.
454    #[tracing::instrument(
455        skip_all,
456        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
457    )]
458    async fn execute(&mut self) {
459        let result = self.scan_partitions_flat().await;
460
461        if let Err(e) = result {
462            self.senders.send_error(e).await;
463        }
464    }
465
466    /// Scans all parts in flat format using FlatSeriesBatchDivider.
467    #[tracing::instrument(
468        skip_all,
469        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
470    )]
471    async fn scan_partitions_flat(&mut self) -> Result<()> {
472        // Initialize reference counts for all partition ranges.
473        for partition_ranges in &self.partitions {
474            self.pruner.add_partition_ranges(partition_ranges);
475        }
476
477        // Create PartitionPruner covering all partitions
478        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
479        let partition_pruner = Arc::new(PartitionPruner::new(
480            self.pruner.clone(),
481            &all_partition_ranges,
482        ));
483
484        let part_metrics = new_partition_metrics(
485            &self.stream_ctx,
486            self.explain_verbose,
487            &self.metrics_set,
488            self.partitions.len(),
489            &self.metrics_list,
490        );
491        part_metrics.on_first_poll();
492        // Start fetch time before building sources so scan cost contains
493        // build part cost.
494        let mut fetch_start = Instant::now();
495
496        // Builds one deduped stream per partition range, then merges across ranges.
497        let build_start = Instant::now();
498        let mut tasks = Vec::new();
499        for partition in &self.partitions {
500            for part_range in partition {
501                let stream_ctx = self.stream_ctx.clone();
502                let part_range = *part_range;
503                let part_metrics = part_metrics.clone();
504                let partition_pruner = partition_pruner.clone();
505                let file_scan_semaphore = self.range_semaphore.clone();
506                let merge_semaphore = self.range_semaphore.clone();
507                tasks.push(common_runtime::spawn_global(async move {
508                    SeqScan::build_flat_partition_range_read(
509                        &stream_ctx,
510                        &part_range,
511                        false,
512                        &part_metrics,
513                        partition_pruner,
514                        file_scan_semaphore,
515                        merge_semaphore,
516                    )
517                    .await
518                }));
519            }
520        }
521        let mut range_streams = Vec::with_capacity(tasks.len());
522        let mut estimated_batch_sizes = Vec::with_capacity(tasks.len());
523        for task in tasks {
524            let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??;
525            range_streams.push(stream);
526            estimated_batch_sizes.push(estimated_batch_size);
527        }
528        let channel_size =
529            compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes));
530        common_telemetry::debug!(
531            "SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}",
532            range_streams.len(),
533            self.stream_ctx.input.region_metadata().region_id,
534            build_start.elapsed(),
535            channel_size,
536        );
537
538        // Each partition range stream is already deduped, so skip dedup here.
539        // Use a separate semaphore for the final merge to avoid deadlock with
540        // range-level merge tasks that share the range_semaphore.
541        let mut reader = SeqScan::build_flat_reader_from_sources(
542            &self.stream_ctx,
543            range_streams,
544            self.final_merge_semaphore.clone(),
545            Some(&part_metrics),
546            true,
547            channel_size,
548        )
549        .await?;
550        let mut metrics = SeriesDistributorMetrics::default();
551
552        let mut divider = FlatSeriesBatchDivider::default();
553        while let Some(record_batch) = reader.try_next().await? {
554            metrics.scan_cost += fetch_start.elapsed();
555            metrics.num_batches += 1;
556            metrics.num_rows += record_batch.num_rows();
557
558            debug_assert!(record_batch.num_rows() > 0);
559            if record_batch.num_rows() == 0 {
560                fetch_start = Instant::now();
561                continue;
562            }
563
564            // Use divider to split series
565            let divider_start = Instant::now();
566            let series_batch = divider.push(record_batch);
567            metrics.divider_cost += divider_start.elapsed();
568            if let Some(series_batch) = series_batch {
569                let yield_start = Instant::now();
570                self.senders
571                    .send_batch(SeriesBatch::Flat(series_batch))
572                    .await?;
573                metrics.yield_cost += yield_start.elapsed();
574            }
575            fetch_start = Instant::now();
576        }
577
578        // Send any remaining batch in the divider
579        let divider_start = Instant::now();
580        let series_batch = divider.finish();
581        metrics.divider_cost += divider_start.elapsed();
582        if let Some(series_batch) = series_batch {
583            let yield_start = Instant::now();
584            self.senders
585                .send_batch(SeriesBatch::Flat(series_batch))
586                .await?;
587            metrics.yield_cost += yield_start.elapsed();
588        }
589
590        metrics.scan_cost += fetch_start.elapsed();
591        metrics.num_series_send_timeout = self.senders.num_timeout;
592        metrics.num_series_send_full = self.senders.num_full;
593        part_metrics.set_distributor_metrics(&metrics);
594
595        part_metrics.on_finish();
596
597        Ok(())
598    }
599}
600
601/// Batches of the same series.
602#[derive(Debug)]
603pub enum SeriesBatch {
604    Flat(FlatSeriesBatch),
605}
606
607impl SeriesBatch {
608    /// Returns the number of batches.
609    pub fn num_batches(&self) -> usize {
610        match self {
611            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
612        }
613    }
614
615    /// Returns the total number of rows across all batches.
616    pub fn num_rows(&self) -> usize {
617        match self {
618            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
619        }
620    }
621}
622
623/// Batches of the same series in flat format.
624#[derive(Default, Debug)]
625pub struct FlatSeriesBatch {
626    pub batches: SmallVec<[RecordBatch; 4]>,
627}
628
629/// List of senders.
630struct SenderList {
631    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
632    /// Number of None senders.
633    num_nones: usize,
634    /// Index of the current partition to send.
635    sender_idx: usize,
636    /// Number of timeout.
637    num_timeout: usize,
638    /// Number of full senders.
639    num_full: usize,
640}
641
642impl SenderList {
643    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
644        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
645        Self {
646            senders,
647            num_nones,
648            sender_idx: 0,
649            num_timeout: 0,
650            num_full: 0,
651        }
652    }
653
654    /// Finds a partition and tries to send the batch to the partition.
655    /// Returns None if it sends successfully.
656    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
657        for _ in 0..self.senders.len() {
658            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
659
660            let sender_idx = self.fetch_add_sender_idx();
661            let Some(sender) = &self.senders[sender_idx] else {
662                continue;
663            };
664
665            match sender.try_send(Ok(batch)) {
666                Ok(()) => return Ok(None),
667                Err(TrySendError::Full(res)) => {
668                    self.num_full += 1;
669                    // Safety: we send Ok.
670                    batch = res.unwrap();
671                }
672                Err(TrySendError::Closed(res)) => {
673                    self.senders[sender_idx] = None;
674                    self.num_nones += 1;
675                    // Safety: we send Ok.
676                    batch = res.unwrap();
677                }
678            }
679        }
680
681        Ok(Some(batch))
682    }
683
684    /// Finds a partition and sends the batch to the partition.
685    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
686        // Sends the batch without blocking first.
687        match self.try_send_batch(batch)? {
688            Some(b) => {
689                // Unable to send batch to partition.
690                batch = b;
691            }
692            None => {
693                return Ok(());
694            }
695        }
696
697        loop {
698            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
699
700            let sender_idx = self.fetch_add_sender_idx();
701            let Some(sender) = &self.senders[sender_idx] else {
702                continue;
703            };
704            // Adds a timeout to avoid blocking indefinitely and sending
705            // the batch in a round-robin fashion when some partitions
706            // don't poll their inputs. This may happen if we have a
707            // node like sort merging. But it is rare when we are using SeriesScan.
708            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
709                Ok(()) => break,
710                Err(SendTimeoutError::Timeout(res)) => {
711                    self.num_timeout += 1;
712                    // Safety: we send Ok.
713                    batch = res.unwrap();
714                }
715                Err(SendTimeoutError::Closed(res)) => {
716                    self.senders[sender_idx] = None;
717                    self.num_nones += 1;
718                    // Safety: we send Ok.
719                    batch = res.unwrap();
720                }
721            }
722        }
723
724        Ok(())
725    }
726
727    async fn send_error(&self, error: Error) {
728        let error = Arc::new(error);
729        for sender in self.senders.iter().flatten() {
730            let result = Err(error.clone()).context(ScanSeriesSnafu);
731            let _ = sender.send(result).await;
732        }
733    }
734
735    fn fetch_add_sender_idx(&mut self) -> usize {
736        let sender_idx = self.sender_idx;
737        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
738        sender_idx
739    }
740}
741
742fn new_partition_metrics(
743    stream_ctx: &StreamContext,
744    explain_verbose: bool,
745    metrics_set: &ExecutionPlanMetricsSet,
746    partition: usize,
747    metrics_list: &PartitionMetricsList,
748) -> PartitionMetrics {
749    let metrics = PartitionMetrics::new(
750        stream_ctx.input.mapper.metadata().region_id,
751        partition,
752        "SeriesScan",
753        stream_ctx.query_start,
754        explain_verbose,
755        metrics_set,
756    );
757
758    metrics_list.set(partition, metrics.clone());
759    metrics
760}
761
762/// A divider to split flat record batches by time series.
763///
764/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
765/// However, a [FlatSeriesBatch] may contain rows from multiple series.
766#[derive(Default)]
767struct FlatSeriesBatchDivider {
768    buffer: FlatSeriesBatch,
769}
770
771impl FlatSeriesBatchDivider {
772    /// Pushes a record batch into the divider.
773    ///
774    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
775    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
776        // If buffer is empty
777        if self.buffer.batches.is_empty() {
778            self.buffer.batches.push(batch);
779            return None;
780        }
781
782        // Gets the primary key column from the incoming batch.
783        let pk_column_idx = primary_key_column_index(batch.num_columns());
784        let batch_pk_column = batch.column(pk_column_idx);
785        let batch_pk_array = batch_pk_column
786            .as_any()
787            .downcast_ref::<PrimaryKeyArray>()
788            .unwrap();
789        let batch_pk_values = batch_pk_array
790            .values()
791            .as_any()
792            .downcast_ref::<BinaryArray>()
793            .unwrap();
794        // Gets the last primary key of the incoming batch.
795        let batch_last_pk =
796            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
797        // Gets the last primary key of the buffer.
798        // Safety: the buffer is not empty.
799        let buffer_last_batch = self.buffer.batches.last().unwrap();
800        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
801        let buffer_pk_array = buffer_pk_column
802            .as_any()
803            .downcast_ref::<PrimaryKeyArray>()
804            .unwrap();
805        let buffer_pk_values = buffer_pk_array
806            .values()
807            .as_any()
808            .downcast_ref::<BinaryArray>()
809            .unwrap();
810        let buffer_last_pk =
811            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
812
813        // If last primary key in the batch is the same as last primary key in the buffer.
814        if batch_last_pk == buffer_last_pk {
815            self.buffer.batches.push(batch);
816            return None;
817        }
818        // Otherwise, the batch must have a different primary key, we find the first offset of the
819        // changed primary key.
820        let batch_pk_keys = batch_pk_array.keys();
821        let pk_indices = batch_pk_keys.values();
822        let mut change_offset = 0;
823        for (i, &key) in pk_indices.iter().enumerate() {
824            let batch_pk = batch_pk_values.value(key as usize);
825
826            if buffer_last_pk != batch_pk {
827                change_offset = i;
828                break;
829            }
830        }
831
832        // Splits the batch at the change offset
833        let (first_part, remaining_part) = if change_offset > 0 {
834            let first_part = batch.slice(0, change_offset);
835            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
836            (Some(first_part), Some(remaining_part))
837        } else {
838            (None, Some(batch))
839        };
840
841        // Creates the result from current buffer + first part of new batch
842        let mut result = std::mem::take(&mut self.buffer);
843        if let Some(first_part) = first_part {
844            result.batches.push(first_part);
845        }
846
847        // Pushes remaining part to the buffer if it exists
848        if let Some(remaining_part) = remaining_part {
849            self.buffer.batches.push(remaining_part);
850        }
851
852        Some(result)
853    }
854
855    /// Returns the final [FlatSeriesBatch].
856    fn finish(&mut self) -> Option<FlatSeriesBatch> {
857        if self.buffer.batches.is_empty() {
858            None
859        } else {
860            Some(std::mem::take(&mut self.buffer))
861        }
862    }
863}
864
865/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
866fn primary_key_at<'a>(
867    primary_key: &PrimaryKeyArray,
868    primary_key_values: &'a BinaryArray,
869    index: usize,
870) -> &'a [u8] {
871    let key = primary_key.keys().value(index);
872    primary_key_values.value(key as usize)
873}
874
875#[cfg(test)]
876mod tests {
877    use std::sync::Arc;
878
879    use api::v1::OpType;
880    use datatypes::arrow::array::{
881        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
882        TimestampMillisecondArray, UInt8Array, UInt64Array,
883    };
884    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
885    use datatypes::arrow::record_batch::RecordBatch;
886
887    use super::*;
888
889    fn new_test_record_batch(
890        primary_keys: &[&[u8]],
891        timestamps: &[i64],
892        sequences: &[u64],
893        op_types: &[OpType],
894        fields: &[u64],
895    ) -> RecordBatch {
896        let num_rows = timestamps.len();
897        debug_assert_eq!(sequences.len(), num_rows);
898        debug_assert_eq!(op_types.len(), num_rows);
899        debug_assert_eq!(fields.len(), num_rows);
900        debug_assert_eq!(primary_keys.len(), num_rows);
901
902        let columns: Vec<ArrayRef> = vec![
903            build_test_pk_string_dict_array(primary_keys),
904            Arc::new(Int64Array::from_iter(
905                fields.iter().map(|v| Some(*v as i64)),
906            )),
907            Arc::new(TimestampMillisecondArray::from_iter_values(
908                timestamps.iter().copied(),
909            )),
910            build_test_pk_array(primary_keys),
911            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
912            Arc::new(UInt8Array::from_iter_values(
913                op_types.iter().map(|v| *v as u8),
914            )),
915        ];
916
917        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
918    }
919
920    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
921        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
922        for &pk in primary_keys {
923            let pk_str = std::str::from_utf8(pk).unwrap();
924            builder.append(pk_str).unwrap();
925        }
926        Arc::new(builder.finish())
927    }
928
929    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
930        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
931        for &pk in primary_keys {
932            builder.append(pk).unwrap();
933        }
934        Arc::new(builder.finish())
935    }
936
937    fn build_test_flat_schema() -> SchemaRef {
938        let fields = vec![
939            Field::new(
940                "k0",
941                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
942                false,
943            ),
944            Field::new("field0", DataType::Int64, true),
945            Field::new(
946                "ts",
947                DataType::Timestamp(TimeUnit::Millisecond, None),
948                false,
949            ),
950            Field::new(
951                "__primary_key",
952                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
953                false,
954            ),
955            Field::new("__sequence", DataType::UInt64, false),
956            Field::new("__op_type", DataType::UInt8, false),
957        ];
958        Arc::new(Schema::new(fields))
959    }
960
961    #[test]
962    fn test_empty_buffer_first_push() {
963        let mut divider = FlatSeriesBatchDivider::default();
964        let result = divider.finish();
965        assert!(result.is_none());
966
967        let mut divider = FlatSeriesBatchDivider::default();
968        let batch = new_test_record_batch(
969            &[b"series1", b"series1"],
970            &[1000, 2000],
971            &[1, 2],
972            &[OpType::Put, OpType::Put],
973            &[10, 20],
974        );
975        let result = divider.push(batch);
976        assert!(result.is_none());
977        assert_eq!(divider.buffer.batches.len(), 1);
978    }
979
980    #[test]
981    fn test_same_series_accumulation() {
982        let mut divider = FlatSeriesBatchDivider::default();
983
984        let batch1 = new_test_record_batch(
985            &[b"series1", b"series1"],
986            &[1000, 2000],
987            &[1, 2],
988            &[OpType::Put, OpType::Put],
989            &[10, 20],
990        );
991
992        let batch2 = new_test_record_batch(
993            &[b"series1", b"series1"],
994            &[3000, 4000],
995            &[3, 4],
996            &[OpType::Put, OpType::Put],
997            &[30, 40],
998        );
999
1000        divider.push(batch1);
1001        let result = divider.push(batch2);
1002        assert!(result.is_none());
1003        let series_batch = divider.finish().unwrap();
1004        assert_eq!(series_batch.batches.len(), 2);
1005    }
1006
1007    #[test]
1008    fn test_series_boundary_detection() {
1009        let mut divider = FlatSeriesBatchDivider::default();
1010
1011        let batch1 = new_test_record_batch(
1012            &[b"series1", b"series1"],
1013            &[1000, 2000],
1014            &[1, 2],
1015            &[OpType::Put, OpType::Put],
1016            &[10, 20],
1017        );
1018
1019        let batch2 = new_test_record_batch(
1020            &[b"series2", b"series2"],
1021            &[3000, 4000],
1022            &[3, 4],
1023            &[OpType::Put, OpType::Put],
1024            &[30, 40],
1025        );
1026
1027        divider.push(batch1);
1028        let series_batch = divider.push(batch2).unwrap();
1029        assert_eq!(series_batch.batches.len(), 1);
1030
1031        assert_eq!(divider.buffer.batches.len(), 1);
1032    }
1033
1034    #[test]
1035    fn test_series_boundary_within_batch() {
1036        let mut divider = FlatSeriesBatchDivider::default();
1037
1038        let batch1 = new_test_record_batch(
1039            &[b"series1", b"series1"],
1040            &[1000, 2000],
1041            &[1, 2],
1042            &[OpType::Put, OpType::Put],
1043            &[10, 20],
1044        );
1045
1046        let batch2 = new_test_record_batch(
1047            &[b"series1", b"series2"],
1048            &[3000, 4000],
1049            &[3, 4],
1050            &[OpType::Put, OpType::Put],
1051            &[30, 40],
1052        );
1053
1054        divider.push(batch1);
1055        let series_batch = divider.push(batch2).unwrap();
1056        assert_eq!(series_batch.batches.len(), 2);
1057        assert_eq!(series_batch.batches[0].num_rows(), 2);
1058        assert_eq!(series_batch.batches[1].num_rows(), 1);
1059
1060        assert_eq!(divider.buffer.batches.len(), 1);
1061        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1062    }
1063
1064    #[test]
1065    fn test_series_splitting() {
1066        let mut divider = FlatSeriesBatchDivider::default();
1067
1068        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1069
1070        let batch2 = new_test_record_batch(
1071            &[b"series1", b"series2", b"series2", b"series3"],
1072            &[2000, 3000, 4000, 5000],
1073            &[2, 3, 4, 5],
1074            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1075            &[20, 30, 40, 50],
1076        );
1077
1078        divider.push(batch1);
1079        let series_batch = divider.push(batch2).unwrap();
1080        assert_eq!(series_batch.batches.len(), 2);
1081
1082        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1083        assert_eq!(total_rows, 2);
1084
1085        let final_batch = divider.finish().unwrap();
1086        assert_eq!(final_batch.batches.len(), 1);
1087        assert_eq!(final_batch.batches[0].num_rows(), 3);
1088    }
1089}