1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33
34const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
36const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
38const MEM_CACHE_SIZE_FACTOR: u64 = 16;
40const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
42const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
44
45pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
47
48#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
51#[serde(default)]
52pub struct MitoConfig {
53 pub num_workers: usize,
57 pub worker_channel_size: usize,
59 pub worker_request_batch_size: usize,
61
62 pub manifest_checkpoint_distance: u64,
66 pub compress_manifest: bool,
68
69 pub max_background_flushes: usize,
72 pub max_background_compactions: usize,
74 pub max_background_purges: usize,
76
77 #[serde(with = "humantime_serde")]
80 pub auto_flush_interval: Duration,
81 pub global_write_buffer_size: ReadableSize,
83 pub global_write_buffer_reject_size: ReadableSize,
85
86 pub sst_meta_cache_size: ReadableSize,
89 pub vector_cache_size: ReadableSize,
91 pub page_cache_size: ReadableSize,
93 pub selector_result_cache_size: ReadableSize,
95 pub enable_write_cache: bool,
97 pub write_cache_path: String,
99 pub write_cache_size: ReadableSize,
101 #[serde(with = "humantime_serde")]
103 pub write_cache_ttl: Option<Duration>,
104
105 pub sst_write_buffer_size: ReadableSize,
108 pub parallel_scan_channel_size: usize,
110 pub allow_stale_entries: bool,
112
113 pub index: IndexConfig,
115 pub inverted_index: InvertedIndexConfig,
117 pub fulltext_index: FulltextIndexConfig,
119 pub bloom_filter_index: BloomFilterConfig,
121
122 pub memtable: MemtableConfig,
124
125 #[serde(with = "humantime_serde")]
128 pub min_compaction_interval: Duration,
129}
130
131impl Default for MitoConfig {
132 fn default() -> Self {
133 let mut mito_config = MitoConfig {
134 num_workers: divide_num_cpus(2),
135 worker_channel_size: 128,
136 worker_request_batch_size: 64,
137 manifest_checkpoint_distance: 10,
138 compress_manifest: false,
139 max_background_flushes: divide_num_cpus(2),
140 max_background_compactions: divide_num_cpus(4),
141 max_background_purges: common_config::utils::get_cpus(),
142 auto_flush_interval: Duration::from_secs(30 * 60),
143 global_write_buffer_size: ReadableSize::gb(1),
144 global_write_buffer_reject_size: ReadableSize::gb(2),
145 sst_meta_cache_size: ReadableSize::mb(128),
146 vector_cache_size: ReadableSize::mb(512),
147 page_cache_size: ReadableSize::mb(512),
148 selector_result_cache_size: ReadableSize::mb(512),
149 enable_write_cache: false,
150 write_cache_path: String::new(),
151 write_cache_size: ReadableSize::gb(5),
152 write_cache_ttl: None,
153 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
154 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
155 allow_stale_entries: false,
156 index: IndexConfig::default(),
157 inverted_index: InvertedIndexConfig::default(),
158 fulltext_index: FulltextIndexConfig::default(),
159 bloom_filter_index: BloomFilterConfig::default(),
160 memtable: MemtableConfig::default(),
161 min_compaction_interval: Duration::from_secs(0),
162 };
163
164 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
166 mito_config.adjust_buffer_and_cache_size(sys_memory);
167 }
168
169 mito_config
170 }
171}
172
173impl MitoConfig {
174 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
178 if self.num_workers == 0 {
180 self.num_workers = divide_num_cpus(2);
181 }
182
183 if self.worker_channel_size == 0 {
185 warn!("Sanitize channel size 0 to 1");
186 self.worker_channel_size = 1;
187 }
188
189 if self.max_background_flushes == 0 {
190 warn!(
191 "Sanitize max background flushes 0 to {}",
192 divide_num_cpus(2)
193 );
194 self.max_background_flushes = divide_num_cpus(2);
195 }
196 if self.max_background_compactions == 0 {
197 warn!(
198 "Sanitize max background compactions 0 to {}",
199 divide_num_cpus(4)
200 );
201 self.max_background_compactions = divide_num_cpus(4);
202 }
203 if self.max_background_purges == 0 {
204 warn!(
205 "Sanitize max background purges 0 to {}",
206 common_config::utils::get_cpus()
207 );
208 self.max_background_purges = common_config::utils::get_cpus();
209 }
210
211 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
212 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
213 warn!(
214 "Sanitize global write buffer reject size to {}",
215 self.global_write_buffer_reject_size
216 );
217 }
218
219 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
220 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
221 warn!(
222 "Sanitize sst write buffer size to {}",
223 self.sst_write_buffer_size
224 );
225 }
226
227 if self.parallel_scan_channel_size < 1 {
228 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
229 warn!(
230 "Sanitize scan channel size to {}",
231 self.parallel_scan_channel_size
232 );
233 }
234
235 if self.write_cache_path.trim().is_empty() {
237 self.write_cache_path = data_home.to_string();
238 }
239
240 self.index.sanitize(data_home, &self.inverted_index)?;
241
242 Ok(())
243 }
244
245 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
246 let global_write_buffer_size = cmp::min(
248 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
249 ReadableSize::gb(1),
250 );
251 let global_write_buffer_reject_size = global_write_buffer_size * 2;
253 let sst_meta_cache_size = cmp::min(
255 sys_memory / SST_META_CACHE_SIZE_FACTOR,
256 ReadableSize::mb(128),
257 );
258 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
260 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
261
262 self.global_write_buffer_size = global_write_buffer_size;
263 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
264 self.sst_meta_cache_size = sst_meta_cache_size;
265 self.vector_cache_size = mem_cache_size;
266 self.page_cache_size = page_cache_size;
267 self.selector_result_cache_size = mem_cache_size;
268
269 self.index.adjust_buffer_and_cache_size(sys_memory);
270 }
271
272 #[cfg(test)]
274 pub fn enable_write_cache(
275 mut self,
276 path: String,
277 size: ReadableSize,
278 ttl: Option<Duration>,
279 ) -> Self {
280 self.enable_write_cache = true;
281 self.write_cache_path = path;
282 self.write_cache_size = size;
283 self.write_cache_ttl = ttl;
284 self
285 }
286}
287
288#[serde_as]
289#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
290#[serde(default)]
291pub struct IndexConfig {
292 pub aux_path: String,
302
303 pub staging_size: ReadableSize,
305 #[serde(with = "humantime_serde")]
309 pub staging_ttl: Option<Duration>,
310
311 pub write_buffer_size: ReadableSize,
313
314 pub metadata_cache_size: ReadableSize,
316 pub content_cache_size: ReadableSize,
318 pub content_cache_page_size: ReadableSize,
320 pub result_cache_size: ReadableSize,
322}
323
324impl Default for IndexConfig {
325 fn default() -> Self {
326 Self {
327 aux_path: String::new(),
328 staging_size: ReadableSize::gb(2),
329 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
330 write_buffer_size: ReadableSize::mb(8),
331 metadata_cache_size: ReadableSize::mb(64),
332 content_cache_size: ReadableSize::mb(128),
333 content_cache_page_size: ReadableSize::kb(64),
334 result_cache_size: ReadableSize::mb(128),
335 }
336 }
337}
338
339impl IndexConfig {
340 pub fn sanitize(
341 &mut self,
342 data_home: &str,
343 inverted_index: &InvertedIndexConfig,
344 ) -> Result<()> {
345 #[allow(deprecated)]
346 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
347 self.aux_path.clone_from(&inverted_index.intermediate_path);
348 warn!(
349 "`inverted_index.intermediate_path` is deprecated, use
350 `index.aux_path` instead. Set `index.aux_path` to {}",
351 &inverted_index.intermediate_path
352 )
353 }
354 if self.aux_path.is_empty() {
355 let path = Path::new(data_home).join("index_intermediate");
356 self.aux_path = path.as_os_str().to_string_lossy().to_string();
357 }
358
359 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
360 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
361 warn!(
362 "Sanitize index write buffer size to {}",
363 self.write_buffer_size
364 );
365 }
366
367 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
368 self.staging_ttl = None;
369 }
370
371 Ok(())
372 }
373
374 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
375 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
376 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
377 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
378
379 let metadata_cache_size = cmp::min(
380 sys_memory / SST_META_CACHE_SIZE_FACTOR,
381 ReadableSize::mb(64),
382 );
383 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
384 }
385}
386
387#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
389#[serde(rename_all = "snake_case")]
390pub enum Mode {
391 #[default]
393 Auto,
394 Disable,
396}
397
398impl Mode {
399 pub fn disabled(&self) -> bool {
401 matches!(self, Mode::Disable)
402 }
403
404 pub fn auto(&self) -> bool {
406 matches!(self, Mode::Auto)
407 }
408}
409
410#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
412#[serde(rename_all = "snake_case")]
413pub enum MemoryThreshold {
414 #[default]
416 Auto,
417 Unlimited,
419 #[serde(untagged)]
421 Size(ReadableSize),
422}
423
424#[serde_as]
426#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
427#[serde(default)]
428pub struct InvertedIndexConfig {
429 pub create_on_flush: Mode,
431 pub create_on_compaction: Mode,
433 pub apply_on_query: Mode,
435
436 pub mem_threshold_on_create: MemoryThreshold,
438
439 #[deprecated = "use [IndexConfig::aux_path] instead"]
440 #[serde(skip_serializing)]
441 pub intermediate_path: String,
442
443 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
444 #[serde(skip_serializing)]
445 pub write_buffer_size: ReadableSize,
446}
447
448impl Default for InvertedIndexConfig {
449 #[allow(deprecated)]
450 fn default() -> Self {
451 Self {
452 create_on_flush: Mode::Auto,
453 create_on_compaction: Mode::Auto,
454 apply_on_query: Mode::Auto,
455 mem_threshold_on_create: MemoryThreshold::Auto,
456 write_buffer_size: ReadableSize::mb(8),
457 intermediate_path: String::new(),
458 }
459 }
460}
461
462impl InvertedIndexConfig {
463 pub fn mem_threshold_on_create(&self) -> Option<usize> {
464 match self.mem_threshold_on_create {
465 MemoryThreshold::Auto => {
466 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
467 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
468 } else {
469 Some(ReadableSize::mb(64).as_bytes() as usize)
470 }
471 }
472 MemoryThreshold::Unlimited => None,
473 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
474 }
475 }
476}
477
478#[serde_as]
480#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
481#[serde(default)]
482pub struct FulltextIndexConfig {
483 pub create_on_flush: Mode,
485 pub create_on_compaction: Mode,
487 pub apply_on_query: Mode,
489 pub mem_threshold_on_create: MemoryThreshold,
491 pub compress: bool,
493}
494
495impl Default for FulltextIndexConfig {
496 fn default() -> Self {
497 Self {
498 create_on_flush: Mode::Auto,
499 create_on_compaction: Mode::Auto,
500 apply_on_query: Mode::Auto,
501 mem_threshold_on_create: MemoryThreshold::Auto,
502 compress: true,
503 }
504 }
505}
506
507impl FulltextIndexConfig {
508 pub fn mem_threshold_on_create(&self) -> usize {
509 match self.mem_threshold_on_create {
510 MemoryThreshold::Auto => {
511 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
512 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
513 } else {
514 ReadableSize::mb(64).as_bytes() as _
515 }
516 }
517 MemoryThreshold::Unlimited => usize::MAX,
518 MemoryThreshold::Size(size) => size.as_bytes() as _,
519 }
520 }
521}
522
523#[serde_as]
525#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
526#[serde(default)]
527pub struct BloomFilterConfig {
528 pub create_on_flush: Mode,
530 pub create_on_compaction: Mode,
532 pub apply_on_query: Mode,
534 pub mem_threshold_on_create: MemoryThreshold,
536}
537
538impl Default for BloomFilterConfig {
539 fn default() -> Self {
540 Self {
541 create_on_flush: Mode::Auto,
542 create_on_compaction: Mode::Auto,
543 apply_on_query: Mode::Auto,
544 mem_threshold_on_create: MemoryThreshold::Auto,
545 }
546 }
547}
548
549impl BloomFilterConfig {
550 pub fn mem_threshold_on_create(&self) -> Option<usize> {
551 match self.mem_threshold_on_create {
552 MemoryThreshold::Auto => {
553 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
554 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
555 } else {
556 Some(ReadableSize::mb(64).as_bytes() as usize)
557 }
558 }
559 MemoryThreshold::Unlimited => None,
560 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
561 }
562 }
563}
564
565fn divide_num_cpus(divisor: usize) -> usize {
567 debug_assert!(divisor > 0);
568 let cores = common_config::utils::get_cpus();
569 debug_assert!(cores > 0);
570
571 cores.div_ceil(divisor)
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 #[test]
579 fn test_deserialize_config() {
580 let s = r#"
581[memtable]
582type = "partition_tree"
583index_max_keys_per_shard = 8192
584data_freeze_threshold = 1024
585dedup = true
586fork_dictionary_bytes = "512MiB"
587"#;
588 let config: MitoConfig = toml::from_str(s).unwrap();
589 let MemtableConfig::PartitionTree(config) = &config.memtable else {
590 unreachable!()
591 };
592 assert_eq!(1024, config.data_freeze_threshold);
593 }
594}