1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::SmallVec;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44 Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45 ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::ScannerMetrics;
48use crate::read::pruner::{PartitionPruner, Pruner};
49use crate::read::scan_region::{ScanInput, StreamContext};
50use crate::read::scan_util::{
51 PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
52 compute_parallel_channel_size,
53};
54use crate::read::seq_scan::SeqScan;
55use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
56use crate::sst::parquet::flat_format::primary_key_column_index;
57use crate::sst::parquet::format::PrimaryKeyArray;
58
59const SEND_TIMEOUT: Duration = Duration::from_micros(100);
61
62type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
64
65pub struct SeriesScan {
71 properties: ScannerProperties,
73 stream_ctx: Arc<StreamContext>,
75 pruner: Arc<Pruner>,
77 receivers: Mutex<ReceiverList>,
79 metrics_list: Arc<PartitionMetricsList>,
82}
83
84impl SeriesScan {
85 pub(crate) fn new(input: ScanInput) -> Self {
87 let mut properties = ScannerProperties::default()
88 .with_append_mode(input.append_mode)
89 .with_total_rows(input.total_rows());
90 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
91 properties.partitions = vec![stream_ctx.partition_ranges()];
92
93 let num_workers = common_stat::get_total_cpu_cores().max(1);
95 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
96
97 Self {
98 properties,
99 stream_ctx,
100 pruner,
101 receivers: Mutex::new(Vec::new()),
102 metrics_list: Arc::new(PartitionMetricsList::default()),
103 }
104 }
105
106 #[tracing::instrument(
107 skip_all,
108 fields(
109 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
110 partition = partition
111 )
112 )]
113 fn scan_partition_impl(
114 &self,
115 ctx: &QueryScanContext,
116 metrics_set: &ExecutionPlanMetricsSet,
117 partition: usize,
118 ) -> Result<SendableRecordBatchStream> {
119 let metrics = new_partition_metrics(
120 &self.stream_ctx,
121 ctx.explain_verbose,
122 metrics_set,
123 partition,
124 &self.metrics_list,
125 );
126
127 let batch_stream =
128 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
129
130 let input = &self.stream_ctx.input;
131 let record_batch_stream = ConvertBatchStream::new(
132 batch_stream,
133 input.mapper.clone(),
134 input.cache_strategy.clone(),
135 metrics,
136 );
137
138 Ok(Box::pin(RecordBatchStreamWrapper::new(
139 input.mapper.output_schema(),
140 Box::pin(record_batch_stream),
141 )))
142 }
143
144 #[tracing::instrument(
145 skip_all,
146 fields(
147 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
148 partition = partition
149 )
150 )]
151 fn scan_batch_in_partition(
152 &self,
153 ctx: &QueryScanContext,
154 partition: usize,
155 part_metrics: PartitionMetrics,
156 metrics_set: &ExecutionPlanMetricsSet,
157 ) -> Result<ScanBatchStream> {
158 if ctx.explain_verbose {
159 common_telemetry::info!(
160 "SeriesScan partition {}, region_id: {}",
161 partition,
162 self.stream_ctx.input.region_metadata().region_id
163 );
164 }
165
166 ensure!(
167 partition < self.properties.num_partitions(),
168 PartitionOutOfRangeSnafu {
169 given: partition,
170 all: self.properties.num_partitions(),
171 }
172 );
173
174 self.maybe_start_distributor(metrics_set, &self.metrics_list, ctx.explain_verbose);
175
176 let mut receiver = self.take_receiver(partition)?;
177 let stream = try_stream! {
178 part_metrics.on_first_poll();
179
180 let mut fetch_start = Instant::now();
181 while let Some(series) = receiver.recv().await {
182 let series = series?;
183
184 let mut metrics = ScannerMetrics::default();
185 metrics.scan_cost += fetch_start.elapsed();
186 fetch_start = Instant::now();
187
188 metrics.num_batches += series.num_batches();
189 metrics.num_rows += series.num_rows();
190
191 let yield_start = Instant::now();
192 yield ScanBatch::Series(series);
193 metrics.yield_cost += yield_start.elapsed();
194
195 part_metrics.merge_metrics(&metrics);
196 }
197
198 part_metrics.on_finish();
199 };
200 Ok(Box::pin(stream))
201 }
202
203 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
205 let mut rx_list = self.receivers.lock().unwrap();
206 rx_list[partition]
207 .take()
208 .context(ScanMultiTimesSnafu { partition })
209 }
210
211 #[tracing::instrument(
213 skip(self, metrics_set, metrics_list),
214 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
215 )]
216 fn maybe_start_distributor(
217 &self,
218 metrics_set: &ExecutionPlanMetricsSet,
219 metrics_list: &Arc<PartitionMetricsList>,
220 explain_verbose: bool,
221 ) {
222 let mut rx_list = self.receivers.lock().unwrap();
223 if !rx_list.is_empty() {
224 return;
225 }
226
227 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
228 let mut distributor = SeriesDistributor {
229 stream_ctx: self.stream_ctx.clone(),
230 range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
231 final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
232 partitions: self.properties.partitions.clone(),
233 pruner: self.pruner.clone(),
234 senders,
235 metrics_set: metrics_set.clone(),
236 metrics_list: metrics_list.clone(),
237 explain_verbose,
238 };
239 let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
240 let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
241 common_runtime::spawn_global(
242 async move {
243 distributor.execute().await;
244 }
245 .instrument(span),
246 );
247
248 *rx_list = receivers;
249 }
250
251 #[tracing::instrument(
253 skip_all,
254 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
255 )]
256 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
257 let part_num = self.properties.num_partitions();
258 let metrics_set = ExecutionPlanMetricsSet::default();
259 let streams = (0..part_num)
260 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
261 .collect::<Result<Vec<_>, BoxedError>>()?;
262 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
263 Ok(Box::pin(chained_stream))
264 }
265
266 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
268 let metrics_set = ExecutionPlanMetricsSet::new();
269
270 let streams = (0..self.properties.partitions.len())
271 .map(|partition| {
272 let metrics = new_partition_metrics(
273 &self.stream_ctx,
274 false,
275 &metrics_set,
276 partition,
277 &self.metrics_list,
278 );
279
280 self.scan_batch_in_partition(
281 &QueryScanContext::default(),
282 partition,
283 metrics,
284 &metrics_set,
285 )
286 })
287 .collect::<Result<Vec<_>>>()?;
288
289 Ok(Box::pin(futures::stream::iter(streams).flatten()))
290 }
291
292 pub(crate) fn check_scan_limit(&self) -> Result<()> {
294 let total_files: usize = self
296 .properties
297 .partitions
298 .iter()
299 .flat_map(|partition| partition.iter())
300 .map(|part_range| {
301 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
302 range_meta.indices.len()
303 })
304 .sum();
305
306 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
307 if total_files > max_concurrent_files {
308 return TooManyFilesToReadSnafu {
309 actual: total_files,
310 max: max_concurrent_files,
311 }
312 .fail();
313 }
314
315 Ok(())
316 }
317}
318
319fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
320 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
321 .map(|_| {
322 let (sender, receiver) = mpsc::channel(1);
323 (Some(sender), Some(receiver))
324 })
325 .unzip();
326 (SenderList::new(senders), receivers)
327}
328
329impl RegionScanner for SeriesScan {
330 fn name(&self) -> &str {
331 "SeriesScan"
332 }
333
334 fn properties(&self) -> &ScannerProperties {
335 &self.properties
336 }
337
338 fn schema(&self) -> SchemaRef {
339 self.stream_ctx.input.mapper.output_schema()
340 }
341
342 fn metadata(&self) -> RegionMetadataRef {
343 self.stream_ctx.input.mapper.metadata().clone()
344 }
345
346 fn scan_partition(
347 &self,
348 ctx: &QueryScanContext,
349 metrics_set: &ExecutionPlanMetricsSet,
350 partition: usize,
351 ) -> Result<SendableRecordBatchStream, BoxedError> {
352 self.scan_partition_impl(ctx, metrics_set, partition)
353 .map_err(BoxedError::new)
354 }
355
356 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
357 self.properties.prepare(request);
358
359 self.check_scan_limit().map_err(BoxedError::new)?;
360
361 Ok(())
362 }
363
364 fn has_predicate_without_region(&self) -> bool {
365 let predicate = self
366 .stream_ctx
367 .input
368 .predicate_group()
369 .predicate_without_region();
370 predicate.is_some()
371 }
372
373 fn add_dyn_filter_to_predicate(
374 &mut self,
375 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
376 ) -> Vec<bool> {
377 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
378 }
379
380 fn set_logical_region(&mut self, logical_region: bool) {
381 self.properties.set_logical_region(logical_region);
382 }
383}
384
385impl DisplayAs for SeriesScan {
386 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
387 write!(
388 f,
389 "SeriesScan: region={}, ",
390 self.stream_ctx.input.mapper.metadata().region_id
391 )?;
392 match t {
393 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
394 self.stream_ctx.format_for_explain(false, f)
395 }
396 DisplayFormatType::Verbose => {
397 self.stream_ctx.format_for_explain(true, f)?;
398 self.metrics_list.format_verbose_metrics(f)
399 }
400 }
401 }
402}
403
404impl fmt::Debug for SeriesScan {
405 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406 f.debug_struct("SeriesScan")
407 .field("num_ranges", &self.stream_ctx.ranges.len())
408 .finish()
409 }
410}
411
412#[cfg(test)]
413impl SeriesScan {
414 pub(crate) fn input(&self) -> &ScanInput {
416 &self.stream_ctx.input
417 }
418}
419
420struct SeriesDistributor {
422 stream_ctx: Arc<StreamContext>,
424 range_semaphore: Option<Arc<Semaphore>>,
426 final_merge_semaphore: Option<Arc<Semaphore>>,
431 partitions: Vec<Vec<PartitionRange>>,
433 pruner: Arc<Pruner>,
435 senders: SenderList,
437 metrics_set: ExecutionPlanMetricsSet,
443 metrics_list: Arc<PartitionMetricsList>,
444 explain_verbose: bool,
446}
447
448impl SeriesDistributor {
449 #[tracing::instrument(
451 skip_all,
452 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
453 )]
454 async fn execute(&mut self) {
455 let result = self.scan_partitions_flat().await;
456
457 if let Err(e) = result {
458 self.senders.send_error(e).await;
459 }
460 }
461
462 #[tracing::instrument(
464 skip_all,
465 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
466 )]
467 async fn scan_partitions_flat(&mut self) -> Result<()> {
468 for partition_ranges in &self.partitions {
470 self.pruner.add_partition_ranges(partition_ranges);
471 }
472
473 let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
475 let partition_pruner = Arc::new(PartitionPruner::new(
476 self.pruner.clone(),
477 &all_partition_ranges,
478 ));
479
480 let part_metrics = new_partition_metrics(
481 &self.stream_ctx,
482 self.explain_verbose,
483 &self.metrics_set,
484 self.partitions.len(),
485 &self.metrics_list,
486 );
487 part_metrics.on_first_poll();
488 let mut fetch_start = Instant::now();
491
492 let build_start = Instant::now();
494 let mut tasks = Vec::new();
495 for partition in &self.partitions {
496 for part_range in partition {
497 let stream_ctx = self.stream_ctx.clone();
498 let part_range = *part_range;
499 let part_metrics = part_metrics.clone();
500 let partition_pruner = partition_pruner.clone();
501 let file_scan_semaphore = self.range_semaphore.clone();
502 let merge_semaphore = self.range_semaphore.clone();
503 tasks.push(common_runtime::spawn_global(async move {
504 SeqScan::build_flat_partition_range_read(
505 &stream_ctx,
506 &part_range,
507 false,
508 &part_metrics,
509 partition_pruner,
510 file_scan_semaphore,
511 merge_semaphore,
512 )
513 .await
514 }));
515 }
516 }
517 let mut range_streams = Vec::with_capacity(tasks.len());
518 let mut estimated_batch_sizes = Vec::with_capacity(tasks.len());
519 for task in tasks {
520 let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??;
521 range_streams.push(stream);
522 estimated_batch_sizes.push(estimated_batch_size);
523 }
524 let channel_size =
525 compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes));
526 common_telemetry::debug!(
527 "SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}",
528 range_streams.len(),
529 self.stream_ctx.input.region_metadata().region_id,
530 build_start.elapsed(),
531 channel_size,
532 );
533
534 let mut reader = SeqScan::build_flat_reader_from_sources(
538 &self.stream_ctx,
539 range_streams,
540 self.final_merge_semaphore.clone(),
541 Some(&part_metrics),
542 true,
543 channel_size,
544 )
545 .await?;
546 let mut metrics = SeriesDistributorMetrics::default();
547
548 let mut divider = FlatSeriesBatchDivider::default();
549 while let Some(record_batch) = reader.try_next().await? {
550 metrics.scan_cost += fetch_start.elapsed();
551 metrics.num_batches += 1;
552 metrics.num_rows += record_batch.num_rows();
553
554 debug_assert!(record_batch.num_rows() > 0);
555 if record_batch.num_rows() == 0 {
556 fetch_start = Instant::now();
557 continue;
558 }
559
560 let divider_start = Instant::now();
562 let series_batch = divider.push(record_batch);
563 metrics.divider_cost += divider_start.elapsed();
564 if let Some(series_batch) = series_batch {
565 let yield_start = Instant::now();
566 self.senders
567 .send_batch(SeriesBatch::Flat(series_batch))
568 .await?;
569 metrics.yield_cost += yield_start.elapsed();
570 }
571 fetch_start = Instant::now();
572 }
573
574 let divider_start = Instant::now();
576 let series_batch = divider.finish();
577 metrics.divider_cost += divider_start.elapsed();
578 if let Some(series_batch) = series_batch {
579 let yield_start = Instant::now();
580 self.senders
581 .send_batch(SeriesBatch::Flat(series_batch))
582 .await?;
583 metrics.yield_cost += yield_start.elapsed();
584 }
585
586 metrics.scan_cost += fetch_start.elapsed();
587 metrics.num_series_send_timeout = self.senders.num_timeout;
588 metrics.num_series_send_full = self.senders.num_full;
589 part_metrics.set_distributor_metrics(&metrics);
590
591 part_metrics.on_finish();
592
593 Ok(())
594 }
595}
596
597#[derive(Debug)]
599pub enum SeriesBatch {
600 Flat(FlatSeriesBatch),
601}
602
603impl SeriesBatch {
604 pub fn num_batches(&self) -> usize {
606 match self {
607 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
608 }
609 }
610
611 pub fn num_rows(&self) -> usize {
613 match self {
614 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
615 }
616 }
617}
618
619#[derive(Default, Debug)]
621pub struct FlatSeriesBatch {
622 pub batches: SmallVec<[RecordBatch; 4]>,
623}
624
625struct SenderList {
627 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
628 num_nones: usize,
630 sender_idx: usize,
632 num_timeout: usize,
634 num_full: usize,
636}
637
638impl SenderList {
639 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
640 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
641 Self {
642 senders,
643 num_nones,
644 sender_idx: 0,
645 num_timeout: 0,
646 num_full: 0,
647 }
648 }
649
650 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
653 for _ in 0..self.senders.len() {
654 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
655
656 let sender_idx = self.fetch_add_sender_idx();
657 let Some(sender) = &self.senders[sender_idx] else {
658 continue;
659 };
660
661 match sender.try_send(Ok(batch)) {
662 Ok(()) => return Ok(None),
663 Err(TrySendError::Full(res)) => {
664 self.num_full += 1;
665 batch = res.unwrap();
667 }
668 Err(TrySendError::Closed(res)) => {
669 self.senders[sender_idx] = None;
670 self.num_nones += 1;
671 batch = res.unwrap();
673 }
674 }
675 }
676
677 Ok(Some(batch))
678 }
679
680 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
682 match self.try_send_batch(batch)? {
684 Some(b) => {
685 batch = b;
687 }
688 None => {
689 return Ok(());
690 }
691 }
692
693 loop {
694 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
695
696 let sender_idx = self.fetch_add_sender_idx();
697 let Some(sender) = &self.senders[sender_idx] else {
698 continue;
699 };
700 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
705 Ok(()) => break,
706 Err(SendTimeoutError::Timeout(res)) => {
707 self.num_timeout += 1;
708 batch = res.unwrap();
710 }
711 Err(SendTimeoutError::Closed(res)) => {
712 self.senders[sender_idx] = None;
713 self.num_nones += 1;
714 batch = res.unwrap();
716 }
717 }
718 }
719
720 Ok(())
721 }
722
723 async fn send_error(&self, error: Error) {
724 let error = Arc::new(error);
725 for sender in self.senders.iter().flatten() {
726 let result = Err(error.clone()).context(ScanSeriesSnafu);
727 let _ = sender.send(result).await;
728 }
729 }
730
731 fn fetch_add_sender_idx(&mut self) -> usize {
732 let sender_idx = self.sender_idx;
733 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
734 sender_idx
735 }
736}
737
738fn new_partition_metrics(
739 stream_ctx: &StreamContext,
740 explain_verbose: bool,
741 metrics_set: &ExecutionPlanMetricsSet,
742 partition: usize,
743 metrics_list: &PartitionMetricsList,
744) -> PartitionMetrics {
745 let metrics = PartitionMetrics::new(
746 stream_ctx.input.mapper.metadata().region_id,
747 partition,
748 "SeriesScan",
749 stream_ctx.query_start,
750 explain_verbose,
751 metrics_set,
752 );
753
754 metrics_list.set(partition, metrics.clone());
755 metrics
756}
757
758#[derive(Default)]
763struct FlatSeriesBatchDivider {
764 buffer: FlatSeriesBatch,
765}
766
767impl FlatSeriesBatchDivider {
768 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
772 if self.buffer.batches.is_empty() {
774 self.buffer.batches.push(batch);
775 return None;
776 }
777
778 let pk_column_idx = primary_key_column_index(batch.num_columns());
780 let batch_pk_column = batch.column(pk_column_idx);
781 let batch_pk_array = batch_pk_column
782 .as_any()
783 .downcast_ref::<PrimaryKeyArray>()
784 .unwrap();
785 let batch_pk_values = batch_pk_array
786 .values()
787 .as_any()
788 .downcast_ref::<BinaryArray>()
789 .unwrap();
790 let batch_last_pk =
792 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
793 let buffer_last_batch = self.buffer.batches.last().unwrap();
796 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
797 let buffer_pk_array = buffer_pk_column
798 .as_any()
799 .downcast_ref::<PrimaryKeyArray>()
800 .unwrap();
801 let buffer_pk_values = buffer_pk_array
802 .values()
803 .as_any()
804 .downcast_ref::<BinaryArray>()
805 .unwrap();
806 let buffer_last_pk =
807 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
808
809 if batch_last_pk == buffer_last_pk {
811 self.buffer.batches.push(batch);
812 return None;
813 }
814 let batch_pk_keys = batch_pk_array.keys();
817 let pk_indices = batch_pk_keys.values();
818 let mut change_offset = 0;
819 for (i, &key) in pk_indices.iter().enumerate() {
820 let batch_pk = batch_pk_values.value(key as usize);
821
822 if buffer_last_pk != batch_pk {
823 change_offset = i;
824 break;
825 }
826 }
827
828 let (first_part, remaining_part) = if change_offset > 0 {
830 let first_part = batch.slice(0, change_offset);
831 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
832 (Some(first_part), Some(remaining_part))
833 } else {
834 (None, Some(batch))
835 };
836
837 let mut result = std::mem::take(&mut self.buffer);
839 if let Some(first_part) = first_part {
840 result.batches.push(first_part);
841 }
842
843 if let Some(remaining_part) = remaining_part {
845 self.buffer.batches.push(remaining_part);
846 }
847
848 Some(result)
849 }
850
851 fn finish(&mut self) -> Option<FlatSeriesBatch> {
853 if self.buffer.batches.is_empty() {
854 None
855 } else {
856 Some(std::mem::take(&mut self.buffer))
857 }
858 }
859}
860
861fn primary_key_at<'a>(
863 primary_key: &PrimaryKeyArray,
864 primary_key_values: &'a BinaryArray,
865 index: usize,
866) -> &'a [u8] {
867 let key = primary_key.keys().value(index);
868 primary_key_values.value(key as usize)
869}
870
871#[cfg(test)]
872mod tests {
873 use std::sync::Arc;
874
875 use api::v1::OpType;
876 use datatypes::arrow::array::{
877 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
878 TimestampMillisecondArray, UInt8Array, UInt64Array,
879 };
880 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
881 use datatypes::arrow::record_batch::RecordBatch;
882
883 use super::*;
884
885 fn new_test_record_batch(
886 primary_keys: &[&[u8]],
887 timestamps: &[i64],
888 sequences: &[u64],
889 op_types: &[OpType],
890 fields: &[u64],
891 ) -> RecordBatch {
892 let num_rows = timestamps.len();
893 debug_assert_eq!(sequences.len(), num_rows);
894 debug_assert_eq!(op_types.len(), num_rows);
895 debug_assert_eq!(fields.len(), num_rows);
896 debug_assert_eq!(primary_keys.len(), num_rows);
897
898 let columns: Vec<ArrayRef> = vec![
899 build_test_pk_string_dict_array(primary_keys),
900 Arc::new(Int64Array::from_iter(
901 fields.iter().map(|v| Some(*v as i64)),
902 )),
903 Arc::new(TimestampMillisecondArray::from_iter_values(
904 timestamps.iter().copied(),
905 )),
906 build_test_pk_array(primary_keys),
907 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
908 Arc::new(UInt8Array::from_iter_values(
909 op_types.iter().map(|v| *v as u8),
910 )),
911 ];
912
913 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
914 }
915
916 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
917 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
918 for &pk in primary_keys {
919 let pk_str = std::str::from_utf8(pk).unwrap();
920 builder.append(pk_str).unwrap();
921 }
922 Arc::new(builder.finish())
923 }
924
925 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
926 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
927 for &pk in primary_keys {
928 builder.append(pk).unwrap();
929 }
930 Arc::new(builder.finish())
931 }
932
933 fn build_test_flat_schema() -> SchemaRef {
934 let fields = vec![
935 Field::new(
936 "k0",
937 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
938 false,
939 ),
940 Field::new("field0", DataType::Int64, true),
941 Field::new(
942 "ts",
943 DataType::Timestamp(TimeUnit::Millisecond, None),
944 false,
945 ),
946 Field::new(
947 "__primary_key",
948 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
949 false,
950 ),
951 Field::new("__sequence", DataType::UInt64, false),
952 Field::new("__op_type", DataType::UInt8, false),
953 ];
954 Arc::new(Schema::new(fields))
955 }
956
957 #[test]
958 fn test_empty_buffer_first_push() {
959 let mut divider = FlatSeriesBatchDivider::default();
960 let result = divider.finish();
961 assert!(result.is_none());
962
963 let mut divider = FlatSeriesBatchDivider::default();
964 let batch = new_test_record_batch(
965 &[b"series1", b"series1"],
966 &[1000, 2000],
967 &[1, 2],
968 &[OpType::Put, OpType::Put],
969 &[10, 20],
970 );
971 let result = divider.push(batch);
972 assert!(result.is_none());
973 assert_eq!(divider.buffer.batches.len(), 1);
974 }
975
976 #[test]
977 fn test_same_series_accumulation() {
978 let mut divider = FlatSeriesBatchDivider::default();
979
980 let batch1 = new_test_record_batch(
981 &[b"series1", b"series1"],
982 &[1000, 2000],
983 &[1, 2],
984 &[OpType::Put, OpType::Put],
985 &[10, 20],
986 );
987
988 let batch2 = new_test_record_batch(
989 &[b"series1", b"series1"],
990 &[3000, 4000],
991 &[3, 4],
992 &[OpType::Put, OpType::Put],
993 &[30, 40],
994 );
995
996 divider.push(batch1);
997 let result = divider.push(batch2);
998 assert!(result.is_none());
999 let series_batch = divider.finish().unwrap();
1000 assert_eq!(series_batch.batches.len(), 2);
1001 }
1002
1003 #[test]
1004 fn test_series_boundary_detection() {
1005 let mut divider = FlatSeriesBatchDivider::default();
1006
1007 let batch1 = new_test_record_batch(
1008 &[b"series1", b"series1"],
1009 &[1000, 2000],
1010 &[1, 2],
1011 &[OpType::Put, OpType::Put],
1012 &[10, 20],
1013 );
1014
1015 let batch2 = new_test_record_batch(
1016 &[b"series2", b"series2"],
1017 &[3000, 4000],
1018 &[3, 4],
1019 &[OpType::Put, OpType::Put],
1020 &[30, 40],
1021 );
1022
1023 divider.push(batch1);
1024 let series_batch = divider.push(batch2).unwrap();
1025 assert_eq!(series_batch.batches.len(), 1);
1026
1027 assert_eq!(divider.buffer.batches.len(), 1);
1028 }
1029
1030 #[test]
1031 fn test_series_boundary_within_batch() {
1032 let mut divider = FlatSeriesBatchDivider::default();
1033
1034 let batch1 = new_test_record_batch(
1035 &[b"series1", b"series1"],
1036 &[1000, 2000],
1037 &[1, 2],
1038 &[OpType::Put, OpType::Put],
1039 &[10, 20],
1040 );
1041
1042 let batch2 = new_test_record_batch(
1043 &[b"series1", b"series2"],
1044 &[3000, 4000],
1045 &[3, 4],
1046 &[OpType::Put, OpType::Put],
1047 &[30, 40],
1048 );
1049
1050 divider.push(batch1);
1051 let series_batch = divider.push(batch2).unwrap();
1052 assert_eq!(series_batch.batches.len(), 2);
1053 assert_eq!(series_batch.batches[0].num_rows(), 2);
1054 assert_eq!(series_batch.batches[1].num_rows(), 1);
1055
1056 assert_eq!(divider.buffer.batches.len(), 1);
1057 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1058 }
1059
1060 #[test]
1061 fn test_series_splitting() {
1062 let mut divider = FlatSeriesBatchDivider::default();
1063
1064 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1065
1066 let batch2 = new_test_record_batch(
1067 &[b"series1", b"series2", b"series2", b"series3"],
1068 &[2000, 3000, 4000, 5000],
1069 &[2, 3, 4, 5],
1070 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1071 &[20, 30, 40, 50],
1072 );
1073
1074 divider.push(batch1);
1075 let series_batch = divider.push(batch2).unwrap();
1076 assert_eq!(series_batch.batches.len(), 2);
1077
1078 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1079 assert_eq!(total_rows, 2);
1080
1081 let final_batch = divider.finish().unwrap();
1082 assert_eq!(final_batch.batches.len(), 1);
1083 assert_eq!(final_batch.batches[0].num_rows(), 3);
1084 }
1085}