1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::array::BinaryArray;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::SchemaRef;
30use futures::{StreamExt, TryStreamExt};
31use smallvec::{SmallVec, smallvec};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::metadata::RegionMetadataRef;
34use store_api::region_engine::{
35 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
36};
37use tokio::sync::Semaphore;
38use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
39use tokio::sync::mpsc::{self, Receiver, Sender};
40
41use crate::error::{
42 Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
43 ScanSeriesSnafu, TooManyFilesToReadSnafu,
44};
45use crate::read::range::RangeBuilderList;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
48use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{Batch, ScannerMetrics};
51use crate::sst::parquet::flat_format::primary_key_column_index;
52use crate::sst::parquet::format::PrimaryKeyArray;
53
54const SEND_TIMEOUT: Duration = Duration::from_micros(100);
56
57type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
59
60pub struct SeriesScan {
66 properties: ScannerProperties,
68 stream_ctx: Arc<StreamContext>,
70 receivers: Mutex<ReceiverList>,
72 metrics_list: Arc<PartitionMetricsList>,
75}
76
77impl SeriesScan {
78 pub(crate) fn new(input: ScanInput) -> Self {
80 let mut properties = ScannerProperties::default()
81 .with_append_mode(input.append_mode)
82 .with_total_rows(input.total_rows());
83 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
84 properties.partitions = vec![stream_ctx.partition_ranges()];
85
86 Self {
87 properties,
88 stream_ctx,
89 receivers: Mutex::new(Vec::new()),
90 metrics_list: Arc::new(PartitionMetricsList::default()),
91 }
92 }
93
94 fn scan_partition_impl(
95 &self,
96 ctx: &QueryScanContext,
97 metrics_set: &ExecutionPlanMetricsSet,
98 partition: usize,
99 ) -> Result<SendableRecordBatchStream> {
100 let metrics = new_partition_metrics(
101 &self.stream_ctx,
102 ctx.explain_verbose,
103 metrics_set,
104 partition,
105 &self.metrics_list,
106 );
107
108 let batch_stream =
109 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
110
111 let input = &self.stream_ctx.input;
112 let record_batch_stream = ConvertBatchStream::new(
113 batch_stream,
114 input.mapper.clone(),
115 input.cache_strategy.clone(),
116 metrics,
117 );
118
119 Ok(Box::pin(RecordBatchStreamWrapper::new(
120 input.mapper.output_schema(),
121 Box::pin(record_batch_stream),
122 )))
123 }
124
125 fn scan_batch_in_partition(
126 &self,
127 ctx: &QueryScanContext,
128 partition: usize,
129 part_metrics: PartitionMetrics,
130 metrics_set: &ExecutionPlanMetricsSet,
131 ) -> Result<ScanBatchStream> {
132 if ctx.explain_verbose {
133 common_telemetry::info!(
134 "SeriesScan partition {}, region_id: {}",
135 partition,
136 self.stream_ctx.input.region_metadata().region_id
137 );
138 }
139
140 ensure!(
141 partition < self.properties.num_partitions(),
142 PartitionOutOfRangeSnafu {
143 given: partition,
144 all: self.properties.num_partitions(),
145 }
146 );
147
148 self.maybe_start_distributor(metrics_set, &self.metrics_list);
149
150 let mut receiver = self.take_receiver(partition)?;
151 let stream = try_stream! {
152 part_metrics.on_first_poll();
153
154 let mut fetch_start = Instant::now();
155 while let Some(series) = receiver.recv().await {
156 let series = series?;
157
158 let mut metrics = ScannerMetrics::default();
159 metrics.scan_cost += fetch_start.elapsed();
160 fetch_start = Instant::now();
161
162 metrics.num_batches += series.num_batches();
163 metrics.num_rows += series.num_rows();
164
165 let yield_start = Instant::now();
166 yield ScanBatch::Series(series);
167 metrics.yield_cost += yield_start.elapsed();
168
169 part_metrics.merge_metrics(&metrics);
170 }
171
172 part_metrics.on_finish();
173 };
174 Ok(Box::pin(stream))
175 }
176
177 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
179 let mut rx_list = self.receivers.lock().unwrap();
180 rx_list[partition]
181 .take()
182 .context(ScanMultiTimesSnafu { partition })
183 }
184
185 fn maybe_start_distributor(
187 &self,
188 metrics_set: &ExecutionPlanMetricsSet,
189 metrics_list: &Arc<PartitionMetricsList>,
190 ) {
191 let mut rx_list = self.receivers.lock().unwrap();
192 if !rx_list.is_empty() {
193 return;
194 }
195
196 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
197 let mut distributor = SeriesDistributor {
198 stream_ctx: self.stream_ctx.clone(),
199 semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
200 partitions: self.properties.partitions.clone(),
201 senders,
202 metrics_set: metrics_set.clone(),
203 metrics_list: metrics_list.clone(),
204 };
205 common_runtime::spawn_global(async move {
206 distributor.execute().await;
207 });
208
209 *rx_list = receivers;
210 }
211
212 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
214 let part_num = self.properties.num_partitions();
215 let metrics_set = ExecutionPlanMetricsSet::default();
216 let streams = (0..part_num)
217 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
218 .collect::<Result<Vec<_>, BoxedError>>()?;
219 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
220 Ok(Box::pin(chained_stream))
221 }
222
223 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
225 let metrics_set = ExecutionPlanMetricsSet::new();
226
227 let streams = (0..self.properties.partitions.len())
228 .map(|partition| {
229 let metrics = new_partition_metrics(
230 &self.stream_ctx,
231 false,
232 &metrics_set,
233 partition,
234 &self.metrics_list,
235 );
236
237 self.scan_batch_in_partition(
238 &QueryScanContext::default(),
239 partition,
240 metrics,
241 &metrics_set,
242 )
243 })
244 .collect::<Result<Vec<_>>>()?;
245
246 Ok(Box::pin(futures::stream::iter(streams).flatten()))
247 }
248
249 pub(crate) fn check_scan_limit(&self) -> Result<()> {
251 let total_files: usize = self
253 .properties
254 .partitions
255 .iter()
256 .flat_map(|partition| partition.iter())
257 .map(|part_range| {
258 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
259 range_meta.indices.len()
260 })
261 .sum();
262
263 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
264 if total_files > max_concurrent_files {
265 return TooManyFilesToReadSnafu {
266 actual: total_files,
267 max: max_concurrent_files,
268 }
269 .fail();
270 }
271
272 Ok(())
273 }
274}
275
276fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
277 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
278 .map(|_| {
279 let (sender, receiver) = mpsc::channel(1);
280 (Some(sender), Some(receiver))
281 })
282 .unzip();
283 (SenderList::new(senders), receivers)
284}
285
286impl RegionScanner for SeriesScan {
287 fn name(&self) -> &str {
288 "SeriesScan"
289 }
290
291 fn properties(&self) -> &ScannerProperties {
292 &self.properties
293 }
294
295 fn schema(&self) -> SchemaRef {
296 self.stream_ctx.input.mapper.output_schema()
297 }
298
299 fn metadata(&self) -> RegionMetadataRef {
300 self.stream_ctx.input.mapper.metadata().clone()
301 }
302
303 fn scan_partition(
304 &self,
305 ctx: &QueryScanContext,
306 metrics_set: &ExecutionPlanMetricsSet,
307 partition: usize,
308 ) -> Result<SendableRecordBatchStream, BoxedError> {
309 self.scan_partition_impl(ctx, metrics_set, partition)
310 .map_err(BoxedError::new)
311 }
312
313 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
314 self.properties.prepare(request);
315
316 self.check_scan_limit().map_err(BoxedError::new)?;
317
318 Ok(())
319 }
320
321 fn has_predicate_without_region(&self) -> bool {
322 let predicate = self
323 .stream_ctx
324 .input
325 .predicate_group()
326 .predicate_without_region();
327 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
328 }
329
330 fn set_logical_region(&mut self, logical_region: bool) {
331 self.properties.set_logical_region(logical_region);
332 }
333}
334
335impl DisplayAs for SeriesScan {
336 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
337 write!(
338 f,
339 "SeriesScan: region={}, ",
340 self.stream_ctx.input.mapper.metadata().region_id
341 )?;
342 match t {
343 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
344 self.stream_ctx.format_for_explain(false, f)
345 }
346 DisplayFormatType::Verbose => {
347 self.stream_ctx.format_for_explain(true, f)?;
348 self.metrics_list.format_verbose_metrics(f)
349 }
350 }
351 }
352}
353
354impl fmt::Debug for SeriesScan {
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356 f.debug_struct("SeriesScan")
357 .field("num_ranges", &self.stream_ctx.ranges.len())
358 .finish()
359 }
360}
361
362#[cfg(test)]
363impl SeriesScan {
364 pub(crate) fn input(&self) -> &ScanInput {
366 &self.stream_ctx.input
367 }
368}
369
370struct SeriesDistributor {
372 stream_ctx: Arc<StreamContext>,
374 semaphore: Option<Arc<Semaphore>>,
376 partitions: Vec<Vec<PartitionRange>>,
378 senders: SenderList,
380 metrics_set: ExecutionPlanMetricsSet,
386 metrics_list: Arc<PartitionMetricsList>,
387}
388
389impl SeriesDistributor {
390 async fn execute(&mut self) {
392 let result = if self.stream_ctx.input.flat_format {
393 self.scan_partitions_flat().await
394 } else {
395 self.scan_partitions().await
396 };
397
398 if let Err(e) = result {
399 self.senders.send_error(e).await;
400 }
401 }
402
403 async fn scan_partitions_flat(&mut self) -> Result<()> {
405 let part_metrics = new_partition_metrics(
406 &self.stream_ctx,
407 false,
408 &self.metrics_set,
409 self.partitions.len(),
410 &self.metrics_list,
411 );
412 part_metrics.on_first_poll();
413
414 let range_builder_list = Arc::new(RangeBuilderList::new(
415 self.stream_ctx.input.num_memtables(),
416 self.stream_ctx.input.num_files(),
417 ));
418 let mut sources = Vec::with_capacity(self.partitions.len());
420 for partition in &self.partitions {
421 sources.reserve(partition.len());
422 for part_range in partition {
423 build_flat_sources(
424 &self.stream_ctx,
425 part_range,
426 false,
427 &part_metrics,
428 range_builder_list.clone(),
429 &mut sources,
430 self.semaphore.clone(),
431 )
432 .await?;
433 }
434 }
435
436 let mut reader = SeqScan::build_flat_reader_from_sources(
438 &self.stream_ctx,
439 sources,
440 self.semaphore.clone(),
441 Some(&part_metrics),
442 )
443 .await?;
444 let mut metrics = SeriesDistributorMetrics::default();
445 let mut fetch_start = Instant::now();
446
447 let mut divider = FlatSeriesBatchDivider::default();
448 while let Some(record_batch) = reader.try_next().await? {
449 metrics.scan_cost += fetch_start.elapsed();
450 metrics.num_batches += 1;
451 metrics.num_rows += record_batch.num_rows();
452
453 debug_assert!(record_batch.num_rows() > 0);
454 if record_batch.num_rows() == 0 {
455 fetch_start = Instant::now();
456 continue;
457 }
458
459 if let Some(series_batch) = divider.push(record_batch) {
461 let yield_start = Instant::now();
462 self.senders
463 .send_batch(SeriesBatch::Flat(series_batch))
464 .await?;
465 metrics.yield_cost += yield_start.elapsed();
466 }
467 fetch_start = Instant::now();
468 }
469
470 if let Some(series_batch) = divider.finish() {
472 let yield_start = Instant::now();
473 self.senders
474 .send_batch(SeriesBatch::Flat(series_batch))
475 .await?;
476 metrics.yield_cost += yield_start.elapsed();
477 }
478
479 metrics.scan_cost += fetch_start.elapsed();
480 metrics.num_series_send_timeout = self.senders.num_timeout;
481 metrics.num_series_send_full = self.senders.num_full;
482 part_metrics.set_distributor_metrics(&metrics);
483
484 part_metrics.on_finish();
485
486 Ok(())
487 }
488
489 async fn scan_partitions(&mut self) -> Result<()> {
491 let part_metrics = new_partition_metrics(
492 &self.stream_ctx,
493 false,
494 &self.metrics_set,
495 self.partitions.len(),
496 &self.metrics_list,
497 );
498 part_metrics.on_first_poll();
499
500 let range_builder_list = Arc::new(RangeBuilderList::new(
501 self.stream_ctx.input.num_memtables(),
502 self.stream_ctx.input.num_files(),
503 ));
504 let mut sources = Vec::with_capacity(self.partitions.len());
506 for partition in &self.partitions {
507 sources.reserve(partition.len());
508 for part_range in partition {
509 build_sources(
510 &self.stream_ctx,
511 part_range,
512 false,
513 &part_metrics,
514 range_builder_list.clone(),
515 &mut sources,
516 self.semaphore.clone(),
517 )
518 .await?;
519 }
520 }
521
522 let mut reader = SeqScan::build_reader_from_sources(
524 &self.stream_ctx,
525 sources,
526 self.semaphore.clone(),
527 Some(&part_metrics),
528 )
529 .await?;
530 let mut metrics = SeriesDistributorMetrics::default();
531 let mut fetch_start = Instant::now();
532
533 let mut current_series = PrimaryKeySeriesBatch::default();
534 while let Some(batch) = reader.next_batch().await? {
535 metrics.scan_cost += fetch_start.elapsed();
536 metrics.num_batches += 1;
537 metrics.num_rows += batch.num_rows();
538
539 debug_assert!(!batch.is_empty());
540 if batch.is_empty() {
541 fetch_start = Instant::now();
542 continue;
543 }
544
545 let Some(last_key) = current_series.current_key() else {
546 current_series.push(batch);
547 fetch_start = Instant::now();
548 continue;
549 };
550
551 if last_key == batch.primary_key() {
552 current_series.push(batch);
553 fetch_start = Instant::now();
554 continue;
555 }
556
557 let to_send =
559 std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch));
560 let yield_start = Instant::now();
561 self.senders
562 .send_batch(SeriesBatch::PrimaryKey(to_send))
563 .await?;
564 metrics.yield_cost += yield_start.elapsed();
565 fetch_start = Instant::now();
566 }
567
568 if !current_series.is_empty() {
569 let yield_start = Instant::now();
570 self.senders
571 .send_batch(SeriesBatch::PrimaryKey(current_series))
572 .await?;
573 metrics.yield_cost += yield_start.elapsed();
574 }
575
576 metrics.scan_cost += fetch_start.elapsed();
577 metrics.num_series_send_timeout = self.senders.num_timeout;
578 metrics.num_series_send_full = self.senders.num_full;
579 part_metrics.set_distributor_metrics(&metrics);
580
581 part_metrics.on_finish();
582
583 Ok(())
584 }
585}
586
587#[derive(Default, Debug)]
589pub struct PrimaryKeySeriesBatch {
590 pub batches: SmallVec<[Batch; 4]>,
591}
592
593impl PrimaryKeySeriesBatch {
594 fn single(batch: Batch) -> Self {
596 Self {
597 batches: smallvec![batch],
598 }
599 }
600
601 fn current_key(&self) -> Option<&[u8]> {
602 self.batches.first().map(|batch| batch.primary_key())
603 }
604
605 fn push(&mut self, batch: Batch) {
606 self.batches.push(batch);
607 }
608
609 fn is_empty(&self) -> bool {
611 self.batches.is_empty()
612 }
613}
614
615#[derive(Debug)]
617pub enum SeriesBatch {
618 PrimaryKey(PrimaryKeySeriesBatch),
619 Flat(FlatSeriesBatch),
620}
621
622impl SeriesBatch {
623 pub fn num_batches(&self) -> usize {
625 match self {
626 SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(),
627 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
628 }
629 }
630
631 pub fn num_rows(&self) -> usize {
633 match self {
634 SeriesBatch::PrimaryKey(primary_key_batch) => {
635 primary_key_batch.batches.iter().map(|x| x.num_rows()).sum()
636 }
637 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
638 }
639 }
640}
641
642#[derive(Default, Debug)]
644pub struct FlatSeriesBatch {
645 pub batches: SmallVec<[RecordBatch; 4]>,
646}
647
648struct SenderList {
650 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
651 num_nones: usize,
653 sender_idx: usize,
655 num_timeout: usize,
657 num_full: usize,
659}
660
661impl SenderList {
662 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
663 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
664 Self {
665 senders,
666 num_nones,
667 sender_idx: 0,
668 num_timeout: 0,
669 num_full: 0,
670 }
671 }
672
673 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
676 for _ in 0..self.senders.len() {
677 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
678
679 let sender_idx = self.fetch_add_sender_idx();
680 let Some(sender) = &self.senders[sender_idx] else {
681 continue;
682 };
683
684 match sender.try_send(Ok(batch)) {
685 Ok(()) => return Ok(None),
686 Err(TrySendError::Full(res)) => {
687 self.num_full += 1;
688 batch = res.unwrap();
690 }
691 Err(TrySendError::Closed(res)) => {
692 self.senders[sender_idx] = None;
693 self.num_nones += 1;
694 batch = res.unwrap();
696 }
697 }
698 }
699
700 Ok(Some(batch))
701 }
702
703 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
705 match self.try_send_batch(batch)? {
707 Some(b) => {
708 batch = b;
710 }
711 None => {
712 return Ok(());
713 }
714 }
715
716 loop {
717 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
718
719 let sender_idx = self.fetch_add_sender_idx();
720 let Some(sender) = &self.senders[sender_idx] else {
721 continue;
722 };
723 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
728 Ok(()) => break,
729 Err(SendTimeoutError::Timeout(res)) => {
730 self.num_timeout += 1;
731 batch = res.unwrap();
733 }
734 Err(SendTimeoutError::Closed(res)) => {
735 self.senders[sender_idx] = None;
736 self.num_nones += 1;
737 batch = res.unwrap();
739 }
740 }
741 }
742
743 Ok(())
744 }
745
746 async fn send_error(&self, error: Error) {
747 let error = Arc::new(error);
748 for sender in self.senders.iter().flatten() {
749 let result = Err(error.clone()).context(ScanSeriesSnafu);
750 let _ = sender.send(result).await;
751 }
752 }
753
754 fn fetch_add_sender_idx(&mut self) -> usize {
755 let sender_idx = self.sender_idx;
756 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
757 sender_idx
758 }
759}
760
761fn new_partition_metrics(
762 stream_ctx: &StreamContext,
763 explain_verbose: bool,
764 metrics_set: &ExecutionPlanMetricsSet,
765 partition: usize,
766 metrics_list: &PartitionMetricsList,
767) -> PartitionMetrics {
768 let metrics = PartitionMetrics::new(
769 stream_ctx.input.mapper.metadata().region_id,
770 partition,
771 "SeriesScan",
772 stream_ctx.query_start,
773 explain_verbose,
774 metrics_set,
775 );
776
777 metrics_list.set(partition, metrics.clone());
778 metrics
779}
780
781#[derive(Default)]
786struct FlatSeriesBatchDivider {
787 buffer: FlatSeriesBatch,
788}
789
790impl FlatSeriesBatchDivider {
791 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
795 if self.buffer.batches.is_empty() {
797 self.buffer.batches.push(batch);
798 return None;
799 }
800
801 let pk_column_idx = primary_key_column_index(batch.num_columns());
803 let batch_pk_column = batch.column(pk_column_idx);
804 let batch_pk_array = batch_pk_column
805 .as_any()
806 .downcast_ref::<PrimaryKeyArray>()
807 .unwrap();
808 let batch_pk_values = batch_pk_array
809 .values()
810 .as_any()
811 .downcast_ref::<BinaryArray>()
812 .unwrap();
813 let batch_last_pk =
815 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
816 let buffer_last_batch = self.buffer.batches.last().unwrap();
819 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
820 let buffer_pk_array = buffer_pk_column
821 .as_any()
822 .downcast_ref::<PrimaryKeyArray>()
823 .unwrap();
824 let buffer_pk_values = buffer_pk_array
825 .values()
826 .as_any()
827 .downcast_ref::<BinaryArray>()
828 .unwrap();
829 let buffer_last_pk =
830 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
831
832 if batch_last_pk == buffer_last_pk {
834 self.buffer.batches.push(batch);
835 return None;
836 }
837 let batch_pk_keys = batch_pk_array.keys();
840 let pk_indices = batch_pk_keys.values();
841 let mut change_offset = 0;
842 for (i, &key) in pk_indices.iter().enumerate() {
843 let batch_pk = batch_pk_values.value(key as usize);
844
845 if buffer_last_pk != batch_pk {
846 change_offset = i;
847 break;
848 }
849 }
850
851 let (first_part, remaining_part) = if change_offset > 0 {
853 let first_part = batch.slice(0, change_offset);
854 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
855 (Some(first_part), Some(remaining_part))
856 } else {
857 (None, Some(batch))
858 };
859
860 let mut result = std::mem::take(&mut self.buffer);
862 if let Some(first_part) = first_part {
863 result.batches.push(first_part);
864 }
865
866 if let Some(remaining_part) = remaining_part {
868 self.buffer.batches.push(remaining_part);
869 }
870
871 Some(result)
872 }
873
874 fn finish(&mut self) -> Option<FlatSeriesBatch> {
876 if self.buffer.batches.is_empty() {
877 None
878 } else {
879 Some(std::mem::take(&mut self.buffer))
880 }
881 }
882}
883
884fn primary_key_at<'a>(
886 primary_key: &PrimaryKeyArray,
887 primary_key_values: &'a BinaryArray,
888 index: usize,
889) -> &'a [u8] {
890 let key = primary_key.keys().value(index);
891 primary_key_values.value(key as usize)
892}
893
894#[cfg(test)]
895mod tests {
896 use std::sync::Arc;
897
898 use api::v1::OpType;
899 use datatypes::arrow::array::{
900 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
901 TimestampMillisecondArray, UInt8Array, UInt64Array,
902 };
903 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
904 use datatypes::arrow::record_batch::RecordBatch;
905
906 use super::*;
907
908 fn new_test_record_batch(
909 primary_keys: &[&[u8]],
910 timestamps: &[i64],
911 sequences: &[u64],
912 op_types: &[OpType],
913 fields: &[u64],
914 ) -> RecordBatch {
915 let num_rows = timestamps.len();
916 debug_assert_eq!(sequences.len(), num_rows);
917 debug_assert_eq!(op_types.len(), num_rows);
918 debug_assert_eq!(fields.len(), num_rows);
919 debug_assert_eq!(primary_keys.len(), num_rows);
920
921 let columns: Vec<ArrayRef> = vec![
922 build_test_pk_string_dict_array(primary_keys),
923 Arc::new(Int64Array::from_iter(
924 fields.iter().map(|v| Some(*v as i64)),
925 )),
926 Arc::new(TimestampMillisecondArray::from_iter_values(
927 timestamps.iter().copied(),
928 )),
929 build_test_pk_array(primary_keys),
930 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
931 Arc::new(UInt8Array::from_iter_values(
932 op_types.iter().map(|v| *v as u8),
933 )),
934 ];
935
936 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
937 }
938
939 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
940 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
941 for &pk in primary_keys {
942 let pk_str = std::str::from_utf8(pk).unwrap();
943 builder.append(pk_str).unwrap();
944 }
945 Arc::new(builder.finish())
946 }
947
948 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
949 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
950 for &pk in primary_keys {
951 builder.append(pk).unwrap();
952 }
953 Arc::new(builder.finish())
954 }
955
956 fn build_test_flat_schema() -> SchemaRef {
957 let fields = vec![
958 Field::new(
959 "k0",
960 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
961 false,
962 ),
963 Field::new("field0", DataType::Int64, true),
964 Field::new(
965 "ts",
966 DataType::Timestamp(TimeUnit::Millisecond, None),
967 false,
968 ),
969 Field::new(
970 "__primary_key",
971 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
972 false,
973 ),
974 Field::new("__sequence", DataType::UInt64, false),
975 Field::new("__op_type", DataType::UInt8, false),
976 ];
977 Arc::new(Schema::new(fields))
978 }
979
980 #[test]
981 fn test_empty_buffer_first_push() {
982 let mut divider = FlatSeriesBatchDivider::default();
983 let result = divider.finish();
984 assert!(result.is_none());
985
986 let mut divider = FlatSeriesBatchDivider::default();
987 let batch = new_test_record_batch(
988 &[b"series1", b"series1"],
989 &[1000, 2000],
990 &[1, 2],
991 &[OpType::Put, OpType::Put],
992 &[10, 20],
993 );
994 let result = divider.push(batch);
995 assert!(result.is_none());
996 assert_eq!(divider.buffer.batches.len(), 1);
997 }
998
999 #[test]
1000 fn test_same_series_accumulation() {
1001 let mut divider = FlatSeriesBatchDivider::default();
1002
1003 let batch1 = new_test_record_batch(
1004 &[b"series1", b"series1"],
1005 &[1000, 2000],
1006 &[1, 2],
1007 &[OpType::Put, OpType::Put],
1008 &[10, 20],
1009 );
1010
1011 let batch2 = new_test_record_batch(
1012 &[b"series1", b"series1"],
1013 &[3000, 4000],
1014 &[3, 4],
1015 &[OpType::Put, OpType::Put],
1016 &[30, 40],
1017 );
1018
1019 divider.push(batch1);
1020 let result = divider.push(batch2);
1021 assert!(result.is_none());
1022 let series_batch = divider.finish().unwrap();
1023 assert_eq!(series_batch.batches.len(), 2);
1024 }
1025
1026 #[test]
1027 fn test_series_boundary_detection() {
1028 let mut divider = FlatSeriesBatchDivider::default();
1029
1030 let batch1 = new_test_record_batch(
1031 &[b"series1", b"series1"],
1032 &[1000, 2000],
1033 &[1, 2],
1034 &[OpType::Put, OpType::Put],
1035 &[10, 20],
1036 );
1037
1038 let batch2 = new_test_record_batch(
1039 &[b"series2", b"series2"],
1040 &[3000, 4000],
1041 &[3, 4],
1042 &[OpType::Put, OpType::Put],
1043 &[30, 40],
1044 );
1045
1046 divider.push(batch1);
1047 let series_batch = divider.push(batch2).unwrap();
1048 assert_eq!(series_batch.batches.len(), 1);
1049
1050 assert_eq!(divider.buffer.batches.len(), 1);
1051 }
1052
1053 #[test]
1054 fn test_series_boundary_within_batch() {
1055 let mut divider = FlatSeriesBatchDivider::default();
1056
1057 let batch1 = new_test_record_batch(
1058 &[b"series1", b"series1"],
1059 &[1000, 2000],
1060 &[1, 2],
1061 &[OpType::Put, OpType::Put],
1062 &[10, 20],
1063 );
1064
1065 let batch2 = new_test_record_batch(
1066 &[b"series1", b"series2"],
1067 &[3000, 4000],
1068 &[3, 4],
1069 &[OpType::Put, OpType::Put],
1070 &[30, 40],
1071 );
1072
1073 divider.push(batch1);
1074 let series_batch = divider.push(batch2).unwrap();
1075 assert_eq!(series_batch.batches.len(), 2);
1076 assert_eq!(series_batch.batches[0].num_rows(), 2);
1077 assert_eq!(series_batch.batches[1].num_rows(), 1);
1078
1079 assert_eq!(divider.buffer.batches.len(), 1);
1080 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1081 }
1082
1083 #[test]
1084 fn test_series_splitting() {
1085 let mut divider = FlatSeriesBatchDivider::default();
1086
1087 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1088
1089 let batch2 = new_test_record_batch(
1090 &[b"series1", b"series2", b"series2", b"series3"],
1091 &[2000, 3000, 4000, 5000],
1092 &[2, 3, 4, 5],
1093 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1094 &[20, 30, 40, 50],
1095 );
1096
1097 divider.push(batch1);
1098 let series_batch = divider.push(batch2).unwrap();
1099 assert_eq!(series_batch.batches.len(), 2);
1100
1101 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1102 assert_eq!(total_rows, 2);
1103
1104 let final_batch = divider.finish().unwrap();
1105 assert_eq!(final_batch.batches.len(), 1);
1106 assert_eq!(final_batch.batches[0].num_rows(), 3);
1107 }
1108}