1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::arrow::array::BinaryArray;
29use datatypes::arrow::record_batch::RecordBatch;
30use datatypes::schema::SchemaRef;
31use futures::{StreamExt, TryStreamExt};
32use smallvec::{SmallVec, smallvec};
33use snafu::{OptionExt, ResultExt, ensure};
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_engine::{
36 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
37};
38use tokio::sync::Semaphore;
39use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
40use tokio::sync::mpsc::{self, Receiver, Sender};
41
42use crate::error::{
43 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
44 ScanSeriesSnafu, TooManyFilesToReadSnafu,
45};
46use crate::read::range::RangeBuilderList;
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
49use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{Batch, ScannerMetrics};
52use crate::sst::parquet::flat_format::primary_key_column_index;
53use crate::sst::parquet::format::PrimaryKeyArray;
54
55const SEND_TIMEOUT: Duration = Duration::from_micros(100);
57
58type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
60
61pub struct SeriesScan {
67 properties: ScannerProperties,
69 stream_ctx: Arc<StreamContext>,
71 receivers: Mutex<ReceiverList>,
73 metrics_list: Arc<PartitionMetricsList>,
76}
77
78impl SeriesScan {
79 pub(crate) fn new(input: ScanInput) -> Self {
81 let mut properties = ScannerProperties::default()
82 .with_append_mode(input.append_mode)
83 .with_total_rows(input.total_rows());
84 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
85 properties.partitions = vec![stream_ctx.partition_ranges()];
86
87 Self {
88 properties,
89 stream_ctx,
90 receivers: Mutex::new(Vec::new()),
91 metrics_list: Arc::new(PartitionMetricsList::default()),
92 }
93 }
94
95 #[tracing::instrument(
96 skip_all,
97 fields(
98 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
99 partition = partition
100 )
101 )]
102 fn scan_partition_impl(
103 &self,
104 ctx: &QueryScanContext,
105 metrics_set: &ExecutionPlanMetricsSet,
106 partition: usize,
107 ) -> Result<SendableRecordBatchStream> {
108 let metrics = new_partition_metrics(
109 &self.stream_ctx,
110 ctx.explain_verbose,
111 metrics_set,
112 partition,
113 &self.metrics_list,
114 );
115
116 let batch_stream =
117 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
118
119 let input = &self.stream_ctx.input;
120 let record_batch_stream = ConvertBatchStream::new(
121 batch_stream,
122 input.mapper.clone(),
123 input.cache_strategy.clone(),
124 metrics,
125 );
126
127 Ok(Box::pin(RecordBatchStreamWrapper::new(
128 input.mapper.output_schema(),
129 Box::pin(record_batch_stream),
130 )))
131 }
132
133 #[tracing::instrument(
134 skip_all,
135 fields(
136 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
137 partition = partition
138 )
139 )]
140 fn scan_batch_in_partition(
141 &self,
142 ctx: &QueryScanContext,
143 partition: usize,
144 part_metrics: PartitionMetrics,
145 metrics_set: &ExecutionPlanMetricsSet,
146 ) -> Result<ScanBatchStream> {
147 if ctx.explain_verbose {
148 common_telemetry::info!(
149 "SeriesScan partition {}, region_id: {}",
150 partition,
151 self.stream_ctx.input.region_metadata().region_id
152 );
153 }
154
155 ensure!(
156 partition < self.properties.num_partitions(),
157 PartitionOutOfRangeSnafu {
158 given: partition,
159 all: self.properties.num_partitions(),
160 }
161 );
162
163 self.maybe_start_distributor(metrics_set, &self.metrics_list);
164
165 let mut receiver = self.take_receiver(partition)?;
166 let stream = try_stream! {
167 part_metrics.on_first_poll();
168
169 let mut fetch_start = Instant::now();
170 while let Some(series) = receiver.recv().await {
171 let series = series?;
172
173 let mut metrics = ScannerMetrics::default();
174 metrics.scan_cost += fetch_start.elapsed();
175 fetch_start = Instant::now();
176
177 metrics.num_batches += series.num_batches();
178 metrics.num_rows += series.num_rows();
179
180 let yield_start = Instant::now();
181 yield ScanBatch::Series(series);
182 metrics.yield_cost += yield_start.elapsed();
183
184 part_metrics.merge_metrics(&metrics);
185 }
186
187 part_metrics.on_finish();
188 };
189 Ok(Box::pin(stream))
190 }
191
192 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
194 let mut rx_list = self.receivers.lock().unwrap();
195 rx_list[partition]
196 .take()
197 .context(ScanMultiTimesSnafu { partition })
198 }
199
200 #[tracing::instrument(
202 skip(self, metrics_set, metrics_list),
203 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
204 )]
205 fn maybe_start_distributor(
206 &self,
207 metrics_set: &ExecutionPlanMetricsSet,
208 metrics_list: &Arc<PartitionMetricsList>,
209 ) {
210 let mut rx_list = self.receivers.lock().unwrap();
211 if !rx_list.is_empty() {
212 return;
213 }
214
215 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
216 let mut distributor = SeriesDistributor {
217 stream_ctx: self.stream_ctx.clone(),
218 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
219 partitions: self.properties.partitions.clone(),
220 senders,
221 metrics_set: metrics_set.clone(),
222 metrics_list: metrics_list.clone(),
223 };
224 let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
225 let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
226 common_runtime::spawn_global(
227 async move {
228 distributor.execute().await;
229 }
230 .instrument(span),
231 );
232
233 *rx_list = receivers;
234 }
235
236 #[tracing::instrument(
238 skip_all,
239 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
240 )]
241 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
242 let part_num = self.properties.num_partitions();
243 let metrics_set = ExecutionPlanMetricsSet::default();
244 let streams = (0..part_num)
245 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
246 .collect::<Result<Vec<_>, BoxedError>>()?;
247 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
248 Ok(Box::pin(chained_stream))
249 }
250
251 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
253 let metrics_set = ExecutionPlanMetricsSet::new();
254
255 let streams = (0..self.properties.partitions.len())
256 .map(|partition| {
257 let metrics = new_partition_metrics(
258 &self.stream_ctx,
259 false,
260 &metrics_set,
261 partition,
262 &self.metrics_list,
263 );
264
265 self.scan_batch_in_partition(
266 &QueryScanContext::default(),
267 partition,
268 metrics,
269 &metrics_set,
270 )
271 })
272 .collect::<Result<Vec<_>>>()?;
273
274 Ok(Box::pin(futures::stream::iter(streams).flatten()))
275 }
276
277 pub(crate) fn check_scan_limit(&self) -> Result<()> {
279 let total_files: usize = self
281 .properties
282 .partitions
283 .iter()
284 .flat_map(|partition| partition.iter())
285 .map(|part_range| {
286 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
287 range_meta.indices.len()
288 })
289 .sum();
290
291 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
292 if total_files > max_concurrent_files {
293 return TooManyFilesToReadSnafu {
294 actual: total_files,
295 max: max_concurrent_files,
296 }
297 .fail();
298 }
299
300 Ok(())
301 }
302}
303
304fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
305 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
306 .map(|_| {
307 let (sender, receiver) = mpsc::channel(1);
308 (Some(sender), Some(receiver))
309 })
310 .unzip();
311 (SenderList::new(senders), receivers)
312}
313
314impl RegionScanner for SeriesScan {
315 fn name(&self) -> &str {
316 "SeriesScan"
317 }
318
319 fn properties(&self) -> &ScannerProperties {
320 &self.properties
321 }
322
323 fn schema(&self) -> SchemaRef {
324 self.stream_ctx.input.mapper.output_schema()
325 }
326
327 fn metadata(&self) -> RegionMetadataRef {
328 self.stream_ctx.input.mapper.metadata().clone()
329 }
330
331 fn scan_partition(
332 &self,
333 ctx: &QueryScanContext,
334 metrics_set: &ExecutionPlanMetricsSet,
335 partition: usize,
336 ) -> Result<SendableRecordBatchStream, BoxedError> {
337 self.scan_partition_impl(ctx, metrics_set, partition)
338 .map_err(BoxedError::new)
339 }
340
341 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
342 self.properties.prepare(request);
343
344 self.check_scan_limit().map_err(BoxedError::new)?;
345
346 Ok(())
347 }
348
349 fn has_predicate_without_region(&self) -> bool {
350 let predicate = self
351 .stream_ctx
352 .input
353 .predicate_group()
354 .predicate_without_region();
355 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
356 }
357
358 fn set_logical_region(&mut self, logical_region: bool) {
359 self.properties.set_logical_region(logical_region);
360 }
361}
362
363impl DisplayAs for SeriesScan {
364 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365 write!(
366 f,
367 "SeriesScan: region={}, ",
368 self.stream_ctx.input.mapper.metadata().region_id
369 )?;
370 match t {
371 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
372 self.stream_ctx.format_for_explain(false, f)
373 }
374 DisplayFormatType::Verbose => {
375 self.stream_ctx.format_for_explain(true, f)?;
376 self.metrics_list.format_verbose_metrics(f)
377 }
378 }
379 }
380}
381
382impl fmt::Debug for SeriesScan {
383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384 f.debug_struct("SeriesScan")
385 .field("num_ranges", &self.stream_ctx.ranges.len())
386 .finish()
387 }
388}
389
390#[cfg(test)]
391impl SeriesScan {
392 pub(crate) fn input(&self) -> &ScanInput {
394 &self.stream_ctx.input
395 }
396}
397
398struct SeriesDistributor {
400 stream_ctx: Arc<StreamContext>,
402 semaphore: Option<Arc<Semaphore>>,
404 partitions: Vec<Vec<PartitionRange>>,
406 senders: SenderList,
408 metrics_set: ExecutionPlanMetricsSet,
414 metrics_list: Arc<PartitionMetricsList>,
415}
416
417impl SeriesDistributor {
418 #[tracing::instrument(
420 skip_all,
421 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
422 )]
423 async fn execute(&mut self) {
424 let result = if self.stream_ctx.input.flat_format {
425 self.scan_partitions_flat().await
426 } else {
427 self.scan_partitions().await
428 };
429
430 if let Err(e) = result {
431 self.senders.send_error(e).await;
432 }
433 }
434
435 #[tracing::instrument(
437 skip_all,
438 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
439 )]
440 async fn scan_partitions_flat(&mut self) -> Result<()> {
441 let part_metrics = new_partition_metrics(
442 &self.stream_ctx,
443 false,
444 &self.metrics_set,
445 self.partitions.len(),
446 &self.metrics_list,
447 );
448 part_metrics.on_first_poll();
449 let mut fetch_start = Instant::now();
452
453 let range_builder_list = Arc::new(RangeBuilderList::new(
454 self.stream_ctx.input.num_memtables(),
455 self.stream_ctx.input.num_files(),
456 ));
457 let mut sources = Vec::with_capacity(self.partitions.len());
459 for partition in &self.partitions {
460 sources.reserve(partition.len());
461 for part_range in partition {
462 build_flat_sources(
463 &self.stream_ctx,
464 part_range,
465 false,
466 &part_metrics,
467 range_builder_list.clone(),
468 &mut sources,
469 self.semaphore.clone(),
470 )
471 .await?;
472 }
473 }
474
475 let mut reader = SeqScan::build_flat_reader_from_sources(
477 &self.stream_ctx,
478 sources,
479 self.semaphore.clone(),
480 Some(&part_metrics),
481 )
482 .await?;
483 let mut metrics = SeriesDistributorMetrics::default();
484
485 let mut divider = FlatSeriesBatchDivider::default();
486 while let Some(record_batch) = reader.try_next().await? {
487 metrics.scan_cost += fetch_start.elapsed();
488 metrics.num_batches += 1;
489 metrics.num_rows += record_batch.num_rows();
490
491 debug_assert!(record_batch.num_rows() > 0);
492 if record_batch.num_rows() == 0 {
493 fetch_start = Instant::now();
494 continue;
495 }
496
497 let divider_start = Instant::now();
499 let series_batch = divider.push(record_batch);
500 metrics.divider_cost += divider_start.elapsed();
501 if let Some(series_batch) = series_batch {
502 let yield_start = Instant::now();
503 self.senders
504 .send_batch(SeriesBatch::Flat(series_batch))
505 .await?;
506 metrics.yield_cost += yield_start.elapsed();
507 }
508 fetch_start = Instant::now();
509 }
510
511 let divider_start = Instant::now();
513 let series_batch = divider.finish();
514 metrics.divider_cost += divider_start.elapsed();
515 if let Some(series_batch) = series_batch {
516 let yield_start = Instant::now();
517 self.senders
518 .send_batch(SeriesBatch::Flat(series_batch))
519 .await?;
520 metrics.yield_cost += yield_start.elapsed();
521 }
522
523 metrics.scan_cost += fetch_start.elapsed();
524 metrics.num_series_send_timeout = self.senders.num_timeout;
525 metrics.num_series_send_full = self.senders.num_full;
526 part_metrics.set_distributor_metrics(&metrics);
527
528 part_metrics.on_finish();
529
530 Ok(())
531 }
532
533 #[tracing::instrument(
535 skip_all,
536 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
537 )]
538 async fn scan_partitions(&mut self) -> Result<()> {
539 let part_metrics = new_partition_metrics(
540 &self.stream_ctx,
541 false,
542 &self.metrics_set,
543 self.partitions.len(),
544 &self.metrics_list,
545 );
546 part_metrics.on_first_poll();
547 let mut fetch_start = Instant::now();
550
551 let range_builder_list = Arc::new(RangeBuilderList::new(
552 self.stream_ctx.input.num_memtables(),
553 self.stream_ctx.input.num_files(),
554 ));
555 let mut sources = Vec::with_capacity(self.partitions.len());
557 for partition in &self.partitions {
558 sources.reserve(partition.len());
559 for part_range in partition {
560 build_sources(
561 &self.stream_ctx,
562 part_range,
563 false,
564 &part_metrics,
565 range_builder_list.clone(),
566 &mut sources,
567 self.semaphore.clone(),
568 )
569 .await?;
570 }
571 }
572
573 let mut reader = SeqScan::build_reader_from_sources(
575 &self.stream_ctx,
576 sources,
577 self.semaphore.clone(),
578 Some(&part_metrics),
579 )
580 .await?;
581 let mut metrics = SeriesDistributorMetrics::default();
582
583 let mut current_series = PrimaryKeySeriesBatch::default();
584 while let Some(batch) = reader.next_batch().await? {
585 metrics.scan_cost += fetch_start.elapsed();
586 metrics.num_batches += 1;
587 metrics.num_rows += batch.num_rows();
588
589 debug_assert!(!batch.is_empty());
590 if batch.is_empty() {
591 fetch_start = Instant::now();
592 continue;
593 }
594
595 let Some(last_key) = current_series.current_key() else {
596 current_series.push(batch);
597 fetch_start = Instant::now();
598 continue;
599 };
600
601 if last_key == batch.primary_key() {
602 current_series.push(batch);
603 fetch_start = Instant::now();
604 continue;
605 }
606
607 let to_send =
609 std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
610 let yield_start = Instant::now();
611 self.senders
612 .send_batch(SeriesBatch::PrimaryKey(to_send))
613 .await?;
614 metrics.yield_cost += yield_start.elapsed();
615 fetch_start = Instant::now();
616 }
617
618 if !current_series.is_empty() {
619 let yield_start = Instant::now();
620 self.senders
621 .send_batch(SeriesBatch::PrimaryKey(current_series))
622 .await?;
623 metrics.yield_cost += yield_start.elapsed();
624 }
625
626 metrics.scan_cost += fetch_start.elapsed();
627 metrics.num_series_send_timeout = self.senders.num_timeout;
628 metrics.num_series_send_full = self.senders.num_full;
629 part_metrics.set_distributor_metrics(&metrics);
630
631 part_metrics.on_finish();
632
633 Ok(())
634 }
635}
636
637#[derive(Default, Debug)]
639pub struct PrimaryKeySeriesBatch {
640 pub batches: SmallVec<[Batch; 4]>,
641}
642
643impl PrimaryKeySeriesBatch {
644 fn single(batch: Batch) -> Self {
646 Self {
647 batches: smallvec![batch],
648 }
649 }
650
651 fn current_key(&self) -> Option<&[u8]> {
652 self.batches.first().map(|batch| batch.primary_key())
653 }
654
655 fn push(&mut self, batch: Batch) {
656 self.batches.push(batch);
657 }
658
659 fn is_empty(&self) -> bool {
661 self.batches.is_empty()
662 }
663}
664
665#[derive(Debug)]
667pub enum SeriesBatch {
668 PrimaryKey(PrimaryKeySeriesBatch),
669 Flat(FlatSeriesBatch),
670}
671
672impl SeriesBatch {
673 pub fn num_batches(&self) -> usize {
675 match self {
676 SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
677 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
678 }
679 }
680
681 pub fn num_rows(&self) -> usize {
683 match self {
684 SeriesBatch::PrimaryKey(primary_key_batch) => {
685 primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
686 }
687 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
688 }
689 }
690}
691
692#[derive(Default, Debug)]
694pub struct FlatSeriesBatch {
695 pub batches: SmallVec<[RecordBatch; 4]>,
696}
697
698struct SenderList {
700 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
701 num_nones: usize,
703 sender_idx: usize,
705 num_timeout: usize,
707 num_full: usize,
709}
710
711impl SenderList {
712 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
713 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
714 Self {
715 senders,
716 num_nones,
717 sender_idx: 0,
718 num_timeout: 0,
719 num_full: 0,
720 }
721 }
722
723 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
726 for _ in 0..self.senders.len() {
727 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
728
729 let sender_idx = self.fetch_add_sender_idx();
730 let Some(sender) = &self.senders[sender_idx] else {
731 continue;
732 };
733
734 match sender.try_send(Ok(batch)) {
735 Ok(()) => return Ok(None),
736 Err(TrySendError::Full(res)) => {
737 self.num_full += 1;
738 batch = res.unwrap();
740 }
741 Err(TrySendError::Closed(res)) => {
742 self.senders[sender_idx] = None;
743 self.num_nones += 1;
744 batch = res.unwrap();
746 }
747 }
748 }
749
750 Ok(Some(batch))
751 }
752
753 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
755 match self.try_send_batch(batch)? {
757 Some(b) => {
758 batch = b;
760 }
761 None => {
762 return Ok(());
763 }
764 }
765
766 loop {
767 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
768
769 let sender_idx = self.fetch_add_sender_idx();
770 let Some(sender) = &self.senders[sender_idx] else {
771 continue;
772 };
773 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
778 Ok(()) => break,
779 Err(SendTimeoutError::Timeout(res)) => {
780 self.num_timeout += 1;
781 batch = res.unwrap();
783 }
784 Err(SendTimeoutError::Closed(res)) => {
785 self.senders[sender_idx] = None;
786 self.num_nones += 1;
787 batch = res.unwrap();
789 }
790 }
791 }
792
793 Ok(())
794 }
795
796 async fn send_error(&self, error: Error) {
797 let error = Arc::new(error);
798 for sender in self.senders.iter().flatten() {
799 let result = Err(error.clone()).context(ScanSeriesSnafu);
800 let _ = sender.send(result).await;
801 }
802 }
803
804 fn fetch_add_sender_idx(&mut self) -> usize {
805 let sender_idx = self.sender_idx;
806 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
807 sender_idx
808 }
809}
810
811fn new_partition_metrics(
812 stream_ctx: &StreamContext,
813 explain_verbose: bool,
814 metrics_set: &ExecutionPlanMetricsSet,
815 partition: usize,
816 metrics_list: &PartitionMetricsList,
817) -> PartitionMetrics {
818 let metrics = PartitionMetrics::new(
819 stream_ctx.input.mapper.metadata().region_id,
820 partition,
821 "SeriesScan",
822 stream_ctx.query_start,
823 explain_verbose,
824 metrics_set,
825 );
826
827 metrics_list.set(partition, metrics.clone());
828 metrics
829}
830
831#[derive(Default)]
836struct FlatSeriesBatchDivider {
837 buffer: FlatSeriesBatch,
838}
839
840impl FlatSeriesBatchDivider {
841 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
845 if self.buffer.batches.is_empty() {
847 self.buffer.batches.push(batch);
848 return None;
849 }
850
851 let pk_column_idx = primary_key_column_index(batch.num_columns());
853 let batch_pk_column = batch.column(pk_column_idx);
854 let batch_pk_array = batch_pk_column
855 .as_any()
856 .downcast_ref::<PrimaryKeyArray>()
857 .unwrap();
858 let batch_pk_values = batch_pk_array
859 .values()
860 .as_any()
861 .downcast_ref::<BinaryArray>()
862 .unwrap();
863 let batch_last_pk =
865 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
866 let buffer_last_batch = self.buffer.batches.last().unwrap();
869 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
870 let buffer_pk_array = buffer_pk_column
871 .as_any()
872 .downcast_ref::<PrimaryKeyArray>()
873 .unwrap();
874 let buffer_pk_values = buffer_pk_array
875 .values()
876 .as_any()
877 .downcast_ref::<BinaryArray>()
878 .unwrap();
879 let buffer_last_pk =
880 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
881
882 if batch_last_pk == buffer_last_pk {
884 self.buffer.batches.push(batch);
885 return None;
886 }
887 let batch_pk_keys = batch_pk_array.keys();
890 let pk_indices = batch_pk_keys.values();
891 let mut change_offset = 0;
892 for (i, &key) in pk_indices.iter().enumerate() {
893 let batch_pk = batch_pk_values.value(key as usize);
894
895 if buffer_last_pk != batch_pk {
896 change_offset = i;
897 break;
898 }
899 }
900
901 let (first_part, remaining_part) = if change_offset > 0 {
903 let first_part = batch.slice(0, change_offset);
904 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
905 (Some(first_part), Some(remaining_part))
906 } else {
907 (None, Some(batch))
908 };
909
910 let mut result = std::mem::take(&mut self.buffer);
912 if let Some(first_part) = first_part {
913 result.batches.push(first_part);
914 }
915
916 if let Some(remaining_part) = remaining_part {
918 self.buffer.batches.push(remaining_part);
919 }
920
921 Some(result)
922 }
923
924 fn finish(&mut self) -> Option<FlatSeriesBatch> {
926 if self.buffer.batches.is_empty() {
927 None
928 } else {
929 Some(std::mem::take(&mut self.buffer))
930 }
931 }
932}
933
934fn primary_key_at<'a>(
936 primary_key: &PrimaryKeyArray,
937 primary_key_values: &'a BinaryArray,
938 index: usize,
939) -> &'a [u8] {
940 let key = primary_key.keys().value(index);
941 primary_key_values.value(key as usize)
942}
943
944#[cfg(test)]
945mod tests {
946 use std::sync::Arc;
947
948 use api::v1::OpType;
949 use datatypes::arrow::array::{
950 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
951 TimestampMillisecondArray, UInt8Array, UInt64Array,
952 };
953 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
954 use datatypes::arrow::record_batch::RecordBatch;
955
956 use super::*;
957
958 fn new_test_record_batch(
959 primary_keys: &[&[u8]],
960 timestamps: &[i64],
961 sequences: &[u64],
962 op_types: &[OpType],
963 fields: &[u64],
964 ) -> RecordBatch {
965 let num_rows = timestamps.len();
966 debug_assert_eq!(sequences.len(), num_rows);
967 debug_assert_eq!(op_types.len(), num_rows);
968 debug_assert_eq!(fields.len(), num_rows);
969 debug_assert_eq!(primary_keys.len(), num_rows);
970
971 let columns: Vec<ArrayRef> = vec![
972 build_test_pk_string_dict_array(primary_keys),
973 Arc::new(Int64Array::from_iter(
974 fields.iter().map(|v| Some(*v as i64)),
975 )),
976 Arc::new(TimestampMillisecondArray::from_iter_values(
977 timestamps.iter().copied(),
978 )),
979 build_test_pk_array(primary_keys),
980 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
981 Arc::new(UInt8Array::from_iter_values(
982 op_types.iter().map(|v| *v as u8),
983 )),
984 ];
985
986 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
987 }
988
989 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
990 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
991 for &pk in primary_keys {
992 let pk_str = std::str::from_utf8(pk).unwrap();
993 builder.append(pk_str).unwrap();
994 }
995 Arc::new(builder.finish())
996 }
997
998 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
999 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1000 for &pk in primary_keys {
1001 builder.append(pk).unwrap();
1002 }
1003 Arc::new(builder.finish())
1004 }
1005
1006 fn build_test_flat_schema() -> SchemaRef {
1007 let fields = vec![
1008 Field::new(
1009 "k0",
1010 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
1011 false,
1012 ),
1013 Field::new("field0", DataType::Int64, true),
1014 Field::new(
1015 "ts",
1016 DataType::Timestamp(TimeUnit::Millisecond, None),
1017 false,
1018 ),
1019 Field::new(
1020 "__primary_key",
1021 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
1022 false,
1023 ),
1024 Field::new("__sequence", DataType::UInt64, false),
1025 Field::new("__op_type", DataType::UInt8, false),
1026 ];
1027 Arc::new(Schema::new(fields))
1028 }
1029
1030 #[test]
1031 fn test_empty_buffer_first_push() {
1032 let mut divider = FlatSeriesBatchDivider::default();
1033 let result = divider.finish();
1034 assert!(result.is_none());
1035
1036 let mut divider = FlatSeriesBatchDivider::default();
1037 let batch = new_test_record_batch(
1038 &[b"series1", b"series1"],
1039 &[1000, 2000],
1040 &[1, 2],
1041 &[OpType::Put, OpType::Put],
1042 &[10, 20],
1043 );
1044 let result = divider.push(batch);
1045 assert!(result.is_none());
1046 assert_eq!(divider.buffer.batches.len(), 1);
1047 }
1048
1049 #[test]
1050 fn test_same_series_accumulation() {
1051 let mut divider = FlatSeriesBatchDivider::default();
1052
1053 let batch1 = new_test_record_batch(
1054 &[b"series1", b"series1"],
1055 &[1000, 2000],
1056 &[1, 2],
1057 &[OpType::Put, OpType::Put],
1058 &[10, 20],
1059 );
1060
1061 let batch2 = new_test_record_batch(
1062 &[b"series1", b"series1"],
1063 &[3000, 4000],
1064 &[3, 4],
1065 &[OpType::Put, OpType::Put],
1066 &[30, 40],
1067 );
1068
1069 divider.push(batch1);
1070 let result = divider.push(batch2);
1071 assert!(result.is_none());
1072 let series_batch = divider.finish().unwrap();
1073 assert_eq!(series_batch.batches.len(), 2);
1074 }
1075
1076 #[test]
1077 fn test_series_boundary_detection() {
1078 let mut divider = FlatSeriesBatchDivider::default();
1079
1080 let batch1 = new_test_record_batch(
1081 &[b"series1", b"series1"],
1082 &[1000, 2000],
1083 &[1, 2],
1084 &[OpType::Put, OpType::Put],
1085 &[10, 20],
1086 );
1087
1088 let batch2 = new_test_record_batch(
1089 &[b"series2", b"series2"],
1090 &[3000, 4000],
1091 &[3, 4],
1092 &[OpType::Put, OpType::Put],
1093 &[30, 40],
1094 );
1095
1096 divider.push(batch1);
1097 let series_batch = divider.push(batch2).unwrap();
1098 assert_eq!(series_batch.batches.len(), 1);
1099
1100 assert_eq!(divider.buffer.batches.len(), 1);
1101 }
1102
1103 #[test]
1104 fn test_series_boundary_within_batch() {
1105 let mut divider = FlatSeriesBatchDivider::default();
1106
1107 let batch1 = new_test_record_batch(
1108 &[b"series1", b"series1"],
1109 &[1000, 2000],
1110 &[1, 2],
1111 &[OpType::Put, OpType::Put],
1112 &[10, 20],
1113 );
1114
1115 let batch2 = new_test_record_batch(
1116 &[b"series1", b"series2"],
1117 &[3000, 4000],
1118 &[3, 4],
1119 &[OpType::Put, OpType::Put],
1120 &[30, 40],
1121 );
1122
1123 divider.push(batch1);
1124 let series_batch = divider.push(batch2).unwrap();
1125 assert_eq!(series_batch.batches.len(), 2);
1126 assert_eq!(series_batch.batches[0].num_rows(), 2);
1127 assert_eq!(series_batch.batches[1].num_rows(), 1);
1128
1129 assert_eq!(divider.buffer.batches.len(), 1);
1130 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1131 }
1132
1133 #[test]
1134 fn test_series_splitting() {
1135 let mut divider = FlatSeriesBatchDivider::default();
1136
1137 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1138
1139 let batch2 = new_test_record_batch(
1140 &[b"series1", b"series2", b"series2", b"series3"],
1141 &[2000, 3000, 4000, 5000],
1142 &[2, 3, 4, 5],
1143 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1144 &[20, 30, 40, 50],
1145 );
1146
1147 divider.push(batch1);
1148 let series_batch = divider.push(batch2).unwrap();
1149 assert_eq!(series_batch.batches.len(), 2);
1150
1151 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1152 assert_eq!(total_rows, 2);
1153
1154 let final_batch = divider.finish().unwrap();
1155 assert_eq!(final_batch.batches.len(), 1);
1156 assert_eq!(final_batch.batches[0].num_rows(), 3);
1157 }
1158}