1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::arrow::array::BinaryArray;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::schema::SchemaRef;
31use futures::{StreamExt, TryStreamExt};
32use smallvec::{SmallVec, smallvec};
33use snafu::{OptionExt, ResultExt, ensure};
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_engine::{
36 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
37};
38use tokio::sync::Semaphore;
39use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
40use tokio::sync::mpsc::{self, Receiver, Sender};
41
42use crate::error::{
43 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
44 ScanSeriesSnafu, TooManyFilesToReadSnafu,
45};
46use crate::read::pruner::{PartitionPruner, Pruner};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
49use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{Batch, ScannerMetrics};
52use crate::sst::parquet::flat_format::primary_key_column_index;
53use crate::sst::parquet::format::PrimaryKeyArray;
54
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
57
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
60
61pub struct SeriesScan {
67 properties: ScannerProperties,
69 stream_ctx: Arc<StreamContext>,
71 pruner: Arc<Pruner>,
73 receivers: Mutex<ReceiverList>,
75 metrics_list: Arc<PartitionMetricsList>,
78}
79
80impl SeriesScan {
81 pub(crate) fn new(input: ScanInput) -> Self {
83 let mut properties = ScannerProperties::default()
84 .with_append_mode(input.append_mode)
85 .with_total_rows(input.total_rows());
86 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
87 properties.partitions = vec![stream_ctx.partition_ranges()];
88
89 let num_workers = common_stat::get_total_cpu_cores().max(1);
91 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
92
93 Self {
94 properties,
95 stream_ctx,
96 pruner,
97 receivers: Mutex::new(Vec::new()),
98 metrics_list: Arc::new(PartitionMetricsList::default()),
99 }
100 }
101
102 #[tracing::instrument(
103 skip_all,
104 fields(
105 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
106 partition = partition
107 )
108 )]
109 fn scan_partition_impl(
110 &self,
111 ctx: &QueryScanContext,
112 metrics_set: &ExecutionPlanMetricsSet,
113 partition: usize,
114 ) -> Result<SendableRecordBatchStream> {
115 let metrics = new_partition_metrics(
116 &self.stream_ctx,
117 ctx.explain_verbose,
118 metrics_set,
119 partition,
120 &self.metrics_list,
121 );
122
123 let batch_stream =
124 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
125
126 let input = &self.stream_ctx.input;
127 let record_batch_stream = ConvertBatchStream::new(
128 batch_stream,
129 input.mapper.clone(),
130 input.cache_strategy.clone(),
131 metrics,
132 );
133
134 Ok(Box::pin(RecordBatchStreamWrapper::new(
135 input.mapper.output_schema(),
136 Box::pin(record_batch_stream),
137 )))
138 }
139
140 #[tracing::instrument(
141 skip_all,
142 fields(
143 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
144 partition = partition
145 )
146 )]
147 fn scan_batch_in_partition(
148 &self,
149 ctx: &QueryScanContext,
150 partition: usize,
151 part_metrics: PartitionMetrics,
152 metrics_set: &ExecutionPlanMetricsSet,
153 ) -> Result<ScanBatchStream> {
154 if ctx.explain_verbose {
155 common_telemetry::info!(
156 "SeriesScan partition {}, region_id: {}",
157 partition,
158 self.stream_ctx.input.region_metadata().region_id
159 );
160 }
161
162 ensure!(
163 partition < self.properties.num_partitions(),
164 PartitionOutOfRangeSnafu {
165 given: partition,
166 all: self.properties.num_partitions(),
167 }
168 );
169
170 self.maybe_start_distributor(metrics_set, &self.metrics_list);
171
172 let mut receiver = self.take_receiver(partition)?;
173 let stream = try_stream! {
174 part_metrics.on_first_poll();
175
176 let mut fetch_start = Instant::now();
177 while let Some(series) = receiver.recv().await {
178 let series = series?;
179
180 let mut metrics = ScannerMetrics::default();
181 metrics.scan_cost += fetch_start.elapsed();
182 fetch_start = Instant::now();
183
184 metrics.num_batches += series.num_batches();
185 metrics.num_rows += series.num_rows();
186
187 let yield_start = Instant::now();
188 yield ScanBatch::Series(series);
189 metrics.yield_cost += yield_start.elapsed();
190
191 part_metrics.merge_metrics(&metrics);
192 }
193
194 part_metrics.on_finish();
195 };
196 Ok(Box::pin(stream))
197 }
198
199 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
201 let mut rx_list = self.receivers.lock().unwrap();
202 rx_list[partition]
203 .take()
204 .context(ScanMultiTimesSnafu { partition })
205 }
206
207 #[tracing::instrument(
209 skip(self, metrics_set, metrics_list),
210 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
211 )]
212 fn maybe_start_distributor(
213 &self,
214 metrics_set: &ExecutionPlanMetricsSet,
215 metrics_list: &Arc<PartitionMetricsList>,
216 ) {
217 let mut rx_list = self.receivers.lock().unwrap();
218 if !rx_list.is_empty() {
219 return;
220 }
221
222 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
223 let mut distributor = SeriesDistributor {
224 stream_ctx: self.stream_ctx.clone(),
225 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
226 partitions: self.properties.partitions.clone(),
227 pruner: self.pruner.clone(),
228 senders,
229 metrics_set: metrics_set.clone(),
230 metrics_list: metrics_list.clone(),
231 };
232 let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
233 let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
234 common_runtime::spawn_global(
235 async move {
236 distributor.execute().await;
237 }
238 .instrument(span),
239 );
240
241 *rx_list = receivers;
242 }
243
244 #[tracing::instrument(
246 skip_all,
247 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
248 )]
249 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
250 let part_num = self.properties.num_partitions();
251 let metrics_set = ExecutionPlanMetricsSet::default();
252 let streams = (0..part_num)
253 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
254 .collect::<Result<Vec<_>, BoxedError>>()?;
255 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
256 Ok(Box::pin(chained_stream))
257 }
258
259 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
261 let metrics_set = ExecutionPlanMetricsSet::new();
262
263 let streams = (0..self.properties.partitions.len())
264 .map(|partition| {
265 let metrics = new_partition_metrics(
266 &self.stream_ctx,
267 false,
268 &metrics_set,
269 partition,
270 &self.metrics_list,
271 );
272
273 self.scan_batch_in_partition(
274 &QueryScanContext::default(),
275 partition,
276 metrics,
277 &metrics_set,
278 )
279 })
280 .collect::<Result<Vec<_>>>()?;
281
282 Ok(Box::pin(futures::stream::iter(streams).flatten()))
283 }
284
285 pub(crate) fn check_scan_limit(&self) -> Result<()> {
287 let total_files: usize = self
289 .properties
290 .partitions
291 .iter()
292 .flat_map(|partition| partition.iter())
293 .map(|part_range| {
294 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
295 range_meta.indices.len()
296 })
297 .sum();
298
299 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
300 if total_files > max_concurrent_files {
301 return TooManyFilesToReadSnafu {
302 actual: total_files,
303 max: max_concurrent_files,
304 }
305 .fail();
306 }
307
308 Ok(())
309 }
310}
311
312fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
313 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
314 .map(|_| {
315 let (sender, receiver) = mpsc::channel(1);
316 (Some(sender), Some(receiver))
317 })
318 .unzip();
319 (SenderList::new(senders), receivers)
320}
321
322impl RegionScanner for SeriesScan {
323 fn name(&self) -> &str {
324 "SeriesScan"
325 }
326
327 fn properties(&self) -> &ScannerProperties {
328 &self.properties
329 }
330
331 fn schema(&self) -> SchemaRef {
332 self.stream_ctx.input.mapper.output_schema()
333 }
334
335 fn metadata(&self) -> RegionMetadataRef {
336 self.stream_ctx.input.mapper.metadata().clone()
337 }
338
339 fn scan_partition(
340 &self,
341 ctx: &QueryScanContext,
342 metrics_set: &ExecutionPlanMetricsSet,
343 partition: usize,
344 ) -> Result<SendableRecordBatchStream, BoxedError> {
345 self.scan_partition_impl(ctx, metrics_set, partition)
346 .map_err(BoxedError::new)
347 }
348
349 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
350 self.properties.prepare(request);
351
352 self.check_scan_limit().map_err(BoxedError::new)?;
353
354 Ok(())
355 }
356
357 fn has_predicate_without_region(&self) -> bool {
358 let predicate = self
359 .stream_ctx
360 .input
361 .predicate_group()
362 .predicate_without_region();
363 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
364 }
365
366 fn set_logical_region(&mut self, logical_region: bool) {
367 self.properties.set_logical_region(logical_region);
368 }
369}
370
371impl DisplayAs for SeriesScan {
372 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
373 write!(
374 f,
375 "SeriesScan: region={}, ",
376 self.stream_ctx.input.mapper.metadata().region_id
377 )?;
378 match t {
379 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
380 self.stream_ctx.format_for_explain(false, f)
381 }
382 DisplayFormatType::Verbose => {
383 self.stream_ctx.format_for_explain(true, f)?;
384 self.metrics_list.format_verbose_metrics(f)
385 }
386 }
387 }
388}
389
390impl fmt::Debug for SeriesScan {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 f.debug_struct("SeriesScan")
393 .field("num_ranges", &self.stream_ctx.ranges.len())
394 .finish()
395 }
396}
397
398#[cfg(test)]
399impl SeriesScan {
400 pub(crate) fn input(&self) -> &ScanInput {
402 &self.stream_ctx.input
403 }
404}
405
406struct SeriesDistributor {
408 stream_ctx: Arc<StreamContext>,
410 semaphore: Option<Arc<Semaphore>>,
412 partitions: Vec<Vec<PartitionRange>>,
414 pruner: Arc<Pruner>,
416 senders: SenderList,
418 metrics_set: ExecutionPlanMetricsSet,
424 metrics_list: Arc<PartitionMetricsList>,
425}
426
427impl SeriesDistributor {
428 #[tracing::instrument(
430 skip_all,
431 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
432 )]
433 async fn execute(&mut self) {
434 let result = if self.stream_ctx.input.flat_format {
435 self.scan_partitions_flat().await
436 } else {
437 self.scan_partitions().await
438 };
439
440 if let Err(e) = result {
441 self.senders.send_error(e).await;
442 }
443 }
444
445 #[tracing::instrument(
447 skip_all,
448 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
449 )]
450 async fn scan_partitions_flat(&mut self) -> Result<()> {
451 for partition_ranges in &self.partitions {
453 self.pruner.add_partition_ranges(partition_ranges);
454 }
455
456 let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
458 let partition_pruner = Arc::new(PartitionPruner::new(
459 self.pruner.clone(),
460 &all_partition_ranges,
461 ));
462
463 let part_metrics = new_partition_metrics(
464 &self.stream_ctx,
465 false,
466 &self.metrics_set,
467 self.partitions.len(),
468 &self.metrics_list,
469 );
470 part_metrics.on_first_poll();
471 let mut fetch_start = Instant::now();
474
475 let mut sources = Vec::with_capacity(self.partitions.len());
477 for partition in &self.partitions {
478 sources.reserve(partition.len());
479 for part_range in partition {
480 build_flat_sources(
481 &self.stream_ctx,
482 part_range,
483 false,
484 &part_metrics,
485 partition_pruner.clone(),
486 &mut sources,
487 self.semaphore.clone(),
488 )
489 .await?;
490 }
491 }
492
493 let mut reader = SeqScan::build_flat_reader_from_sources(
495 &self.stream_ctx,
496 sources,
497 self.semaphore.clone(),
498 Some(&part_metrics),
499 )
500 .await?;
501 let mut metrics = SeriesDistributorMetrics::default();
502
503 let mut divider = FlatSeriesBatchDivider::default();
504 while let Some(record_batch) = reader.try_next().await? {
505 metrics.scan_cost += fetch_start.elapsed();
506 metrics.num_batches += 1;
507 metrics.num_rows += record_batch.num_rows();
508
509 debug_assert!(record_batch.num_rows() > 0);
510 if record_batch.num_rows() == 0 {
511 fetch_start = Instant::now();
512 continue;
513 }
514
515 let divider_start = Instant::now();
517 let series_batch = divider.push(record_batch);
518 metrics.divider_cost += divider_start.elapsed();
519 if let Some(series_batch) = series_batch {
520 let yield_start = Instant::now();
521 self.senders
522 .send_batch(SeriesBatch::Flat(series_batch))
523 .await?;
524 metrics.yield_cost += yield_start.elapsed();
525 }
526 fetch_start = Instant::now();
527 }
528
529 let divider_start = Instant::now();
531 let series_batch = divider.finish();
532 metrics.divider_cost += divider_start.elapsed();
533 if let Some(series_batch) = series_batch {
534 let yield_start = Instant::now();
535 self.senders
536 .send_batch(SeriesBatch::Flat(series_batch))
537 .await?;
538 metrics.yield_cost += yield_start.elapsed();
539 }
540
541 metrics.scan_cost += fetch_start.elapsed();
542 metrics.num_series_send_timeout = self.senders.num_timeout;
543 metrics.num_series_send_full = self.senders.num_full;
544 part_metrics.set_distributor_metrics(&metrics);
545
546 part_metrics.on_finish();
547
548 Ok(())
549 }
550
551 #[tracing::instrument(
553 skip_all,
554 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
555 )]
556 async fn scan_partitions(&mut self) -> Result<()> {
557 for partition_ranges in &self.partitions {
559 self.pruner.add_partition_ranges(partition_ranges);
560 }
561
562 let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
564 let partition_pruner = Arc::new(PartitionPruner::new(
565 self.pruner.clone(),
566 &all_partition_ranges,
567 ));
568
569 let part_metrics = new_partition_metrics(
570 &self.stream_ctx,
571 false,
572 &self.metrics_set,
573 self.partitions.len(),
574 &self.metrics_list,
575 );
576 part_metrics.on_first_poll();
577 let mut fetch_start = Instant::now();
580
581 let mut sources = Vec::with_capacity(self.partitions.len());
583 for partition in &self.partitions {
584 sources.reserve(partition.len());
585 for part_range in partition {
586 build_sources(
587 &self.stream_ctx,
588 part_range,
589 false,
590 &part_metrics,
591 partition_pruner.clone(),
592 &mut sources,
593 self.semaphore.clone(),
594 )
595 .await?;
596 }
597 }
598
599 let mut reader = SeqScan::build_reader_from_sources(
601 &self.stream_ctx,
602 sources,
603 self.semaphore.clone(),
604 Some(&part_metrics),
605 )
606 .await?;
607 let mut metrics = SeriesDistributorMetrics::default();
608
609 let mut current_series = PrimaryKeySeriesBatch::default();
610 while let Some(batch) = reader.next_batch().await? {
611 metrics.scan_cost += fetch_start.elapsed();
612 metrics.num_batches += 1;
613 metrics.num_rows += batch.num_rows();
614
615 debug_assert!(!batch.is_empty());
616 if batch.is_empty() {
617 fetch_start = Instant::now();
618 continue;
619 }
620
621 let Some(last_key) = current_series.current_key() else {
622 current_series.push(batch);
623 fetch_start = Instant::now();
624 continue;
625 };
626
627 if last_key == batch.primary_key() {
628 current_series.push(batch);
629 fetch_start = Instant::now();
630 continue;
631 }
632
633 let to_send =
635 std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
636 let yield_start = Instant::now();
637 self.senders
638 .send_batch(SeriesBatch::PrimaryKey(to_send))
639 .await?;
640 metrics.yield_cost += yield_start.elapsed();
641 fetch_start = Instant::now();
642 }
643
644 if !current_series.is_empty() {
645 let yield_start = Instant::now();
646 self.senders
647 .send_batch(SeriesBatch::PrimaryKey(current_series))
648 .await?;
649 metrics.yield_cost += yield_start.elapsed();
650 }
651
652 metrics.scan_cost += fetch_start.elapsed();
653 metrics.num_series_send_timeout = self.senders.num_timeout;
654 metrics.num_series_send_full = self.senders.num_full;
655 part_metrics.set_distributor_metrics(&metrics);
656
657 part_metrics.on_finish();
658
659 Ok(())
660 }
661}
662
663#[derive(Default, Debug)]
665pub struct PrimaryKeySeriesBatch {
666 pub batches: SmallVec<[Batch; 4]>,
667}
668
669impl PrimaryKeySeriesBatch {
670 fn single(batch: Batch) -> Self {
672 Self {
673 batches: smallvec![batch],
674 }
675 }
676
677 fn current_key(&self) -> Option<&[u8]> {
678 self.batches.first().map(|batch| batch.primary_key())
679 }
680
681 fn push(&mut self, batch: Batch) {
682 self.batches.push(batch);
683 }
684
685 fn is_empty(&self) -> bool {
687 self.batches.is_empty()
688 }
689}
690
691#[derive(Debug)]
693pub enum SeriesBatch {
694 PrimaryKey(PrimaryKeySeriesBatch),
695 Flat(FlatSeriesBatch),
696}
697
698impl SeriesBatch {
699 pub fn num_batches(&self) -> usize {
701 match self {
702 SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
703 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
704 }
705 }
706
707 pub fn num_rows(&self) -> usize {
709 match self {
710 SeriesBatch::PrimaryKey(primary_key_batch) => {
711 primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
712 }
713 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
714 }
715 }
716}
717
718#[derive(Default, Debug)]
720pub struct FlatSeriesBatch {
721 pub batches: SmallVec<[RecordBatch; 4]>,
722}
723
724struct SenderList {
726 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
727 num_nones: usize,
729 sender_idx: usize,
731 num_timeout: usize,
733 num_full: usize,
735}
736
737impl SenderList {
738 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
739 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
740 Self {
741 senders,
742 num_nones,
743 sender_idx: 0,
744 num_timeout: 0,
745 num_full: 0,
746 }
747 }
748
749 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
752 for _ in 0..self.senders.len() {
753 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
754
755 let sender_idx = self.fetch_add_sender_idx();
756 let Some(sender) = &self.senders[sender_idx] else {
757 continue;
758 };
759
760 match sender.try_send(Ok(batch)) {
761 Ok(()) => return Ok(None),
762 Err(TrySendError::Full(res)) => {
763 self.num_full += 1;
764 batch = res.unwrap();
766 }
767 Err(TrySendError::Closed(res)) => {
768 self.senders[sender_idx] = None;
769 self.num_nones += 1;
770 batch = res.unwrap();
772 }
773 }
774 }
775
776 Ok(Some(batch))
777 }
778
779 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
781 match self.try_send_batch(batch)? {
783 Some(b) => {
784 batch = b;
786 }
787 None => {
788 return Ok(());
789 }
790 }
791
792 loop {
793 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
794
795 let sender_idx = self.fetch_add_sender_idx();
796 let Some(sender) = &self.senders[sender_idx] else {
797 continue;
798 };
799 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
804 Ok(()) => break,
805 Err(SendTimeoutError::Timeout(res)) => {
806 self.num_timeout += 1;
807 batch = res.unwrap();
809 }
810 Err(SendTimeoutError::Closed(res)) => {
811 self.senders[sender_idx] = None;
812 self.num_nones += 1;
813 batch = res.unwrap();
815 }
816 }
817 }
818
819 Ok(())
820 }
821
822 async fn send_error(&self, error: Error) {
823 let error = Arc::new(error);
824 for sender in self.senders.iter().flatten() {
825 let result = Err(error.clone()).context(ScanSeriesSnafu);
826 let _ = sender.send(result).await;
827 }
828 }
829
830 fn fetch_add_sender_idx(&mut self) -> usize {
831 let sender_idx = self.sender_idx;
832 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
833 sender_idx
834 }
835}
836
837fn new_partition_metrics(
838 stream_ctx: &StreamContext,
839 explain_verbose: bool,
840 metrics_set: &ExecutionPlanMetricsSet,
841 partition: usize,
842 metrics_list: &PartitionMetricsList,
843) -> PartitionMetrics {
844 let metrics = PartitionMetrics::new(
845 stream_ctx.input.mapper.metadata().region_id,
846 partition,
847 "SeriesScan",
848 stream_ctx.query_start,
849 explain_verbose,
850 metrics_set,
851 );
852
853 metrics_list.set(partition, metrics.clone());
854 metrics
855}
856
857#[derive(Default)]
862struct FlatSeriesBatchDivider {
863 buffer: FlatSeriesBatch,
864}
865
866impl FlatSeriesBatchDivider {
867 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
871 if self.buffer.batches.is_empty() {
873 self.buffer.batches.push(batch);
874 return None;
875 }
876
877 let pk_column_idx = primary_key_column_index(batch.num_columns());
879 let batch_pk_column = batch.column(pk_column_idx);
880 let batch_pk_array = batch_pk_column
881 .as_any()
882 .downcast_ref::<PrimaryKeyArray>()
883 .unwrap();
884 let batch_pk_values = batch_pk_array
885 .values()
886 .as_any()
887 .downcast_ref::<BinaryArray>()
888 .unwrap();
889 let batch_last_pk =
891 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
892 let buffer_last_batch = self.buffer.batches.last().unwrap();
895 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
896 let buffer_pk_array = buffer_pk_column
897 .as_any()
898 .downcast_ref::<PrimaryKeyArray>()
899 .unwrap();
900 let buffer_pk_values = buffer_pk_array
901 .values()
902 .as_any()
903 .downcast_ref::<BinaryArray>()
904 .unwrap();
905 let buffer_last_pk =
906 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
907
908 if batch_last_pk == buffer_last_pk {
910 self.buffer.batches.push(batch);
911 return None;
912 }
913 let batch_pk_keys = batch_pk_array.keys();
916 let pk_indices = batch_pk_keys.values();
917 let mut change_offset = 0;
918 for (i, &key) in pk_indices.iter().enumerate() {
919 let batch_pk = batch_pk_values.value(key as usize);
920
921 if buffer_last_pk != batch_pk {
922 change_offset = i;
923 break;
924 }
925 }
926
927 let (first_part, remaining_part) = if change_offset > 0 {
929 let first_part = batch.slice(0, change_offset);
930 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
931 (Some(first_part), Some(remaining_part))
932 } else {
933 (None, Some(batch))
934 };
935
936 let mut result = std::mem::take(&mut self.buffer);
938 if let Some(first_part) = first_part {
939 result.batches.push(first_part);
940 }
941
942 if let Some(remaining_part) = remaining_part {
944 self.buffer.batches.push(remaining_part);
945 }
946
947 Some(result)
948 }
949
950 fn finish(&mut self) -> Option<FlatSeriesBatch> {
952 if self.buffer.batches.is_empty() {
953 None
954 } else {
955 Some(std::mem::take(&mut self.buffer))
956 }
957 }
958}
959
960fn primary_key_at<'a>(
962 primary_key: &PrimaryKeyArray,
963 primary_key_values: &'a BinaryArray,
964 index: usize,
965) -> &'a [u8] {
966 let key = primary_key.keys().value(index);
967 primary_key_values.value(key as usize)
968}
969
970#[cfg(test)]
971mod tests {
972 use std::sync::Arc;
973
974 use api::v1::OpType;
975 use datatypes::arrow::array::{
976 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
977 TimestampMillisecondArray, UInt8Array, UInt64Array,
978 };
979 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
980 use datatypes::arrow::record_batch::RecordBatch;
981
982 use super::*;
983
984 fn new_test_record_batch(
985 primary_keys: &[&[u8]],
986 timestamps: &[i64],
987 sequences: &[u64],
988 op_types: &[OpType],
989 fields: &[u64],
990 ) -> RecordBatch {
991 let num_rows = timestamps.len();
992 debug_assert_eq!(sequences.len(), num_rows);
993 debug_assert_eq!(op_types.len(), num_rows);
994 debug_assert_eq!(fields.len(), num_rows);
995 debug_assert_eq!(primary_keys.len(), num_rows);
996
997 let columns: Vec<ArrayRef> = vec![
998 build_test_pk_string_dict_array(primary_keys),
999 Arc::new(Int64Array::from_iter(
1000 fields.iter().map(|v| Some(*v as i64)),
1001 )),
1002 Arc::new(TimestampMillisecondArray::from_iter_values(
1003 timestamps.iter().copied(),
1004 )),
1005 build_test_pk_array(primary_keys),
1006 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
1007 Arc::new(UInt8Array::from_iter_values(
1008 op_types.iter().map(|v| *v as u8),
1009 )),
1010 ];
1011
1012 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
1013 }
1014
1015 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
1016 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
1017 for &pk in primary_keys {
1018 let pk_str = std::str::from_utf8(pk).unwrap();
1019 builder.append(pk_str).unwrap();
1020 }
1021 Arc::new(builder.finish())
1022 }
1023
1024 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1025 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1026 for &pk in primary_keys {
1027 builder.append(pk).unwrap();
1028 }
1029 Arc::new(builder.finish())
1030 }
1031
1032 fn build_test_flat_schema() -> SchemaRef {
1033 let fields = vec![
1034 Field::new(
1035 "k0",
1036 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1037 false,
1038 ),
1039 Field::new("field0", DataType::Int64, true),
1040 Field::new(
1041 "ts",
1042 DataType::Timestamp(TimeUnit::Millisecond, None),
1043 false,
1044 ),
1045 Field::new(
1046 "__primary_key",
1047 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1048 false,
1049 ),
1050 Field::new("__sequence", DataType::UInt64, false),
1051 Field::new("__op_type", DataType::UInt8, false),
1052 ];
1053 Arc::new(Schema::new(fields))
1054 }
1055
1056 #[test]
1057 fn test_empty_buffer_first_push() {
1058 let mut divider = FlatSeriesBatchDivider::default();
1059 let result = divider.finish();
1060 assert!(result.is_none());
1061
1062 let mut divider = FlatSeriesBatchDivider::default();
1063 let batch = new_test_record_batch(
1064 &[b"series1", b"series1"],
1065 &[1000, 2000],
1066 &[1, 2],
1067 &[OpType::Put, OpType::Put],
1068 &[10, 20],
1069 );
1070 let result = divider.push(batch);
1071 assert!(result.is_none());
1072 assert_eq!(divider.buffer.batches.len(), 1);
1073 }
1074
1075 #[test]
1076 fn test_same_series_accumulation() {
1077 let mut divider = FlatSeriesBatchDivider::default();
1078
1079 let batch1 = new_test_record_batch(
1080 &[b"series1", b"series1"],
1081 &[1000, 2000],
1082 &[1, 2],
1083 &[OpType::Put, OpType::Put],
1084 &[10, 20],
1085 );
1086
1087 let batch2 = new_test_record_batch(
1088 &[b"series1", b"series1"],
1089 &[3000, 4000],
1090 &[3, 4],
1091 &[OpType::Put, OpType::Put],
1092 &[30, 40],
1093 );
1094
1095 divider.push(batch1);
1096 let result = divider.push(batch2);
1097 assert!(result.is_none());
1098 let series_batch = divider.finish().unwrap();
1099 assert_eq!(series_batch.batches.len(), 2);
1100 }
1101
1102 #[test]
1103 fn test_series_boundary_detection() {
1104 let mut divider = FlatSeriesBatchDivider::default();
1105
1106 let batch1 = new_test_record_batch(
1107 &[b"series1", b"series1"],
1108 &[1000, 2000],
1109 &[1, 2],
1110 &[OpType::Put, OpType::Put],
1111 &[10, 20],
1112 );
1113
1114 let batch2 = new_test_record_batch(
1115 &[b"series2", b"series2"],
1116 &[3000, 4000],
1117 &[3, 4],
1118 &[OpType::Put, OpType::Put],
1119 &[30, 40],
1120 );
1121
1122 divider.push(batch1);
1123 let series_batch = divider.push(batch2).unwrap();
1124 assert_eq!(series_batch.batches.len(), 1);
1125
1126 assert_eq!(divider.buffer.batches.len(), 1);
1127 }
1128
1129 #[test]
1130 fn test_series_boundary_within_batch() {
1131 let mut divider = FlatSeriesBatchDivider::default();
1132
1133 let batch1 = new_test_record_batch(
1134 &[b"series1", b"series1"],
1135 &[1000, 2000],
1136 &[1, 2],
1137 &[OpType::Put, OpType::Put],
1138 &[10, 20],
1139 );
1140
1141 let batch2 = new_test_record_batch(
1142 &[b"series1", b"series2"],
1143 &[3000, 4000],
1144 &[3, 4],
1145 &[OpType::Put, OpType::Put],
1146 &[30, 40],
1147 );
1148
1149 divider.push(batch1);
1150 let series_batch = divider.push(batch2).unwrap();
1151 assert_eq!(series_batch.batches.len(), 2);
1152 assert_eq!(series_batch.batches[0].num_rows(), 2);
1153 assert_eq!(series_batch.batches[1].num_rows(), 1);
1154
1155 assert_eq!(divider.buffer.batches.len(), 1);
1156 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1157 }
1158
1159 #[test]
1160 fn test_series_splitting() {
1161 let mut divider = FlatSeriesBatchDivider::default();
1162
1163 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1164
1165 let batch2 = new_test_record_batch(
1166 &[b"series1", b"series2", b"series2", b"series3"],
1167 &[2000, 3000, 4000, 5000],
1168 &[2, 3, 4, 5],
1169 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1170 &[20, 30, 40, 50],
1171 );
1172
1173 divider.push(batch1);
1174 let series_batch = divider.push(batch2).unwrap();
1175 assert_eq!(series_batch.batches.len(), 2);
1176
1177 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1178 assert_eq!(total_rows, 2);
1179
1180 let final_batch = divider.finish().unwrap();
1181 assert_eq!(final_batch.batches.len(), 1);
1182 assert_eq!(final_batch.batches[0].num_rows(), 3);
1183 }
1184}