mito2/memtable/partition_tree/
tree.rs1use std::collections::{BTreeMap, HashSet, VecDeque};
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use common_recordbatch::filter::SimpleFilterEvaluator;
23use common_time::Timestamp;
24use datafusion_common::ScalarValue;
25use datatypes::prelude::ValueRef;
26use mito_codec::key_values::KeyValue;
27use mito_codec::primary_key_filter::is_partition_column;
28use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
29use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
30use snafu::{ensure, ResultExt};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, SequenceNumber};
34use table::predicate::Predicate;
35
36use crate::error::{
37 EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
38};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::partition_tree::partition::{
41 Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
42};
43use crate::memtable::partition_tree::PartitionTreeConfig;
44use crate::memtable::stats::WriteMetrics;
45use crate::memtable::{BoxedBatchIterator, KeyValues};
46use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
47use crate::read::dedup::LastNonNullIter;
48use crate::read::Batch;
49use crate::region::options::MergeMode;
50
51pub struct PartitionTree {
53 config: PartitionTreeConfig,
55 pub(crate) metadata: RegionMetadataRef,
57 row_codec: Arc<dyn PrimaryKeyCodec>,
59 partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
61 is_partitioned: bool,
63 write_buffer_manager: Option<WriteBufferManagerRef>,
65 sparse_encoder: Arc<SparseEncoder>,
66}
67
68impl PartitionTree {
69 pub fn new(
71 row_codec: Arc<dyn PrimaryKeyCodec>,
72 metadata: RegionMetadataRef,
73 config: &PartitionTreeConfig,
74 write_buffer_manager: Option<WriteBufferManagerRef>,
75 ) -> Self {
76 let sparse_encoder = SparseEncoder::new(
77 metadata
78 .primary_key_columns()
79 .map(|c| FieldWithId {
80 field: SortField::new(c.column_schema.data_type.clone()),
81 column_id: c.column_id,
82 })
83 .collect(),
84 );
85 let is_partitioned = Partition::has_multi_partitions(&metadata);
86 let mut config = config.clone();
87 if config.merge_mode == MergeMode::LastNonNull {
88 config.dedup = false;
89 }
90
91 PartitionTree {
92 config,
93 metadata,
94 row_codec,
95 partitions: Default::default(),
96 is_partitioned,
97 write_buffer_manager,
98 sparse_encoder: Arc::new(sparse_encoder),
99 }
100 }
101
102 fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
103 if let Some(expected_num_fields) = self.row_codec.num_fields() {
105 ensure!(
106 expected_num_fields == kv.num_primary_keys(),
107 PrimaryKeyLengthMismatchSnafu {
108 expect: expected_num_fields,
109 actual: kv.num_primary_keys(),
110 }
111 );
112 }
113 Ok(())
115 }
116
117 fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
119 if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
120 let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
123 return EncodeSparsePrimaryKeySnafu {
124 reason: "sparse primary key is not binary".to_string(),
125 }
126 .fail();
127 };
128 buffer.extend_from_slice(primary_key);
129 } else {
130 self.sparse_encoder
132 .encode_to_vec(kv.primary_keys(), buffer)
133 .context(EncodeSnafu)?;
134 }
135 Ok(())
136 }
137
138 pub fn write(
144 &self,
145 kvs: &KeyValues,
146 pk_buffer: &mut Vec<u8>,
147 metrics: &mut WriteMetrics,
148 ) -> Result<()> {
149 let has_pk = !self.metadata.primary_key.is_empty();
150
151 for kv in kvs.iter() {
152 self.verify_primary_key_length(&kv)?;
153 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
155 metrics.min_ts = metrics.min_ts.min(ts);
156 metrics.max_ts = metrics.max_ts.max(ts);
157 metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
158
159 if !has_pk {
160 self.write_no_key(kv)?;
162 continue;
163 }
164
165 pk_buffer.clear();
167 if self.is_partitioned {
168 self.encode_sparse_primary_key(&kv, pk_buffer)?;
169 } else {
170 self.row_codec
171 .encode_key_value(&kv, pk_buffer)
172 .context(EncodeSnafu)?;
173 }
174
175 self.write_with_key(pk_buffer, kv, metrics)?;
177 }
178
179 metrics.value_bytes +=
180 kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
181
182 Ok(())
183 }
184
185 pub fn write_one(
190 &self,
191 kv: KeyValue,
192 pk_buffer: &mut Vec<u8>,
193 metrics: &mut WriteMetrics,
194 ) -> Result<()> {
195 let has_pk = !self.metadata.primary_key.is_empty();
196
197 self.verify_primary_key_length(&kv)?;
198 let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
200 metrics.min_ts = metrics.min_ts.min(ts);
201 metrics.max_ts = metrics.max_ts.max(ts);
202 metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
203
204 if !has_pk {
205 return self.write_no_key(kv);
207 }
208
209 pk_buffer.clear();
211 if self.is_partitioned {
212 self.encode_sparse_primary_key(&kv, pk_buffer)?;
213 } else {
214 self.row_codec
215 .encode_key_value(&kv, pk_buffer)
216 .context(EncodeSnafu)?;
217 }
218
219 self.write_with_key(pk_buffer, kv, metrics)?;
221
222 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
223
224 Ok(())
225 }
226
227 pub fn read(
229 &self,
230 projection: Option<&[ColumnId]>,
231 predicate: Option<Predicate>,
232 sequence: Option<SequenceNumber>,
233 ) -> Result<BoxedBatchIterator> {
234 let start = Instant::now();
235 let projection: HashSet<_> = if let Some(projection) = projection {
237 projection.iter().copied().collect()
238 } else {
239 self.metadata.field_columns().map(|c| c.column_id).collect()
240 };
241
242 let filters = predicate
243 .map(|predicate| {
244 predicate
245 .exprs()
246 .iter()
247 .filter_map(SimpleFilterEvaluator::try_new)
248 .collect::<Vec<_>>()
249 })
250 .unwrap_or_default();
251
252 let mut tree_iter_metric = TreeIterMetrics::default();
253 let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
254
255 let mut iter = TreeIter {
256 sequence,
257 partitions,
258 current_reader: None,
259 metrics: tree_iter_metric,
260 };
261 let context = ReadPartitionContext::new(
262 self.metadata.clone(),
263 self.row_codec.clone(),
264 projection,
265 Arc::new(filters),
266 );
267 iter.fetch_next_partition(context)?;
268
269 iter.metrics.iter_elapsed += start.elapsed();
270
271 if self.config.merge_mode == MergeMode::LastNonNull {
272 let iter = LastNonNullIter::new(iter);
273 Ok(Box::new(iter))
274 } else {
275 Ok(Box::new(iter))
276 }
277 }
278
279 pub fn is_empty(&self) -> bool {
283 let partitions = self.partitions.read().unwrap();
284 partitions.values().all(|part| !part.has_data())
285 }
286
287 pub fn freeze(&self) -> Result<()> {
291 let partitions = self.partitions.read().unwrap();
292 for partition in partitions.values() {
293 partition.freeze()?;
294 }
295 Ok(())
296 }
297
298 pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
301 if self.metadata.schema_version != metadata.schema_version
302 || self.metadata.column_metadatas != metadata.column_metadatas
303 {
304 return PartitionTree::new(
306 self.row_codec.clone(),
307 metadata,
308 &self.config,
309 self.write_buffer_manager.clone(),
310 );
311 }
312
313 let mut total_shared_size = 0;
314 let mut part_infos = {
315 let partitions = self.partitions.read().unwrap();
316 partitions
317 .iter()
318 .filter_map(|(part_key, part)| {
319 let stats = part.stats();
320 if stats.num_rows > 0 {
321 total_shared_size += stats.shared_memory_size;
323 Some((*part_key, part.clone(), stats))
324 } else {
325 None
326 }
327 })
328 .collect::<Vec<_>>()
329 };
330
331 let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize;
333 if total_shared_size > fork_size {
334 part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size);
336 while total_shared_size > fork_size {
337 let Some(info) = part_infos.pop() else {
338 break;
339 };
340
341 common_telemetry::debug!(
342 "Evict partition {} with memory size {}, {} shards",
343 info.0,
344 info.2.shared_memory_size,
345 info.2.shard_num,
346 );
347
348 total_shared_size -= info.2.shared_memory_size;
349 }
350 }
351
352 let mut forked = BTreeMap::new();
353 for (part_key, part, _) in part_infos {
354 let forked_part = part.fork(&metadata, &self.config);
355 forked.insert(part_key, Arc::new(forked_part));
356 }
357
358 PartitionTree {
359 config: self.config.clone(),
360 metadata,
361 row_codec: self.row_codec.clone(),
362 partitions: RwLock::new(forked),
363 is_partitioned: self.is_partitioned,
364 write_buffer_manager: self.write_buffer_manager.clone(),
365 sparse_encoder: self.sparse_encoder.clone(),
366 }
367 }
368
369 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
371 self.write_buffer_manager.clone()
372 }
373
374 fn write_with_key(
375 &self,
376 primary_key: &mut Vec<u8>,
377 key_value: KeyValue,
378 metrics: &mut WriteMetrics,
379 ) -> Result<()> {
380 let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
381 let partition = self.get_or_create_partition(partition_key);
382
383 partition.write_with_key(
384 primary_key,
385 self.row_codec.as_ref(),
386 key_value,
387 self.is_partitioned, metrics,
389 )
390 }
391
392 fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
393 let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
394 let partition = self.get_or_create_partition(partition_key);
395
396 partition.write_no_key(key_value)
397 }
398
399 fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
400 let mut partitions = self.partitions.write().unwrap();
401 partitions
402 .entry(partition_key)
403 .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
404 .clone()
405 }
406
407 fn prune_partitions(
408 &self,
409 filters: &[SimpleFilterEvaluator],
410 metrics: &mut TreeIterMetrics,
411 ) -> VecDeque<PartitionRef> {
412 let partitions = self.partitions.read().unwrap();
413 metrics.partitions_total = partitions.len();
414 if !self.is_partitioned {
415 return partitions.values().cloned().collect();
416 }
417
418 let mut pruned = VecDeque::new();
419 for (key, partition) in partitions.iter() {
421 let mut is_needed = true;
422 for filter in filters {
423 if !is_partition_column(filter.column_name()) {
424 continue;
425 }
426
427 if !filter
428 .evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
429 .unwrap_or(true)
430 {
431 is_needed = false;
432 }
433 }
434
435 if is_needed {
436 pruned.push_back(partition.clone());
437 }
438 }
439 metrics.partitions_after_pruning = pruned.len();
440 pruned
441 }
442}
443
444#[derive(Default)]
445struct TreeIterMetrics {
446 iter_elapsed: Duration,
447 fetch_partition_elapsed: Duration,
448 rows_fetched: usize,
449 batches_fetched: usize,
450 partitions_total: usize,
451 partitions_after_pruning: usize,
452}
453
454struct TreeIter {
455 sequence: Option<SequenceNumber>,
457 partitions: VecDeque<PartitionRef>,
458 current_reader: Option<PartitionReader>,
459 metrics: TreeIterMetrics,
460}
461
462impl Drop for TreeIter {
463 fn drop(&mut self) {
464 READ_ROWS_TOTAL
465 .with_label_values(&["partition_tree_memtable"])
466 .inc_by(self.metrics.rows_fetched as u64);
467 PARTITION_TREE_READ_STAGE_ELAPSED
468 .with_label_values(&["fetch_next_partition"])
469 .observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
470 let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
471 READ_STAGE_ELAPSED
472 .with_label_values(&["scan_memtable"])
473 .observe(scan_elapsed);
474 common_telemetry::debug!(
475 "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
476 self.metrics.partitions_total,
477 self.metrics.partitions_after_pruning,
478 self.metrics.rows_fetched,
479 self.metrics.batches_fetched,
480 scan_elapsed
481 );
482 }
483}
484
485impl Iterator for TreeIter {
486 type Item = Result<Batch>;
487
488 fn next(&mut self) -> Option<Self::Item> {
489 let start = Instant::now();
490 let res = self.next_batch().transpose();
491 self.metrics.iter_elapsed += start.elapsed();
492 res
493 }
494}
495
496impl TreeIter {
497 fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
499 let start = Instant::now();
500 while let Some(partition) = self.partitions.pop_front() {
501 let part_reader = partition.read(context)?;
502 if !part_reader.is_valid() {
503 context = part_reader.into_context();
504 continue;
505 }
506 self.current_reader = Some(part_reader);
507 break;
508 }
509 self.metrics.fetch_partition_elapsed += start.elapsed();
510 Ok(())
511 }
512
513 fn next_batch(&mut self) -> Result<Option<Batch>> {
515 let Some(part_reader) = &mut self.current_reader else {
516 return Ok(None);
517 };
518
519 debug_assert!(part_reader.is_valid());
520 let batch = part_reader.convert_current_batch()?;
521 part_reader.next()?;
522 if part_reader.is_valid() {
523 self.metrics.rows_fetched += batch.num_rows();
524 self.metrics.batches_fetched += 1;
525 let mut batch = batch;
526 batch.filter_by_sequence(self.sequence)?;
527 return Ok(Some(batch));
528 }
529
530 let part_reader = self.current_reader.take().unwrap();
532 let context = part_reader.into_context();
533 self.fetch_next_partition(context)?;
534
535 self.metrics.rows_fetched += batch.num_rows();
536 self.metrics.batches_fetched += 1;
537 let mut batch = batch;
538 batch.filter_by_sequence(self.sequence)?;
539 Ok(Some(batch))
540 }
541}