1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::{SmallVec, smallvec};
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45 ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::pruner::{PartitionPruner, Pruner};
48use crate::read::scan_region::{ScanInput, StreamContext};
49use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
50use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{Batch, ScannerMetrics};
53use crate::sst::parquet::flat_format::primary_key_column_index;
54use crate::sst::parquet::format::PrimaryKeyArray;
55
56const SEND_TIMEOUT: Duration = Duration::from_micros(100);
58
59type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
61
62pub struct SeriesScan {
68 properties: ScannerProperties,
70 stream_ctx: Arc<StreamContext>,
72 pruner: Arc<Pruner>,
74 receivers: Mutex<ReceiverList>,
76 metrics_list: Arc<PartitionMetricsList>,
79}
80
81impl SeriesScan {
82 pub(crate) fn new(input: ScanInput) -> Self {
84 let mut properties = ScannerProperties::default()
85 .with_append_mode(input.append_mode)
86 .with_total_rows(input.total_rows());
87 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
88 properties.partitions = vec![stream_ctx.partition_ranges()];
89
90 let num_workers = common_stat::get_total_cpu_cores().max(1);
92 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
93
94 Self {
95 properties,
96 stream_ctx,
97 pruner,
98 receivers: Mutex::new(Vec::new()),
99 metrics_list: Arc::new(PartitionMetricsList::default()),
100 }
101 }
102
103 #[tracing::instrument(
104 skip_all,
105 fields(
106 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
107 partition = partition
108 )
109 )]
110 fn scan_partition_impl(
111 &self,
112 ctx: &QueryScanContext,
113 metrics_set: &ExecutionPlanMetricsSet,
114 partition: usize,
115 ) -> Result<SendableRecordBatchStream> {
116 let metrics = new_partition_metrics(
117 &self.stream_ctx,
118 ctx.explain_verbose,
119 metrics_set,
120 partition,
121 &self.metrics_list,
122 );
123
124 let batch_stream =
125 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
126
127 let input = &self.stream_ctx.input;
128 let record_batch_stream = ConvertBatchStream::new(
129 batch_stream,
130 input.mapper.clone(),
131 input.cache_strategy.clone(),
132 metrics,
133 );
134
135 Ok(Box::pin(RecordBatchStreamWrapper::new(
136 input.mapper.output_schema(),
137 Box::pin(record_batch_stream),
138 )))
139 }
140
141 #[tracing::instrument(
142 skip_all,
143 fields(
144 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
145 partition = partition
146 )
147 )]
148 fn scan_batch_in_partition(
149 &self,
150 ctx: &QueryScanContext,
151 partition: usize,
152 part_metrics: PartitionMetrics,
153 metrics_set: &ExecutionPlanMetricsSet,
154 ) -> Result<ScanBatchStream> {
155 if ctx.explain_verbose {
156 common_telemetry::info!(
157 "SeriesScan partition {}, region_id: {}",
158 partition,
159 self.stream_ctx.input.region_metadata().region_id
160 );
161 }
162
163 ensure!(
164 partition < self.properties.num_partitions(),
165 PartitionOutOfRangeSnafu {
166 given: partition,
167 all: self.properties.num_partitions(),
168 }
169 );
170
171 self.maybe_start_distributor(metrics_set, &self.metrics_list);
172
173 let mut receiver = self.take_receiver(partition)?;
174 let stream = try_stream! {
175 part_metrics.on_first_poll();
176
177 let mut fetch_start = Instant::now();
178 while let Some(series) = receiver.recv().await {
179 let series = series?;
180
181 let mut metrics = ScannerMetrics::default();
182 metrics.scan_cost += fetch_start.elapsed();
183 fetch_start = Instant::now();
184
185 metrics.num_batches += series.num_batches();
186 metrics.num_rows += series.num_rows();
187
188 let yield_start = Instant::now();
189 yield ScanBatch::Series(series);
190 metrics.yield_cost += yield_start.elapsed();
191
192 part_metrics.merge_metrics(&metrics);
193 }
194
195 part_metrics.on_finish();
196 };
197 Ok(Box::pin(stream))
198 }
199
200 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
202 let mut rx_list = self.receivers.lock().unwrap();
203 rx_list[partition]
204 .take()
205 .context(ScanMultiTimesSnafu { partition })
206 }
207
208 #[tracing::instrument(
210 skip(self, metrics_set, metrics_list),
211 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
212 )]
213 fn maybe_start_distributor(
214 &self,
215 metrics_set: &ExecutionPlanMetricsSet,
216 metrics_list: &Arc<PartitionMetricsList>,
217 ) {
218 let mut rx_list = self.receivers.lock().unwrap();
219 if !rx_list.is_empty() {
220 return;
221 }
222
223 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
224 let mut distributor = SeriesDistributor {
225 stream_ctx: self.stream_ctx.clone(),
226 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
227 partitions: self.properties.partitions.clone(),
228 pruner: self.pruner.clone(),
229 senders,
230 metrics_set: metrics_set.clone(),
231 metrics_list: metrics_list.clone(),
232 };
233 let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
234 let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
235 common_runtime::spawn_global(
236 async move {
237 distributor.execute().await;
238 }
239 .instrument(span),
240 );
241
242 *rx_list = receivers;
243 }
244
245 #[tracing::instrument(
247 skip_all,
248 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
249 )]
250 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
251 let part_num = self.properties.num_partitions();
252 let metrics_set = ExecutionPlanMetricsSet::default();
253 let streams = (0..part_num)
254 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
255 .collect::<Result<Vec<_>, BoxedError>>()?;
256 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
257 Ok(Box::pin(chained_stream))
258 }
259
260 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
262 let metrics_set = ExecutionPlanMetricsSet::new();
263
264 let streams = (0..self.properties.partitions.len())
265 .map(|partition| {
266 let metrics = new_partition_metrics(
267 &self.stream_ctx,
268 false,
269 &metrics_set,
270 partition,
271 &self.metrics_list,
272 );
273
274 self.scan_batch_in_partition(
275 &QueryScanContext::default(),
276 partition,
277 metrics,
278 &metrics_set,
279 )
280 })
281 .collect::<Result<Vec<_>>>()?;
282
283 Ok(Box::pin(futures::stream::iter(streams).flatten()))
284 }
285
286 pub(crate) fn check_scan_limit(&self) -> Result<()> {
288 let total_files: usize = self
290 .properties
291 .partitions
292 .iter()
293 .flat_map(|partition| partition.iter())
294 .map(|part_range| {
295 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
296 range_meta.indices.len()
297 })
298 .sum();
299
300 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
301 if total_files > max_concurrent_files {
302 return TooManyFilesToReadSnafu {
303 actual: total_files,
304 max: max_concurrent_files,
305 }
306 .fail();
307 }
308
309 Ok(())
310 }
311}
312
313fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
314 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
315 .map(|_| {
316 let (sender, receiver) = mpsc::channel(1);
317 (Some(sender), Some(receiver))
318 })
319 .unzip();
320 (SenderList::new(senders), receivers)
321}
322
323impl RegionScanner for SeriesScan {
324 fn name(&self) -> &str {
325 "SeriesScan"
326 }
327
328 fn properties(&self) -> &ScannerProperties {
329 &self.properties
330 }
331
332 fn schema(&self) -> SchemaRef {
333 self.stream_ctx.input.mapper.output_schema()
334 }
335
336 fn metadata(&self) -> RegionMetadataRef {
337 self.stream_ctx.input.mapper.metadata().clone()
338 }
339
340 fn scan_partition(
341 &self,
342 ctx: &QueryScanContext,
343 metrics_set: &ExecutionPlanMetricsSet,
344 partition: usize,
345 ) -> Result<SendableRecordBatchStream, BoxedError> {
346 self.scan_partition_impl(ctx, metrics_set, partition)
347 .map_err(BoxedError::new)
348 }
349
350 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
351 self.properties.prepare(request);
352
353 self.check_scan_limit().map_err(BoxedError::new)?;
354
355 Ok(())
356 }
357
358 fn has_predicate_without_region(&self) -> bool {
359 let predicate = self
360 .stream_ctx
361 .input
362 .predicate_group()
363 .predicate_without_region();
364 predicate.is_some()
365 }
366
367 fn add_dyn_filter_to_predicate(
368 &mut self,
369 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
370 ) -> Vec<bool> {
371 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
372 }
373
374 fn set_logical_region(&mut self, logical_region: bool) {
375 self.properties.set_logical_region(logical_region);
376 }
377}
378
379impl DisplayAs for SeriesScan {
380 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
381 write!(
382 f,
383 "SeriesScan: region={}, ",
384 self.stream_ctx.input.mapper.metadata().region_id
385 )?;
386 match t {
387 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
388 self.stream_ctx.format_for_explain(false, f)
389 }
390 DisplayFormatType::Verbose => {
391 self.stream_ctx.format_for_explain(true, f)?;
392 self.metrics_list.format_verbose_metrics(f)
393 }
394 }
395 }
396}
397
398impl fmt::Debug for SeriesScan {
399 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400 f.debug_struct("SeriesScan")
401 .field("num_ranges", &self.stream_ctx.ranges.len())
402 .finish()
403 }
404}
405
406#[cfg(test)]
407impl SeriesScan {
408 pub(crate) fn input(&self) -> &ScanInput {
410 &self.stream_ctx.input
411 }
412}
413
414struct SeriesDistributor {
416 stream_ctx: Arc<StreamContext>,
418 semaphore: Option<Arc<Semaphore>>,
420 partitions: Vec<Vec<PartitionRange>>,
422 pruner: Arc<Pruner>,
424 senders: SenderList,
426 metrics_set: ExecutionPlanMetricsSet,
432 metrics_list: Arc<PartitionMetricsList>,
433}
434
435impl SeriesDistributor {
436 #[tracing::instrument(
438 skip_all,
439 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
440 )]
441 async fn execute(&mut self) {
442 let result = if self.stream_ctx.input.flat_format {
443 self.scan_partitions_flat().await
444 } else {
445 self.scan_partitions().await
446 };
447
448 if let Err(e) = result {
449 self.senders.send_error(e).await;
450 }
451 }
452
453 #[tracing::instrument(
455 skip_all,
456 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
457 )]
458 async fn scan_partitions_flat(&mut self) -> Result<()> {
459 for partition_ranges in &self.partitions {
461 self.pruner.add_partition_ranges(partition_ranges);
462 }
463
464 let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
466 let partition_pruner = Arc::new(PartitionPruner::new(
467 self.pruner.clone(),
468 &all_partition_ranges,
469 ));
470
471 let part_metrics = new_partition_metrics(
472 &self.stream_ctx,
473 false,
474 &self.metrics_set,
475 self.partitions.len(),
476 &self.metrics_list,
477 );
478 part_metrics.on_first_poll();
479 let mut fetch_start = Instant::now();
482
483 let mut sources = Vec::with_capacity(self.partitions.len());
485 for partition in &self.partitions {
486 sources.reserve(partition.len());
487 for part_range in partition {
488 build_flat_sources(
489 &self.stream_ctx,
490 part_range,
491 false,
492 &part_metrics,
493 partition_pruner.clone(),
494 &mut sources,
495 self.semaphore.clone(),
496 )
497 .await?;
498 }
499 }
500
501 let mut reader = SeqScan::build_flat_reader_from_sources(
503 &self.stream_ctx,
504 sources,
505 self.semaphore.clone(),
506 Some(&part_metrics),
507 )
508 .await?;
509 let mut metrics = SeriesDistributorMetrics::default();
510
511 let mut divider = FlatSeriesBatchDivider::default();
512 while let Some(record_batch) = reader.try_next().await? {
513 metrics.scan_cost += fetch_start.elapsed();
514 metrics.num_batches += 1;
515 metrics.num_rows += record_batch.num_rows();
516
517 debug_assert!(record_batch.num_rows() > 0);
518 if record_batch.num_rows() == 0 {
519 fetch_start = Instant::now();
520 continue;
521 }
522
523 let divider_start = Instant::now();
525 let series_batch = divider.push(record_batch);
526 metrics.divider_cost += divider_start.elapsed();
527 if let Some(series_batch) = series_batch {
528 let yield_start = Instant::now();
529 self.senders
530 .send_batch(SeriesBatch::Flat(series_batch))
531 .await?;
532 metrics.yield_cost += yield_start.elapsed();
533 }
534 fetch_start = Instant::now();
535 }
536
537 let divider_start = Instant::now();
539 let series_batch = divider.finish();
540 metrics.divider_cost += divider_start.elapsed();
541 if let Some(series_batch) = series_batch {
542 let yield_start = Instant::now();
543 self.senders
544 .send_batch(SeriesBatch::Flat(series_batch))
545 .await?;
546 metrics.yield_cost += yield_start.elapsed();
547 }
548
549 metrics.scan_cost += fetch_start.elapsed();
550 metrics.num_series_send_timeout = self.senders.num_timeout;
551 metrics.num_series_send_full = self.senders.num_full;
552 part_metrics.set_distributor_metrics(&metrics);
553
554 part_metrics.on_finish();
555
556 Ok(())
557 }
558
559 #[tracing::instrument(
561 skip_all,
562 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
563 )]
564 async fn scan_partitions(&mut self) -> Result<()> {
565 for partition_ranges in &self.partitions {
567 self.pruner.add_partition_ranges(partition_ranges);
568 }
569
570 let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
572 let partition_pruner = Arc::new(PartitionPruner::new(
573 self.pruner.clone(),
574 &all_partition_ranges,
575 ));
576
577 let part_metrics = new_partition_metrics(
578 &self.stream_ctx,
579 false,
580 &self.metrics_set,
581 self.partitions.len(),
582 &self.metrics_list,
583 );
584 part_metrics.on_first_poll();
585 let mut fetch_start = Instant::now();
588
589 let mut sources = Vec::with_capacity(self.partitions.len());
591 for partition in &self.partitions {
592 sources.reserve(partition.len());
593 for part_range in partition {
594 build_sources(
595 &self.stream_ctx,
596 part_range,
597 false,
598 &part_metrics,
599 partition_pruner.clone(),
600 &mut sources,
601 self.semaphore.clone(),
602 )
603 .await?;
604 }
605 }
606
607 let mut reader = SeqScan::build_reader_from_sources(
609 &self.stream_ctx,
610 sources,
611 self.semaphore.clone(),
612 Some(&part_metrics),
613 )
614 .await?;
615 let mut metrics = SeriesDistributorMetrics::default();
616
617 let mut current_series = PrimaryKeySeriesBatch::default();
618 while let Some(batch) = reader.next_batch().await? {
619 metrics.scan_cost += fetch_start.elapsed();
620 metrics.num_batches += 1;
621 metrics.num_rows += batch.num_rows();
622
623 debug_assert!(!batch.is_empty());
624 if batch.is_empty() {
625 fetch_start = Instant::now();
626 continue;
627 }
628
629 let Some(last_key) = current_series.current_key() else {
630 current_series.push(batch);
631 fetch_start = Instant::now();
632 continue;
633 };
634
635 if last_key == batch.primary_key() {
636 current_series.push(batch);
637 fetch_start = Instant::now();
638 continue;
639 }
640
641 let to_send =
643 std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
644 let yield_start = Instant::now();
645 self.senders
646 .send_batch(SeriesBatch::PrimaryKey(to_send))
647 .await?;
648 metrics.yield_cost += yield_start.elapsed();
649 fetch_start = Instant::now();
650 }
651
652 if !current_series.is_empty() {
653 let yield_start = Instant::now();
654 self.senders
655 .send_batch(SeriesBatch::PrimaryKey(current_series))
656 .await?;
657 metrics.yield_cost += yield_start.elapsed();
658 }
659
660 metrics.scan_cost += fetch_start.elapsed();
661 metrics.num_series_send_timeout = self.senders.num_timeout;
662 metrics.num_series_send_full = self.senders.num_full;
663 part_metrics.set_distributor_metrics(&metrics);
664
665 part_metrics.on_finish();
666
667 Ok(())
668 }
669}
670
671#[derive(Default, Debug)]
673pub struct PrimaryKeySeriesBatch {
674 pub batches: SmallVec<[Batch; 4]>,
675}
676
677impl PrimaryKeySeriesBatch {
678 fn single(batch: Batch) -> Self {
680 Self {
681 batches: smallvec![batch],
682 }
683 }
684
685 fn current_key(&self) -> Option<&[u8]> {
686 self.batches.first().map(|batch| batch.primary_key())
687 }
688
689 fn push(&mut self, batch: Batch) {
690 self.batches.push(batch);
691 }
692
693 fn is_empty(&self) -> bool {
695 self.batches.is_empty()
696 }
697}
698
699#[derive(Debug)]
701pub enum SeriesBatch {
702 PrimaryKey(PrimaryKeySeriesBatch),
703 Flat(FlatSeriesBatch),
704}
705
706impl SeriesBatch {
707 pub fn num_batches(&self) -> usize {
709 match self {
710 SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
711 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
712 }
713 }
714
715 pub fn num_rows(&self) -> usize {
717 match self {
718 SeriesBatch::PrimaryKey(primary_key_batch) => {
719 primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
720 }
721 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
722 }
723 }
724}
725
726#[derive(Default, Debug)]
728pub struct FlatSeriesBatch {
729 pub batches: SmallVec<[RecordBatch; 4]>,
730}
731
732struct SenderList {
734 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
735 num_nones: usize,
737 sender_idx: usize,
739 num_timeout: usize,
741 num_full: usize,
743}
744
745impl SenderList {
746 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
747 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
748 Self {
749 senders,
750 num_nones,
751 sender_idx: 0,
752 num_timeout: 0,
753 num_full: 0,
754 }
755 }
756
757 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
760 for _ in 0..self.senders.len() {
761 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
762
763 let sender_idx = self.fetch_add_sender_idx();
764 let Some(sender) = &self.senders[sender_idx] else {
765 continue;
766 };
767
768 match sender.try_send(Ok(batch)) {
769 Ok(()) => return Ok(None),
770 Err(TrySendError::Full(res)) => {
771 self.num_full += 1;
772 batch = res.unwrap();
774 }
775 Err(TrySendError::Closed(res)) => {
776 self.senders[sender_idx] = None;
777 self.num_nones += 1;
778 batch = res.unwrap();
780 }
781 }
782 }
783
784 Ok(Some(batch))
785 }
786
787 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
789 match self.try_send_batch(batch)? {
791 Some(b) => {
792 batch = b;
794 }
795 None => {
796 return Ok(());
797 }
798 }
799
800 loop {
801 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
802
803 let sender_idx = self.fetch_add_sender_idx();
804 let Some(sender) = &self.senders[sender_idx] else {
805 continue;
806 };
807 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
812 Ok(()) => break,
813 Err(SendTimeoutError::Timeout(res)) => {
814 self.num_timeout += 1;
815 batch = res.unwrap();
817 }
818 Err(SendTimeoutError::Closed(res)) => {
819 self.senders[sender_idx] = None;
820 self.num_nones += 1;
821 batch = res.unwrap();
823 }
824 }
825 }
826
827 Ok(())
828 }
829
830 async fn send_error(&self, error: Error) {
831 let error = Arc::new(error);
832 for sender in self.senders.iter().flatten() {
833 let result = Err(error.clone()).context(ScanSeriesSnafu);
834 let _ = sender.send(result).await;
835 }
836 }
837
838 fn fetch_add_sender_idx(&mut self) -> usize {
839 let sender_idx = self.sender_idx;
840 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
841 sender_idx
842 }
843}
844
845fn new_partition_metrics(
846 stream_ctx: &StreamContext,
847 explain_verbose: bool,
848 metrics_set: &ExecutionPlanMetricsSet,
849 partition: usize,
850 metrics_list: &PartitionMetricsList,
851) -> PartitionMetrics {
852 let metrics = PartitionMetrics::new(
853 stream_ctx.input.mapper.metadata().region_id,
854 partition,
855 "SeriesScan",
856 stream_ctx.query_start,
857 explain_verbose,
858 metrics_set,
859 );
860
861 metrics_list.set(partition, metrics.clone());
862 metrics
863}
864
865#[derive(Default)]
870struct FlatSeriesBatchDivider {
871 buffer: FlatSeriesBatch,
872}
873
874impl FlatSeriesBatchDivider {
875 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
879 if self.buffer.batches.is_empty() {
881 self.buffer.batches.push(batch);
882 return None;
883 }
884
885 let pk_column_idx = primary_key_column_index(batch.num_columns());
887 let batch_pk_column = batch.column(pk_column_idx);
888 let batch_pk_array = batch_pk_column
889 .as_any()
890 .downcast_ref::<PrimaryKeyArray>()
891 .unwrap();
892 let batch_pk_values = batch_pk_array
893 .values()
894 .as_any()
895 .downcast_ref::<BinaryArray>()
896 .unwrap();
897 let batch_last_pk =
899 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
900 let buffer_last_batch = self.buffer.batches.last().unwrap();
903 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
904 let buffer_pk_array = buffer_pk_column
905 .as_any()
906 .downcast_ref::<PrimaryKeyArray>()
907 .unwrap();
908 let buffer_pk_values = buffer_pk_array
909 .values()
910 .as_any()
911 .downcast_ref::<BinaryArray>()
912 .unwrap();
913 let buffer_last_pk =
914 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
915
916 if batch_last_pk == buffer_last_pk {
918 self.buffer.batches.push(batch);
919 return None;
920 }
921 let batch_pk_keys = batch_pk_array.keys();
924 let pk_indices = batch_pk_keys.values();
925 let mut change_offset = 0;
926 for (i, &key) in pk_indices.iter().enumerate() {
927 let batch_pk = batch_pk_values.value(key as usize);
928
929 if buffer_last_pk != batch_pk {
930 change_offset = i;
931 break;
932 }
933 }
934
935 let (first_part, remaining_part) = if change_offset > 0 {
937 let first_part = batch.slice(0, change_offset);
938 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
939 (Some(first_part), Some(remaining_part))
940 } else {
941 (None, Some(batch))
942 };
943
944 let mut result = std::mem::take(&mut self.buffer);
946 if let Some(first_part) = first_part {
947 result.batches.push(first_part);
948 }
949
950 if let Some(remaining_part) = remaining_part {
952 self.buffer.batches.push(remaining_part);
953 }
954
955 Some(result)
956 }
957
958 fn finish(&mut self) -> Option<FlatSeriesBatch> {
960 if self.buffer.batches.is_empty() {
961 None
962 } else {
963 Some(std::mem::take(&mut self.buffer))
964 }
965 }
966}
967
968fn primary_key_at<'a>(
970 primary_key: &PrimaryKeyArray,
971 primary_key_values: &'a BinaryArray,
972 index: usize,
973) -> &'a [u8] {
974 let key = primary_key.keys().value(index);
975 primary_key_values.value(key as usize)
976}
977
978#[cfg(test)]
979mod tests {
980 use std::sync::Arc;
981
982 use api::v1::OpType;
983 use datatypes::arrow::array::{
984 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
985 TimestampMillisecondArray, UInt8Array, UInt64Array,
986 };
987 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
988 use datatypes::arrow::record_batch::RecordBatch;
989
990 use super::*;
991
992 fn new_test_record_batch(
993 primary_keys: &[&[u8]],
994 timestamps: &[i64],
995 sequences: &[u64],
996 op_types: &[OpType],
997 fields: &[u64],
998 ) -> RecordBatch {
999 let num_rows = timestamps.len();
1000 debug_assert_eq!(sequences.len(), num_rows);
1001 debug_assert_eq!(op_types.len(), num_rows);
1002 debug_assert_eq!(fields.len(), num_rows);
1003 debug_assert_eq!(primary_keys.len(), num_rows);
1004
1005 let columns: Vec<ArrayRef> = vec![
1006 build_test_pk_string_dict_array(primary_keys),
1007 Arc::new(Int64Array::from_iter(
1008 fields.iter().map(|v| Some(*v as i64)),
1009 )),
1010 Arc::new(TimestampMillisecondArray::from_iter_values(
1011 timestamps.iter().copied(),
1012 )),
1013 build_test_pk_array(primary_keys),
1014 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
1015 Arc::new(UInt8Array::from_iter_values(
1016 op_types.iter().map(|v| *v as u8),
1017 )),
1018 ];
1019
1020 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
1021 }
1022
1023 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
1024 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
1025 for &pk in primary_keys {
1026 let pk_str = std::str::from_utf8(pk).unwrap();
1027 builder.append(pk_str).unwrap();
1028 }
1029 Arc::new(builder.finish())
1030 }
1031
1032 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1033 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1034 for &pk in primary_keys {
1035 builder.append(pk).unwrap();
1036 }
1037 Arc::new(builder.finish())
1038 }
1039
1040 fn build_test_flat_schema() -> SchemaRef {
1041 let fields = vec![
1042 Field::new(
1043 "k0",
1044 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1045 false,
1046 ),
1047 Field::new("field0", DataType::Int64, true),
1048 Field::new(
1049 "ts",
1050 DataType::Timestamp(TimeUnit::Millisecond, None),
1051 false,
1052 ),
1053 Field::new(
1054 "__primary_key",
1055 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1056 false,
1057 ),
1058 Field::new("__sequence", DataType::UInt64, false),
1059 Field::new("__op_type", DataType::UInt8, false),
1060 ];
1061 Arc::new(Schema::new(fields))
1062 }
1063
1064 #[test]
1065 fn test_empty_buffer_first_push() {
1066 let mut divider = FlatSeriesBatchDivider::default();
1067 let result = divider.finish();
1068 assert!(result.is_none());
1069
1070 let mut divider = FlatSeriesBatchDivider::default();
1071 let batch = new_test_record_batch(
1072 &[b"series1", b"series1"],
1073 &[1000, 2000],
1074 &[1, 2],
1075 &[OpType::Put, OpType::Put],
1076 &[10, 20],
1077 );
1078 let result = divider.push(batch);
1079 assert!(result.is_none());
1080 assert_eq!(divider.buffer.batches.len(), 1);
1081 }
1082
1083 #[test]
1084 fn test_same_series_accumulation() {
1085 let mut divider = FlatSeriesBatchDivider::default();
1086
1087 let batch1 = new_test_record_batch(
1088 &[b"series1", b"series1"],
1089 &[1000, 2000],
1090 &[1, 2],
1091 &[OpType::Put, OpType::Put],
1092 &[10, 20],
1093 );
1094
1095 let batch2 = new_test_record_batch(
1096 &[b"series1", b"series1"],
1097 &[3000, 4000],
1098 &[3, 4],
1099 &[OpType::Put, OpType::Put],
1100 &[30, 40],
1101 );
1102
1103 divider.push(batch1);
1104 let result = divider.push(batch2);
1105 assert!(result.is_none());
1106 let series_batch = divider.finish().unwrap();
1107 assert_eq!(series_batch.batches.len(), 2);
1108 }
1109
1110 #[test]
1111 fn test_series_boundary_detection() {
1112 let mut divider = FlatSeriesBatchDivider::default();
1113
1114 let batch1 = new_test_record_batch(
1115 &[b"series1", b"series1"],
1116 &[1000, 2000],
1117 &[1, 2],
1118 &[OpType::Put, OpType::Put],
1119 &[10, 20],
1120 );
1121
1122 let batch2 = new_test_record_batch(
1123 &[b"series2", b"series2"],
1124 &[3000, 4000],
1125 &[3, 4],
1126 &[OpType::Put, OpType::Put],
1127 &[30, 40],
1128 );
1129
1130 divider.push(batch1);
1131 let series_batch = divider.push(batch2).unwrap();
1132 assert_eq!(series_batch.batches.len(), 1);
1133
1134 assert_eq!(divider.buffer.batches.len(), 1);
1135 }
1136
1137 #[test]
1138 fn test_series_boundary_within_batch() {
1139 let mut divider = FlatSeriesBatchDivider::default();
1140
1141 let batch1 = new_test_record_batch(
1142 &[b"series1", b"series1"],
1143 &[1000, 2000],
1144 &[1, 2],
1145 &[OpType::Put, OpType::Put],
1146 &[10, 20],
1147 );
1148
1149 let batch2 = new_test_record_batch(
1150 &[b"series1", b"series2"],
1151 &[3000, 4000],
1152 &[3, 4],
1153 &[OpType::Put, OpType::Put],
1154 &[30, 40],
1155 );
1156
1157 divider.push(batch1);
1158 let series_batch = divider.push(batch2).unwrap();
1159 assert_eq!(series_batch.batches.len(), 2);
1160 assert_eq!(series_batch.batches[0].num_rows(), 2);
1161 assert_eq!(series_batch.batches[1].num_rows(), 1);
1162
1163 assert_eq!(divider.buffer.batches.len(), 1);
1164 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1165 }
1166
1167 #[test]
1168 fn test_series_splitting() {
1169 let mut divider = FlatSeriesBatchDivider::default();
1170
1171 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1172
1173 let batch2 = new_test_record_batch(
1174 &[b"series1", b"series2", b"series2", b"series3"],
1175 &[2000, 3000, 4000, 5000],
1176 &[2, 3, 4, 5],
1177 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1178 &[20, 30, 40, 50],
1179 );
1180
1181 divider.push(batch1);
1182 let series_batch = divider.push(batch2).unwrap();
1183 assert_eq!(series_batch.batches.len(), 2);
1184
1185 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1186 assert_eq!(total_rows, 2);
1187
1188 let final_batch = divider.finish().unwrap();
1189 assert_eq!(final_batch.batches.len(), 1);
1190 assert_eq!(final_batch.batches[0].num_rows(), 3);
1191 }
1192}