mito2/memtable/partition_tree/
tree.rs1use std::collections::{BTreeMap, HashSet, VecDeque};
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use common_recordbatch::filter::SimpleFilterEvaluator;
23use common_time::Timestamp;
24use datafusion_common::ScalarValue;
25use datatypes::prelude::ValueRef;
26use memcomparable::Serializer;
27use serde::Serialize;
28use snafu::{ensure, ResultExt};
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::{ColumnId, SequenceNumber};
32use table::predicate::Predicate;
33
34use crate::error::{
35 EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu,
36};
37use crate::flush::WriteBufferManagerRef;
38use crate::memtable::key_values::KeyValue;
39use crate::memtable::partition_tree::partition::{
40 Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
41};
42use crate::memtable::partition_tree::PartitionTreeConfig;
43use crate::memtable::stats::WriteMetrics;
44use crate::memtable::{BoxedBatchIterator, KeyValues};
45use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
46use crate::read::dedup::LastNonNullIter;
47use crate::read::Batch;
48use crate::region::options::MergeMode;
49use crate::row_converter::{PrimaryKeyCodec, SortField};
50
51pub struct PartitionTree {
53 config: PartitionTreeConfig,
55 pub(crate) metadata: RegionMetadataRef,
57 row_codec: Arc<dyn PrimaryKeyCodec>,
59 partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
61 is_partitioned: bool,
63 write_buffer_manager: Option<WriteBufferManagerRef>,
65 sparse_encoder: Arc<SparseEncoder>,
66}
67
68impl PartitionTree {
69 pub fn new(
71 row_codec: Arc<dyn PrimaryKeyCodec>,
72 metadata: RegionMetadataRef,
73 config: &PartitionTreeConfig,
74 write_buffer_manager: Option<WriteBufferManagerRef>,
75 ) -> Self {
76 let sparse_encoder = SparseEncoder {
77 fields: metadata
78 .primary_key_columns()
79 .map(|c| FieldWithId {
80 field: SortField::new(c.column_schema.data_type.clone()),
81 column_id: c.column_id,
82 })
83 .collect(),
84 };
85 let is_partitioned = Partition::has_multi_partitions(&metadata);
86 let mut config = config.clone();
87 if config.merge_mode == MergeMode::LastNonNull {
88 config.dedup = false;
89 }
90
91 PartitionTree {
92 config,
93 metadata,
94 row_codec,
95 partitions: Default::default(),
96 is_partitioned,
97 write_buffer_manager,
98 sparse_encoder: Arc::new(sparse_encoder),
99 }
100 }
101
102 fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
103 if let Some(expected_num_fields) = self.row_codec.num_fields() {
105 ensure!(
106 expected_num_fields == kv.num_primary_keys(),
107 PrimaryKeyLengthMismatchSnafu {
108 expect: expected_num_fields,
109 actual: kv.num_primary_keys(),
110 }
111 );
112 }
113 Ok(())
115 }
116
117 fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
119 if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
120 let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
123 return EncodeSparsePrimaryKeySnafu {
124 reason: "sparse primary key is not binary".to_string(),
125 }
126 .fail();
127 };
128 buffer.extend_from_slice(primary_key);
129 } else {
130 self.sparse_encoder
132 .encode_to_vec(kv.primary_keys(), buffer)?;
133 }
134 Ok(())
135 }
136
137 pub fn write(
143 &self,
144 kvs: &KeyValues,
145 pk_buffer: &mut Vec<u8>,
146 metrics: &mut WriteMetrics,
147 ) -> Result<()> {
148 let has_pk = !self.metadata.primary_key.is_empty();
149
150 for kv in kvs.iter() {
151 self.verify_primary_key_length(&kv)?;
152 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
154 metrics.min_ts = metrics.min_ts.min(ts);
155 metrics.max_ts = metrics.max_ts.max(ts);
156 metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
157
158 if !has_pk {
159 self.write_no_key(kv)?;
161 continue;
162 }
163
164 pk_buffer.clear();
166 if self.is_partitioned {
167 self.encode_sparse_primary_key(&kv, pk_buffer)?;
168 } else {
169 self.row_codec.encode_key_value(&kv, pk_buffer)?;
170 }
171
172 self.write_with_key(pk_buffer, kv, metrics)?;
174 }
175
176 metrics.value_bytes +=
177 kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
178
179 Ok(())
180 }
181
182 pub fn write_one(
187 &self,
188 kv: KeyValue,
189 pk_buffer: &mut Vec<u8>,
190 metrics: &mut WriteMetrics,
191 ) -> Result<()> {
192 let has_pk = !self.metadata.primary_key.is_empty();
193
194 self.verify_primary_key_length(&kv)?;
195 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
197 metrics.min_ts = metrics.min_ts.min(ts);
198 metrics.max_ts = metrics.max_ts.max(ts);
199 metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
200
201 if !has_pk {
202 return self.write_no_key(kv);
204 }
205
206 pk_buffer.clear();
208 if self.is_partitioned {
209 self.encode_sparse_primary_key(&kv, pk_buffer)?;
210 } else {
211 self.row_codec.encode_key_value(&kv, pk_buffer)?;
212 }
213
214 self.write_with_key(pk_buffer, kv, metrics)?;
216
217 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
218
219 Ok(())
220 }
221
222 pub fn read(
224 &self,
225 projection: Option<&[ColumnId]>,
226 predicate: Option<Predicate>,
227 sequence: Option<SequenceNumber>,
228 ) -> Result<BoxedBatchIterator> {
229 let start = Instant::now();
230 let projection: HashSet<_> = if let Some(projection) = projection {
232 projection.iter().copied().collect()
233 } else {
234 self.metadata.field_columns().map(|c| c.column_id).collect()
235 };
236
237 let filters = predicate
238 .map(|predicate| {
239 predicate
240 .exprs()
241 .iter()
242 .filter_map(SimpleFilterEvaluator::try_new)
243 .collect::<Vec<_>>()
244 })
245 .unwrap_or_default();
246
247 let mut tree_iter_metric = TreeIterMetrics::default();
248 let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
249
250 let mut iter = TreeIter {
251 sequence,
252 partitions,
253 current_reader: None,
254 metrics: tree_iter_metric,
255 };
256 let context = ReadPartitionContext::new(
257 self.metadata.clone(),
258 self.row_codec.clone(),
259 projection,
260 Arc::new(filters),
261 );
262 iter.fetch_next_partition(context)?;
263
264 iter.metrics.iter_elapsed += start.elapsed();
265
266 if self.config.merge_mode == MergeMode::LastNonNull {
267 let iter = LastNonNullIter::new(iter);
268 Ok(Box::new(iter))
269 } else {
270 Ok(Box::new(iter))
271 }
272 }
273
274 pub fn is_empty(&self) -> bool {
278 let partitions = self.partitions.read().unwrap();
279 partitions.values().all(|part| !part.has_data())
280 }
281
282 pub fn freeze(&self) -> Result<()> {
286 let partitions = self.partitions.read().unwrap();
287 for partition in partitions.values() {
288 partition.freeze()?;
289 }
290 Ok(())
291 }
292
293 pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
296 if self.metadata.schema_version != metadata.schema_version
297 || self.metadata.column_metadatas != metadata.column_metadatas
298 {
299 return PartitionTree::new(
301 self.row_codec.clone(),
302 metadata,
303 &self.config,
304 self.write_buffer_manager.clone(),
305 );
306 }
307
308 let mut total_shared_size = 0;
309 let mut part_infos = {
310 let partitions = self.partitions.read().unwrap();
311 partitions
312 .iter()
313 .filter_map(|(part_key, part)| {
314 let stats = part.stats();
315 if stats.num_rows > 0 {
316 total_shared_size += stats.shared_memory_size;
318 Some((*part_key, part.clone(), stats))
319 } else {
320 None
321 }
322 })
323 .collect::<Vec<_>>()
324 };
325
326 let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize;
328 if total_shared_size > fork_size {
329 part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size);
331 while total_shared_size > fork_size {
332 let Some(info) = part_infos.pop() else {
333 break;
334 };
335
336 common_telemetry::debug!(
337 "Evict partition {} with memory size {}, {} shards",
338 info.0,
339 info.2.shared_memory_size,
340 info.2.shard_num,
341 );
342
343 total_shared_size -= info.2.shared_memory_size;
344 }
345 }
346
347 let mut forked = BTreeMap::new();
348 for (part_key, part, _) in part_infos {
349 let forked_part = part.fork(&metadata, &self.config);
350 forked.insert(part_key, Arc::new(forked_part));
351 }
352
353 PartitionTree {
354 config: self.config.clone(),
355 metadata,
356 row_codec: self.row_codec.clone(),
357 partitions: RwLock::new(forked),
358 is_partitioned: self.is_partitioned,
359 write_buffer_manager: self.write_buffer_manager.clone(),
360 sparse_encoder: self.sparse_encoder.clone(),
361 }
362 }
363
364 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
366 self.write_buffer_manager.clone()
367 }
368
369 fn write_with_key(
370 &self,
371 primary_key: &mut Vec<u8>,
372 key_value: KeyValue,
373 metrics: &mut WriteMetrics,
374 ) -> Result<()> {
375 let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
376 let partition = self.get_or_create_partition(partition_key);
377
378 partition.write_with_key(
379 primary_key,
380 self.row_codec.as_ref(),
381 key_value,
382 self.is_partitioned, metrics,
384 )
385 }
386
387 fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
388 let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
389 let partition = self.get_or_create_partition(partition_key);
390
391 partition.write_no_key(key_value)
392 }
393
394 fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
395 let mut partitions = self.partitions.write().unwrap();
396 partitions
397 .entry(partition_key)
398 .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
399 .clone()
400 }
401
402 fn prune_partitions(
403 &self,
404 filters: &[SimpleFilterEvaluator],
405 metrics: &mut TreeIterMetrics,
406 ) -> VecDeque<PartitionRef> {
407 let partitions = self.partitions.read().unwrap();
408 metrics.partitions_total = partitions.len();
409 if !self.is_partitioned {
410 return partitions.values().cloned().collect();
411 }
412
413 let mut pruned = VecDeque::new();
414 for (key, partition) in partitions.iter() {
416 let mut is_needed = true;
417 for filter in filters {
418 if !Partition::is_partition_column(filter.column_name()) {
419 continue;
420 }
421
422 if !filter
423 .evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
424 .unwrap_or(true)
425 {
426 is_needed = false;
427 }
428 }
429
430 if is_needed {
431 pruned.push_back(partition.clone());
432 }
433 }
434 metrics.partitions_after_pruning = pruned.len();
435 pruned
436 }
437}
438
439struct FieldWithId {
440 field: SortField,
441 column_id: ColumnId,
442}
443
444struct SparseEncoder {
445 fields: Vec<FieldWithId>,
446}
447
448impl SparseEncoder {
449 fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
450 where
451 I: Iterator<Item = ValueRef<'a>>,
452 {
453 let mut serializer = Serializer::new(buffer);
454 for (value, field) in row.zip(self.fields.iter()) {
455 if !value.is_null() {
456 field
457 .column_id
458 .serialize(&mut serializer)
459 .context(SerializeFieldSnafu)?;
460 field.field.serialize(&mut serializer, &value)?;
461 }
462 }
463 Ok(())
464 }
465}
466
467#[derive(Default)]
468struct TreeIterMetrics {
469 iter_elapsed: Duration,
470 fetch_partition_elapsed: Duration,
471 rows_fetched: usize,
472 batches_fetched: usize,
473 partitions_total: usize,
474 partitions_after_pruning: usize,
475}
476
477struct TreeIter {
478 sequence: Option<SequenceNumber>,
480 partitions: VecDeque<PartitionRef>,
481 current_reader: Option<PartitionReader>,
482 metrics: TreeIterMetrics,
483}
484
485impl Drop for TreeIter {
486 fn drop(&mut self) {
487 READ_ROWS_TOTAL
488 .with_label_values(&["partition_tree_memtable"])
489 .inc_by(self.metrics.rows_fetched as u64);
490 PARTITION_TREE_READ_STAGE_ELAPSED
491 .with_label_values(&["fetch_next_partition"])
492 .observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
493 let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
494 READ_STAGE_ELAPSED
495 .with_label_values(&["scan_memtable"])
496 .observe(scan_elapsed);
497 common_telemetry::debug!(
498 "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
499 self.metrics.partitions_total,
500 self.metrics.partitions_after_pruning,
501 self.metrics.rows_fetched,
502 self.metrics.batches_fetched,
503 scan_elapsed
504 );
505 }
506}
507
508impl Iterator for TreeIter {
509 type Item = Result<Batch>;
510
511 fn next(&mut self) -> Option<Self::Item> {
512 let start = Instant::now();
513 let res = self.next_batch().transpose();
514 self.metrics.iter_elapsed += start.elapsed();
515 res
516 }
517}
518
519impl TreeIter {
520 fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
522 let start = Instant::now();
523 while let Some(partition) = self.partitions.pop_front() {
524 let part_reader = partition.read(context)?;
525 if !part_reader.is_valid() {
526 context = part_reader.into_context();
527 continue;
528 }
529 self.current_reader = Some(part_reader);
530 break;
531 }
532 self.metrics.fetch_partition_elapsed += start.elapsed();
533 Ok(())
534 }
535
536 fn next_batch(&mut self) -> Result<Option<Batch>> {
538 let Some(part_reader) = &mut self.current_reader else {
539 return Ok(None);
540 };
541
542 debug_assert!(part_reader.is_valid());
543 let batch = part_reader.convert_current_batch()?;
544 part_reader.next()?;
545 if part_reader.is_valid() {
546 self.metrics.rows_fetched += batch.num_rows();
547 self.metrics.batches_fetched += 1;
548 let mut batch = batch;
549 batch.filter_by_sequence(self.sequence)?;
550 return Ok(Some(batch));
551 }
552
553 let part_reader = self.current_reader.take().unwrap();
555 let context = part_reader.into_context();
556 self.fetch_next_partition(context)?;
557
558 self.metrics.rows_fetched += batch.num_rows();
559 self.metrics.batches_fetched += 1;
560 let mut batch = batch;
561 batch.filter_by_sequence(self.sequence)?;
562 Ok(Some(batch))
563 }
564}