use std::cmp;
use std::path::Path;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::error::Result;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
const MEM_CACHE_SIZE_FACTOR: u64 = 16;
const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct MitoConfig {
pub num_workers: usize,
pub worker_channel_size: usize,
pub worker_request_batch_size: usize,
pub manifest_checkpoint_distance: u64,
pub compress_manifest: bool,
pub max_background_flushes: usize,
pub max_background_compactions: usize,
pub max_background_purges: usize,
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
pub global_write_buffer_size: ReadableSize,
pub global_write_buffer_reject_size: ReadableSize,
pub sst_meta_cache_size: ReadableSize,
pub vector_cache_size: ReadableSize,
pub page_cache_size: ReadableSize,
pub selector_result_cache_size: ReadableSize,
pub enable_write_cache: bool,
pub write_cache_path: String,
pub write_cache_size: ReadableSize,
#[serde(with = "humantime_serde")]
pub write_cache_ttl: Option<Duration>,
pub sst_write_buffer_size: ReadableSize,
pub parallel_scan_channel_size: usize,
pub allow_stale_entries: bool,
pub index: IndexConfig,
pub inverted_index: InvertedIndexConfig,
pub fulltext_index: FulltextIndexConfig,
pub bloom_filter_index: BloomFilterConfig,
pub memtable: MemtableConfig,
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
}
impl Default for MitoConfig {
fn default() -> Self {
let mut mito_config = MitoConfig {
num_workers: divide_num_cpus(2),
worker_channel_size: 128,
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
compress_manifest: false,
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: common_config::utils::get_cpus(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
selector_result_cache_size: ReadableSize::mb(512),
enable_write_cache: false,
write_cache_path: String::new(),
write_cache_size: ReadableSize::gb(5),
write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
};
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
mito_config.adjust_buffer_and_cache_size(sys_memory);
}
mito_config
}
}
impl MitoConfig {
pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
if self.num_workers == 0 {
self.num_workers = divide_num_cpus(2);
}
if self.worker_channel_size == 0 {
warn!("Sanitize channel size 0 to 1");
self.worker_channel_size = 1;
}
if self.max_background_flushes == 0 {
warn!(
"Sanitize max background flushes 0 to {}",
divide_num_cpus(2)
);
self.max_background_flushes = divide_num_cpus(2);
}
if self.max_background_compactions == 0 {
warn!(
"Sanitize max background compactions 0 to {}",
divide_num_cpus(4)
);
self.max_background_compactions = divide_num_cpus(4);
}
if self.max_background_purges == 0 {
warn!(
"Sanitize max background purges 0 to {}",
common_config::utils::get_cpus()
);
self.max_background_purges = common_config::utils::get_cpus();
}
if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
warn!(
"Sanitize global write buffer reject size to {}",
self.global_write_buffer_reject_size
);
}
if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
warn!(
"Sanitize sst write buffer size to {}",
self.sst_write_buffer_size
);
}
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(
"Sanitize scan channel size to {}",
self.parallel_scan_channel_size
);
}
if self.write_cache_path.trim().is_empty() {
self.write_cache_path = data_home.to_string();
}
self.index.sanitize(data_home, &self.inverted_index)?;
Ok(())
}
fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
let global_write_buffer_size = cmp::min(
sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
ReadableSize::gb(1),
);
let global_write_buffer_reject_size = global_write_buffer_size * 2;
let sst_meta_cache_size = cmp::min(
sys_memory / SST_META_CACHE_SIZE_FACTOR,
ReadableSize::mb(128),
);
let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
self.global_write_buffer_size = global_write_buffer_size;
self.global_write_buffer_reject_size = global_write_buffer_reject_size;
self.sst_meta_cache_size = sst_meta_cache_size;
self.vector_cache_size = mem_cache_size;
self.page_cache_size = page_cache_size;
self.selector_result_cache_size = mem_cache_size;
}
#[cfg(test)]
pub fn enable_write_cache(
mut self,
path: String,
size: ReadableSize,
ttl: Option<Duration>,
) -> Self {
self.enable_write_cache = true;
self.write_cache_path = path;
self.write_cache_size = size;
self.write_cache_ttl = ttl;
self
}
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct IndexConfig {
pub aux_path: String,
pub staging_size: ReadableSize,
#[serde(with = "humantime_serde")]
pub staging_ttl: Option<Duration>,
pub write_buffer_size: ReadableSize,
pub metadata_cache_size: ReadableSize,
pub content_cache_size: ReadableSize,
pub content_cache_page_size: ReadableSize,
}
impl Default for IndexConfig {
fn default() -> Self {
Self {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
content_cache_page_size: ReadableSize::kb(64),
}
}
}
impl IndexConfig {
pub fn sanitize(
&mut self,
data_home: &str,
inverted_index: &InvertedIndexConfig,
) -> Result<()> {
#[allow(deprecated)]
if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
self.aux_path.clone_from(&inverted_index.intermediate_path);
warn!(
"`inverted_index.intermediate_path` is deprecated, use
`index.aux_path` instead. Set `index.aux_path` to {}",
&inverted_index.intermediate_path
)
}
if self.aux_path.is_empty() {
let path = Path::new(data_home).join("index_intermediate");
self.aux_path = path.as_os_str().to_string_lossy().to_string();
}
if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
warn!(
"Sanitize index write buffer size to {}",
self.write_buffer_size
);
}
if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
self.staging_ttl = None;
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum Mode {
#[default]
Auto,
Disable,
}
impl Mode {
pub fn disabled(&self) -> bool {
matches!(self, Mode::Disable)
}
pub fn auto(&self) -> bool {
matches!(self, Mode::Auto)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum MemoryThreshold {
#[default]
Auto,
Unlimited,
#[serde(untagged)]
Size(ReadableSize),
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct InvertedIndexConfig {
pub create_on_flush: Mode,
pub create_on_compaction: Mode,
pub apply_on_query: Mode,
pub mem_threshold_on_create: MemoryThreshold,
#[deprecated = "use [IndexConfig::aux_path] instead"]
#[serde(skip_serializing)]
pub intermediate_path: String,
#[deprecated = "use [IndexConfig::write_buffer_size] instead"]
#[serde(skip_serializing)]
pub write_buffer_size: ReadableSize,
}
impl Default for InvertedIndexConfig {
#[allow(deprecated)]
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
write_buffer_size: ReadableSize::mb(8),
intermediate_path: String::new(),
}
}
}
impl InvertedIndexConfig {
pub fn mem_threshold_on_create(&self) -> Option<usize> {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
} else {
Some(ReadableSize::mb(64).as_bytes() as usize)
}
}
MemoryThreshold::Unlimited => None,
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
}
}
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct FulltextIndexConfig {
pub create_on_flush: Mode,
pub create_on_compaction: Mode,
pub apply_on_query: Mode,
pub mem_threshold_on_create: MemoryThreshold,
pub compress: bool,
}
impl Default for FulltextIndexConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
compress: true,
}
}
}
impl FulltextIndexConfig {
pub fn mem_threshold_on_create(&self) -> usize {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
(sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
} else {
ReadableSize::mb(64).as_bytes() as _
}
}
MemoryThreshold::Unlimited => usize::MAX,
MemoryThreshold::Size(size) => size.as_bytes() as _,
}
}
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct BloomFilterConfig {
pub create_on_flush: Mode,
pub create_on_compaction: Mode,
pub apply_on_query: Mode,
pub mem_threshold_on_create: MemoryThreshold,
}
impl Default for BloomFilterConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
}
}
}
impl BloomFilterConfig {
pub fn mem_threshold_on_create(&self) -> Option<usize> {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
} else {
Some(ReadableSize::mb(64).as_bytes() as usize)
}
}
MemoryThreshold::Unlimited => None,
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
}
}
}
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);
let cores = common_config::utils::get_cpus();
debug_assert!(cores > 0);
cores.div_ceil(divisor)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deserialize_config() {
let s = r#"
[memtable]
type = "partition_tree"
index_max_keys_per_shard = 8192
data_freeze_threshold = 1024
dedup = true
fork_dictionary_bytes = "512MiB"
"#;
let config: MitoConfig = toml::from_str(s).unwrap();
let MemtableConfig::PartitionTree(config) = &config.memtable else {
unreachable!()
};
assert_eq!(1024, config.data_freeze_threshold);
}
}