1use std::cmp::{Ordering, Reverse};
16use std::collections::BinaryHeap;
17use std::fmt::Debug;
18use std::ops::Range;
19
20use crate::error::Result;
21use crate::memtable::partition_tree::data::{DataBatch, DataBufferReader, DataPartReader};
22use crate::memtable::partition_tree::PkIndex;
23
24pub trait Node: Ord {
26 fn is_valid(&self) -> bool;
28
29 fn is_behind(&self, other: &Self) -> bool;
31
32 fn advance(&mut self, len: usize) -> Result<()>;
38
39 fn current_item_len(&self) -> usize;
41
42 fn search_key_in_current_item(&self, other: &Self) -> Result<usize, usize>;
44}
45
46pub struct Merger<T: Node> {
47 heap: BinaryHeap<T>,
51 current_node: Option<T>,
55 current_rows: usize,
57}
58
59impl<T: Node> Merger<T> {
60 pub(crate) fn try_new(nodes: Vec<T>) -> Result<Self> {
61 let mut heap = BinaryHeap::with_capacity(nodes.len());
62 for node in nodes {
63 if node.is_valid() {
64 heap.push(node);
65 }
66 }
67 let mut merger = Merger {
68 heap,
69 current_node: None,
70 current_rows: 0,
71 };
72 merger.next()?;
73 Ok(merger)
74 }
75
76 pub(crate) fn is_valid(&self) -> bool {
78 self.current_node.is_some()
79 }
80
81 pub(crate) fn current_node(&self) -> &T {
87 self.current_node.as_ref().unwrap()
88 }
89
90 pub(crate) fn current_rows(&self) -> usize {
92 self.current_rows
93 }
94
95 pub(crate) fn next(&mut self) -> Result<()> {
97 self.maybe_advance_current_node()?;
98 debug_assert!(self.current_node.is_none());
99
100 let Some(top_node) = self.heap.pop() else {
102 return Ok(());
104 };
105 if let Some(next_node) = self.heap.peek() {
106 if next_node.is_behind(&top_node) {
107 self.current_rows = top_node.current_item_len();
109 } else {
110 match top_node.search_key_in_current_item(next_node) {
112 Ok(pos) => {
113 if pos == 0 {
114 self.current_rows = 1;
118 } else {
119 self.current_rows = pos;
122 }
123 }
124 Err(pos) => {
125 debug_assert!(pos > 0);
127 self.current_rows = pos;
128 }
129 }
130 }
131 } else {
132 self.current_rows = top_node.current_item_len();
134 }
135 self.current_node = Some(top_node);
136
137 Ok(())
138 }
139
140 fn maybe_advance_current_node(&mut self) -> Result<()> {
141 let Some(mut node) = self.current_node.take() else {
142 return Ok(());
143 };
144
145 node.advance(self.current_rows)?;
147 self.current_rows = 0;
148 if !node.is_valid() {
149 return Ok(());
150 }
151
152 self.heap.push(node);
154 Ok(())
155 }
156}
157
158#[derive(Debug)]
159pub(crate) struct DataBatchKey {
160 pub(crate) pk_index: PkIndex,
161 pub(crate) timestamp: i64,
162}
163
164pub(crate) enum DataSource {
165 Buffer(DataBufferReader),
166 Part(DataPartReader),
167}
168
169impl DataSource {
170 fn current_data_batch(&self) -> DataBatch {
171 match self {
172 DataSource::Buffer(buffer) => buffer.current_data_batch(),
173 DataSource::Part(p) => p.current_data_batch(),
174 }
175 }
176
177 fn is_valid(&self) -> bool {
178 match self {
179 DataSource::Buffer(b) => b.is_valid(),
180 DataSource::Part(p) => p.is_valid(),
181 }
182 }
183
184 fn next(&mut self) -> Result<()> {
185 match self {
186 DataSource::Buffer(b) => b.next(),
187 DataSource::Part(p) => p.next(),
188 }
189 }
190}
191
192pub(crate) struct DataNode {
193 source: DataSource,
194 current_range: Option<Range<usize>>,
196}
197
198impl DataNode {
199 pub(crate) fn new(source: DataSource) -> Self {
200 let current_range = source
201 .is_valid()
202 .then(|| 0..source.current_data_batch().range().len());
203
204 Self {
205 source,
206 current_range,
207 }
208 }
209
210 pub(crate) fn current_data_batch(&self) -> DataBatch {
211 let range = self.current_range();
212 let batch = self.source.current_data_batch();
213 batch.slice(range.start, range.len())
214 }
215
216 fn current_range(&self) -> Range<usize> {
217 self.current_range.clone().unwrap()
218 }
219}
220
221impl Ord for DataNode {
222 fn cmp(&self, other: &Self) -> Ordering {
223 let weight = self.current_data_batch().pk_index();
224 let (ts_start, sequence) = self.current_data_batch().first_row();
225 let other_weight = other.current_data_batch().pk_index();
226 let (other_ts_start, other_sequence) = other.current_data_batch().first_row();
227 (weight, ts_start, Reverse(sequence))
228 .cmp(&(other_weight, other_ts_start, Reverse(other_sequence)))
229 .reverse()
230 }
231}
232
233impl Eq for DataNode {}
234
235impl PartialEq<Self> for DataNode {
236 fn eq(&self, other: &Self) -> bool {
237 self.current_data_batch()
238 .first_row()
239 .eq(&other.current_data_batch().first_row())
240 }
241}
242
243impl PartialOrd<Self> for DataNode {
244 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
245 Some(self.cmp(other))
246 }
247}
248
249impl Node for DataNode {
250 fn is_valid(&self) -> bool {
251 self.current_range.is_some()
252 }
253
254 fn is_behind(&self, other: &Self) -> bool {
255 let pk_weight = self.current_data_batch().pk_index();
256 let (start, seq) = self.current_data_batch().first_row();
257 let other_pk_weight = other.current_data_batch().pk_index();
258 let (other_end, other_seq) = other.current_data_batch().last_row();
259 (pk_weight, start, Reverse(seq)) > (other_pk_weight, other_end, Reverse(other_seq))
260 }
261
262 fn advance(&mut self, len: usize) -> Result<()> {
263 let mut range = self.current_range();
264 debug_assert!(range.len() >= len);
265
266 let remaining = range.len() - len;
267 if remaining == 0 {
268 self.source.next()?;
270 if self.source.is_valid() {
271 self.current_range = Some(0..self.source.current_data_batch().range().len());
272 } else {
273 self.current_range = None;
275 }
276 } else {
277 range.start += len;
278 self.current_range = Some(range);
279 }
280
281 Ok(())
282 }
283
284 fn current_item_len(&self) -> usize {
285 self.current_range.clone().unwrap().len()
286 }
287
288 fn search_key_in_current_item(&self, other: &Self) -> Result<usize, usize> {
289 let key = other.current_data_batch().first_key();
290 self.current_data_batch().search_key(&key)
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use datatypes::arrow::array::UInt64Array;
297 use store_api::metadata::RegionMetadataRef;
298
299 use super::*;
300 use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBuffer};
301 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
302
303 fn write_rows_to_buffer(
304 buffer: &mut DataBuffer,
305 schema: &RegionMetadataRef,
306 pk_index: u16,
307 ts: Vec<i64>,
308 sequence: &mut u64,
309 ) {
310 let rows = ts.len() as u64;
311 let v0 = ts.iter().map(|v| Some(*v as f64)).collect::<Vec<_>>();
312 let kvs = build_key_values_with_ts_seq_values(
313 schema,
314 "whatever".to_string(),
315 1,
316 ts.into_iter(),
317 v0.into_iter(),
318 *sequence,
319 );
320
321 for kv in kvs.iter() {
322 buffer.write_row(pk_index, &kv);
323 }
324
325 *sequence += rows;
326 }
327
328 fn check_merger_read(nodes: Vec<DataNode>, expected: &[(u16, Vec<(i64, u64)>)]) {
329 let mut merger = Merger::try_new(nodes).unwrap();
330
331 let mut res = vec![];
332 while merger.is_valid() {
333 let data_batch = merger.current_node().current_data_batch();
334 let data_batch = data_batch.slice(0, merger.current_rows());
335 let batch = data_batch.slice_record_batch();
336 let ts_array = batch.column(1);
337 let ts_values: Vec<_> = timestamp_array_to_i64_slice(ts_array).to_vec();
338 let ts_and_seq = ts_values
339 .into_iter()
340 .zip(
341 batch
342 .column(2)
343 .as_any()
344 .downcast_ref::<UInt64Array>()
345 .unwrap()
346 .iter(),
347 )
348 .map(|(ts, seq)| (ts, seq.unwrap()))
349 .collect::<Vec<_>>();
350
351 res.push((data_batch.pk_index(), ts_and_seq));
352 merger.next().unwrap();
353 }
354 assert_eq!(expected, &res);
355 }
356
357 #[test]
358 fn test_merger() {
359 let metadata = metadata_for_test();
360 let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
361 let weight = &[2, 1, 0];
362 let mut seq = 0;
363 write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
364 write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq);
365 let node1 = DataNode::new(DataSource::Part(
366 buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
367 ));
368
369 let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
370 write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
371 write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
372 let node2 = DataNode::new(DataSource::Part(
373 buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
374 ));
375
376 check_merger_read(
377 vec![node1, node2],
378 &[
379 (1, vec![(2, 0)]),
380 (1, vec![(3, 4)]),
381 (1, vec![(3, 1)]),
382 (2, vec![(1, 5)]),
383 (2, vec![(1, 2), (2, 3)]),
384 ],
385 );
386 }
387
388 #[test]
389 fn test_merger2() {
390 let metadata = metadata_for_test();
391 let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
392 let weight = &[2, 1, 0];
393 let mut seq = 0;
394 write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
395 write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2], &mut seq);
396 let node1 = DataNode::new(DataSource::Part(
397 buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
398 ));
399
400 let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
401 write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
402 let node2 = DataNode::new(DataSource::Part(
403 buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
404 ));
405
406 let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
407 write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
408 let node3 = DataNode::new(DataSource::Part(
409 buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
410 ));
411
412 check_merger_read(
413 vec![node1, node3, node2],
414 &[
415 (1, vec![(2, 0)]),
416 (1, vec![(3, 4)]),
417 (1, vec![(3, 1)]),
418 (2, vec![(1, 2)]),
419 (2, vec![(2, 5)]),
420 (2, vec![(2, 3)]),
421 (2, vec![(3, 6)]),
422 ],
423 );
424 }
425
426 #[test]
427 fn test_merger_overlapping() {
428 let metadata = metadata_for_test();
429 let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
430 let weight = &[0, 1, 2];
431 let mut seq = 0;
432 write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
433 let node1 = DataNode::new(DataSource::Part(
434 buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
435 ));
436
437 let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
438 write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
439 let node2 = DataNode::new(DataSource::Part(
440 buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
441 ));
442
443 let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
444 write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
445 let node3 = DataNode::new(DataSource::Part(
446 buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
447 ));
448
449 check_merger_read(
450 vec![node1, node3, node2],
451 &[
452 (0, vec![(1, 0)]),
453 (0, vec![(2, 5)]),
454 (0, vec![(2, 1)]),
455 (0, vec![(3, 6)]),
456 (0, vec![(3, 2)]),
457 (1, vec![(2, 3), (3, 4)]),
458 ],
459 );
460 }
461
462 #[test]
463 fn test_merger_parts_and_buffer() {
464 let metadata = metadata_for_test();
465 let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
466 let weight = &[0, 1, 2];
467 let mut seq = 0;
468 write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
469 let node1 = DataNode::new(DataSource::Buffer(
470 buffer1.read().unwrap().build(Some(weight)).unwrap(),
471 ));
472
473 let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
474 write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
475 let node2 = DataNode::new(DataSource::Part(
476 buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
477 ));
478
479 let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
480 write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
481 let node3 = DataNode::new(DataSource::Part(
482 buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
483 ));
484
485 check_merger_read(
486 vec![node1, node3, node2],
487 &[
488 (0, vec![(1, 0)]),
489 (0, vec![(2, 5)]),
490 (0, vec![(2, 1)]),
491 (0, vec![(3, 6)]),
492 (0, vec![(3, 2)]),
493 (1, vec![(2, 3), (3, 4)]),
494 ],
495 );
496 }
497
498 #[test]
499 fn test_merger_overlapping_2() {
500 let metadata = metadata_for_test();
501 let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
502 let weight = &[0, 1, 2];
503 let mut seq = 0;
504 write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq);
505 let node1 = DataNode::new(DataSource::Part(
506 buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
507 ));
508
509 let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
510 write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![2], &mut seq);
511 let node2 = DataNode::new(DataSource::Part(
512 buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
513 ));
514
515 let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
516 write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq);
517 let node3 = DataNode::new(DataSource::Part(
518 buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
519 ));
520
521 check_merger_read(
522 vec![node1, node2, node3],
523 &[
524 (0, vec![(1, 0)]),
525 (0, vec![(2, 4)]),
526 (0, vec![(2, 3)]),
527 (0, vec![(2, 2)]),
528 ],
529 );
530 }
531
532 #[test]
533 fn test_merger_overlapping_3() {
534 let metadata = metadata_for_test();
535 let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
536 let weight = &[0, 1, 2];
537 let mut seq = 0;
538 write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq);
539 let node1 = DataNode::new(DataSource::Part(
540 buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
541 ));
542
543 let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
544 write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
545 let node2 = DataNode::new(DataSource::Part(
546 buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
547 ));
548
549 check_merger_read(
550 vec![node1, node2],
551 &[(0, vec![(0, 0)]), (0, vec![(1, 2)]), (0, vec![(1, 1)])],
552 );
553 }
554}