1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use common_time::Timestamp;
25use mito_codec::key_values::KeyValue;
26pub use mito_codec::key_values::KeyValues;
27use serde::{Deserialize, Serialize};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, SequenceNumber};
30
31use crate::config::MitoConfig;
32use crate::error::Result;
33use crate::flush::WriteBufferManagerRef;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod partition_tree;
46pub mod simple_bulk_memtable;
47mod stats;
48pub mod time_partition;
49pub mod time_series;
50pub(crate) mod version;
51
52#[cfg(any(test, feature = "test"))]
53pub use bulk::part::BulkPart;
54#[cfg(any(test, feature = "test"))]
55pub use time_partition::filter_record_batch;
56
57pub type MemtableId = u32;
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum MemtableConfig {
66 PartitionTree(PartitionTreeConfig),
67 TimeSeries,
68}
69
70impl Default for MemtableConfig {
71 fn default() -> Self {
72 Self::TimeSeries
73 }
74}
75
76#[derive(Debug, Default, Clone)]
77pub struct MemtableStats {
78 estimated_bytes: usize,
80 time_range: Option<(Timestamp, Timestamp)>,
83 pub num_rows: usize,
85 pub num_ranges: usize,
87 max_sequence: SequenceNumber,
89 series_count: usize,
91}
92
93impl MemtableStats {
94 #[cfg(any(test, feature = "test"))]
96 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
97 self.time_range = time_range;
98 self
99 }
100
101 #[cfg(feature = "test")]
102 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
103 self.max_sequence = max_sequence;
104 self
105 }
106
107 pub fn bytes_allocated(&self) -> usize {
109 self.estimated_bytes
110 }
111
112 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
114 self.time_range
115 }
116
117 pub fn num_rows(&self) -> usize {
119 self.num_rows
120 }
121
122 pub fn num_ranges(&self) -> usize {
124 self.num_ranges
125 }
126
127 pub fn max_sequence(&self) -> SequenceNumber {
129 self.max_sequence
130 }
131
132 pub fn series_count(&self) -> usize {
134 self.series_count
135 }
136}
137
138pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
139
140#[derive(Default)]
142pub struct MemtableRanges {
143 pub ranges: BTreeMap<usize, MemtableRange>,
145 pub stats: MemtableStats,
147}
148
149pub trait Memtable: Send + Sync + fmt::Debug {
151 fn id(&self) -> MemtableId;
153
154 fn write(&self, kvs: &KeyValues) -> Result<()>;
156
157 fn write_one(&self, key_value: KeyValue) -> Result<()>;
159
160 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
162
163 #[cfg(any(test, feature = "test"))]
170 fn iter(
171 &self,
172 projection: Option<&[ColumnId]>,
173 predicate: Option<table::predicate::Predicate>,
174 sequence: Option<SequenceNumber>,
175 ) -> Result<BoxedBatchIterator>;
176
177 fn ranges(
180 &self,
181 projection: Option<&[ColumnId]>,
182 predicate: PredicateGroup,
183 sequence: Option<SequenceNumber>,
184 ) -> Result<MemtableRanges>;
185
186 fn is_empty(&self) -> bool;
188
189 fn freeze(&self) -> Result<()>;
191
192 fn stats(&self) -> MemtableStats;
194
195 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
199}
200
201pub type MemtableRef = Arc<dyn Memtable>;
202
203pub trait MemtableBuilder: Send + Sync + fmt::Debug {
205 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
207
208 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
210 let _metadata = metadata;
211 false
212 }
213}
214
215pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
216
217#[derive(Default)]
219pub struct AllocTracker {
220 write_buffer_manager: Option<WriteBufferManagerRef>,
221 bytes_allocated: AtomicUsize,
223 is_done_allocating: AtomicBool,
225}
226
227impl fmt::Debug for AllocTracker {
228 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229 f.debug_struct("AllocTracker")
230 .field("bytes_allocated", &self.bytes_allocated)
231 .field("is_done_allocating", &self.is_done_allocating)
232 .finish()
233 }
234}
235
236impl AllocTracker {
237 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
239 AllocTracker {
240 write_buffer_manager,
241 bytes_allocated: AtomicUsize::new(0),
242 is_done_allocating: AtomicBool::new(false),
243 }
244 }
245
246 pub(crate) fn on_allocation(&self, bytes: usize) {
248 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
249 WRITE_BUFFER_BYTES.add(bytes as i64);
250 if let Some(write_buffer_manager) = &self.write_buffer_manager {
251 write_buffer_manager.reserve_mem(bytes);
252 }
253 }
254
255 pub(crate) fn done_allocating(&self) {
260 if let Some(write_buffer_manager) = &self.write_buffer_manager {
261 if self
262 .is_done_allocating
263 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
264 .is_ok()
265 {
266 write_buffer_manager
267 .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
268 }
269 }
270 }
271
272 pub(crate) fn bytes_allocated(&self) -> usize {
274 self.bytes_allocated.load(Ordering::Relaxed)
275 }
276
277 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
279 self.write_buffer_manager.clone()
280 }
281}
282
283impl Drop for AllocTracker {
284 fn drop(&mut self) {
285 if !self.is_done_allocating.load(Ordering::Relaxed) {
286 self.done_allocating();
287 }
288
289 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
290 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
291
292 if let Some(write_buffer_manager) = &self.write_buffer_manager {
294 write_buffer_manager.free_mem(bytes_allocated);
295 }
296 }
297}
298
299#[derive(Clone)]
301pub(crate) struct MemtableBuilderProvider {
302 write_buffer_manager: Option<WriteBufferManagerRef>,
303 config: Arc<MitoConfig>,
304}
305
306impl MemtableBuilderProvider {
307 pub(crate) fn new(
308 write_buffer_manager: Option<WriteBufferManagerRef>,
309 config: Arc<MitoConfig>,
310 ) -> Self {
311 Self {
312 write_buffer_manager,
313 config,
314 }
315 }
316
317 pub(crate) fn builder_for_options(
318 &self,
319 options: Option<&MemtableOptions>,
320 dedup: bool,
321 merge_mode: MergeMode,
322 ) -> MemtableBuilderRef {
323 match options {
324 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
325 self.write_buffer_manager.clone(),
326 dedup,
327 merge_mode,
328 )),
329 Some(MemtableOptions::PartitionTree(opts)) => {
330 Arc::new(PartitionTreeMemtableBuilder::new(
331 PartitionTreeConfig {
332 index_max_keys_per_shard: opts.index_max_keys_per_shard,
333 data_freeze_threshold: opts.data_freeze_threshold,
334 fork_dictionary_bytes: opts.fork_dictionary_bytes,
335 dedup,
336 merge_mode,
337 },
338 self.write_buffer_manager.clone(),
339 ))
340 }
341 None => self.default_memtable_builder(dedup, merge_mode),
342 }
343 }
344
345 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
346 match &self.config.memtable {
347 MemtableConfig::PartitionTree(config) => {
348 let mut config = config.clone();
349 config.dedup = dedup;
350 Arc::new(PartitionTreeMemtableBuilder::new(
351 config,
352 self.write_buffer_manager.clone(),
353 ))
354 }
355 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
356 self.write_buffer_manager.clone(),
357 dedup,
358 merge_mode,
359 )),
360 }
361 }
362}
363
364#[derive(Clone, Default)]
366pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
367
368impl MemScanMetrics {
369 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
371 let mut metrics = self.0.lock().unwrap();
372 metrics.total_series += inner.total_series;
373 metrics.num_rows += inner.num_rows;
374 metrics.num_batches += inner.num_batches;
375 metrics.scan_cost += inner.scan_cost;
376 }
377
378 pub(crate) fn data(&self) -> MemScanMetricsData {
380 self.0.lock().unwrap().clone()
381 }
382}
383
384#[derive(Clone, Default)]
385pub(crate) struct MemScanMetricsData {
386 pub(crate) total_series: usize,
388 pub(crate) num_rows: usize,
390 pub(crate) num_batches: usize,
392 pub(crate) scan_cost: Duration,
394}
395
396pub trait IterBuilder: Send + Sync {
399 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
401}
402
403pub type BoxedIterBuilder = Box<dyn IterBuilder>;
404
405pub struct MemtableRangeContext {
407 id: MemtableId,
409 builder: BoxedIterBuilder,
411 predicate: PredicateGroup,
413}
414
415pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
416
417impl MemtableRangeContext {
418 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
420 Self {
421 id,
422 builder,
423 predicate,
424 }
425 }
426}
427
428#[derive(Clone)]
430pub struct MemtableRange {
431 context: MemtableRangeContextRef,
433 num_rows: usize,
436}
437
438impl MemtableRange {
439 pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
441 Self { context, num_rows }
442 }
443
444 pub fn id(&self) -> MemtableId {
446 self.context.id
447 }
448
449 pub fn build_prune_iter(
453 &self,
454 time_range: FileTimeRange,
455 metrics: Option<MemScanMetrics>,
456 ) -> Result<BoxedBatchIterator> {
457 let iter = self.context.builder.build(metrics)?;
458 let time_filters = self.context.predicate.time_filters();
459 Ok(Box::new(PruneTimeIterator::new(
460 iter,
461 time_range,
462 time_filters,
463 )))
464 }
465
466 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
468 self.context.builder.build(None)
469 }
470
471 pub fn num_rows(&self) -> usize {
472 self.num_rows
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use common_base::readable_size::ReadableSize;
479
480 use super::*;
481 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
482
483 #[test]
484 fn test_deserialize_memtable_config() {
485 let s = r#"
486type = "partition_tree"
487index_max_keys_per_shard = 8192
488data_freeze_threshold = 1024
489dedup = true
490fork_dictionary_bytes = "512MiB"
491"#;
492 let config: MemtableConfig = toml::from_str(s).unwrap();
493 let MemtableConfig::PartitionTree(memtable_config) = config else {
494 unreachable!()
495 };
496 assert!(memtable_config.dedup);
497 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
498 assert_eq!(1024, memtable_config.data_freeze_threshold);
499 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
500 }
501
502 #[test]
503 fn test_alloc_tracker_without_manager() {
504 let tracker = AllocTracker::new(None);
505 assert_eq!(0, tracker.bytes_allocated());
506 tracker.on_allocation(100);
507 assert_eq!(100, tracker.bytes_allocated());
508 tracker.on_allocation(200);
509 assert_eq!(300, tracker.bytes_allocated());
510
511 tracker.done_allocating();
512 assert_eq!(300, tracker.bytes_allocated());
513 }
514
515 #[test]
516 fn test_alloc_tracker_with_manager() {
517 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
518 {
519 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
520
521 tracker.on_allocation(100);
522 assert_eq!(100, tracker.bytes_allocated());
523 assert_eq!(100, manager.memory_usage());
524 assert_eq!(100, manager.mutable_usage());
525
526 for _ in 0..2 {
527 tracker.done_allocating();
529 assert_eq!(100, manager.memory_usage());
530 assert_eq!(0, manager.mutable_usage());
531 }
532 }
533
534 assert_eq!(0, manager.memory_usage());
535 assert_eq!(0, manager.mutable_usage());
536 }
537
538 #[test]
539 fn test_alloc_tracker_without_done_allocating() {
540 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
541 {
542 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
543
544 tracker.on_allocation(100);
545 assert_eq!(100, tracker.bytes_allocated());
546 assert_eq!(100, manager.memory_usage());
547 assert_eq!(100, manager.mutable_usage());
548 }
549
550 assert_eq!(0, manager.memory_usage());
551 assert_eq!(0, manager.mutable_usage());
552 }
553}