1use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing::{self, Instrument};
26use common_telemetry::warn;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use datatypes::schema::SchemaRef;
32use futures::{StreamExt, TryStreamExt};
33use smallvec::SmallVec;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
38};
39use tokio::sync::Semaphore;
40use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
41use tokio::sync::mpsc::{self, Receiver, Sender};
42
43use crate::error::{
44 Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
45 ScanSeriesSnafu, TooManyFilesToReadSnafu,
46};
47use crate::read::ScannerMetrics;
48use crate::read::pruner::{PartitionPruner, Pruner};
49use crate::read::scan_region::{ScanInput, StreamContext};
50use crate::read::scan_util::{
51 PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
52 compute_parallel_channel_size,
53};
54use crate::read::seq_scan::SeqScan;
55use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
56use crate::sst::parquet::flat_format::primary_key_column_index;
57use crate::sst::parquet::format::PrimaryKeyArray;
58
59const SEND_TIMEOUT: Duration = Duration::from_micros(100);
61
62type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
64
65pub struct SeriesScan {
71 properties: ScannerProperties,
73 stream_ctx: Arc<StreamContext>,
75 pruner: Arc<Pruner>,
77 receivers: Mutex<ReceiverList>,
79 metrics_list: Arc<PartitionMetricsList>,
82}
83
84impl SeriesScan {
85 pub(crate) fn new(input: ScanInput) -> Self {
87 let mut properties = ScannerProperties::default()
88 .with_append_mode(input.append_mode)
89 .with_total_rows(input.total_rows());
90 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
91 properties.partitions = vec![stream_ctx.partition_ranges()];
92
93 let num_workers = common_stat::get_total_cpu_cores().max(1);
95 let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
96
97 Self {
98 properties,
99 stream_ctx,
100 pruner,
101 receivers: Mutex::new(Vec::new()),
102 metrics_list: Arc::new(PartitionMetricsList::default()),
103 }
104 }
105
106 #[tracing::instrument(
107 skip_all,
108 fields(
109 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
110 partition = partition
111 )
112 )]
113 fn scan_partition_impl(
114 &self,
115 ctx: &QueryScanContext,
116 metrics_set: &ExecutionPlanMetricsSet,
117 partition: usize,
118 ) -> Result<SendableRecordBatchStream> {
119 let metrics = new_partition_metrics(
120 &self.stream_ctx,
121 ctx.explain_verbose,
122 metrics_set,
123 partition,
124 &self.metrics_list,
125 );
126
127 let batch_stream =
128 self.scan_batch_in_partition(ctx, partition, metrics.clone(), metrics_set)?;
129
130 let input = &self.stream_ctx.input;
131 let record_batch_stream = ConvertBatchStream::new(
132 batch_stream,
133 input.mapper.clone(),
134 input.cache_strategy.clone(),
135 metrics,
136 );
137
138 Ok(Box::pin(RecordBatchStreamWrapper::new(
139 input.mapper.output_schema(),
140 Box::pin(record_batch_stream),
141 )))
142 }
143
144 #[tracing::instrument(
145 skip_all,
146 fields(
147 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
148 partition = partition
149 )
150 )]
151 fn scan_batch_in_partition(
152 &self,
153 ctx: &QueryScanContext,
154 partition: usize,
155 part_metrics: PartitionMetrics,
156 metrics_set: &ExecutionPlanMetricsSet,
157 ) -> Result<ScanBatchStream> {
158 if ctx.explain_verbose {
159 common_telemetry::info!(
160 "SeriesScan partition {}, region_id: {}",
161 partition,
162 self.stream_ctx.input.region_metadata().region_id
163 );
164 }
165
166 ensure!(
167 partition < self.properties.num_partitions(),
168 PartitionOutOfRangeSnafu {
169 given: partition,
170 all: self.properties.num_partitions(),
171 }
172 );
173
174 self.maybe_start_distributor(metrics_set, &self.metrics_list, ctx.explain_verbose);
175
176 let mut receiver = self.take_receiver(partition)?;
177 let stream = try_stream! {
178 part_metrics.on_first_poll();
179
180 let mut fetch_start = Instant::now();
181 while let Some(series) = receiver.recv().await {
182 let series = series?;
183
184 let mut metrics = ScannerMetrics::default();
185 metrics.scan_cost += fetch_start.elapsed();
186 fetch_start = Instant::now();
187
188 metrics.num_batches += series.num_batches();
189 metrics.num_rows += series.num_rows();
190
191 let yield_start = Instant::now();
192 yield ScanBatch::Series(series);
193 metrics.yield_cost += yield_start.elapsed();
194
195 part_metrics.merge_metrics(&metrics);
196 }
197
198 part_metrics.on_finish();
199 };
200 Ok(Box::pin(stream))
201 }
202
203 fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
205 let mut rx_list = self.receivers.lock().unwrap();
206 rx_list[partition]
207 .take()
208 .context(ScanMultiTimesSnafu { partition })
209 }
210
211 #[tracing::instrument(
213 skip(self, metrics_set, metrics_list),
214 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
215 )]
216 fn maybe_start_distributor(
217 &self,
218 metrics_set: &ExecutionPlanMetricsSet,
219 metrics_list: &Arc<PartitionMetricsList>,
220 explain_verbose: bool,
221 ) {
222 let mut rx_list = self.receivers.lock().unwrap();
223 if !rx_list.is_empty() {
224 return;
225 }
226
227 let (senders, receivers) = new_channel_list(self.properties.num_partitions());
228 let mut distributor = SeriesDistributor {
229 stream_ctx: self.stream_ctx.clone(),
230 range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
231 final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
232 partitions: self.properties.partitions.clone(),
233 pruner: self.pruner.clone(),
234 senders,
235 metrics_set: metrics_set.clone(),
236 metrics_list: metrics_list.clone(),
237 explain_verbose,
238 };
239 let region_id = distributor.stream_ctx.input.mapper.metadata().region_id;
240 let span = tracing::info_span!("SeriesScan::distributor", region_id = %region_id);
241 common_runtime::spawn_global(
242 async move {
243 distributor.execute().await;
244 }
245 .instrument(span),
246 );
247
248 *rx_list = receivers;
249 }
250
251 #[tracing::instrument(
253 skip_all,
254 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
255 )]
256 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
257 let part_num = self.properties.num_partitions();
258 let metrics_set = ExecutionPlanMetricsSet::default();
259 let streams = (0..part_num)
260 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
261 .collect::<Result<Vec<_>, BoxedError>>()?;
262 let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
263 Ok(Box::pin(chained_stream))
264 }
265
266 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
268 let metrics_set = ExecutionPlanMetricsSet::new();
269
270 let streams = (0..self.properties.partitions.len())
271 .map(|partition| {
272 let metrics = new_partition_metrics(
273 &self.stream_ctx,
274 false,
275 &metrics_set,
276 partition,
277 &self.metrics_list,
278 );
279
280 self.scan_batch_in_partition(
281 &QueryScanContext::default(),
282 partition,
283 metrics,
284 &metrics_set,
285 )
286 })
287 .collect::<Result<Vec<_>>>()?;
288
289 Ok(Box::pin(futures::stream::iter(streams).flatten()))
290 }
291
292 pub(crate) fn check_scan_limit(&self) -> Result<()> {
294 let total_files: usize = self
296 .properties
297 .partitions
298 .iter()
299 .flat_map(|partition| partition.iter())
300 .map(|part_range| {
301 let range_meta = &self.stream_ctx.ranges[part_range.identifier];
302 range_meta.indices.len()
303 })
304 .sum();
305
306 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
307 if total_files > max_concurrent_files {
308 return TooManyFilesToReadSnafu {
309 actual: total_files,
310 max: max_concurrent_files,
311 }
312 .fail();
313 }
314
315 Ok(())
316 }
317}
318
319fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
320 let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
321 .map(|_| {
322 let (sender, receiver) = mpsc::channel(1);
323 (Some(sender), Some(receiver))
324 })
325 .unzip();
326 (SenderList::new(senders), receivers)
327}
328
329impl RegionScanner for SeriesScan {
330 fn name(&self) -> &str {
331 "SeriesScan"
332 }
333
334 fn properties(&self) -> &ScannerProperties {
335 &self.properties
336 }
337
338 fn schema(&self) -> SchemaRef {
339 self.stream_ctx.input.mapper.output_schema()
340 }
341
342 fn metadata(&self) -> RegionMetadataRef {
343 self.stream_ctx.input.mapper.metadata().clone()
344 }
345
346 fn scan_partition(
347 &self,
348 ctx: &QueryScanContext,
349 metrics_set: &ExecutionPlanMetricsSet,
350 partition: usize,
351 ) -> Result<SendableRecordBatchStream, BoxedError> {
352 self.scan_partition_impl(ctx, metrics_set, partition)
353 .map_err(BoxedError::new)
354 }
355
356 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
357 self.properties.prepare(request);
358
359 self.check_scan_limit().map_err(BoxedError::new)?;
360
361 Ok(())
362 }
363
364 fn has_predicate_without_region(&self) -> bool {
365 let predicate = self
366 .stream_ctx
367 .input
368 .predicate_group()
369 .predicate_without_region();
370 predicate.is_some()
371 }
372
373 fn add_dyn_filter_to_predicate(
374 &mut self,
375 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
376 ) -> Vec<bool> {
377 self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
378 }
379
380 fn set_logical_region(&mut self, logical_region: bool) {
381 self.properties.set_logical_region(logical_region);
382 }
383
384 fn snapshot_sequence(&self) -> Option<u64> {
385 self.stream_ctx.input.snapshot_sequence
386 }
387}
388
389impl DisplayAs for SeriesScan {
390 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
391 write!(
392 f,
393 "SeriesScan: region={}, ",
394 self.stream_ctx.input.mapper.metadata().region_id
395 )?;
396 match t {
397 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
398 self.stream_ctx.format_for_explain(false, f)
399 }
400 DisplayFormatType::Verbose => {
401 self.stream_ctx.format_for_explain(true, f)?;
402 self.metrics_list.format_verbose_metrics(f)
403 }
404 }
405 }
406}
407
408impl fmt::Debug for SeriesScan {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.debug_struct("SeriesScan")
411 .field("num_ranges", &self.stream_ctx.ranges.len())
412 .finish()
413 }
414}
415
416#[cfg(test)]
417impl SeriesScan {
418 pub(crate) fn input(&self) -> &ScanInput {
420 &self.stream_ctx.input
421 }
422}
423
424struct SeriesDistributor {
426 stream_ctx: Arc<StreamContext>,
428 range_semaphore: Option<Arc<Semaphore>>,
430 final_merge_semaphore: Option<Arc<Semaphore>>,
435 partitions: Vec<Vec<PartitionRange>>,
437 pruner: Arc<Pruner>,
439 senders: SenderList,
441 metrics_set: ExecutionPlanMetricsSet,
447 metrics_list: Arc<PartitionMetricsList>,
448 explain_verbose: bool,
450}
451
452impl SeriesDistributor {
453 #[tracing::instrument(
455 skip_all,
456 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
457 )]
458 async fn execute(&mut self) {
459 let result = self.scan_partitions_flat().await;
460
461 if let Err(e) = result {
462 self.senders.send_error(e).await;
463 }
464 }
465
466 #[tracing::instrument(
468 skip_all,
469 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
470 )]
471 async fn scan_partitions_flat(&mut self) -> Result<()> {
472 for partition_ranges in &self.partitions {
474 self.pruner.add_partition_ranges(partition_ranges);
475 }
476
477 let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect();
479 let partition_pruner = Arc::new(PartitionPruner::new(
480 self.pruner.clone(),
481 &all_partition_ranges,
482 ));
483
484 let part_metrics = new_partition_metrics(
485 &self.stream_ctx,
486 self.explain_verbose,
487 &self.metrics_set,
488 self.partitions.len(),
489 &self.metrics_list,
490 );
491 part_metrics.on_first_poll();
492 let mut fetch_start = Instant::now();
495
496 let build_start = Instant::now();
498 let mut tasks = Vec::new();
499 for partition in &self.partitions {
500 for part_range in partition {
501 let stream_ctx = self.stream_ctx.clone();
502 let part_range = *part_range;
503 let part_metrics = part_metrics.clone();
504 let partition_pruner = partition_pruner.clone();
505 let file_scan_semaphore = self.range_semaphore.clone();
506 let merge_semaphore = self.range_semaphore.clone();
507 tasks.push(common_runtime::spawn_global(async move {
508 SeqScan::build_flat_partition_range_read(
509 &stream_ctx,
510 &part_range,
511 false,
512 &part_metrics,
513 partition_pruner,
514 file_scan_semaphore,
515 merge_semaphore,
516 )
517 .await
518 }));
519 }
520 }
521 let mut range_streams = Vec::with_capacity(tasks.len());
522 let mut estimated_batch_sizes = Vec::with_capacity(tasks.len());
523 for task in tasks {
524 let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??;
525 range_streams.push(stream);
526 estimated_batch_sizes.push(estimated_batch_size);
527 }
528 let channel_size =
529 compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes));
530 common_telemetry::debug!(
531 "SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}",
532 range_streams.len(),
533 self.stream_ctx.input.region_metadata().region_id,
534 build_start.elapsed(),
535 channel_size,
536 );
537
538 let mut reader = SeqScan::build_flat_reader_from_sources(
542 &self.stream_ctx,
543 range_streams,
544 self.final_merge_semaphore.clone(),
545 Some(&part_metrics),
546 true,
547 channel_size,
548 )
549 .await?;
550 let mut metrics = SeriesDistributorMetrics::default();
551
552 let mut divider = FlatSeriesBatchDivider::default();
553 while let Some(record_batch) = reader.try_next().await? {
554 metrics.scan_cost += fetch_start.elapsed();
555 metrics.num_batches += 1;
556 metrics.num_rows += record_batch.num_rows();
557
558 debug_assert!(record_batch.num_rows() > 0);
559 if record_batch.num_rows() == 0 {
560 fetch_start = Instant::now();
561 continue;
562 }
563
564 let divider_start = Instant::now();
566 let series_batch = divider.push(record_batch);
567 metrics.divider_cost += divider_start.elapsed();
568 if let Some(series_batch) = series_batch {
569 let yield_start = Instant::now();
570 self.senders
571 .send_batch(SeriesBatch::Flat(series_batch))
572 .await?;
573 metrics.yield_cost += yield_start.elapsed();
574 }
575 fetch_start = Instant::now();
576 }
577
578 let divider_start = Instant::now();
580 let series_batch = divider.finish();
581 metrics.divider_cost += divider_start.elapsed();
582 if let Some(series_batch) = series_batch {
583 let yield_start = Instant::now();
584 self.senders
585 .send_batch(SeriesBatch::Flat(series_batch))
586 .await?;
587 metrics.yield_cost += yield_start.elapsed();
588 }
589
590 metrics.scan_cost += fetch_start.elapsed();
591 metrics.num_series_send_timeout = self.senders.num_timeout;
592 metrics.num_series_send_full = self.senders.num_full;
593 part_metrics.set_distributor_metrics(&metrics);
594
595 part_metrics.on_finish();
596
597 Ok(())
598 }
599}
600
601#[derive(Debug)]
603pub enum SeriesBatch {
604 Flat(FlatSeriesBatch),
605}
606
607impl SeriesBatch {
608 pub fn num_batches(&self) -> usize {
610 match self {
611 SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(),
612 }
613 }
614
615 pub fn num_rows(&self) -> usize {
617 match self {
618 SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(),
619 }
620 }
621}
622
623#[derive(Default, Debug)]
625pub struct FlatSeriesBatch {
626 pub batches: SmallVec<[RecordBatch; 4]>,
627}
628
629struct SenderList {
631 senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
632 num_nones: usize,
634 sender_idx: usize,
636 num_timeout: usize,
638 num_full: usize,
640}
641
642impl SenderList {
643 fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
644 let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
645 Self {
646 senders,
647 num_nones,
648 sender_idx: 0,
649 num_timeout: 0,
650 num_full: 0,
651 }
652 }
653
654 fn try_send_batch(&mut self, mut batch: SeriesBatch) -> Result<Option<SeriesBatch>> {
657 for _ in 0..self.senders.len() {
658 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
659
660 let sender_idx = self.fetch_add_sender_idx();
661 let Some(sender) = &self.senders[sender_idx] else {
662 continue;
663 };
664
665 match sender.try_send(Ok(batch)) {
666 Ok(()) => return Ok(None),
667 Err(TrySendError::Full(res)) => {
668 self.num_full += 1;
669 batch = res.unwrap();
671 }
672 Err(TrySendError::Closed(res)) => {
673 self.senders[sender_idx] = None;
674 self.num_nones += 1;
675 batch = res.unwrap();
677 }
678 }
679 }
680
681 Ok(Some(batch))
682 }
683
684 async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
686 match self.try_send_batch(batch)? {
688 Some(b) => {
689 batch = b;
691 }
692 None => {
693 return Ok(());
694 }
695 }
696
697 loop {
698 ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
699
700 let sender_idx = self.fetch_add_sender_idx();
701 let Some(sender) = &self.senders[sender_idx] else {
702 continue;
703 };
704 match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
709 Ok(()) => break,
710 Err(SendTimeoutError::Timeout(res)) => {
711 self.num_timeout += 1;
712 batch = res.unwrap();
714 }
715 Err(SendTimeoutError::Closed(res)) => {
716 self.senders[sender_idx] = None;
717 self.num_nones += 1;
718 batch = res.unwrap();
720 }
721 }
722 }
723
724 Ok(())
725 }
726
727 async fn send_error(&self, error: Error) {
728 let error = Arc::new(error);
729 for sender in self.senders.iter().flatten() {
730 let result = Err(error.clone()).context(ScanSeriesSnafu);
731 let _ = sender.send(result).await;
732 }
733 }
734
735 fn fetch_add_sender_idx(&mut self) -> usize {
736 let sender_idx = self.sender_idx;
737 self.sender_idx = (self.sender_idx + 1) % self.senders.len();
738 sender_idx
739 }
740}
741
742fn new_partition_metrics(
743 stream_ctx: &StreamContext,
744 explain_verbose: bool,
745 metrics_set: &ExecutionPlanMetricsSet,
746 partition: usize,
747 metrics_list: &PartitionMetricsList,
748) -> PartitionMetrics {
749 let metrics = PartitionMetrics::new(
750 stream_ctx.input.mapper.metadata().region_id,
751 partition,
752 "SeriesScan",
753 stream_ctx.query_start,
754 explain_verbose,
755 metrics_set,
756 );
757
758 metrics_list.set(partition, metrics.clone());
759 metrics
760}
761
762#[derive(Default)]
767struct FlatSeriesBatchDivider {
768 buffer: FlatSeriesBatch,
769}
770
771impl FlatSeriesBatchDivider {
772 fn push(&mut self, batch: RecordBatch) -> Option<FlatSeriesBatch> {
776 if self.buffer.batches.is_empty() {
778 self.buffer.batches.push(batch);
779 return None;
780 }
781
782 let pk_column_idx = primary_key_column_index(batch.num_columns());
784 let batch_pk_column = batch.column(pk_column_idx);
785 let batch_pk_array = batch_pk_column
786 .as_any()
787 .downcast_ref::<PrimaryKeyArray>()
788 .unwrap();
789 let batch_pk_values = batch_pk_array
790 .values()
791 .as_any()
792 .downcast_ref::<BinaryArray>()
793 .unwrap();
794 let batch_last_pk =
796 primary_key_at(batch_pk_array, batch_pk_values, batch_pk_array.len() - 1);
797 let buffer_last_batch = self.buffer.batches.last().unwrap();
800 let buffer_pk_column = buffer_last_batch.column(pk_column_idx);
801 let buffer_pk_array = buffer_pk_column
802 .as_any()
803 .downcast_ref::<PrimaryKeyArray>()
804 .unwrap();
805 let buffer_pk_values = buffer_pk_array
806 .values()
807 .as_any()
808 .downcast_ref::<BinaryArray>()
809 .unwrap();
810 let buffer_last_pk =
811 primary_key_at(buffer_pk_array, buffer_pk_values, buffer_pk_array.len() - 1);
812
813 if batch_last_pk == buffer_last_pk {
815 self.buffer.batches.push(batch);
816 return None;
817 }
818 let batch_pk_keys = batch_pk_array.keys();
821 let pk_indices = batch_pk_keys.values();
822 let mut change_offset = 0;
823 for (i, &key) in pk_indices.iter().enumerate() {
824 let batch_pk = batch_pk_values.value(key as usize);
825
826 if buffer_last_pk != batch_pk {
827 change_offset = i;
828 break;
829 }
830 }
831
832 let (first_part, remaining_part) = if change_offset > 0 {
834 let first_part = batch.slice(0, change_offset);
835 let remaining_part = batch.slice(change_offset, batch.num_rows() - change_offset);
836 (Some(first_part), Some(remaining_part))
837 } else {
838 (None, Some(batch))
839 };
840
841 let mut result = std::mem::take(&mut self.buffer);
843 if let Some(first_part) = first_part {
844 result.batches.push(first_part);
845 }
846
847 if let Some(remaining_part) = remaining_part {
849 self.buffer.batches.push(remaining_part);
850 }
851
852 Some(result)
853 }
854
855 fn finish(&mut self) -> Option<FlatSeriesBatch> {
857 if self.buffer.batches.is_empty() {
858 None
859 } else {
860 Some(std::mem::take(&mut self.buffer))
861 }
862 }
863}
864
865fn primary_key_at<'a>(
867 primary_key: &PrimaryKeyArray,
868 primary_key_values: &'a BinaryArray,
869 index: usize,
870) -> &'a [u8] {
871 let key = primary_key.keys().value(index);
872 primary_key_values.value(key as usize)
873}
874
875#[cfg(test)]
876mod tests {
877 use std::sync::Arc;
878
879 use api::v1::OpType;
880 use datatypes::arrow::array::{
881 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
882 TimestampMillisecondArray, UInt8Array, UInt64Array,
883 };
884 use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit, UInt32Type};
885 use datatypes::arrow::record_batch::RecordBatch;
886
887 use super::*;
888
889 fn new_test_record_batch(
890 primary_keys: &[&[u8]],
891 timestamps: &[i64],
892 sequences: &[u64],
893 op_types: &[OpType],
894 fields: &[u64],
895 ) -> RecordBatch {
896 let num_rows = timestamps.len();
897 debug_assert_eq!(sequences.len(), num_rows);
898 debug_assert_eq!(op_types.len(), num_rows);
899 debug_assert_eq!(fields.len(), num_rows);
900 debug_assert_eq!(primary_keys.len(), num_rows);
901
902 let columns: Vec<ArrayRef> = vec![
903 build_test_pk_string_dict_array(primary_keys),
904 Arc::new(Int64Array::from_iter(
905 fields.iter().map(|v| Some(*v as i64)),
906 )),
907 Arc::new(TimestampMillisecondArray::from_iter_values(
908 timestamps.iter().copied(),
909 )),
910 build_test_pk_array(primary_keys),
911 Arc::new(UInt64Array::from_iter_values(sequences.iter().copied())),
912 Arc::new(UInt8Array::from_iter_values(
913 op_types.iter().map(|v| *v as u8),
914 )),
915 ];
916
917 RecordBatch::try_new(build_test_flat_schema(), columns).unwrap()
918 }
919
920 fn build_test_pk_string_dict_array(primary_keys: &[&[u8]]) -> ArrayRef {
921 let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
922 for &pk in primary_keys {
923 let pk_str = std::str::from_utf8(pk).unwrap();
924 builder.append(pk_str).unwrap();
925 }
926 Arc::new(builder.finish())
927 }
928
929 fn build_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
930 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
931 for &pk in primary_keys {
932 builder.append(pk).unwrap();
933 }
934 Arc::new(builder.finish())
935 }
936
937 fn build_test_flat_schema() -> SchemaRef {
938 let fields = vec![
939 Field::new(
940 "k0",
941 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
942 false,
943 ),
944 Field::new("field0", DataType::Int64, true),
945 Field::new(
946 "ts",
947 DataType::Timestamp(TimeUnit::Millisecond, None),
948 false,
949 ),
950 Field::new(
951 "__primary_key",
952 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
953 false,
954 ),
955 Field::new("__sequence", DataType::UInt64, false),
956 Field::new("__op_type", DataType::UInt8, false),
957 ];
958 Arc::new(Schema::new(fields))
959 }
960
961 #[test]
962 fn test_empty_buffer_first_push() {
963 let mut divider = FlatSeriesBatchDivider::default();
964 let result = divider.finish();
965 assert!(result.is_none());
966
967 let mut divider = FlatSeriesBatchDivider::default();
968 let batch = new_test_record_batch(
969 &[b"series1", b"series1"],
970 &[1000, 2000],
971 &[1, 2],
972 &[OpType::Put, OpType::Put],
973 &[10, 20],
974 );
975 let result = divider.push(batch);
976 assert!(result.is_none());
977 assert_eq!(divider.buffer.batches.len(), 1);
978 }
979
980 #[test]
981 fn test_same_series_accumulation() {
982 let mut divider = FlatSeriesBatchDivider::default();
983
984 let batch1 = new_test_record_batch(
985 &[b"series1", b"series1"],
986 &[1000, 2000],
987 &[1, 2],
988 &[OpType::Put, OpType::Put],
989 &[10, 20],
990 );
991
992 let batch2 = new_test_record_batch(
993 &[b"series1", b"series1"],
994 &[3000, 4000],
995 &[3, 4],
996 &[OpType::Put, OpType::Put],
997 &[30, 40],
998 );
999
1000 divider.push(batch1);
1001 let result = divider.push(batch2);
1002 assert!(result.is_none());
1003 let series_batch = divider.finish().unwrap();
1004 assert_eq!(series_batch.batches.len(), 2);
1005 }
1006
1007 #[test]
1008 fn test_series_boundary_detection() {
1009 let mut divider = FlatSeriesBatchDivider::default();
1010
1011 let batch1 = new_test_record_batch(
1012 &[b"series1", b"series1"],
1013 &[1000, 2000],
1014 &[1, 2],
1015 &[OpType::Put, OpType::Put],
1016 &[10, 20],
1017 );
1018
1019 let batch2 = new_test_record_batch(
1020 &[b"series2", b"series2"],
1021 &[3000, 4000],
1022 &[3, 4],
1023 &[OpType::Put, OpType::Put],
1024 &[30, 40],
1025 );
1026
1027 divider.push(batch1);
1028 let series_batch = divider.push(batch2).unwrap();
1029 assert_eq!(series_batch.batches.len(), 1);
1030
1031 assert_eq!(divider.buffer.batches.len(), 1);
1032 }
1033
1034 #[test]
1035 fn test_series_boundary_within_batch() {
1036 let mut divider = FlatSeriesBatchDivider::default();
1037
1038 let batch1 = new_test_record_batch(
1039 &[b"series1", b"series1"],
1040 &[1000, 2000],
1041 &[1, 2],
1042 &[OpType::Put, OpType::Put],
1043 &[10, 20],
1044 );
1045
1046 let batch2 = new_test_record_batch(
1047 &[b"series1", b"series2"],
1048 &[3000, 4000],
1049 &[3, 4],
1050 &[OpType::Put, OpType::Put],
1051 &[30, 40],
1052 );
1053
1054 divider.push(batch1);
1055 let series_batch = divider.push(batch2).unwrap();
1056 assert_eq!(series_batch.batches.len(), 2);
1057 assert_eq!(series_batch.batches[0].num_rows(), 2);
1058 assert_eq!(series_batch.batches[1].num_rows(), 1);
1059
1060 assert_eq!(divider.buffer.batches.len(), 1);
1061 assert_eq!(divider.buffer.batches[0].num_rows(), 1);
1062 }
1063
1064 #[test]
1065 fn test_series_splitting() {
1066 let mut divider = FlatSeriesBatchDivider::default();
1067
1068 let batch1 = new_test_record_batch(&[b"series1"], &[1000], &[1], &[OpType::Put], &[10]);
1069
1070 let batch2 = new_test_record_batch(
1071 &[b"series1", b"series2", b"series2", b"series3"],
1072 &[2000, 3000, 4000, 5000],
1073 &[2, 3, 4, 5],
1074 &[OpType::Put, OpType::Put, OpType::Put, OpType::Put],
1075 &[20, 30, 40, 50],
1076 );
1077
1078 divider.push(batch1);
1079 let series_batch = divider.push(batch2).unwrap();
1080 assert_eq!(series_batch.batches.len(), 2);
1081
1082 let total_rows: usize = series_batch.batches.iter().map(|b| b.num_rows()).sum();
1083 assert_eq!(total_rows, 2);
1084
1085 let final_batch = divider.finish().unwrap();
1086 assert_eq!(final_batch.batches.len(), 1);
1087 assert_eq!(final_batch.batches[0].num_rows(), 3);
1088 }
1089}