1use std::cmp::Ordering;
18use std::time::{Duration, Instant};
19
20use mito_codec::key_values::KeyValue;
21use mito_codec::row_converter::PrimaryKeyFilter;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24
25use crate::error::{DecodeSnafu, Result};
26use crate::memtable::partition_tree::data::{
27 DATA_INIT_CAP, DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder,
28};
29use crate::memtable::partition_tree::dict::KeyDictRef;
30use crate::memtable::partition_tree::merger::{Merger, Node};
31use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
32use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
33use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
34
35pub struct Shard {
37 pub(crate) shard_id: ShardId,
38 key_dict: Option<KeyDictRef>,
40 data_parts: DataParts,
42 dedup: bool,
43 data_freeze_threshold: usize,
45}
46
47impl Shard {
48 pub fn new(
50 shard_id: ShardId,
51 key_dict: Option<KeyDictRef>,
52 data_parts: DataParts,
53 dedup: bool,
54 data_freeze_threshold: usize,
55 ) -> Shard {
56 Shard {
57 shard_id,
58 key_dict,
59 data_parts,
60 dedup,
61 data_freeze_threshold,
62 }
63 }
64
65 pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
69 debug_assert_eq!(self.shard_id, pk_id.shard_id);
70
71 if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
72 self.data_parts.freeze()?;
73 }
74
75 self.data_parts.write_row(pk_id.pk_index, key_value);
76 Ok(())
77 }
78
79 pub fn read(&self) -> Result<ShardReaderBuilder> {
82 let parts_reader = self.data_parts.read()?;
83
84 Ok(ShardReaderBuilder {
85 shard_id: self.shard_id,
86 key_dict: self.key_dict.clone(),
87 inner: parts_reader,
88 })
89 }
90
91 pub fn fork(&self, metadata: RegionMetadataRef) -> Shard {
93 Shard {
94 shard_id: self.shard_id,
95 key_dict: self.key_dict.clone(),
96 data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
97 dedup: self.dedup,
98 data_freeze_threshold: self.data_freeze_threshold,
99 }
100 }
101
102 pub fn is_empty(&self) -> bool {
104 self.data_parts.is_empty()
105 }
106
107 pub(crate) fn shared_memory_size(&self) -> usize {
109 self.key_dict
110 .as_ref()
111 .map(|dict| dict.shared_memory_size())
112 .unwrap_or(0)
113 }
114}
115
116pub trait DataBatchSource {
118 fn is_valid(&self) -> bool;
120
121 fn next(&mut self) -> Result<()>;
123
124 fn current_pk_id(&self) -> PkId;
128
129 fn current_key(&self) -> Option<&[u8]>;
134
135 fn current_data_batch(&self) -> DataBatch<'_>;
139}
140
141pub type BoxedDataBatchSource = Box<dyn DataBatchSource + Send>;
142
143pub struct ShardReaderBuilder {
144 shard_id: ShardId,
145 key_dict: Option<KeyDictRef>,
146 inner: DataPartsReaderBuilder,
147}
148
149impl ShardReaderBuilder {
150 pub(crate) fn build(
151 self,
152 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
153 ) -> Result<ShardReader> {
154 let ShardReaderBuilder {
155 shard_id,
156 key_dict,
157 inner,
158 } = self;
159 let now = Instant::now();
160 let parts_reader = inner.build()?;
161 ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed())
162 }
163}
164
165pub struct ShardReader {
167 shard_id: ShardId,
168 key_dict: Option<KeyDictRef>,
169 parts_reader: DataPartsReader,
170 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
171 last_yield_pk_index: Option<PkIndex>,
172 keys_before_pruning: usize,
173 keys_after_pruning: usize,
174 prune_pk_cost: Duration,
175 data_build_cost: Duration,
176}
177
178impl ShardReader {
179 fn new(
180 shard_id: ShardId,
181 key_dict: Option<KeyDictRef>,
182 parts_reader: DataPartsReader,
183 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
184 data_build_cost: Duration,
185 ) -> Result<Self> {
186 let has_pk = key_dict.is_some();
187 let mut reader = Self {
188 shard_id,
189 key_dict,
190 parts_reader,
191 key_filter: if has_pk { key_filter } else { None },
192 last_yield_pk_index: None,
193 keys_before_pruning: 0,
194 keys_after_pruning: 0,
195 prune_pk_cost: Duration::default(),
196 data_build_cost,
197 };
198 reader.prune_batch_by_key()?;
199
200 Ok(reader)
201 }
202
203 fn is_valid(&self) -> bool {
204 self.parts_reader.is_valid()
205 }
206
207 fn next(&mut self) -> Result<()> {
208 self.parts_reader.next()?;
209 self.prune_batch_by_key()
210 }
211
212 fn current_key(&self) -> Option<&[u8]> {
213 let pk_index = self.parts_reader.current_data_batch().pk_index();
214 self.key_dict
215 .as_ref()
216 .map(|dict| dict.key_by_pk_index(pk_index))
217 }
218
219 fn current_pk_id(&self) -> PkId {
220 let pk_index = self.parts_reader.current_data_batch().pk_index();
221 PkId {
222 shard_id: self.shard_id,
223 pk_index,
224 }
225 }
226
227 fn current_data_batch(&self) -> DataBatch<'_> {
228 self.parts_reader.current_data_batch()
229 }
230
231 fn prune_batch_by_key(&mut self) -> Result<()> {
232 let Some(key_filter) = &mut self.key_filter else {
233 return Ok(());
234 };
235
236 while self.parts_reader.is_valid() {
237 let pk_index = self.parts_reader.current_data_batch().pk_index();
238 if let Some(yield_pk_index) = self.last_yield_pk_index
239 && pk_index == yield_pk_index
240 {
241 break;
242 }
243 self.keys_before_pruning += 1;
244 let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index);
246 let now = Instant::now();
247 if key_filter.matches(key).context(DecodeSnafu)? {
248 self.prune_pk_cost += now.elapsed();
249 self.last_yield_pk_index = Some(pk_index);
250 self.keys_after_pruning += 1;
251 break;
252 }
253 self.prune_pk_cost += now.elapsed();
254 self.parts_reader.next()?;
255 }
256
257 Ok(())
258 }
259}
260
261impl Drop for ShardReader {
262 fn drop(&mut self) {
263 let shard_prune_pk = self.prune_pk_cost.as_secs_f64();
264 PARTITION_TREE_READ_STAGE_ELAPSED
265 .with_label_values(&["shard_prune_pk"])
266 .observe(shard_prune_pk);
267 if self.keys_before_pruning > 0 {
268 common_telemetry::debug!(
269 "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
270 self.parts_reader.num_parts(),
271 self.keys_before_pruning,
272 self.keys_after_pruning,
273 shard_prune_pk,
274 self.data_build_cost.as_secs_f64(),
275 );
276 }
277 }
278}
279
280pub(crate) struct ShardMerger {
282 merger: Merger<ShardNode>,
283}
284
285impl ShardMerger {
286 pub(crate) fn try_new(nodes: Vec<ShardNode>) -> Result<Self> {
287 let merger = Merger::try_new(nodes)?;
288 Ok(ShardMerger { merger })
289 }
290}
291
292impl DataBatchSource for ShardMerger {
293 fn is_valid(&self) -> bool {
294 self.merger.is_valid()
295 }
296
297 fn next(&mut self) -> Result<()> {
298 self.merger.next()
299 }
300
301 fn current_pk_id(&self) -> PkId {
302 self.merger.current_node().current_pk_id()
303 }
304
305 fn current_key(&self) -> Option<&[u8]> {
306 self.merger.current_node().current_key()
307 }
308
309 fn current_data_batch(&self) -> DataBatch<'_> {
310 let batch = self.merger.current_node().current_data_batch();
311 batch.slice(0, self.merger.current_rows())
312 }
313}
314
315pub(crate) enum ShardSource {
316 Builder(ShardBuilderReader),
317 Shard(ShardReader),
318}
319
320impl ShardSource {
321 fn is_valid(&self) -> bool {
322 match self {
323 ShardSource::Builder(r) => r.is_valid(),
324 ShardSource::Shard(r) => r.is_valid(),
325 }
326 }
327
328 fn next(&mut self) -> Result<()> {
329 match self {
330 ShardSource::Builder(r) => r.next(),
331 ShardSource::Shard(r) => r.next(),
332 }
333 }
334
335 fn current_pk_id(&self) -> PkId {
336 match self {
337 ShardSource::Builder(r) => r.current_pk_id(),
338 ShardSource::Shard(r) => r.current_pk_id(),
339 }
340 }
341
342 fn current_key(&self) -> Option<&[u8]> {
343 match self {
344 ShardSource::Builder(r) => r.current_key(),
345 ShardSource::Shard(r) => r.current_key(),
346 }
347 }
348
349 fn current_data_batch(&self) -> DataBatch<'_> {
350 match self {
351 ShardSource::Builder(r) => r.current_data_batch(),
352 ShardSource::Shard(r) => r.current_data_batch(),
353 }
354 }
355}
356
357pub(crate) struct ShardNode {
359 source: ShardSource,
360}
361
362impl ShardNode {
363 pub(crate) fn new(source: ShardSource) -> Self {
364 Self { source }
365 }
366
367 fn current_pk_id(&self) -> PkId {
368 self.source.current_pk_id()
369 }
370
371 fn current_key(&self) -> Option<&[u8]> {
372 self.source.current_key()
373 }
374
375 fn current_data_batch(&self) -> DataBatch<'_> {
376 self.source.current_data_batch()
377 }
378}
379
380impl PartialEq for ShardNode {
381 fn eq(&self, other: &Self) -> bool {
382 self.source.current_key() == other.source.current_key()
383 }
384}
385
386impl Eq for ShardNode {}
387
388impl Ord for ShardNode {
389 fn cmp(&self, other: &Self) -> Ordering {
390 self.source
391 .current_key()
392 .cmp(&other.source.current_key())
393 .reverse()
394 }
395}
396
397impl PartialOrd for ShardNode {
398 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
399 Some(self.cmp(other))
400 }
401}
402
403impl Node for ShardNode {
404 fn is_valid(&self) -> bool {
405 self.source.is_valid()
406 }
407
408 fn is_behind(&self, other: &Self) -> bool {
409 debug_assert_ne!(self.source.current_key(), other.source.current_key());
411 self.source.current_key() < other.source.current_key()
412 }
413
414 fn advance(&mut self, len: usize) -> Result<()> {
415 debug_assert_eq!(self.source.current_data_batch().num_rows(), len);
416 self.source.next()
417 }
418
419 fn current_item_len(&self) -> usize {
420 self.current_data_batch().num_rows()
421 }
422
423 fn search_key_in_current_item(&self, _other: &Self) -> Result<usize, usize> {
424 Err(self.source.current_data_batch().num_rows())
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use std::sync::Arc;
431
432 use super::*;
433 use crate::memtable::KeyValues;
434 use crate::memtable::partition_tree::PkIndex;
435 use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
436 use crate::memtable::partition_tree::dict::KeyDictBuilder;
437 use crate::memtable::stats::WriteMetrics;
438 use crate::test_util::memtable_util::{
439 build_key_values_with_ts_seq_values, encode_keys, metadata_for_test,
440 };
441
442 fn input_with_key(metadata: &RegionMetadataRef) -> Vec<(KeyValues, PkIndex)> {
444 vec![
445 (
446 build_key_values_with_ts_seq_values(
447 metadata,
448 "shard".to_string(),
449 2,
450 [20, 21].into_iter(),
451 [Some(0.0), Some(1.0)].into_iter(),
452 0,
453 ),
454 2,
455 ),
456 (
457 build_key_values_with_ts_seq_values(
458 metadata,
459 "shard".to_string(),
460 0,
461 [0, 1].into_iter(),
462 [Some(0.0), Some(1.0)].into_iter(),
463 1,
464 ),
465 0,
466 ),
467 (
468 build_key_values_with_ts_seq_values(
469 metadata,
470 "shard".to_string(),
471 1,
472 [10, 11].into_iter(),
473 [Some(0.0), Some(1.0)].into_iter(),
474 2,
475 ),
476 1,
477 ),
478 ]
479 }
480
481 fn new_shard_with_dict(
482 shard_id: ShardId,
483 metadata: RegionMetadataRef,
484 input: &[(KeyValues, PkIndex)],
485 data_freeze_threshold: usize,
486 ) -> Shard {
487 let mut dict_builder = KeyDictBuilder::new(1024);
488 let mut metrics = WriteMetrics::default();
489 let mut keys = Vec::with_capacity(input.len());
490 for (kvs, _) in input {
491 encode_keys(&metadata, kvs, &mut keys);
492 }
493 for key in &keys {
494 dict_builder.insert_key(key, None, &mut metrics);
495 }
496
497 let (dict, _) = dict_builder.finish().unwrap();
498 let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
499
500 Shard::new(
501 shard_id,
502 Some(Arc::new(dict)),
503 data_parts,
504 true,
505 data_freeze_threshold,
506 )
507 }
508
509 fn collect_timestamps(shard: &Shard) -> Vec<i64> {
510 let mut reader = shard.read().unwrap().build(None).unwrap();
511 let mut timestamps = Vec::new();
512 while reader.is_valid() {
513 let rb = reader.current_data_batch().slice_record_batch();
514 let ts_array = rb.column(1);
515 let ts_slice = timestamp_array_to_i64_slice(ts_array);
516 timestamps.extend_from_slice(ts_slice);
517
518 reader.next().unwrap();
519 }
520 timestamps
521 }
522
523 #[test]
524 fn test_write_read_shard() {
525 let metadata = metadata_for_test();
526 let input = input_with_key(&metadata);
527 let mut shard = new_shard_with_dict(8, metadata, &input, 100);
528 assert!(shard.is_empty());
529 for (key_values, pk_index) in &input {
530 for kv in key_values.iter() {
531 let pk_id = PkId {
532 shard_id: shard.shard_id,
533 pk_index: *pk_index,
534 };
535 shard.write_with_pk_id(pk_id, &kv).unwrap();
536 }
537 }
538 assert!(!shard.is_empty());
539
540 let timestamps = collect_timestamps(&shard);
541 assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
542 }
543
544 #[test]
545 fn test_shard_freeze() {
546 let metadata = metadata_for_test();
547 let kvs = build_key_values_with_ts_seq_values(
548 &metadata,
549 "shard".to_string(),
550 0,
551 [0].into_iter(),
552 [Some(0.0)].into_iter(),
553 0,
554 );
555 let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
556 let expected: Vec<_> = (0..200).collect();
557 for i in &expected {
558 let kvs = build_key_values_with_ts_seq_values(
559 &metadata,
560 "shard".to_string(),
561 0,
562 [*i].into_iter(),
563 [Some(0.0)].into_iter(),
564 *i as u64,
565 );
566 let pk_id = PkId {
567 shard_id: shard.shard_id,
568 pk_index: *i as PkIndex,
569 };
570 for kv in kvs.iter() {
571 shard.write_with_pk_id(pk_id, &kv).unwrap();
572 }
573 }
574 assert!(!shard.is_empty());
575 assert_eq!(3, shard.data_parts.frozen_len());
576
577 let timestamps = collect_timestamps(&shard);
578 assert_eq!(expected, timestamps);
579 }
580}