mito2/memtable/partition_tree/
tree.rs1use std::collections::{BTreeMap, HashSet, VecDeque};
18use std::sync::{Arc, RwLock};
19use std::time::{Duration, Instant};
20
21use api::v1::OpType;
22use common_recordbatch::filter::SimpleFilterEvaluator;
23use common_time::Timestamp;
24use datafusion_common::ScalarValue;
25use datatypes::prelude::ValueRef;
26use mito_codec::key_values::KeyValue;
27use mito_codec::primary_key_filter::is_partition_column;
28use mito_codec::row_converter::sparse::{FieldWithId, SparseEncoder};
29use mito_codec::row_converter::{PrimaryKeyCodec, SortField};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, SequenceRange};
34use table::predicate::Predicate;
35
36use crate::error::{
37 EncodeSnafu, EncodeSparsePrimaryKeySnafu, PrimaryKeyLengthMismatchSnafu, Result,
38};
39use crate::flush::WriteBufferManagerRef;
40use crate::memtable::partition_tree::PartitionTreeConfig;
41use crate::memtable::partition_tree::partition::{
42 Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
43};
44use crate::memtable::stats::WriteMetrics;
45use crate::memtable::{BoxedBatchIterator, KeyValues};
46use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
47use crate::read::Batch;
48use crate::read::dedup::LastNonNullIter;
49use crate::region::options::MergeMode;
50
51pub struct PartitionTree {
53 config: PartitionTreeConfig,
55 pub(crate) metadata: RegionMetadataRef,
57 row_codec: Arc<dyn PrimaryKeyCodec>,
59 partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
61 is_partitioned: bool,
63 write_buffer_manager: Option<WriteBufferManagerRef>,
65 sparse_encoder: Arc<SparseEncoder>,
66}
67
68impl PartitionTree {
69 pub fn new(
71 row_codec: Arc<dyn PrimaryKeyCodec>,
72 metadata: RegionMetadataRef,
73 config: &PartitionTreeConfig,
74 write_buffer_manager: Option<WriteBufferManagerRef>,
75 ) -> Self {
76 let sparse_encoder = SparseEncoder::new(
77 metadata
78 .primary_key_columns()
79 .map(|c| FieldWithId {
80 field: SortField::new(c.column_schema.data_type.clone()),
81 column_id: c.column_id,
82 })
83 .collect(),
84 );
85 let is_partitioned = Partition::has_multi_partitions(&metadata);
86 let mut config = config.clone();
87 if config.merge_mode == MergeMode::LastNonNull {
88 config.dedup = false;
89 }
90
91 PartitionTree {
92 config,
93 metadata,
94 row_codec,
95 partitions: Default::default(),
96 is_partitioned,
97 write_buffer_manager,
98 sparse_encoder: Arc::new(sparse_encoder),
99 }
100 }
101
102 fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
103 if let Some(expected_num_fields) = self.row_codec.num_fields() {
105 ensure!(
106 expected_num_fields == kv.num_primary_keys(),
107 PrimaryKeyLengthMismatchSnafu {
108 expect: expected_num_fields,
109 actual: kv.num_primary_keys(),
110 }
111 );
112 }
113 Ok(())
115 }
116
117 fn encode_sparse_primary_key(&self, kv: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
119 if kv.primary_key_encoding() == PrimaryKeyEncoding::Sparse {
120 let ValueRef::Binary(primary_key) = kv.primary_keys().next().unwrap() else {
123 return EncodeSparsePrimaryKeySnafu {
124 reason: "sparse primary key is not binary".to_string(),
125 }
126 .fail();
127 };
128 buffer.extend_from_slice(primary_key);
129 } else {
130 self.sparse_encoder
132 .encode_to_vec(kv.primary_keys(), buffer)
133 .context(EncodeSnafu)?;
134 }
135 Ok(())
136 }
137
138 pub fn write(
144 &self,
145 kvs: &KeyValues,
146 pk_buffer: &mut Vec<u8>,
147 metrics: &mut WriteMetrics,
148 ) -> Result<()> {
149 let has_pk = !self.metadata.primary_key.is_empty();
150
151 for kv in kvs.iter() {
152 self.verify_primary_key_length(&kv)?;
153 let ts = kv
155 .timestamp()
156 .try_into_timestamp()
157 .unwrap()
158 .unwrap()
159 .value();
160 metrics.min_ts = metrics.min_ts.min(ts);
161 metrics.max_ts = metrics.max_ts.max(ts);
162 metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
163
164 if !has_pk {
165 self.write_no_key(kv)?;
167 continue;
168 }
169
170 pk_buffer.clear();
172 if self.is_partitioned {
173 self.encode_sparse_primary_key(&kv, pk_buffer)?;
174 } else {
175 self.row_codec
176 .encode_key_value(&kv, pk_buffer)
177 .context(EncodeSnafu)?;
178 }
179
180 self.write_with_key(pk_buffer, kv, metrics)?;
182 }
183
184 metrics.value_bytes +=
185 kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
186
187 Ok(())
188 }
189
190 pub fn write_one(
195 &self,
196 kv: KeyValue,
197 pk_buffer: &mut Vec<u8>,
198 metrics: &mut WriteMetrics,
199 ) -> Result<()> {
200 let has_pk = !self.metadata.primary_key.is_empty();
201
202 self.verify_primary_key_length(&kv)?;
203 let ts = kv
205 .timestamp()
206 .try_into_timestamp()
207 .unwrap()
208 .unwrap()
209 .value();
210 metrics.min_ts = metrics.min_ts.min(ts);
211 metrics.max_ts = metrics.max_ts.max(ts);
212 metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
213
214 if !has_pk {
215 return self.write_no_key(kv);
217 }
218
219 pk_buffer.clear();
221 if self.is_partitioned {
222 self.encode_sparse_primary_key(&kv, pk_buffer)?;
223 } else {
224 self.row_codec
225 .encode_key_value(&kv, pk_buffer)
226 .context(EncodeSnafu)?;
227 }
228
229 self.write_with_key(pk_buffer, kv, metrics)?;
231
232 metrics.value_bytes += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();
233
234 Ok(())
235 }
236
237 pub fn read(
239 &self,
240 projection: Option<&[ColumnId]>,
241 predicate: Option<Predicate>,
242 sequence: Option<SequenceRange>,
243 mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
244 ) -> Result<BoxedBatchIterator> {
245 let start = Instant::now();
246 let projection: HashSet<_> = if let Some(projection) = projection {
248 projection.iter().copied().collect()
249 } else {
250 self.metadata.field_columns().map(|c| c.column_id).collect()
251 };
252
253 let filters = predicate
254 .map(|predicate| {
255 predicate
256 .exprs()
257 .iter()
258 .filter_map(SimpleFilterEvaluator::try_new)
259 .collect::<Vec<_>>()
260 })
261 .unwrap_or_default();
262
263 let mut tree_iter_metric = TreeIterMetrics::default();
264 let partitions = self.prune_partitions(&filters, &mut tree_iter_metric);
265
266 let mut iter = TreeIter {
267 sequence,
268 partitions,
269 current_reader: None,
270 metrics: tree_iter_metric,
271 mem_scan_metrics,
272 };
273 let context = ReadPartitionContext::new(
274 self.metadata.clone(),
275 self.row_codec.clone(),
276 projection,
277 Arc::new(filters),
278 );
279 iter.fetch_next_partition(context)?;
280
281 iter.metrics.iter_elapsed += start.elapsed();
282
283 if self.config.merge_mode == MergeMode::LastNonNull {
284 let iter = LastNonNullIter::new(iter);
285 Ok(Box::new(iter))
286 } else {
287 Ok(Box::new(iter))
288 }
289 }
290
291 pub fn is_empty(&self) -> bool {
295 let partitions = self.partitions.read().unwrap();
296 partitions.values().all(|part| !part.has_data())
297 }
298
299 pub fn freeze(&self) -> Result<()> {
303 let partitions = self.partitions.read().unwrap();
304 for partition in partitions.values() {
305 partition.freeze()?;
306 }
307 Ok(())
308 }
309
310 pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
313 if self.metadata.schema_version != metadata.schema_version
314 || self.metadata.column_metadatas != metadata.column_metadatas
315 {
316 return PartitionTree::new(
318 self.row_codec.clone(),
319 metadata,
320 &self.config,
321 self.write_buffer_manager.clone(),
322 );
323 }
324
325 let mut total_shared_size = 0;
326 let mut part_infos = {
327 let partitions = self.partitions.read().unwrap();
328 partitions
329 .iter()
330 .filter_map(|(part_key, part)| {
331 let stats = part.stats();
332 if stats.num_rows > 0 {
333 total_shared_size += stats.shared_memory_size;
335 Some((*part_key, part.clone(), stats))
336 } else {
337 None
338 }
339 })
340 .collect::<Vec<_>>()
341 };
342
343 let fork_size = self.config.fork_dictionary_bytes.as_bytes() as usize;
345 if total_shared_size > fork_size {
346 part_infos.sort_unstable_by_key(|info| info.2.shared_memory_size);
348 while total_shared_size > fork_size {
349 let Some(info) = part_infos.pop() else {
350 break;
351 };
352
353 common_telemetry::debug!(
354 "Evict partition {} with memory size {}, {} shards",
355 info.0,
356 info.2.shared_memory_size,
357 info.2.shard_num,
358 );
359
360 total_shared_size -= info.2.shared_memory_size;
361 }
362 }
363
364 let mut forked = BTreeMap::new();
365 for (part_key, part, _) in part_infos {
366 let forked_part = part.fork(&metadata, &self.config);
367 forked.insert(part_key, Arc::new(forked_part));
368 }
369
370 PartitionTree {
371 config: self.config.clone(),
372 metadata,
373 row_codec: self.row_codec.clone(),
374 partitions: RwLock::new(forked),
375 is_partitioned: self.is_partitioned,
376 write_buffer_manager: self.write_buffer_manager.clone(),
377 sparse_encoder: self.sparse_encoder.clone(),
378 }
379 }
380
381 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
383 self.write_buffer_manager.clone()
384 }
385
386 fn write_with_key(
387 &self,
388 primary_key: &mut Vec<u8>,
389 key_value: KeyValue,
390 metrics: &mut WriteMetrics,
391 ) -> Result<()> {
392 let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
393 let partition = self.get_or_create_partition(partition_key);
394
395 partition.write_with_key(
396 primary_key,
397 self.row_codec.as_ref(),
398 key_value,
399 self.is_partitioned, metrics,
401 )
402 }
403
404 fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
405 let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
406 let partition = self.get_or_create_partition(partition_key);
407
408 partition.write_no_key(key_value)
409 }
410
411 fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
412 let mut partitions = self.partitions.write().unwrap();
413 partitions
414 .entry(partition_key)
415 .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
416 .clone()
417 }
418
419 fn prune_partitions(
420 &self,
421 filters: &[SimpleFilterEvaluator],
422 metrics: &mut TreeIterMetrics,
423 ) -> VecDeque<PartitionRef> {
424 let partitions = self.partitions.read().unwrap();
425 metrics.partitions_total = partitions.len();
426 if !self.is_partitioned {
427 return partitions.values().cloned().collect();
428 }
429
430 let mut pruned = VecDeque::new();
431 for (key, partition) in partitions.iter() {
433 let mut is_needed = true;
434 for filter in filters {
435 if !is_partition_column(filter.column_name()) {
436 continue;
437 }
438
439 if !filter
440 .evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
441 .unwrap_or(true)
442 {
443 is_needed = false;
444 }
445 }
446
447 if is_needed {
448 pruned.push_back(partition.clone());
449 }
450 }
451 metrics.partitions_after_pruning = pruned.len();
452 pruned
453 }
454
455 pub(crate) fn series_count(&self) -> usize {
457 self.partitions
458 .read()
459 .unwrap()
460 .values()
461 .map(|p| p.series_count())
462 .sum()
463 }
464}
465
466#[derive(Default)]
467struct TreeIterMetrics {
468 iter_elapsed: Duration,
469 fetch_partition_elapsed: Duration,
470 rows_fetched: usize,
471 batches_fetched: usize,
472 partitions_total: usize,
473 partitions_after_pruning: usize,
474}
475
476struct TreeIter {
477 sequence: Option<SequenceRange>,
479 partitions: VecDeque<PartitionRef>,
480 current_reader: Option<PartitionReader>,
481 metrics: TreeIterMetrics,
482 mem_scan_metrics: Option<crate::memtable::MemScanMetrics>,
483}
484
485impl TreeIter {
486 fn report_mem_scan_metrics(&mut self) {
487 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
488 let inner = crate::memtable::MemScanMetricsData {
489 total_series: 0, num_rows: self.metrics.rows_fetched,
491 num_batches: self.metrics.batches_fetched,
492 scan_cost: self.metrics.iter_elapsed,
493 };
494 mem_scan_metrics.merge_inner(&inner);
495 }
496 }
497}
498
499impl Drop for TreeIter {
500 fn drop(&mut self) {
501 self.report_mem_scan_metrics();
503
504 READ_ROWS_TOTAL
505 .with_label_values(&["partition_tree_memtable"])
506 .inc_by(self.metrics.rows_fetched as u64);
507 PARTITION_TREE_READ_STAGE_ELAPSED
508 .with_label_values(&["fetch_next_partition"])
509 .observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
510 let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();
511 READ_STAGE_ELAPSED
512 .with_label_values(&["scan_memtable"])
513 .observe(scan_elapsed);
514 common_telemetry::debug!(
515 "TreeIter partitions total: {}, partitions after prune: {}, rows fetched: {}, batches fetched: {}, scan elapsed: {}",
516 self.metrics.partitions_total,
517 self.metrics.partitions_after_pruning,
518 self.metrics.rows_fetched,
519 self.metrics.batches_fetched,
520 scan_elapsed
521 );
522 }
523}
524
525impl Iterator for TreeIter {
526 type Item = Result<Batch>;
527
528 fn next(&mut self) -> Option<Self::Item> {
529 let start = Instant::now();
530 let res = self.next_batch().transpose();
531 self.metrics.iter_elapsed += start.elapsed();
532 res
533 }
534}
535
536impl TreeIter {
537 fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
539 let start = Instant::now();
540 while let Some(partition) = self.partitions.pop_front() {
541 let part_reader = partition.read(context)?;
542 if !part_reader.is_valid() {
543 context = part_reader.into_context();
544 continue;
545 }
546 self.current_reader = Some(part_reader);
547 break;
548 }
549 self.metrics.fetch_partition_elapsed += start.elapsed();
550 Ok(())
551 }
552
553 fn next_batch(&mut self) -> Result<Option<Batch>> {
555 let Some(part_reader) = &mut self.current_reader else {
556 self.report_mem_scan_metrics();
558 return Ok(None);
559 };
560
561 debug_assert!(part_reader.is_valid());
562 let batch = part_reader.convert_current_batch()?;
563 part_reader.next()?;
564 if part_reader.is_valid() {
565 self.metrics.rows_fetched += batch.num_rows();
566 self.metrics.batches_fetched += 1;
567 let mut batch = batch;
568 batch.filter_by_sequence(self.sequence)?;
569 return Ok(Some(batch));
570 }
571
572 let part_reader = self.current_reader.take().unwrap();
574 let context = part_reader.into_context();
575 self.fetch_next_partition(context)?;
576
577 self.metrics.rows_fetched += batch.num_rows();
578 self.metrics.batches_fetched += 1;
579 let mut batch = batch;
580 batch.filter_by_sequence(self.sequence)?;
581 Ok(Some(batch))
582 }
583}