Skip to main content

mito2/read/
series_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Per-series scan implementation.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::SmallVec;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44    Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45    ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::ScannerMetrics;
48use crate::read::pruner::{PartitionPruner, Pruner};
49use crate::read::scan_region::{ScanInput, StreamContext};
50use crate::read::scan_util::{
51    PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
52    compute_parallel_channel_size,
53};
54use crate::read::seq_scan::SeqScan;
55use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
56use crate::sst::parquet::flat_format::primary_key_column_index;
57use crate::sst::parquet::format::PrimaryKeyArray;
58
59/// Timeout to send a batch to a sender.
60const SEND_TIMEOUT: Duration = Duration::from_micros(100);
61
62/// List of receivers.
63type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
64
65/// Scans a region and returns sorted rows of a series in the same partition.
66///
67/// The output order is always order by `(primary key, time index)` inside every
68/// partition.
69/// Always returns the same series (primary key) to the same partition.
70pub struct SeriesScan {
71    /// Properties of the scanner.
72    properties: ScannerProperties,
73    /// Context of streams.
74    stream_ctx: Arc<StreamContext>,
75    /// Shared pruner for file range building.
76    pruner: Arc<Pruner>,
77    /// Receivers of each partition.
78    receivers: Mutex<ReceiverList>,
79    /// Metrics for each partition.
80    /// The scanner only sets in query and keeps it empty during compaction.
81    metrics_list: Arc<PartitionMetricsList>,
82}
83
84impl SeriesScan {
85    /// Creates a new [SeriesScan].
86    pub(crate) fn new(input: ScanInput) -> Self {
87        let mut properties = ScannerProperties::default()
88            .with_append_mode(input.append_mode)
89            .with_total_rows(input.total_rows());
90        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
91        properties.partitions = vec![stream_ctx.partition_ranges()];
92
93        // Create the shared pruner with number of workers equal to CPU cores.
94        let num_workers = common_stat::get_total_cpu_cores().max(1);
95        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
96
97        Self {
98            properties,
99            stream_ctx,
100            pruner,
101            receivers: Mutex::new(Vec::new()),
102            metrics_list: Arc::new(PartitionMetricsList::default()),
103        }
104    }
105
106    #[tracing::instrument(
107        skip_all,
108        fields(
109            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
110            partition = partition
111        )
112    )]
113    fn scan_partition_impl(
114        &self,
115        ctx: &QueryScanContext,
116        metrics_set: &ExecutionPlanMetricsSet,
117        partition: usize,
118    ) -> Result<SendableRecordBatchStream> {
119        let metrics = new_partition_metrics(
120            &self.stream_ctx,
121            ctx.explain_verbose,
122            metrics_set,
123            partition,
124            &self.metrics_list,
125        );
126
127        let batch_stream =
128            self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
129
130        let input = &self.stream_ctx.input;
131        let record_batch_stream = ConvertBatchStream::new(
132            batch_stream,
133            input.mapper.clone(),
134            input.cache_strategy.clone(),
135            metrics,
136        );
137
138        Ok(Box::pin(RecordBatchStreamWrapper::new(
139            input.mapper.output_schema(),
140            Box::pin(record_batch_stream),
141        )))
142    }
143
144    #[tracing::instrument(
145        skip_all,
146        fields(
147            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
148            partition = partition
149        )
150    )]
151    fn scan_batch_in_partition(
152        &self,
153        ctx: &QueryScanContext,
154        partition: usize,
155        part_metrics: PartitionMetrics,
156        metrics_set: &ExecutionPlanMetricsSet,
157    ) -> Result<ScanBatchStream> {
158        if ctx.explain_verbose {
159            common_telemetry::info!(
160                "SeriesScan partition {}, region_id: {}",
161                partition,
162                self.stream_ctx.input.region_metadata().region_id
163            );
164        }
165
166        ensure!(
167            partition < self.properties.num_partitions(),
168            PartitionOutOfRangeSnafu {
169                given: partition,
170                all: self.properties.num_partitions(),
171            }
172        );
173
174        self.maybe_start_distributor(metrics_set, &self.metrics_list, ctx.explain_verbose);
175
176        let mut receiver = self.take_receiver(partition)?;
177        let stream = try_stream! {
178            part_metrics.on_first_poll();
179
180            let mut fetch_start = Instant::now();
181            while let Some(series) = receiver.recv().await {
182                let series = series?;
183
184                let mut metrics = ScannerMetrics::default();
185                metrics.scan_cost += fetch_start.elapsed();
186                fetch_start = Instant::now();
187
188                metrics.num_batches += series.num_batches();
189                metrics.num_rows += series.num_rows();
190
191                let yield_start = Instant::now();
192                yield ScanBatch::Series(series);
193                metrics.yield_cost += yield_start.elapsed();
194
195                part_metrics.merge_metrics(&metrics);
196            }
197
198            part_metrics.on_finish();
199        };
200        Ok(Box::pin(stream))
201    }
202
203    /// Takes the receiver for the partition.
204    fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
205        let mut rx_list = self.receivers.lock().unwrap();
206        rx_list[partition]
207            .take()
208            .context(ScanMultiTimesSnafu { partition })
209    }
210
211    /// Starts the distributor if the receiver list is empty.
212    #[tracing::instrument(
213        skip(self, metrics_set, metrics_list),
214        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
215    )]
216    fn maybe_start_distributor(
217        &self,
218        metrics_set: &ExecutionPlanMetricsSet,
219        metrics_list: &Arc<PartitionMetricsList>,
220        explain_verbose: bool,
221    ) {
222        let mut rx_list = self.receivers.lock().unwrap();
223        if !rx_list.is_empty() {
224            return;
225        }
226
227        let (senders, receivers) = new_channel_list(self.properties.num_partitions());
228        let mut distributor = SeriesDistributor {
229            stream_ctx: self.stream_ctx.clone(),
230            range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
231            final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
232            partitions: self.properties.partitions.clone(),
233            pruner: self.pruner.clone(),
234            senders,
235            metrics_set: metrics_set.clone(),
236            metrics_list: metrics_list.clone(),
237            explain_verbose,
238        };
239        let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
240        let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
241        common_runtime::spawn_global(
242            async move {
243                distributor.execute().await;
244            }
245            .instrument(span),
246        );
247
248        *rx_list = receivers;
249    }
250
251    /// Scans the region and returns a stream.
252    #[tracing::instrument(
253        skip_all,
254        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
255    )]
256    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
257        let part_num = self.properties.num_partitions();
258        let metrics_set = ExecutionPlanMetricsSet::default();
259        let streams = (0..part_num)
260            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
261            .collect::<Result<Vec<_>, BoxedError>>()?;
262        let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
263        Ok(Box::pin(chained_stream))
264    }
265
266    /// Scan [`Batch`] in all partitions one by one.
267    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
268        let metrics_set = ExecutionPlanMetricsSet::new();
269
270        let streams = (0..self.properties.partitions.len())
271            .map(|partition| {
272                let metrics = new_partition_metrics(
273                    &self.stream_ctx,
274                    false,
275                    &metrics_set,
276                    partition,
277                    &self.metrics_list,
278                );
279
280                self.scan_batch_in_partition(
281                    &QueryScanContext::default(),
282                    partition,
283                    metrics,
284                    &metrics_set,
285                )
286            })
287            .collect::<Result<Vec<_>>>()?;
288
289        Ok(Box::pin(futures::stream::iter(streams).flatten()))
290    }
291
292    /// Checks resource limit for the scanner.
293    pub(crate) fn check_scan_limit(&self) -> Result<()> {
294        // Sum the total number of files across all partitions
295        let total_files: usize = self
296            .properties
297            .partitions
298            .iter()
299            .flat_map(|partition| partition.iter())
300            .map(|part_range| {
301                let range_meta = &self.stream_ctx.ranges[part_range.identifier];
302                range_meta.indices.len()
303            })
304            .sum();
305
306        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
307        if total_files > max_concurrent_files {
308            return TooManyFilesToReadSnafu {
309                actual: total_files,
310                max: max_concurrent_files,
311            }
312            .fail();
313        }
314
315        Ok(())
316    }
317}
318
319fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
320    let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
321        .map(|_| {
322            let (sender, receiver) = mpsc::channel(1);
323            (Some(sender), Some(receiver))
324        })
325        .unzip();
326    (SenderList::new(senders), receivers)
327}
328
329impl RegionScanner for SeriesScan {
330    fn name(&self) -> &str {
331        "SeriesScan"
332    }
333
334    fn properties(&self) -> &ScannerProperties {
335        &self.properties
336    }
337
338    fn schema(&self) -> SchemaRef {
339        self.stream_ctx.input.mapper.output_schema()
340    }
341
342    fn metadata(&self) -> RegionMetadataRef {
343        self.stream_ctx.input.mapper.metadata().clone()
344    }
345
346    fn scan_partition(
347        &self,
348        ctx: &QueryScanContext,
349        metrics_set: &ExecutionPlanMetricsSet,
350        partition: usize,
351    ) -> Result<SendableRecordBatchStream, BoxedError> {
352        self.scan_partition_impl(ctx, metrics_set, partition)
353            .map_err(BoxedError::new)
354    }
355
356    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
357        self.properties.prepare(request);
358
359        self.check_scan_limit().map_err(BoxedError::new)?;
360
361        Ok(())
362    }
363
364    fn has_predicate_without_region(&self) -> bool {
365        let predicate = self
366            .stream_ctx
367            .input
368            .predicate_group()
369            .predicate_without_region();
370        predicate.is_some()
371    }
372
373    fn add_dyn_filter_to_predicate(
374        &mut self,
375        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
376    ) -> Vec<bool> {
377        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
378    }
379
380    fn set_logical_region(&mut self, logical_region: bool) {
381        self.properties.set_logical_region(logical_region);
382    }
383}
384
385impl DisplayAs for SeriesScan {
386    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
387        write!(
388            f,
389            "SeriesScan: region={}, ",
390            self.stream_ctx.input.mapper.metadata().region_id
391        )?;
392        match t {
393            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
394                self.stream_ctx.format_for_explain(false, f)
395            }
396            DisplayFormatType::Verbose => {
397                self.stream_ctx.format_for_explain(true, f)?;
398                self.metrics_list.format_verbose_metrics(f)
399            }
400        }
401    }
402}
403
404impl fmt::Debug for SeriesScan {
405    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406        f.debug_struct("SeriesScan")
407            .field("num_ranges", &self.stream_ctx.ranges.len())
408            .finish()
409    }
410}
411
412#[cfg(test)]
413impl SeriesScan {
414    /// Returns the input.
415    pub(crate) fn input(&self) -> &ScanInput {
416        &self.stream_ctx.input
417    }
418}
419
420/// The distributor scans series and distributes them to different partitions.
421struct SeriesDistributor {
422    /// Context for the scan stream.
423    stream_ctx: Arc<StreamContext>,
424    /// Semaphore for file scanning and range-level merging.
425    range_semaphore: Option<Arc<Semaphore>>,
426    /// Semaphore for the final merge across all range streams.
427    /// Must be separate from `range_semaphore` to avoid deadlock: final merge tasks
428    /// hold a permit while waiting for data from range-level merge tasks, which also
429    /// need permits to produce data.
430    final_merge_semaphore: Option<Arc<Semaphore>>,
431    /// Partition ranges to scan.
432    partitions: Vec<Vec<PartitionRange>>,
433    /// Shared pruner for file range building.
434    pruner: Arc<Pruner>,
435    /// Senders of all partitions.
436    senders: SenderList,
437    /// Metrics set to report.
438    /// The distributor report the metrics as an additional partition.
439    /// This may double the scan cost of the [SeriesScan] metrics. We can
440    /// get per-partition metrics in verbose mode to see the metrics of the
441    /// distributor.
442    metrics_set: ExecutionPlanMetricsSet,
443    metrics_list: Arc<PartitionMetricsList>,
444    /// Whether to use verbose logging and collect detailed metrics.
445    explain_verbose: bool,
446}
447
448impl SeriesDistributor {
449    /// Executes the distributor.
450    #[tracing::instrument(
451        skip_all,
452        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
453    )]
454    async fn execute(&mut self) {
455        let result = self.scan_partitions_flat().await;
456
457        if let Err(e) = result {
458            self.senders.send_error(e).await;
459        }
460    }
461
462    /// Scans all parts in flat format using FlatSeriesBatchDivider.
463    #[tracing::instrument(
464        skip_all,
465        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
466    )]
467    async fn scan_partitions_flat(&mut self) -> Result<()> {
468        // Initialize reference counts for all partition ranges.
469        for partition_ranges in &self.partitions {
470            self.pruner.add_partition_ranges(partition_ranges);
471        }
472
473        // Create PartitionPruner covering all partitions
474        let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
475        let partition_pruner = Arc::new(PartitionPruner::new(
476            self.pruner.clone(),
477            &all_partition_ranges,
478        ));
479
480        let part_metrics = new_partition_metrics(
481            &self.stream_ctx,
482            self.explain_verbose,
483            &self.metrics_set,
484            self.partitions.len(),
485            &self.metrics_list,
486        );
487        part_metrics.on_first_poll();
488        // Start fetch time before building sources so scan cost contains
489        // build part cost.
490        let mut fetch_start = Instant::now();
491
492        // Builds one deduped stream per partition range, then merges across ranges.
493        let build_start = Instant::now();
494        let mut tasks = Vec::new();
495        for partition in &self.partitions {
496            for part_range in partition {
497                let stream_ctx = self.stream_ctx.clone();
498                let part_range = *part_range;
499                let part_metrics = part_metrics.clone();
500                let partition_pruner = partition_pruner.clone();
501                let file_scan_semaphore = self.range_semaphore.clone();
502                let merge_semaphore = self.range_semaphore.clone();
503                tasks.push(common_runtime::spawn_global(async move {
504                    SeqScan::build_flat_partition_range_read(
505                        &stream_ctx,
506                        &part_range,
507                        false,
508                        &part_metrics,
509                        partition_pruner,
510                        file_scan_semaphore,
511                        merge_semaphore,
512                    )
513                    .await
514                }));
515            }
516        }
517        let mut range_streams = Vec::with_capacity(tasks.len());
518        let mut estimated_batch_sizes = Vec::with_capacity(tasks.len());
519        for task in tasks {
520            let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??;
521            range_streams.push(stream);
522            estimated_batch_sizes.push(estimated_batch_size);
523        }
524        let channel_size =
525            compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes));
526        common_telemetry::debug!(
527            "SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}",
528            range_streams.len(),
529            self.stream_ctx.input.region_metadata().region_id,
530            build_start.elapsed(),
531            channel_size,
532        );
533
534        // Each partition range stream is already deduped, so skip dedup here.
535        // Use a separate semaphore for the final merge to avoid deadlock with
536        // range-level merge tasks that share the range_semaphore.
537        let mut reader = SeqScan::build_flat_reader_from_sources(
538            &self.stream_ctx,
539            range_streams,
540            self.final_merge_semaphore.clone(),
541            Some(&part_metrics),
542            true,
543            channel_size,
544        )
545        .await?;
546        let mut metrics = SeriesDistributorMetrics::default();
547
548        let mut divider = FlatSeriesBatchDivider::default();
549        while let Some(record_batch) = reader.try_next().await? {
550            metrics.scan_cost += fetch_start.elapsed();
551            metrics.num_batches += 1;
552            metrics.num_rows += record_batch.num_rows();
553
554            debug_assert!(record_batch.num_rows() > 0);
555            if record_batch.num_rows() == 0 {
556                fetch_start = Instant::now();
557                continue;
558            }
559
560            // Use divider to split series
561            let divider_start = Instant::now();
562            let series_batch = divider.push(record_batch);
563            metrics.divider_cost += divider_start.elapsed();
564            if let Some(series_batch) = series_batch {
565                let yield_start = Instant::now();
566                self.senders
567                    .send_batch(SeriesBatch::Flat(series_batch))
568                    .await?;
569                metrics.yield_cost += yield_start.elapsed();
570            }
571            fetch_start = Instant::now();
572        }
573
574        // Send any remaining batch in the divider
575        let divider_start = Instant::now();
576        let series_batch = divider.finish();
577        metrics.divider_cost += divider_start.elapsed();
578        if let Some(series_batch) = series_batch {
579            let yield_start = Instant::now();
580            self.senders
581                .send_batch(SeriesBatch::Flat(series_batch))
582                .await?;
583            metrics.yield_cost += yield_start.elapsed();
584        }
585
586        metrics.scan_cost += fetch_start.elapsed();
587        metrics.num_series_send_timeout = self.senders.num_timeout;
588        metrics.num_series_send_full = self.senders.num_full;
589        part_metrics.set_distributor_metrics(&metrics);
590
591        part_metrics.on_finish();
592
593        Ok(())
594    }
595}
596
597/// Batches of the same series.
598#[derive(Debug)]
599pub enum SeriesBatch {
600    Flat(FlatSeriesBatch),
601}
602
603impl SeriesBatch {
604    /// Returns the number of batches.
605    pub fn num_batches(&self) -> usize {
606        match self {
607            SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
608        }
609    }
610
611    /// Returns the total number of rows across all batches.
612    pub fn num_rows(&self) -> usize {
613        match self {
614            SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
615        }
616    }
617}
618
619/// Batches of the same series in flat format.
620#[derive(Default, Debug)]
621pub struct FlatSeriesBatch {
622    pub batches: SmallVec<[RecordBatch; 4]>,
623}
624
625/// List of senders.
626struct SenderList {
627    senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
628    /// Number of None senders.
629    num_nones: usize,
630    /// Index of the current partition to send.
631    sender_idx: usize,
632    /// Number of timeout.
633    num_timeout: usize,
634    /// Number of full senders.
635    num_full: usize,
636}
637
638impl SenderList {
639    fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
640        let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
641        Self {
642            senders,
643            num_nones,
644            sender_idx: 0,
645            num_timeout: 0,
646            num_full: 0,
647        }
648    }
649
650    /// Finds a partition and tries to send the batch to the partition.
651    /// Returns None if it sends successfully.
652    fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
653        for _ in 0..self.senders.len() {
654            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
655
656            let sender_idx = self.fetch_add_sender_idx();
657            let Some(sender) = &self.senders[sender_idx] else {
658                continue;
659            };
660
661            match sender.try_send(Ok(batch)) {
662                Ok(()) => return Ok(None),
663                Err(TrySendError::Full(res)) => {
664                    self.num_full += 1;
665                    // Safety: we send Ok.
666                    batch = res.unwrap();
667                }
668                Err(TrySendError::Closed(res)) => {
669                    self.senders[sender_idx] = None;
670                    self.num_nones += 1;
671                    // Safety: we send Ok.
672                    batch = res.unwrap();
673                }
674            }
675        }
676
677        Ok(Some(batch))
678    }
679
680    /// Finds a partition and sends the batch to the partition.
681    async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
682        // Sends the batch without blocking first.
683        match self.try_send_batch(batch)? {
684            Some(b) => {
685                // Unable to send batch to partition.
686                batch = b;
687            }
688            None => {
689                return Ok(());
690            }
691        }
692
693        loop {
694            ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
695
696            let sender_idx = self.fetch_add_sender_idx();
697            let Some(sender) = &self.senders[sender_idx] else {
698                continue;
699            };
700            // Adds a timeout to avoid blocking indefinitely and sending
701            // the batch in a round-robin fashion when some partitions
702            // don't poll their inputs. This may happen if we have a
703            // node like sort merging. But it is rare when we are using SeriesScan.
704            match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
705                Ok(()) => break,
706                Err(SendTimeoutError::Timeout(res)) => {
707                    self.num_timeout += 1;
708                    // Safety: we send Ok.
709                    batch = res.unwrap();
710                }
711                Err(SendTimeoutError::Closed(res)) => {
712                    self.senders[sender_idx] = None;
713                    self.num_nones += 1;
714                    // Safety: we send Ok.
715                    batch = res.unwrap();
716                }
717            }
718        }
719
720        Ok(())
721    }
722
723    async fn send_error(&self, error: Error) {
724        let error = Arc::new(error);
725        for sender in self.senders.iter().flatten() {
726            let result = Err(error.clone()).context(ScanSeriesSnafu);
727            let _ = sender.send(result).await;
728        }
729    }
730
731    fn fetch_add_sender_idx(&mut self) -> usize {
732        let sender_idx = self.sender_idx;
733        self.sender_idx = (self.sender_idx + 1) % self.senders.len();
734        sender_idx
735    }
736}
737
738fn new_partition_metrics(
739    stream_ctx: &StreamContext,
740    explain_verbose: bool,
741    metrics_set: &ExecutionPlanMetricsSet,
742    partition: usize,
743    metrics_list: &PartitionMetricsList,
744) -> PartitionMetrics {
745    let metrics = PartitionMetrics::new(
746        stream_ctx.input.mapper.metadata().region_id,
747        partition,
748        "SeriesScan",
749        stream_ctx.query_start,
750        explain_verbose,
751        metrics_set,
752    );
753
754    metrics_list.set(partition, metrics.clone());
755    metrics
756}
757
758/// A divider to split flat record batches by time series.
759///
760/// It only ensures rows of the same series are returned in the same [FlatSeriesBatch].
761/// However, a [FlatSeriesBatch] may contain rows from multiple series.
762#[derive(Default)]
763struct FlatSeriesBatchDivider {
764    buffer: FlatSeriesBatch,
765}
766
767impl FlatSeriesBatchDivider {
768    /// Pushes a record batch into the divider.
769    ///
770    /// Returns a [FlatSeriesBatch] if we ensure the batch contains all rows of the series in it.
771    fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
772        // If buffer is empty
773        if self.buffer.batches.is_empty() {
774            self.buffer.batches.push(batch);
775            return None;
776        }
777
778        // Gets the primary key column from the incoming batch.
779        let pk_column_idx = primary_key_column_index(batch.num_columns());
780        let batch_pk_column = batch.column(pk_column_idx);
781        let batch_pk_array = batch_pk_column
782            .as_any()
783            .downcast_ref::<PrimaryKeyArray>()
784            .unwrap();
785        let batch_pk_values = batch_pk_array
786            .values()
787            .as_any()
788            .downcast_ref::<BinaryArray>()
789            .unwrap();
790        // Gets the last primary key of the incoming batch.
791        let batch_last_pk =
792            primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
793        // Gets the last primary key of the buffer.
794        // Safety: the buffer is not empty.
795        let buffer_last_batch = self.buffer.batches.last().unwrap();
796        let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
797        let buffer_pk_array = buffer_pk_column
798            .as_any()
799            .downcast_ref::<PrimaryKeyArray>()
800            .unwrap();
801        let buffer_pk_values = buffer_pk_array
802            .values()
803            .as_any()
804            .downcast_ref::<BinaryArray>()
805            .unwrap();
806        let buffer_last_pk =
807            primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
808
809        // If last primary key in the batch is the same as last primary key in the buffer.
810        if batch_last_pk == buffer_last_pk {
811            self.buffer.batches.push(batch);
812            return None;
813        }
814        // Otherwise, the batch must have a different primary key, we find the first offset of the
815        // changed primary key.
816        let batch_pk_keys = batch_pk_array.keys();
817        let pk_indices = batch_pk_keys.values();
818        let mut change_offset = 0;
819        for (i, &key) in pk_indices.iter().enumerate() {
820            let batch_pk = batch_pk_values.value(key as usize);
821
822            if buffer_last_pk != batch_pk {
823                change_offset = i;
824                break;
825            }
826        }
827
828        // Splits the batch at the change offset
829        let (first_part, remaining_part) = if change_offset > 0 {
830            let first_part = batch.slice(0, change_offset);
831            let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
832            (Some(first_part), Some(remaining_part))
833        } else {
834            (None, Some(batch))
835        };
836
837        // Creates the result from current buffer + first part of new batch
838        let mut result = std::mem::take(&mut self.buffer);
839        if let Some(first_part) = first_part {
840            result.batches.push(first_part);
841        }
842
843        // Pushes remaining part to the buffer if it exists
844        if let Some(remaining_part) = remaining_part {
845            self.buffer.batches.push(remaining_part);
846        }
847
848        Some(result)
849    }
850
851    /// Returns the final [FlatSeriesBatch].
852    fn finish(&mut self) -> Option<FlatSeriesBatch> {
853        if self.buffer.batches.is_empty() {
854            None
855        } else {
856            Some(std::mem::take(&mut self.buffer))
857        }
858    }
859}
860
861/// Helper function to extract primary key bytes at a specific index from [PrimaryKeyArray].
862fn primary_key_at<'a>(
863    primary_key: &PrimaryKeyArray,
864    primary_key_values: &'a BinaryArray,
865    index: usize,
866) -> &'a [u8] {
867    let key = primary_key.keys().value(index);
868    primary_key_values.value(key as usize)
869}
870
871#[cfg(test)]
872mod tests {
873    use std::sync::Arc;
874
875    use api::v1::OpType;
876    use datatypes::arrow::array::{
877        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
878        TimestampMillisecondArray, UInt8Array, UInt64Array,
879    };
880    use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
881    use datatypes::arrow::record_batch::RecordBatch;
882
883    use super::*;
884
885    fn new_test_record_batch(
886        primary_keys: &[&[u8]],
887        timestamps: &[i64],
888        sequences: &[u64],
889        op_types: &[OpType],
890        fields: &[u64],
891    ) -> RecordBatch {
892        let num_rows = timestamps.len();
893        debug_assert_eq!(sequences.len(), num_rows);
894        debug_assert_eq!(op_types.len(), num_rows);
895        debug_assert_eq!(fields.len(), num_rows);
896        debug_assert_eq!(primary_keys.len(), num_rows);
897
898        let columns: Vec<ArrayRef> = vec![
899            build_test_pk_string_dict_array(primary_keys),
900            Arc::new(Int64Array::from_iter(
901                fields.iter().map(|v| Some(*v as i64)),
902            )),
903            Arc::new(TimestampMillisecondArray::from_iter_values(
904                timestamps.iter().copied(),
905            )),
906            build_test_pk_array(primary_keys),
907            Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
908            Arc::new(UInt8Array::from_iter_values(
909                op_types.iter().map(|v| *v as u8),
910            )),
911        ];
912
913        RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
914    }
915
916    fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
917        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
918        for &pk in primary_keys {
919            let pk_str = std::str::from_utf8(pk).unwrap();
920            builder.append(pk_str).unwrap();
921        }
922        Arc::new(builder.finish())
923    }
924
925    fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
926        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
927        for &pk in primary_keys {
928            builder.append(pk).unwrap();
929        }
930        Arc::new(builder.finish())
931    }
932
933    fn build_test_flat_schema() -> SchemaRef {
934        let fields = vec![
935            Field::new(
936                "k0",
937                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
938                false,
939            ),
940            Field::new("field0", DataType::Int64, true),
941            Field::new(
942                "ts",
943                DataType::Timestamp(TimeUnit::Millisecond, None),
944                false,
945            ),
946            Field::new(
947                "__primary_key",
948                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
949                false,
950            ),
951            Field::new("__sequence", DataType::UInt64, false),
952            Field::new("__op_type", DataType::UInt8, false),
953        ];
954        Arc::new(Schema::new(fields))
955    }
956
957    #[test]
958    fn test_empty_buffer_first_push() {
959        let mut divider = FlatSeriesBatchDivider::default();
960        let result = divider.finish();
961        assert!(result.is_none());
962
963        let mut divider = FlatSeriesBatchDivider::default();
964        let batch = new_test_record_batch(
965            &[b"series1", b"series1"],
966            &[1000, 2000],
967            &[1, 2],
968            &[OpType::Put, OpType::Put],
969            &[10, 20],
970        );
971        let result = divider.push(batch);
972        assert!(result.is_none());
973        assert_eq!(divider.buffer.batches.len(), 1);
974    }
975
976    #[test]
977    fn test_same_series_accumulation() {
978        let mut divider = FlatSeriesBatchDivider::default();
979
980        let batch1 = new_test_record_batch(
981            &[b"series1", b"series1"],
982            &[1000, 2000],
983            &[1, 2],
984            &[OpType::Put, OpType::Put],
985            &[10, 20],
986        );
987
988        let batch2 = new_test_record_batch(
989            &[b"series1", b"series1"],
990            &[3000, 4000],
991            &[3, 4],
992            &[OpType::Put, OpType::Put],
993            &[30, 40],
994        );
995
996        divider.push(batch1);
997        let result = divider.push(batch2);
998        assert!(result.is_none());
999        let series_batch = divider.finish().unwrap();
1000        assert_eq!(series_batch.batches.len(), 2);
1001    }
1002
1003    #[test]
1004    fn test_series_boundary_detection() {
1005        let mut divider = FlatSeriesBatchDivider::default();
1006
1007        let batch1 = new_test_record_batch(
1008            &[b"series1", b"series1"],
1009            &[1000, 2000],
1010            &[1, 2],
1011            &[OpType::Put, OpType::Put],
1012            &[10, 20],
1013        );
1014
1015        let batch2 = new_test_record_batch(
1016            &[b"series2", b"series2"],
1017            &[3000, 4000],
1018            &[3, 4],
1019            &[OpType::Put, OpType::Put],
1020            &[30, 40],
1021        );
1022
1023        divider.push(batch1);
1024        let series_batch = divider.push(batch2).unwrap();
1025        assert_eq!(series_batch.batches.len(), 1);
1026
1027        assert_eq!(divider.buffer.batches.len(), 1);
1028    }
1029
1030    #[test]
1031    fn test_series_boundary_within_batch() {
1032        let mut divider = FlatSeriesBatchDivider::default();
1033
1034        let batch1 = new_test_record_batch(
1035            &[b"series1", b"series1"],
1036            &[1000, 2000],
1037            &[1, 2],
1038            &[OpType::Put, OpType::Put],
1039            &[10, 20],
1040        );
1041
1042        let batch2 = new_test_record_batch(
1043            &[b"series1", b"series2"],
1044            &[3000, 4000],
1045            &[3, 4],
1046            &[OpType::Put, OpType::Put],
1047            &[30, 40],
1048        );
1049
1050        divider.push(batch1);
1051        let series_batch = divider.push(batch2).unwrap();
1052        assert_eq!(series_batch.batches.len(), 2);
1053        assert_eq!(series_batch.batches[0].num_rows(), 2);
1054        assert_eq!(series_batch.batches[1].num_rows(), 1);
1055
1056        assert_eq!(divider.buffer.batches.len(), 1);
1057        assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1058    }
1059
1060    #[test]
1061    fn test_series_splitting() {
1062        let mut divider = FlatSeriesBatchDivider::default();
1063
1064        let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1065
1066        let batch2 = new_test_record_batch(
1067            &[b"series1", b"series2", b"series2", b"series3"],
1068            &[2000, 3000, 4000, 5000],
1069            &[2, 3, 4, 5],
1070            &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1071            &[20, 30, 40, 50],
1072        );
1073
1074        divider.push(batch1);
1075        let series_batch = divider.push(batch2).unwrap();
1076        assert_eq!(series_batch.batches.len(), 2);
1077
1078        let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1079        assert_eq!(total_rows, 2);
1080
1081        let final_batch = divider.finish().unwrap();
1082        assert_eq!(final_batch.batches.len(), 1);
1083        assert_eq!(final_batch.batches[0].num_rows(), 3);
1084    }
1085}