1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::{Arc, Mutex};
21use std::time::Duration;
22
23pub use bulk::part::EncodedBulkPart;
24use common_time::Timestamp;
25use datatypes::arrow::record_batch::RecordBatch;
26use mito_codec::key_values::KeyValue;
27pub use mito_codec::key_values::KeyValues;
28use serde::{Deserialize, Serialize};
29use store_api::metadata::RegionMetadataRef;
30use store_api::storage::{ColumnId, SequenceNumber};
31
32use crate::config::MitoConfig;
33use crate::error::{Result, UnsupportedOperationSnafu};
34use crate::flush::WriteBufferManagerRef;
35use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
36use crate::memtable::time_series::TimeSeriesMemtableBuilder;
37use crate::metrics::WRITE_BUFFER_BYTES;
38use crate::read::prune::PruneTimeIterator;
39use crate::read::scan_region::PredicateGroup;
40use crate::read::Batch;
41use crate::region::options::{MemtableOptions, MergeMode};
42use crate::sst::file::FileTimeRange;
43
44mod builder;
45pub mod bulk;
46pub mod partition_tree;
47pub mod simple_bulk_memtable;
48mod stats;
49pub mod time_partition;
50pub mod time_series;
51pub(crate) mod version;
52
53#[cfg(any(test, feature = "test"))]
54pub use bulk::part::BulkPart;
55#[cfg(any(test, feature = "test"))]
56pub use time_partition::filter_record_batch;
57
58pub type MemtableId = u32;
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
65#[serde(tag = "type", rename_all = "snake_case")]
66pub enum MemtableConfig {
67 PartitionTree(PartitionTreeConfig),
68 TimeSeries,
69}
70
71impl Default for MemtableConfig {
72 fn default() -> Self {
73 Self::TimeSeries
74 }
75}
76
77#[derive(Debug, Default, Clone)]
78pub struct MemtableStats {
79 estimated_bytes: usize,
81 time_range: Option<(Timestamp, Timestamp)>,
84 pub num_rows: usize,
86 pub num_ranges: usize,
88 max_sequence: SequenceNumber,
90 series_count: usize,
92}
93
94impl MemtableStats {
95 #[cfg(any(test, feature = "test"))]
97 pub fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
98 self.time_range = time_range;
99 self
100 }
101
102 #[cfg(feature = "test")]
103 pub fn with_max_sequence(mut self, max_sequence: SequenceNumber) -> Self {
104 self.max_sequence = max_sequence;
105 self
106 }
107
108 pub fn bytes_allocated(&self) -> usize {
110 self.estimated_bytes
111 }
112
113 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
115 self.time_range
116 }
117
118 pub fn num_rows(&self) -> usize {
120 self.num_rows
121 }
122
123 pub fn num_ranges(&self) -> usize {
125 self.num_ranges
126 }
127
128 pub fn max_sequence(&self) -> SequenceNumber {
130 self.max_sequence
131 }
132
133 pub fn series_count(&self) -> usize {
135 self.series_count
136 }
137}
138
139pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
140
141pub type BoxedRecordBatchIterator = Box<dyn Iterator<Item = Result<RecordBatch>> + Send>;
142
143#[derive(Default)]
145pub struct MemtableRanges {
146 pub ranges: BTreeMap<usize, MemtableRange>,
148 pub stats: MemtableStats,
150}
151
152pub trait Memtable: Send + Sync + fmt::Debug {
154 fn id(&self) -> MemtableId;
156
157 fn write(&self, kvs: &KeyValues) -> Result<()>;
159
160 fn write_one(&self, key_value: KeyValue) -> Result<()>;
162
163 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
165
166 #[cfg(any(test, feature = "test"))]
173 fn iter(
174 &self,
175 projection: Option<&[ColumnId]>,
176 predicate: Option<table::predicate::Predicate>,
177 sequence: Option<SequenceNumber>,
178 ) -> Result<BoxedBatchIterator>;
179
180 fn ranges(
183 &self,
184 projection: Option<&[ColumnId]>,
185 predicate: PredicateGroup,
186 sequence: Option<SequenceNumber>,
187 ) -> Result<MemtableRanges>;
188
189 fn is_empty(&self) -> bool;
191
192 fn freeze(&self) -> Result<()>;
194
195 fn stats(&self) -> MemtableStats;
197
198 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
202}
203
204pub type MemtableRef = Arc<dyn Memtable>;
205
206pub trait MemtableBuilder: Send + Sync + fmt::Debug {
208 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
210
211 fn use_bulk_insert(&self, metadata: &RegionMetadataRef) -> bool {
213 let _metadata = metadata;
214 false
215 }
216}
217
218pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
219
220#[derive(Default)]
222pub struct AllocTracker {
223 write_buffer_manager: Option<WriteBufferManagerRef>,
224 bytes_allocated: AtomicUsize,
226 is_done_allocating: AtomicBool,
228}
229
230impl fmt::Debug for AllocTracker {
231 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232 f.debug_struct("AllocTracker")
233 .field("bytes_allocated", &self.bytes_allocated)
234 .field("is_done_allocating", &self.is_done_allocating)
235 .finish()
236 }
237}
238
239impl AllocTracker {
240 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
242 AllocTracker {
243 write_buffer_manager,
244 bytes_allocated: AtomicUsize::new(0),
245 is_done_allocating: AtomicBool::new(false),
246 }
247 }
248
249 pub(crate) fn on_allocation(&self, bytes: usize) {
251 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
252 WRITE_BUFFER_BYTES.add(bytes as i64);
253 if let Some(write_buffer_manager) = &self.write_buffer_manager {
254 write_buffer_manager.reserve_mem(bytes);
255 }
256 }
257
258 pub(crate) fn done_allocating(&self) {
263 if let Some(write_buffer_manager) = &self.write_buffer_manager {
264 if self
265 .is_done_allocating
266 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
267 .is_ok()
268 {
269 write_buffer_manager
270 .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
271 }
272 }
273 }
274
275 pub(crate) fn bytes_allocated(&self) -> usize {
277 self.bytes_allocated.load(Ordering::Relaxed)
278 }
279
280 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
282 self.write_buffer_manager.clone()
283 }
284}
285
286impl Drop for AllocTracker {
287 fn drop(&mut self) {
288 if !self.is_done_allocating.load(Ordering::Relaxed) {
289 self.done_allocating();
290 }
291
292 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
293 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
294
295 if let Some(write_buffer_manager) = &self.write_buffer_manager {
297 write_buffer_manager.free_mem(bytes_allocated);
298 }
299 }
300}
301
302#[derive(Clone)]
304pub(crate) struct MemtableBuilderProvider {
305 write_buffer_manager: Option<WriteBufferManagerRef>,
306 config: Arc<MitoConfig>,
307}
308
309impl MemtableBuilderProvider {
310 pub(crate) fn new(
311 write_buffer_manager: Option<WriteBufferManagerRef>,
312 config: Arc<MitoConfig>,
313 ) -> Self {
314 Self {
315 write_buffer_manager,
316 config,
317 }
318 }
319
320 pub(crate) fn builder_for_options(
321 &self,
322 options: Option<&MemtableOptions>,
323 dedup: bool,
324 merge_mode: MergeMode,
325 ) -> MemtableBuilderRef {
326 match options {
327 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
328 self.write_buffer_manager.clone(),
329 dedup,
330 merge_mode,
331 )),
332 Some(MemtableOptions::PartitionTree(opts)) => {
333 Arc::new(PartitionTreeMemtableBuilder::new(
334 PartitionTreeConfig {
335 index_max_keys_per_shard: opts.index_max_keys_per_shard,
336 data_freeze_threshold: opts.data_freeze_threshold,
337 fork_dictionary_bytes: opts.fork_dictionary_bytes,
338 dedup,
339 merge_mode,
340 },
341 self.write_buffer_manager.clone(),
342 ))
343 }
344 None => self.default_memtable_builder(dedup, merge_mode),
345 }
346 }
347
348 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
349 match &self.config.memtable {
350 MemtableConfig::PartitionTree(config) => {
351 let mut config = config.clone();
352 config.dedup = dedup;
353 Arc::new(PartitionTreeMemtableBuilder::new(
354 config,
355 self.write_buffer_manager.clone(),
356 ))
357 }
358 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
359 self.write_buffer_manager.clone(),
360 dedup,
361 merge_mode,
362 )),
363 }
364 }
365}
366
367#[derive(Clone, Default)]
369pub struct MemScanMetrics(Arc<Mutex<MemScanMetricsData>>);
370
371impl MemScanMetrics {
372 pub(crate) fn merge_inner(&self, inner: &MemScanMetricsData) {
374 let mut metrics = self.0.lock().unwrap();
375 metrics.total_series += inner.total_series;
376 metrics.num_rows += inner.num_rows;
377 metrics.num_batches += inner.num_batches;
378 metrics.scan_cost += inner.scan_cost;
379 }
380
381 pub(crate) fn data(&self) -> MemScanMetricsData {
383 self.0.lock().unwrap().clone()
384 }
385}
386
387#[derive(Clone, Default)]
388pub(crate) struct MemScanMetricsData {
389 pub(crate) total_series: usize,
391 pub(crate) num_rows: usize,
393 pub(crate) num_batches: usize,
395 pub(crate) scan_cost: Duration,
397}
398
399pub trait IterBuilder: Send + Sync {
402 fn build(&self, metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator>;
404
405 fn is_record_batch(&self) -> bool {
407 false
408 }
409
410 fn build_record_batch(
412 &self,
413 metrics: Option<MemScanMetrics>,
414 ) -> Result<BoxedRecordBatchIterator> {
415 let _metrics = metrics;
416 UnsupportedOperationSnafu {
417 err_msg: "Record batch iterator is not supported by this memtable",
418 }
419 .fail()
420 }
421}
422
423pub type BoxedIterBuilder = Box<dyn IterBuilder>;
424
425pub struct MemtableRangeContext {
427 id: MemtableId,
429 builder: BoxedIterBuilder,
431 predicate: PredicateGroup,
433}
434
435pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
436
437impl MemtableRangeContext {
438 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
440 Self {
441 id,
442 builder,
443 predicate,
444 }
445 }
446}
447
448#[derive(Clone)]
450pub struct MemtableRange {
451 context: MemtableRangeContextRef,
453 num_rows: usize,
456}
457
458impl MemtableRange {
459 pub fn new(context: MemtableRangeContextRef, num_rows: usize) -> Self {
461 Self { context, num_rows }
462 }
463
464 pub fn id(&self) -> MemtableId {
466 self.context.id
467 }
468
469 pub fn build_prune_iter(
473 &self,
474 time_range: FileTimeRange,
475 metrics: Option<MemScanMetrics>,
476 ) -> Result<BoxedBatchIterator> {
477 let iter = self.context.builder.build(metrics)?;
478 let time_filters = self.context.predicate.time_filters();
479 Ok(Box::new(PruneTimeIterator::new(
480 iter,
481 time_range,
482 time_filters,
483 )))
484 }
485
486 pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
488 self.context.builder.build(None)
489 }
490
491 pub fn build_record_batch_iter(
496 &self,
497 metrics: Option<MemScanMetrics>,
498 ) -> Result<BoxedRecordBatchIterator> {
499 self.context.builder.build_record_batch(metrics)
500 }
501
502 pub fn is_record_batch(&self) -> bool {
504 self.context.builder.is_record_batch()
505 }
506
507 pub fn num_rows(&self) -> usize {
508 self.num_rows
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use common_base::readable_size::ReadableSize;
515
516 use super::*;
517 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
518
519 #[test]
520 fn test_deserialize_memtable_config() {
521 let s = r#"
522type = "partition_tree"
523index_max_keys_per_shard = 8192
524data_freeze_threshold = 1024
525dedup = true
526fork_dictionary_bytes = "512MiB"
527"#;
528 let config: MemtableConfig = toml::from_str(s).unwrap();
529 let MemtableConfig::PartitionTree(memtable_config) = config else {
530 unreachable!()
531 };
532 assert!(memtable_config.dedup);
533 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
534 assert_eq!(1024, memtable_config.data_freeze_threshold);
535 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
536 }
537
538 #[test]
539 fn test_alloc_tracker_without_manager() {
540 let tracker = AllocTracker::new(None);
541 assert_eq!(0, tracker.bytes_allocated());
542 tracker.on_allocation(100);
543 assert_eq!(100, tracker.bytes_allocated());
544 tracker.on_allocation(200);
545 assert_eq!(300, tracker.bytes_allocated());
546
547 tracker.done_allocating();
548 assert_eq!(300, tracker.bytes_allocated());
549 }
550
551 #[test]
552 fn test_alloc_tracker_with_manager() {
553 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
554 {
555 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
556
557 tracker.on_allocation(100);
558 assert_eq!(100, tracker.bytes_allocated());
559 assert_eq!(100, manager.memory_usage());
560 assert_eq!(100, manager.mutable_usage());
561
562 for _ in 0..2 {
563 tracker.done_allocating();
565 assert_eq!(100, manager.memory_usage());
566 assert_eq!(0, manager.mutable_usage());
567 }
568 }
569
570 assert_eq!(0, manager.memory_usage());
571 assert_eq!(0, manager.mutable_usage());
572 }
573
574 #[test]
575 fn test_alloc_tracker_without_done_allocating() {
576 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
577 {
578 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
579
580 tracker.on_allocation(100);
581 assert_eq!(100, tracker.bytes_allocated());
582 assert_eq!(100, manager.memory_usage());
583 assert_eq!(100, manager.mutable_usage());
584 }
585
586 assert_eq!(0, manager.memory_usage());
587 assert_eq!(0, manager.mutable_usage());
588 }
589}