1use std::cmp::Ordering;
18use std::time::{Duration, Instant};
19
20use store_api::metadata::RegionMetadataRef;
21
22use crate::error::Result;
23use crate::memtable::key_values::KeyValue;
24use crate::memtable::partition_tree::data::{
25 DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
26};
27use crate::memtable::partition_tree::dict::KeyDictRef;
28use crate::memtable::partition_tree::merger::{Merger, Node};
29use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
30use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
31use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
32use crate::row_converter::PrimaryKeyFilter;
33
34pub struct Shard {
36 pub(crate) shard_id: ShardId,
37 key_dict: Option<KeyDictRef>,
39 data_parts: DataParts,
41 dedup: bool,
42 data_freeze_threshold: usize,
44}
45
46impl Shard {
47 pub fn new(
49 shard_id: ShardId,
50 key_dict: Option<KeyDictRef>,
51 data_parts: DataParts,
52 dedup: bool,
53 data_freeze_threshold: usize,
54 ) -> Shard {
55 Shard {
56 shard_id,
57 key_dict,
58 data_parts,
59 dedup,
60 data_freeze_threshold,
61 }
62 }
63
64 pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
68 debug_assert_eq!(self.shard_id, pk_id.shard_id);
69
70 if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
71 self.data_parts.freeze()?;
72 }
73
74 self.data_parts.write_row(pk_id.pk_index, key_value);
75 Ok(())
76 }
77
78 pub fn read(&self) -> Result<ShardReaderBuilder> {
81 let parts_reader = self.data_parts.read()?;
82
83 Ok(ShardReaderBuilder {
84 shard_id: self.shard_id,
85 key_dict: self.key_dict.clone(),
86 inner: parts_reader,
87 })
88 }
89
90 pub fn fork(&self, metadata: RegionMetadataRef) -> Shard {
92 Shard {
93 shard_id: self.shard_id,
94 key_dict: self.key_dict.clone(),
95 data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
96 dedup: self.dedup,
97 data_freeze_threshold: self.data_freeze_threshold,
98 }
99 }
100
101 pub fn is_empty(&self) -> bool {
103 self.data_parts.is_empty()
104 }
105
106 pub(crate) fn shared_memory_size(&self) -> usize {
108 self.key_dict
109 .as_ref()
110 .map(|dict| dict.shared_memory_size())
111 .unwrap_or(0)
112 }
113}
114
115pub trait DataBatchSource {
117 fn is_valid(&self) -> bool;
119
120 fn next(&mut self) -> Result<()>;
122
123 fn current_pk_id(&self) -> PkId;
127
128 fn current_key(&self) -> Option<&[u8]>;
133
134 fn current_data_batch(&self) -> DataBatch;
138}
139
140pub type BoxedDataBatchSource = Box<dyn DataBatchSource + Send>;
141
142pub struct ShardReaderBuilder {
143 shard_id: ShardId,
144 key_dict: Option<KeyDictRef>,
145 inner: DataPartsReaderBuilder,
146}
147
148impl ShardReaderBuilder {
149 pub(crate) fn build(
150 self,
151 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
152 ) -> Result<ShardReader> {
153 let ShardReaderBuilder {
154 shard_id,
155 key_dict,
156 inner,
157 } = self;
158 let now = Instant::now();
159 let parts_reader = inner.build()?;
160 ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed())
161 }
162}
163
164pub struct ShardReader {
166 shard_id: ShardId,
167 key_dict: Option<KeyDictRef>,
168 parts_reader: DataPartsReader,
169 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
170 last_yield_pk_index: Option<PkIndex>,
171 keys_before_pruning: usize,
172 keys_after_pruning: usize,
173 prune_pk_cost: Duration,
174 data_build_cost: Duration,
175}
176
177impl ShardReader {
178 fn new(
179 shard_id: ShardId,
180 key_dict: Option<KeyDictRef>,
181 parts_reader: DataPartsReader,
182 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
183 data_build_cost: Duration,
184 ) -> Result<Self> {
185 let has_pk = key_dict.is_some();
186 let mut reader = Self {
187 shard_id,
188 key_dict,
189 parts_reader,
190 key_filter: if has_pk { key_filter } else { None },
191 last_yield_pk_index: None,
192 keys_before_pruning: 0,
193 keys_after_pruning: 0,
194 prune_pk_cost: Duration::default(),
195 data_build_cost,
196 };
197 reader.prune_batch_by_key()?;
198
199 Ok(reader)
200 }
201
202 fn is_valid(&self) -> bool {
203 self.parts_reader.is_valid()
204 }
205
206 fn next(&mut self) -> Result<()> {
207 self.parts_reader.next()?;
208 self.prune_batch_by_key()
209 }
210
211 fn current_key(&self) -> Option<&[u8]> {
212 let pk_index = self.parts_reader.current_data_batch().pk_index();
213 self.key_dict
214 .as_ref()
215 .map(|dict| dict.key_by_pk_index(pk_index))
216 }
217
218 fn current_pk_id(&self) -> PkId {
219 let pk_index = self.parts_reader.current_data_batch().pk_index();
220 PkId {
221 shard_id: self.shard_id,
222 pk_index,
223 }
224 }
225
226 fn current_data_batch(&self) -> DataBatch {
227 self.parts_reader.current_data_batch()
228 }
229
230 fn prune_batch_by_key(&mut self) -> Result<()> {
231 let Some(key_filter) = &mut self.key_filter else {
232 return Ok(());
233 };
234
235 while self.parts_reader.is_valid() {
236 let pk_index = self.parts_reader.current_data_batch().pk_index();
237 if let Some(yield_pk_index) = self.last_yield_pk_index {
238 if pk_index == yield_pk_index {
239 break;
240 }
241 }
242 self.keys_before_pruning += 1;
243 let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index);
245 let now = Instant::now();
246 if key_filter.matches(key) {
247 self.prune_pk_cost += now.elapsed();
248 self.last_yield_pk_index = Some(pk_index);
249 self.keys_after_pruning += 1;
250 break;
251 }
252 self.prune_pk_cost += now.elapsed();
253 self.parts_reader.next()?;
254 }
255
256 Ok(())
257 }
258}
259
260impl Drop for ShardReader {
261 fn drop(&mut self) {
262 let shard_prune_pk = self.prune_pk_cost.as_secs_f64();
263 PARTITION_TREE_READ_STAGE_ELAPSED
264 .with_label_values(&["shard_prune_pk"])
265 .observe(shard_prune_pk);
266 if self.keys_before_pruning > 0 {
267 common_telemetry::debug!(
268 "ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
269 self.parts_reader.num_parts(),
270 self.keys_before_pruning,
271 self.keys_after_pruning,
272 shard_prune_pk,
273 self.data_build_cost.as_secs_f64(),
274 );
275 }
276 }
277}
278
279pub(crate) struct ShardMerger {
281 merger: Merger<ShardNode>,
282}
283
284impl ShardMerger {
285 pub(crate) fn try_new(nodes: Vec<ShardNode>) -> Result<Self> {
286 let merger = Merger::try_new(nodes)?;
287 Ok(ShardMerger { merger })
288 }
289}
290
291impl DataBatchSource for ShardMerger {
292 fn is_valid(&self) -> bool {
293 self.merger.is_valid()
294 }
295
296 fn next(&mut self) -> Result<()> {
297 self.merger.next()
298 }
299
300 fn current_pk_id(&self) -> PkId {
301 self.merger.current_node().current_pk_id()
302 }
303
304 fn current_key(&self) -> Option<&[u8]> {
305 self.merger.current_node().current_key()
306 }
307
308 fn current_data_batch(&self) -> DataBatch {
309 let batch = self.merger.current_node().current_data_batch();
310 batch.slice(0, self.merger.current_rows())
311 }
312}
313
314pub(crate) enum ShardSource {
315 Builder(ShardBuilderReader),
316 Shard(ShardReader),
317}
318
319impl ShardSource {
320 fn is_valid(&self) -> bool {
321 match self {
322 ShardSource::Builder(r) => r.is_valid(),
323 ShardSource::Shard(r) => r.is_valid(),
324 }
325 }
326
327 fn next(&mut self) -> Result<()> {
328 match self {
329 ShardSource::Builder(r) => r.next(),
330 ShardSource::Shard(r) => r.next(),
331 }
332 }
333
334 fn current_pk_id(&self) -> PkId {
335 match self {
336 ShardSource::Builder(r) => r.current_pk_id(),
337 ShardSource::Shard(r) => r.current_pk_id(),
338 }
339 }
340
341 fn current_key(&self) -> Option<&[u8]> {
342 match self {
343 ShardSource::Builder(r) => r.current_key(),
344 ShardSource::Shard(r) => r.current_key(),
345 }
346 }
347
348 fn current_data_batch(&self) -> DataBatch {
349 match self {
350 ShardSource::Builder(r) => r.current_data_batch(),
351 ShardSource::Shard(r) => r.current_data_batch(),
352 }
353 }
354}
355
356pub(crate) struct ShardNode {
358 source: ShardSource,
359}
360
361impl ShardNode {
362 pub(crate) fn new(source: ShardSource) -> Self {
363 Self { source }
364 }
365
366 fn current_pk_id(&self) -> PkId {
367 self.source.current_pk_id()
368 }
369
370 fn current_key(&self) -> Option<&[u8]> {
371 self.source.current_key()
372 }
373
374 fn current_data_batch(&self) -> DataBatch {
375 self.source.current_data_batch()
376 }
377}
378
379impl PartialEq for ShardNode {
380 fn eq(&self, other: &Self) -> bool {
381 self.source.current_key() == other.source.current_key()
382 }
383}
384
385impl Eq for ShardNode {}
386
387impl Ord for ShardNode {
388 fn cmp(&self, other: &Self) -> Ordering {
389 self.source
390 .current_key()
391 .cmp(&other.source.current_key())
392 .reverse()
393 }
394}
395
396impl PartialOrd for ShardNode {
397 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
398 Some(self.cmp(other))
399 }
400}
401
402impl Node for ShardNode {
403 fn is_valid(&self) -> bool {
404 self.source.is_valid()
405 }
406
407 fn is_behind(&self, other: &Self) -> bool {
408 debug_assert_ne!(self.source.current_key(), other.source.current_key());
410 self.source.current_key() < other.source.current_key()
411 }
412
413 fn advance(&mut self, len: usize) -> Result<()> {
414 debug_assert_eq!(self.source.current_data_batch().num_rows(), len);
415 self.source.next()
416 }
417
418 fn current_item_len(&self) -> usize {
419 self.current_data_batch().num_rows()
420 }
421
422 fn search_key_in_current_item(&self, _other: &Self) -> Result<usize, usize> {
423 Err(self.source.current_data_batch().num_rows())
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use std::sync::Arc;
430
431 use super::*;
432 use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
433 use crate::memtable::partition_tree::dict::KeyDictBuilder;
434 use crate::memtable::partition_tree::PkIndex;
435 use crate::memtable::stats::WriteMetrics;
436 use crate::memtable::KeyValues;
437 use crate::test_util::memtable_util::{
438 build_key_values_with_ts_seq_values, encode_keys, metadata_for_test,
439 };
440
441 fn input_with_key(metadata: &RegionMetadataRef) -> Vec<(KeyValues, PkIndex)> {
443 vec![
444 (
445 build_key_values_with_ts_seq_values(
446 metadata,
447 "shard".to_string(),
448 2,
449 [20, 21].into_iter(),
450 [Some(0.0), Some(1.0)].into_iter(),
451 0,
452 ),
453 2,
454 ),
455 (
456 build_key_values_with_ts_seq_values(
457 metadata,
458 "shard".to_string(),
459 0,
460 [0, 1].into_iter(),
461 [Some(0.0), Some(1.0)].into_iter(),
462 1,
463 ),
464 0,
465 ),
466 (
467 build_key_values_with_ts_seq_values(
468 metadata,
469 "shard".to_string(),
470 1,
471 [10, 11].into_iter(),
472 [Some(0.0), Some(1.0)].into_iter(),
473 2,
474 ),
475 1,
476 ),
477 ]
478 }
479
480 fn new_shard_with_dict(
481 shard_id: ShardId,
482 metadata: RegionMetadataRef,
483 input: &[(KeyValues, PkIndex)],
484 data_freeze_threshold: usize,
485 ) -> Shard {
486 let mut dict_builder = KeyDictBuilder::new(1024);
487 let mut metrics = WriteMetrics::default();
488 let mut keys = Vec::with_capacity(input.len());
489 for (kvs, _) in input {
490 encode_keys(&metadata, kvs, &mut keys);
491 }
492 for key in &keys {
493 dict_builder.insert_key(key, None, &mut metrics);
494 }
495
496 let (dict, _) = dict_builder.finish().unwrap();
497 let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
498
499 Shard::new(
500 shard_id,
501 Some(Arc::new(dict)),
502 data_parts,
503 true,
504 data_freeze_threshold,
505 )
506 }
507
508 fn collect_timestamps(shard: &Shard) -> Vec<i64> {
509 let mut reader = shard.read().unwrap().build(None).unwrap();
510 let mut timestamps = Vec::new();
511 while reader.is_valid() {
512 let rb = reader.current_data_batch().slice_record_batch();
513 let ts_array = rb.column(1);
514 let ts_slice = timestamp_array_to_i64_slice(ts_array);
515 timestamps.extend_from_slice(ts_slice);
516
517 reader.next().unwrap();
518 }
519 timestamps
520 }
521
522 #[test]
523 fn test_write_read_shard() {
524 let metadata = metadata_for_test();
525 let input = input_with_key(&metadata);
526 let mut shard = new_shard_with_dict(8, metadata, &input, 100);
527 assert!(shard.is_empty());
528 for (key_values, pk_index) in &input {
529 for kv in key_values.iter() {
530 let pk_id = PkId {
531 shard_id: shard.shard_id,
532 pk_index: *pk_index,
533 };
534 shard.write_with_pk_id(pk_id, &kv).unwrap();
535 }
536 }
537 assert!(!shard.is_empty());
538
539 let timestamps = collect_timestamps(&shard);
540 assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
541 }
542
543 #[test]
544 fn test_shard_freeze() {
545 let metadata = metadata_for_test();
546 let kvs = build_key_values_with_ts_seq_values(
547 &metadata,
548 "shard".to_string(),
549 0,
550 [0].into_iter(),
551 [Some(0.0)].into_iter(),
552 0,
553 );
554 let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
555 let expected: Vec<_> = (0..200).collect();
556 for i in &expected {
557 let kvs = build_key_values_with_ts_seq_values(
558 &metadata,
559 "shard".to_string(),
560 0,
561 [*i].into_iter(),
562 [Some(0.0)].into_iter(),
563 *i as u64,
564 );
565 let pk_id = PkId {
566 shard_id: shard.shard_id,
567 pk_index: *i as PkIndex,
568 };
569 for kv in kvs.iter() {
570 shard.write_with_pk_id(pk_id, &kv).unwrap();
571 }
572 }
573 assert!(!shard.is_empty());
574 assert_eq!(3, shard.data_parts.frozen_len());
575
576 let timestamps = collect_timestamps(&shard);
577 assert_eq!(expected, timestamps);
578 }
579}