use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
pub use bulk::part::BulkPart;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::prune::PruneTimeIterator;
use crate::read::scan_region::PredicateGroup;
use crate::read::Batch;
use crate::region::options::{MemtableOptions, MergeMode};
use crate::sst::file::FileTimeRange;
pub mod bulk;
pub mod key_values;
pub mod partition_tree;
mod stats;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
pub type MemtableId = u32;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum MemtableConfig {
PartitionTree(PartitionTreeConfig),
TimeSeries,
}
impl Default for MemtableConfig {
fn default() -> Self {
Self::TimeSeries
}
}
#[derive(Debug, Default)]
pub struct MemtableStats {
estimated_bytes: usize,
time_range: Option<(Timestamp, Timestamp)>,
num_rows: usize,
num_ranges: usize,
max_sequence: SequenceNumber,
}
impl MemtableStats {
#[cfg(any(test, feature = "test"))]
pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
self.time_range = time_range;
self
}
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes
}
pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
self.time_range
}
pub fn num_rows(&self) -> usize {
self.num_rows
}
pub fn num_ranges(&self) -> usize {
self.num_ranges
}
pub fn max_sequence(&self) -> SequenceNumber {
self.max_sequence
}
}
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
#[derive(Default)]
pub struct MemtableRanges {
pub ranges: BTreeMap<usize, MemtableRange>,
pub stats: MemtableStats,
}
pub trait Memtable: Send + Sync + fmt::Debug {
fn id(&self) -> MemtableId;
fn write(&self, kvs: &KeyValues) -> Result<()>;
fn write_one(&self, key_value: KeyValue) -> Result<()>;
fn write_bulk(&self, part: BulkPart) -> Result<()>;
fn iter(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
sequence: Option<SequenceNumber>,
) -> Result<BoxedBatchIterator>;
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: PredicateGroup,
sequence: Option<SequenceNumber>,
) -> MemtableRanges;
fn is_empty(&self) -> bool;
fn freeze(&self) -> Result<()>;
fn stats(&self) -> MemtableStats;
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
}
pub type MemtableRef = Arc<dyn Memtable>;
pub trait MemtableBuilder: Send + Sync + fmt::Debug {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
}
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
#[derive(Default)]
pub struct AllocTracker {
write_buffer_manager: Option<WriteBufferManagerRef>,
bytes_allocated: AtomicUsize,
is_done_allocating: AtomicBool,
}
impl fmt::Debug for AllocTracker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AllocTracker")
.field("bytes_allocated", &self.bytes_allocated)
.field("is_done_allocating", &self.is_done_allocating)
.finish()
}
}
impl AllocTracker {
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> AllocTracker {
AllocTracker {
write_buffer_manager,
bytes_allocated: AtomicUsize::new(0),
is_done_allocating: AtomicBool::new(false),
}
}
pub(crate) fn on_allocation(&self, bytes: usize) {
self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
WRITE_BUFFER_BYTES.add(bytes as i64);
if let Some(write_buffer_manager) = &self.write_buffer_manager {
write_buffer_manager.reserve_mem(bytes);
}
}
pub(crate) fn done_allocating(&self) {
if let Some(write_buffer_manager) = &self.write_buffer_manager {
if self
.is_done_allocating
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
write_buffer_manager
.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
}
}
}
pub(crate) fn bytes_allocated(&self) -> usize {
self.bytes_allocated.load(Ordering::Relaxed)
}
pub(crate) fn write_buffer_manager(&self) -> Option<WriteBufferManagerRef> {
self.write_buffer_manager.clone()
}
}
impl Drop for AllocTracker {
fn drop(&mut self) {
if !self.is_done_allocating.load(Ordering::Relaxed) {
self.done_allocating();
}
let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
WRITE_BUFFER_BYTES.sub(bytes_allocated as i64);
if let Some(write_buffer_manager) = &self.write_buffer_manager {
write_buffer_manager.free_mem(bytes_allocated);
}
}
}
#[derive(Clone)]
pub(crate) struct MemtableBuilderProvider {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: Arc<MitoConfig>,
}
impl MemtableBuilderProvider {
pub(crate) fn new(
write_buffer_manager: Option<WriteBufferManagerRef>,
config: Arc<MitoConfig>,
) -> Self {
Self {
write_buffer_manager,
config,
}
}
pub(crate) fn builder_for_options(
&self,
options: Option<&MemtableOptions>,
dedup: bool,
merge_mode: MergeMode,
) -> MemtableBuilderRef {
match options {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
merge_mode,
)),
Some(MemtableOptions::PartitionTree(opts)) => {
Arc::new(PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {
index_max_keys_per_shard: opts.index_max_keys_per_shard,
data_freeze_threshold: opts.data_freeze_threshold,
fork_dictionary_bytes: opts.fork_dictionary_bytes,
dedup,
merge_mode,
},
self.write_buffer_manager.clone(),
))
}
None => self.default_memtable_builder(dedup, merge_mode),
}
}
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
match &self.config.memtable {
MemtableConfig::PartitionTree(config) => {
let mut config = config.clone();
config.dedup = dedup;
Arc::new(PartitionTreeMemtableBuilder::new(
config,
self.write_buffer_manager.clone(),
))
}
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
merge_mode,
)),
}
}
}
pub trait IterBuilder: Send + Sync {
fn build(&self) -> Result<BoxedBatchIterator>;
}
pub type BoxedIterBuilder = Box<dyn IterBuilder>;
pub struct MemtableRangeContext {
id: MemtableId,
builder: BoxedIterBuilder,
predicate: PredicateGroup,
}
pub type MemtableRangeContextRef = Arc<MemtableRangeContext>;
impl MemtableRangeContext {
pub fn new(id: MemtableId, builder: BoxedIterBuilder, predicate: PredicateGroup) -> Self {
Self {
id,
builder,
predicate,
}
}
}
#[derive(Clone)]
pub struct MemtableRange {
context: MemtableRangeContextRef,
}
impl MemtableRange {
pub fn new(context: MemtableRangeContextRef) -> Self {
Self { context }
}
pub fn id(&self) -> MemtableId {
self.context.id
}
pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
let iter = self.context.builder.build()?;
let time_filters = self.context.predicate.time_filters();
Ok(Box::new(PruneTimeIterator::new(
iter,
time_range,
time_filters,
)))
}
}
#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;
use super::*;
use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
#[test]
fn test_deserialize_memtable_config() {
let s = r#"
type = "partition_tree"
index_max_keys_per_shard = 8192
data_freeze_threshold = 1024
dedup = true
fork_dictionary_bytes = "512MiB"
"#;
let config: MemtableConfig = toml::from_str(s).unwrap();
let MemtableConfig::PartitionTree(memtable_config) = config else {
unreachable!()
};
assert!(memtable_config.dedup);
assert_eq!(8192, memtable_config.index_max_keys_per_shard);
assert_eq!(1024, memtable_config.data_freeze_threshold);
assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
}
#[test]
fn test_alloc_tracker_without_manager() {
let tracker = AllocTracker::new(None);
assert_eq!(0, tracker.bytes_allocated());
tracker.on_allocation(100);
assert_eq!(100, tracker.bytes_allocated());
tracker.on_allocation(200);
assert_eq!(300, tracker.bytes_allocated());
tracker.done_allocating();
assert_eq!(300, tracker.bytes_allocated());
}
#[test]
fn test_alloc_tracker_with_manager() {
let manager = Arc::new(WriteBufferManagerImpl::new(1000));
{
let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
tracker.on_allocation(100);
assert_eq!(100, tracker.bytes_allocated());
assert_eq!(100, manager.memory_usage());
assert_eq!(100, manager.mutable_usage());
for _ in 0..2 {
tracker.done_allocating();
assert_eq!(100, manager.memory_usage());
assert_eq!(0, manager.mutable_usage());
}
}
assert_eq!(0, manager.memory_usage());
assert_eq!(0, manager.mutable_usage());
}
#[test]
fn test_alloc_tracker_without_done_allocating() {
let manager = Arc::new(WriteBufferManagerImpl::new(1000));
{
let tracker = AllocTracker::new(Some(manager.clone() as WriteBufferManagerRef));
tracker.on_allocation(100);
assert_eq!(100, tracker.bytes_allocated());
assert_eq!(100, manager.memory_usage());
assert_eq!(100, manager.mutable_usage());
}
assert_eq!(0, manager.memory_usage());
assert_eq!(0, manager.mutable_usage());
}
}