1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::arrow::array::BinaryArray;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::schema::SchemaRef;
31use futures::{StreamExt, TryStreamExt};
32use smallvec::{SmallVec, smallvec};
33use snafu::{OptionExt, ResultExt, ensure};
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_engine::{
36 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
37};
38use tokio::sync::Semaphore;
39use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
40use tokio::sync::mpsc::{self, Receiver, Sender};
41
42use crate::error::{
43 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
44 ScanSeriesSnafu, TooManyFilesToReadSnafu,
45};
46use crate::read::range::{RangeBuilderList, file_range_counts};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
49use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{Batch, ScannerMetrics};
52use crate::sst::parquet::flat_format::primary_key_column_index;
53use crate::sst::parquet::format::PrimaryKeyArray;
54
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
57
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
60
61pub struct SeriesScan {
67 properties: ScannerProperties,
69 stream_ctx: Arc<StreamContext>,
71 receivers: Mutex<ReceiverList>,
73 metrics_list: Arc<PartitionMetricsList>,
76}
77
78impl SeriesScan {
79 pub(crate) fn new(input: ScanInput) -> Self {
81 let mut properties = ScannerProperties::default()
82 .with_append_mode(input.append_mode)
83 .with_total_rows(input.total_rows());
84 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
85 properties.partitions = vec![stream_ctx.partition_ranges()];
86
87 Self {
88 properties,
89 stream_ctx,
90 receivers: Mutex::new(Vec::new()),
91 metrics_list: Arc::new(PartitionMetricsList::default()),
92 }
93 }
94
95 #[tracing::instrument(
96 skip_all,
97 fields(
98 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
99 partition = partition
100 )
101 )]
102 fn scan_partition_impl(
103 &self,
104 ctx: &QueryScanContext,
105 metrics_set: &ExecutionPlanMetricsSet,
106 partition: usize,
107 ) -> Result<SendableRecordBatchStream> {
108 let metrics = new_partition_metrics(
109 &self.stream_ctx,
110 ctx.explain_verbose,
111 metrics_set,
112 partition,
113 &self.metrics_list,
114 );
115
116 let batch_stream =
117 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
118
119 let input = &self.stream_ctx.input;
120 let record_batch_stream = ConvertBatchStream::new(
121 batch_stream,
122 input.mapper.clone(),
123 input.cache_strategy.clone(),
124 metrics,
125 );
126
127 Ok(Box::pin(RecordBatchStreamWrapper::new(
128 input.mapper.output_schema(),
129 Box::pin(record_batch_stream),
130 )))
131 }
132
133 #[tracing::instrument(
134 skip_all,
135 fields(
136 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
137 partition = partition
138 )
139 )]
140 fn scan_batch_in_partition(
141 &self,
142 ctx: &QueryScanContext,
143 partition: usize,
144 part_metrics: PartitionMetrics,
145 metrics_set: &ExecutionPlanMetricsSet,
146 ) -> Result<ScanBatchStream> {
147 if ctx.explain_verbose {
148 common_telemetry::info!(
149 "SeriesScan partition {}, region_id: {}",
150 partition,
151 self.stream_ctx.input.region_metadata().region_id
152 );
153 }
154
155 ensure!(
156 partition < self.properties.num_partitions(),
157 PartitionOutOfRangeSnafu {
158 given: partition,
159 all: self.properties.num_partitions(),
160 }
161 );
162
163 self.maybe_start_distributor(metrics_set, &self.metrics_list);
164
165 let mut receiver = self.take_receiver(partition)?;
166 let stream = try_stream! {
167 part_metrics.on_first_poll();
168
169 let mut fetch_start = Instant::now();
170 while let Some(series) = receiver.recv().await {
171 let series = series?;
172
173 let mut metrics = ScannerMetrics::default();
174 metrics.scan_cost += fetch_start.elapsed();
175 fetch_start = Instant::now();
176
177 metrics.num_batches += series.num_batches();
178 metrics.num_rows += series.num_rows();
179
180 let yield_start = Instant::now();
181 yield ScanBatch::Series(series);
182 metrics.yield_cost += yield_start.elapsed();
183
184 part_metrics.merge_metrics(&metrics);
185 }
186
187 part_metrics.on_finish();
188 };
189 Ok(Box::pin(stream))
190 }
191
192 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
194 let mut rx_list = self.receivers.lock().unwrap();
195 rx_list[partition]
196 .take()
197 .context(ScanMultiTimesSnafu { partition })
198 }
199
200 #[tracing::instrument(
202 skip(self, metrics_set, metrics_list),
203 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
204 )]
205 fn maybe_start_distributor(
206 &self,
207 metrics_set: &ExecutionPlanMetricsSet,
208 metrics_list: &Arc<PartitionMetricsList>,
209 ) {
210 let mut rx_list = self.receivers.lock().unwrap();
211 if !rx_list.is_empty() {
212 return;
213 }
214
215 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
216 let mut distributor = SeriesDistributor {
217 stream_ctx: self.stream_ctx.clone(),
218 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
219 partitions: self.properties.partitions.clone(),
220 senders,
221 metrics_set: metrics_set.clone(),
222 metrics_list: metrics_list.clone(),
223 };
224 let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
225 let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
226 common_runtime::spawn_global(
227 async move {
228 distributor.execute().await;
229 }
230 .instrument(span),
231 );
232
233 *rx_list = receivers;
234 }
235
236 #[tracing::instrument(
238 skip_all,
239 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
240 )]
241 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
242 let part_num = self.properties.num_partitions();
243 let metrics_set = ExecutionPlanMetricsSet::default();
244 let streams = (0..part_num)
245 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
246 .collect::<Result<Vec<_>, BoxedError>>()?;
247 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
248 Ok(Box::pin(chained_stream))
249 }
250
251 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
253 let metrics_set = ExecutionPlanMetricsSet::new();
254
255 let streams = (0..self.properties.partitions.len())
256 .map(|partition| {
257 let metrics = new_partition_metrics(
258 &self.stream_ctx,
259 false,
260 &metrics_set,
261 partition,
262 &self.metrics_list,
263 );
264
265 self.scan_batch_in_partition(
266 &QueryScanContext::default(),
267 partition,
268 metrics,
269 &metrics_set,
270 )
271 })
272 .collect::<Result<Vec<_>>>()?;
273
274 Ok(Box::pin(futures::stream::iter(streams).flatten()))
275 }
276
277 pub(crate) fn check_scan_limit(&self) -> Result<()> {
279 let total_files: usize = self
281 .properties
282 .partitions
283 .iter()
284 .flat_map(|partition| partition.iter())
285 .map(|part_range| {
286 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
287 range_meta.indices.len()
288 })
289 .sum();
290
291 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
292 if total_files > max_concurrent_files {
293 return TooManyFilesToReadSnafu {
294 actual: total_files,
295 max: max_concurrent_files,
296 }
297 .fail();
298 }
299
300 Ok(())
301 }
302}
303
304fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
305 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
306 .map(|_| {
307 let (sender, receiver) = mpsc::channel(1);
308 (Some(sender), Some(receiver))
309 })
310 .unzip();
311 (SenderList::new(senders), receivers)
312}
313
314impl RegionScanner for SeriesScan {
315 fn name(&self) -> &str {
316 "SeriesScan"
317 }
318
319 fn properties(&self) -> &ScannerProperties {
320 &self.properties
321 }
322
323 fn schema(&self) -> SchemaRef {
324 self.stream_ctx.input.mapper.output_schema()
325 }
326
327 fn metadata(&self) -> RegionMetadataRef {
328 self.stream_ctx.input.mapper.metadata().clone()
329 }
330
331 fn scan_partition(
332 &self,
333 ctx: &QueryScanContext,
334 metrics_set: &ExecutionPlanMetricsSet,
335 partition: usize,
336 ) -> Result<SendableRecordBatchStream, BoxedError> {
337 self.scan_partition_impl(ctx, metrics_set, partition)
338 .map_err(BoxedError::new)
339 }
340
341 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
342 self.properties.prepare(request);
343
344 self.check_scan_limit().map_err(BoxedError::new)?;
345
346 Ok(())
347 }
348
349 fn has_predicate_without_region(&self) -> bool {
350 let predicate = self
351 .stream_ctx
352 .input
353 .predicate_group()
354 .predicate_without_region();
355 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
356 }
357
358 fn set_logical_region(&mut self, logical_region: bool) {
359 self.properties.set_logical_region(logical_region);
360 }
361}
362
363impl DisplayAs for SeriesScan {
364 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365 write!(
366 f,
367 "SeriesScan: region={}, ",
368 self.stream_ctx.input.mapper.metadata().region_id
369 )?;
370 match t {
371 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
372 self.stream_ctx.format_for_explain(false, f)
373 }
374 DisplayFormatType::Verbose => {
375 self.stream_ctx.format_for_explain(true, f)?;
376 self.metrics_list.format_verbose_metrics(f)
377 }
378 }
379 }
380}
381
382impl fmt::Debug for SeriesScan {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 f.debug_struct("SeriesScan")
385 .field("num_ranges", &self.stream_ctx.ranges.len())
386 .finish()
387 }
388}
389
390#[cfg(test)]
391impl SeriesScan {
392 pub(crate) fn input(&self) -> &ScanInput {
394 &self.stream_ctx.input
395 }
396}
397
398struct SeriesDistributor {
400 stream_ctx: Arc<StreamContext>,
402 semaphore: Option<Arc<Semaphore>>,
404 partitions: Vec<Vec<PartitionRange>>,
406 senders: SenderList,
408 metrics_set: ExecutionPlanMetricsSet,
414 metrics_list: Arc<PartitionMetricsList>,
415}
416
417impl SeriesDistributor {
418 #[tracing::instrument(
420 skip_all,
421 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
422 )]
423 async fn execute(&mut self) {
424 let result = if self.stream_ctx.input.flat_format {
425 self.scan_partitions_flat().await
426 } else {
427 self.scan_partitions().await
428 };
429
430 if let Err(e) = result {
431 self.senders.send_error(e).await;
432 }
433 }
434
435 #[tracing::instrument(
437 skip_all,
438 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
439 )]
440 async fn scan_partitions_flat(&mut self) -> Result<()> {
441 let part_metrics = new_partition_metrics(
442 &self.stream_ctx,
443 false,
444 &self.metrics_set,
445 self.partitions.len(),
446 &self.metrics_list,
447 );
448 part_metrics.on_first_poll();
449 let mut fetch_start = Instant::now();
452
453 let counts = file_range_counts(
454 self.stream_ctx.input.num_memtables(),
455 self.stream_ctx.input.num_files(),
456 &self.stream_ctx.ranges,
457 self.partitions.iter().flatten(),
458 );
459 let range_builder_list = Arc::new(RangeBuilderList::new(
460 self.stream_ctx.input.num_memtables(),
461 counts,
462 ));
463 let mut sources = Vec::with_capacity(self.partitions.len());
465 for partition in &self.partitions {
466 sources.reserve(partition.len());
467 for part_range in partition {
468 build_flat_sources(
469 &self.stream_ctx,
470 part_range,
471 false,
472 &part_metrics,
473 range_builder_list.clone(),
474 &mut sources,
475 self.semaphore.clone(),
476 )
477 .await?;
478 }
479 }
480
481 let mut reader = SeqScan::build_flat_reader_from_sources(
483 &self.stream_ctx,
484 sources,
485 self.semaphore.clone(),
486 Some(&part_metrics),
487 )
488 .await?;
489 let mut metrics = SeriesDistributorMetrics::default();
490
491 let mut divider = FlatSeriesBatchDivider::default();
492 while let Some(record_batch) = reader.try_next().await? {
493 metrics.scan_cost += fetch_start.elapsed();
494 metrics.num_batches += 1;
495 metrics.num_rows += record_batch.num_rows();
496
497 debug_assert!(record_batch.num_rows() > 0);
498 if record_batch.num_rows() == 0 {
499 fetch_start = Instant::now();
500 continue;
501 }
502
503 let divider_start = Instant::now();
505 let series_batch = divider.push(record_batch);
506 metrics.divider_cost += divider_start.elapsed();
507 if let Some(series_batch) = series_batch {
508 let yield_start = Instant::now();
509 self.senders
510 .send_batch(SeriesBatch::Flat(series_batch))
511 .await?;
512 metrics.yield_cost += yield_start.elapsed();
513 }
514 fetch_start = Instant::now();
515 }
516
517 let divider_start = Instant::now();
519 let series_batch = divider.finish();
520 metrics.divider_cost += divider_start.elapsed();
521 if let Some(series_batch) = series_batch {
522 let yield_start = Instant::now();
523 self.senders
524 .send_batch(SeriesBatch::Flat(series_batch))
525 .await?;
526 metrics.yield_cost += yield_start.elapsed();
527 }
528
529 metrics.scan_cost += fetch_start.elapsed();
530 metrics.num_series_send_timeout = self.senders.num_timeout;
531 metrics.num_series_send_full = self.senders.num_full;
532 part_metrics.set_distributor_metrics(&metrics);
533
534 part_metrics.on_finish();
535
536 Ok(())
537 }
538
539 #[tracing::instrument(
541 skip_all,
542 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
543 )]
544 async fn scan_partitions(&mut self) -> Result<()> {
545 let part_metrics = new_partition_metrics(
546 &self.stream_ctx,
547 false,
548 &self.metrics_set,
549 self.partitions.len(),
550 &self.metrics_list,
551 );
552 part_metrics.on_first_poll();
553 let mut fetch_start = Instant::now();
556
557 let counts = file_range_counts(
558 self.stream_ctx.input.num_memtables(),
559 self.stream_ctx.input.num_files(),
560 &self.stream_ctx.ranges,
561 self.partitions.iter().flatten(),
562 );
563 let range_builder_list = Arc::new(RangeBuilderList::new(
564 self.stream_ctx.input.num_memtables(),
565 counts,
566 ));
567 let mut sources = Vec::with_capacity(self.partitions.len());
569 for partition in &self.partitions {
570 sources.reserve(partition.len());
571 for part_range in partition {
572 build_sources(
573 &self.stream_ctx,
574 part_range,
575 false,
576 &part_metrics,
577 range_builder_list.clone(),
578 &mut sources,
579 self.semaphore.clone(),
580 )
581 .await?;
582 }
583 }
584
585 let mut reader = SeqScan::build_reader_from_sources(
587 &self.stream_ctx,
588 sources,
589 self.semaphore.clone(),
590 Some(&part_metrics),
591 )
592 .await?;
593 let mut metrics = SeriesDistributorMetrics::default();
594
595 let mut current_series = PrimaryKeySeriesBatch::default();
596 while let Some(batch) = reader.next_batch().await? {
597 metrics.scan_cost += fetch_start.elapsed();
598 metrics.num_batches += 1;
599 metrics.num_rows += batch.num_rows();
600
601 debug_assert!(!batch.is_empty());
602 if batch.is_empty() {
603 fetch_start = Instant::now();
604 continue;
605 }
606
607 let Some(last_key) = current_series.current_key() else {
608 current_series.push(batch);
609 fetch_start = Instant::now();
610 continue;
611 };
612
613 if last_key == batch.primary_key() {
614 current_series.push(batch);
615 fetch_start = Instant::now();
616 continue;
617 }
618
619 let to_send =
621 std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
622 let yield_start = Instant::now();
623 self.senders
624 .send_batch(SeriesBatch::PrimaryKey(to_send))
625 .await?;
626 metrics.yield_cost += yield_start.elapsed();
627 fetch_start = Instant::now();
628 }
629
630 if !current_series.is_empty() {
631 let yield_start = Instant::now();
632 self.senders
633 .send_batch(SeriesBatch::PrimaryKey(current_series))
634 .await?;
635 metrics.yield_cost += yield_start.elapsed();
636 }
637
638 metrics.scan_cost += fetch_start.elapsed();
639 metrics.num_series_send_timeout = self.senders.num_timeout;
640 metrics.num_series_send_full = self.senders.num_full;
641 part_metrics.set_distributor_metrics(&metrics);
642
643 part_metrics.on_finish();
644
645 Ok(())
646 }
647}
648
649#[derive(Default, Debug)]
651pub struct PrimaryKeySeriesBatch {
652 pub batches: SmallVec<[Batch; 4]>,
653}
654
655impl PrimaryKeySeriesBatch {
656 fn single(batch: Batch) -> Self {
658 Self {
659 batches: smallvec![batch],
660 }
661 }
662
663 fn current_key(&self) -> Option<&[u8]> {
664 self.batches.first().map(|batch| batch.primary_key())
665 }
666
667 fn push(&mut self, batch: Batch) {
668 self.batches.push(batch);
669 }
670
671 fn is_empty(&self) -> bool {
673 self.batches.is_empty()
674 }
675}
676
677#[derive(Debug)]
679pub enum SeriesBatch {
680 PrimaryKey(PrimaryKeySeriesBatch),
681 Flat(FlatSeriesBatch),
682}
683
684impl SeriesBatch {
685 pub fn num_batches(&self) -> usize {
687 match self {
688 SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
689 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
690 }
691 }
692
693 pub fn num_rows(&self) -> usize {
695 match self {
696 SeriesBatch::PrimaryKey(primary_key_batch) => {
697 primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
698 }
699 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
700 }
701 }
702}
703
704#[derive(Default, Debug)]
706pub struct FlatSeriesBatch {
707 pub batches: SmallVec<[RecordBatch; 4]>,
708}
709
710struct SenderList {
712 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
713 num_nones: usize,
715 sender_idx: usize,
717 num_timeout: usize,
719 num_full: usize,
721}
722
723impl SenderList {
724 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
725 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
726 Self {
727 senders,
728 num_nones,
729 sender_idx: 0,
730 num_timeout: 0,
731 num_full: 0,
732 }
733 }
734
735 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
738 for _ in 0..self.senders.len() {
739 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
740
741 let sender_idx = self.fetch_add_sender_idx();
742 let Some(sender) = &self.senders[sender_idx] else {
743 continue;
744 };
745
746 match sender.try_send(Ok(batch)) {
747 Ok(()) => return Ok(None),
748 Err(TrySendError::Full(res)) => {
749 self.num_full += 1;
750 batch = res.unwrap();
752 }
753 Err(TrySendError::Closed(res)) => {
754 self.senders[sender_idx] = None;
755 self.num_nones += 1;
756 batch = res.unwrap();
758 }
759 }
760 }
761
762 Ok(Some(batch))
763 }
764
765 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
767 match self.try_send_batch(batch)? {
769 Some(b) => {
770 batch = b;
772 }
773 None => {
774 return Ok(());
775 }
776 }
777
778 loop {
779 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
780
781 let sender_idx = self.fetch_add_sender_idx();
782 let Some(sender) = &self.senders[sender_idx] else {
783 continue;
784 };
785 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
790 Ok(()) => break,
791 Err(SendTimeoutError::Timeout(res)) => {
792 self.num_timeout += 1;
793 batch = res.unwrap();
795 }
796 Err(SendTimeoutError::Closed(res)) => {
797 self.senders[sender_idx] = None;
798 self.num_nones += 1;
799 batch = res.unwrap();
801 }
802 }
803 }
804
805 Ok(())
806 }
807
808 async fn send_error(&self, error: Error) {
809 let error = Arc::new(error);
810 for sender in self.senders.iter().flatten() {
811 let result = Err(error.clone()).context(ScanSeriesSnafu);
812 let _ = sender.send(result).await;
813 }
814 }
815
816 fn fetch_add_sender_idx(&mut self) -> usize {
817 let sender_idx = self.sender_idx;
818 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
819 sender_idx
820 }
821}
822
823fn new_partition_metrics(
824 stream_ctx: &StreamContext,
825 explain_verbose: bool,
826 metrics_set: &ExecutionPlanMetricsSet,
827 partition: usize,
828 metrics_list: &PartitionMetricsList,
829) -> PartitionMetrics {
830 let metrics = PartitionMetrics::new(
831 stream_ctx.input.mapper.metadata().region_id,
832 partition,
833 "SeriesScan",
834 stream_ctx.query_start,
835 explain_verbose,
836 metrics_set,
837 );
838
839 metrics_list.set(partition, metrics.clone());
840 metrics
841}
842
843#[derive(Default)]
848struct FlatSeriesBatchDivider {
849 buffer: FlatSeriesBatch,
850}
851
852impl FlatSeriesBatchDivider {
853 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
857 if self.buffer.batches.is_empty() {
859 self.buffer.batches.push(batch);
860 return None;
861 }
862
863 let pk_column_idx = primary_key_column_index(batch.num_columns());
865 let batch_pk_column = batch.column(pk_column_idx);
866 let batch_pk_array = batch_pk_column
867 .as_any()
868 .downcast_ref::<PrimaryKeyArray>()
869 .unwrap();
870 let batch_pk_values = batch_pk_array
871 .values()
872 .as_any()
873 .downcast_ref::<BinaryArray>()
874 .unwrap();
875 let batch_last_pk =
877 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
878 let buffer_last_batch = self.buffer.batches.last().unwrap();
881 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
882 let buffer_pk_array = buffer_pk_column
883 .as_any()
884 .downcast_ref::<PrimaryKeyArray>()
885 .unwrap();
886 let buffer_pk_values = buffer_pk_array
887 .values()
888 .as_any()
889 .downcast_ref::<BinaryArray>()
890 .unwrap();
891 let buffer_last_pk =
892 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
893
894 if batch_last_pk == buffer_last_pk {
896 self.buffer.batches.push(batch);
897 return None;
898 }
899 let batch_pk_keys = batch_pk_array.keys();
902 let pk_indices = batch_pk_keys.values();
903 let mut change_offset = 0;
904 for (i, &key) in pk_indices.iter().enumerate() {
905 let batch_pk = batch_pk_values.value(key as usize);
906
907 if buffer_last_pk != batch_pk {
908 change_offset = i;
909 break;
910 }
911 }
912
913 let (first_part, remaining_part) = if change_offset > 0 {
915 let first_part = batch.slice(0, change_offset);
916 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
917 (Some(first_part), Some(remaining_part))
918 } else {
919 (None, Some(batch))
920 };
921
922 let mut result = std::mem::take(&mut self.buffer);
924 if let Some(first_part) = first_part {
925 result.batches.push(first_part);
926 }
927
928 if let Some(remaining_part) = remaining_part {
930 self.buffer.batches.push(remaining_part);
931 }
932
933 Some(result)
934 }
935
936 fn finish(&mut self) -> Option<FlatSeriesBatch> {
938 if self.buffer.batches.is_empty() {
939 None
940 } else {
941 Some(std::mem::take(&mut self.buffer))
942 }
943 }
944}
945
946fn primary_key_at<'a>(
948 primary_key: &PrimaryKeyArray,
949 primary_key_values: &'a BinaryArray,
950 index: usize,
951) -> &'a [u8] {
952 let key = primary_key.keys().value(index);
953 primary_key_values.value(key as usize)
954}
955
956#[cfg(test)]
957mod tests {
958 use std::sync::Arc;
959
960 use api::v1::OpType;
961 use datatypes::arrow::array::{
962 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
963 TimestampMillisecondArray, UInt8Array, UInt64Array,
964 };
965 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
966 use datatypes::arrow::record_batch::RecordBatch;
967
968 use super::*;
969
970 fn new_test_record_batch(
971 primary_keys: &[&[u8]],
972 timestamps: &[i64],
973 sequences: &[u64],
974 op_types: &[OpType],
975 fields: &[u64],
976 ) -> RecordBatch {
977 let num_rows = timestamps.len();
978 debug_assert_eq!(sequences.len(), num_rows);
979 debug_assert_eq!(op_types.len(), num_rows);
980 debug_assert_eq!(fields.len(), num_rows);
981 debug_assert_eq!(primary_keys.len(), num_rows);
982
983 let columns: Vec<ArrayRef> = vec![
984 build_test_pk_string_dict_array(primary_keys),
985 Arc::new(Int64Array::from_iter(
986 fields.iter().map(|v| Some(*v as i64)),
987 )),
988 Arc::new(TimestampMillisecondArray::from_iter_values(
989 timestamps.iter().copied(),
990 )),
991 build_test_pk_array(primary_keys),
992 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
993 Arc::new(UInt8Array::from_iter_values(
994 op_types.iter().map(|v| *v as u8),
995 )),
996 ];
997
998 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
999 }
1000
1001 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
1002 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
1003 for &pk in primary_keys {
1004 let pk_str = std::str::from_utf8(pk).unwrap();
1005 builder.append(pk_str).unwrap();
1006 }
1007 Arc::new(builder.finish())
1008 }
1009
1010 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1011 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1012 for &pk in primary_keys {
1013 builder.append(pk).unwrap();
1014 }
1015 Arc::new(builder.finish())
1016 }
1017
1018 fn build_test_flat_schema() -> SchemaRef {
1019 let fields = vec![
1020 Field::new(
1021 "k0",
1022 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1023 false,
1024 ),
1025 Field::new("field0", DataType::Int64, true),
1026 Field::new(
1027 "ts",
1028 DataType::Timestamp(TimeUnit::Millisecond, None),
1029 false,
1030 ),
1031 Field::new(
1032 "__primary_key",
1033 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1034 false,
1035 ),
1036 Field::new("__sequence", DataType::UInt64, false),
1037 Field::new("__op_type", DataType::UInt8, false),
1038 ];
1039 Arc::new(Schema::new(fields))
1040 }
1041
1042 #[test]
1043 fn test_empty_buffer_first_push() {
1044 let mut divider = FlatSeriesBatchDivider::default();
1045 let result = divider.finish();
1046 assert!(result.is_none());
1047
1048 let mut divider = FlatSeriesBatchDivider::default();
1049 let batch = new_test_record_batch(
1050 &[b"series1", b"series1"],
1051 &[1000, 2000],
1052 &[1, 2],
1053 &[OpType::Put, OpType::Put],
1054 &[10, 20],
1055 );
1056 let result = divider.push(batch);
1057 assert!(result.is_none());
1058 assert_eq!(divider.buffer.batches.len(), 1);
1059 }
1060
1061 #[test]
1062 fn test_same_series_accumulation() {
1063 let mut divider = FlatSeriesBatchDivider::default();
1064
1065 let batch1 = new_test_record_batch(
1066 &[b"series1", b"series1"],
1067 &[1000, 2000],
1068 &[1, 2],
1069 &[OpType::Put, OpType::Put],
1070 &[10, 20],
1071 );
1072
1073 let batch2 = new_test_record_batch(
1074 &[b"series1", b"series1"],
1075 &[3000, 4000],
1076 &[3, 4],
1077 &[OpType::Put, OpType::Put],
1078 &[30, 40],
1079 );
1080
1081 divider.push(batch1);
1082 let result = divider.push(batch2);
1083 assert!(result.is_none());
1084 let series_batch = divider.finish().unwrap();
1085 assert_eq!(series_batch.batches.len(), 2);
1086 }
1087
1088 #[test]
1089 fn test_series_boundary_detection() {
1090 let mut divider = FlatSeriesBatchDivider::default();
1091
1092 let batch1 = new_test_record_batch(
1093 &[b"series1", b"series1"],
1094 &[1000, 2000],
1095 &[1, 2],
1096 &[OpType::Put, OpType::Put],
1097 &[10, 20],
1098 );
1099
1100 let batch2 = new_test_record_batch(
1101 &[b"series2", b"series2"],
1102 &[3000, 4000],
1103 &[3, 4],
1104 &[OpType::Put, OpType::Put],
1105 &[30, 40],
1106 );
1107
1108 divider.push(batch1);
1109 let series_batch = divider.push(batch2).unwrap();
1110 assert_eq!(series_batch.batches.len(), 1);
1111
1112 assert_eq!(divider.buffer.batches.len(), 1);
1113 }
1114
1115 #[test]
1116 fn test_series_boundary_within_batch() {
1117 let mut divider = FlatSeriesBatchDivider::default();
1118
1119 let batch1 = new_test_record_batch(
1120 &[b"series1", b"series1"],
1121 &[1000, 2000],
1122 &[1, 2],
1123 &[OpType::Put, OpType::Put],
1124 &[10, 20],
1125 );
1126
1127 let batch2 = new_test_record_batch(
1128 &[b"series1", b"series2"],
1129 &[3000, 4000],
1130 &[3, 4],
1131 &[OpType::Put, OpType::Put],
1132 &[30, 40],
1133 );
1134
1135 divider.push(batch1);
1136 let series_batch = divider.push(batch2).unwrap();
1137 assert_eq!(series_batch.batches.len(), 2);
1138 assert_eq!(series_batch.batches[0].num_rows(), 2);
1139 assert_eq!(series_batch.batches[1].num_rows(), 1);
1140
1141 assert_eq!(divider.buffer.batches.len(), 1);
1142 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1143 }
1144
1145 #[test]
1146 fn test_series_splitting() {
1147 let mut divider = FlatSeriesBatchDivider::default();
1148
1149 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1150
1151 let batch2 = new_test_record_batch(
1152 &[b"series1", b"series2", b"series2", b"series3"],
1153 &[2000, 3000, 4000, 5000],
1154 &[2, 3, 4, 5],
1155 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1156 &[20, 30, 40, 50],
1157 );
1158
1159 divider.push(batch1);
1160 let series_batch = divider.push(batch2).unwrap();
1161 assert_eq!(series_batch.batches.len(), 2);
1162
1163 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1164 assert_eq!(total_rows, 2);
1165
1166 let final_batch = divider.finish().unwrap();
1167 assert_eq!(final_batch.batches.len(), 1);
1168 assert_eq!(final_batch.batches[0].num_rows(), 3);
1169 }
1170}