1use std::collections::BTreeMap;
18use std::fmt;
19use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22pub use bulk::part::EncodedBulkPart;
23use common_time::Timestamp;
24use serde::{Deserialize, Serialize};
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::{ColumnId, SequenceNumber};
27use table::predicate::Predicate;
28
29use crate::config::MitoConfig;
30use crate::error::Result;
31use crate::flush::WriteBufferManagerRef;
32use crate::memtable::key_values::KeyValue;
33pub use crate::memtable::key_values::KeyValues;
34use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
35use crate::memtable::time_series::TimeSeriesMemtableBuilder;
36use crate::metrics::WRITE_BUFFER_BYTES;
37use crate::read::prune::PruneTimeIterator;
38use crate::read::scan_region::PredicateGroup;
39use crate::read::Batch;
40use crate::region::options::{MemtableOptions, MergeMode};
41use crate::sst::file::FileTimeRange;
42
43mod builder;
44pub mod bulk;
45pub mod key_values;
46pub mod partition_tree;
47mod simple_bulk_memtable;
48mod stats;
49pub mod time_partition;
50pub mod time_series;
51pub(crate) mod version;
52
53#[cfg(any(test, feature = "test"))]
54pub use bulk::part::BulkPart;
55#[cfg(any(test, feature = "test"))]
56pub use time_partition::filter_record_batch;
57
58pub type MemtableId = u32;
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
65#[serde(tag = "type", rename_all = "snake_case")]
66pub enum MemtableConfig {
67 PartitionTree(PartitionTreeConfig),
68 TimeSeries,
69}
70
71impl Default for MemtableConfig {
72 fn default() -> Self {
73 Self::TimeSeries
74 }
75}
76
77#[derive(Debug, Default)]
78pub struct MemtableStats {
79 estimated_bytes: usize,
81 time_range: Option<(Timestamp, Timestamp)>,
84 num_rows: usize,
86 num_ranges: usize,
88 max_sequence: SequenceNumber,
90}
91
92impl MemtableStats {
93 #[cfg(any(test, feature = "test"))]
95 pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
96 self.time_range = time_range;
97 self
98 }
99
100 pub fn bytes_allocated(&self) -> usize {
102 self.estimated_bytes
103 }
104
105 pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
107 self.time_range
108 }
109
110 pub fn num_rows(&self) -> usize {
112 self.num_rows
113 }
114
115 pub fn num_ranges(&self) -> usize {
117 self.num_ranges
118 }
119
120 pub fn max_sequence(&self) -> SequenceNumber {
122 self.max_sequence
123 }
124}
125
126pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
127
128#[derive(Default)]
130pub struct MemtableRanges {
131 pub ranges: BTreeMap<usize, MemtableRange>,
133 pub stats: MemtableStats,
135}
136
137pub trait Memtable: Send + Sync + fmt::Debug {
139 fn id(&self) -> MemtableId;
141
142 fn write(&self, kvs: &KeyValues) -> Result<()>;
144
145 fn write_one(&self, key_value: KeyValue) -> Result<()>;
147
148 fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
150
151 fn iter(
155 &self,
156 projection: Option<&[ColumnId]>,
157 predicate: Option<Predicate>,
158 sequence: Option<SequenceNumber>,
159 ) -> Result<BoxedBatchIterator>;
160
161 fn ranges(
164 &self,
165 projection: Option<&[ColumnId]>,
166 predicate: PredicateGroup,
167 sequence: Option<SequenceNumber>,
168 ) -> Result<MemtableRanges>;
169
170 fn is_empty(&self) -> bool;
172
173 fn freeze(&self) -> Result<()>;
175
176 fn stats(&self) -> MemtableStats;
178
179 fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
183}
184
185pub type MemtableRef = Arc<dyn Memtable>;
186
187pub trait MemtableBuilder: Send + Sync + fmt::Debug {
189 fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
191}
192
193pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
194
195#[derive(Default)]
197pub struct AllocTracker {
198 write_buffer_manager: Option<WriteBufferManagerRef>,
199 bytes_allocated: AtomicUsize,
201 is_done_allocating: AtomicBool,
203}
204
205impl fmt::Debug for AllocTracker {
206 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
207 f.debug_struct("AllocTracker")
208 .field("bytes_allocated", &self.bytes_allocated)
209 .field("is_done_allocating", &self.is_done_allocating)
210 .finish()
211 }
212}
213
214impl AllocTracker {
215 pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
217 AllocTracker {
218 write_buffer_manager,
219 bytes_allocated: AtomicUsize::new(0),
220 is_done_allocating: AtomicBool::new(false),
221 }
222 }
223
224 pub(crate) fn on_allocation(&self, bytes: usize) {
226 self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
227 WRITE_BUFFER_BYTES.add(bytes as i64);
228 if let Some(write_buffer_manager) = &self.write_buffer_manager {
229 write_buffer_manager.reserve_mem(bytes);
230 }
231 }
232
233 pub(crate) fn done_allocating(&self) {
238 if let Some(write_buffer_manager) = &self.write_buffer_manager {
239 if self
240 .is_done_allocating
241 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
242 .is_ok()
243 {
244 write_buffer_manager
245 .schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
246 }
247 }
248 }
249
250 pub(crate) fn bytes_allocated(&self) -> usize {
252 self.bytes_allocated.load(Ordering::Relaxed)
253 }
254
255 pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
257 self.write_buffer_manager.clone()
258 }
259}
260
261impl Drop for AllocTracker {
262 fn drop(&mut self) {
263 if !self.is_done_allocating.load(Ordering::Relaxed) {
264 self.done_allocating();
265 }
266
267 let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
268 WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
269
270 if let Some(write_buffer_manager) = &self.write_buffer_manager {
272 write_buffer_manager.free_mem(bytes_allocated);
273 }
274 }
275}
276
277#[derive(Clone)]
279pub(crate) struct MemtableBuilderProvider {
280 write_buffer_manager: Option<WriteBufferManagerRef>,
281 config: Arc<MitoConfig>,
282}
283
284impl MemtableBuilderProvider {
285 pub(crate) fn new(
286 write_buffer_manager: Option<WriteBufferManagerRef>,
287 config: Arc<MitoConfig>,
288 ) -> Self {
289 Self {
290 write_buffer_manager,
291 config,
292 }
293 }
294
295 pub(crate) fn builder_for_options(
296 &self,
297 options: Option<&MemtableOptions>,
298 dedup: bool,
299 merge_mode: MergeMode,
300 ) -> MemtableBuilderRef {
301 match options {
302 Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
303 self.write_buffer_manager.clone(),
304 dedup,
305 merge_mode,
306 )),
307 Some(MemtableOptions::PartitionTree(opts)) => {
308 Arc::new(PartitionTreeMemtableBuilder::new(
309 PartitionTreeConfig {
310 index_max_keys_per_shard: opts.index_max_keys_per_shard,
311 data_freeze_threshold: opts.data_freeze_threshold,
312 fork_dictionary_bytes: opts.fork_dictionary_bytes,
313 dedup,
314 merge_mode,
315 },
316 self.write_buffer_manager.clone(),
317 ))
318 }
319 None => self.default_memtable_builder(dedup, merge_mode),
320 }
321 }
322
323 fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
324 match &self.config.memtable {
325 MemtableConfig::PartitionTree(config) => {
326 let mut config = config.clone();
327 config.dedup = dedup;
328 Arc::new(PartitionTreeMemtableBuilder::new(
329 config,
330 self.write_buffer_manager.clone(),
331 ))
332 }
333 MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
334 self.write_buffer_manager.clone(),
335 dedup,
336 merge_mode,
337 )),
338 }
339 }
340}
341
342pub trait IterBuilder: Send + Sync {
345 fn build(&self) -> Result<BoxedBatchIterator>;
347}
348
349pub type BoxedIterBuilder = Box<dyn IterBuilder>;
350
351pub struct MemtableRangeContext {
353 id: MemtableId,
355 builder: BoxedIterBuilder,
357 predicate: PredicateGroup,
359}
360
361pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
362
363impl MemtableRangeContext {
364 pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
366 Self {
367 id,
368 builder,
369 predicate,
370 }
371 }
372}
373
374#[derive(Clone)]
376pub struct MemtableRange {
377 context: MemtableRangeContextRef,
379}
380
381impl MemtableRange {
382 pub fn new(context: MemtableRangeContextRef) -> Self {
384 Self { context }
385 }
386
387 pub fn id(&self) -> MemtableId {
389 self.context.id
390 }
391
392 pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
396 let iter = self.context.builder.build()?;
397 let time_filters = self.context.predicate.time_filters();
398 Ok(Box::new(PruneTimeIterator::new(
399 iter,
400 time_range,
401 time_filters,
402 )))
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use common_base::readable_size::ReadableSize;
409
410 use super::*;
411 use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
412
413 #[test]
414 fn test_deserialize_memtable_config() {
415 let s = r#"
416type = "partition_tree"
417index_max_keys_per_shard = 8192
418data_freeze_threshold = 1024
419dedup = true
420fork_dictionary_bytes = "512MiB"
421"#;
422 let config: MemtableConfig = toml::from_str(s).unwrap();
423 let MemtableConfig::PartitionTree(memtable_config) = config else {
424 unreachable!()
425 };
426 assert!(memtable_config.dedup);
427 assert_eq!(8192, memtable_config.index_max_keys_per_shard);
428 assert_eq!(1024, memtable_config.data_freeze_threshold);
429 assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
430 }
431
432 #[test]
433 fn test_alloc_tracker_without_manager() {
434 let tracker = AllocTracker::new(None);
435 assert_eq!(0, tracker.bytes_allocated());
436 tracker.on_allocation(100);
437 assert_eq!(100, tracker.bytes_allocated());
438 tracker.on_allocation(200);
439 assert_eq!(300, tracker.bytes_allocated());
440
441 tracker.done_allocating();
442 assert_eq!(300, tracker.bytes_allocated());
443 }
444
445 #[test]
446 fn test_alloc_tracker_with_manager() {
447 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
448 {
449 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
450
451 tracker.on_allocation(100);
452 assert_eq!(100, tracker.bytes_allocated());
453 assert_eq!(100, manager.memory_usage());
454 assert_eq!(100, manager.mutable_usage());
455
456 for _ in 0..2 {
457 tracker.done_allocating();
459 assert_eq!(100, manager.memory_usage());
460 assert_eq!(0, manager.mutable_usage());
461 }
462 }
463
464 assert_eq!(0, manager.memory_usage());
465 assert_eq!(0, manager.mutable_usage());
466 }
467
468 #[test]
469 fn test_alloc_tracker_without_done_allocating() {
470 let manager = Arc::new(WriteBufferManagerImpl::new(1000));
471 {
472 let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
473
474 tracker.on_allocation(100);
475 assert_eq!(100, tracker.bytes_allocated());
476 assert_eq!(100, manager.memory_usage());
477 assert_eq!(100, manager.mutable_usage());
478 }
479
480 assert_eq!(0, manager.memory_usage());
481 assert_eq!(0, manager.mutable_usage());
482 }
483}