1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::memtable::MemtableConfig;
33use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
34
35const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
36pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
38pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
40
41const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
43const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
45const MEM_CACHE_SIZE_FACTOR: u64 = 16;
47const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
49const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
51
52pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
54
55#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
58#[serde(default)]
59pub struct MitoConfig {
60 pub num_workers: usize,
64 pub worker_channel_size: usize,
66 pub worker_request_batch_size: usize,
68
69 pub manifest_checkpoint_distance: u64,
73 pub experimental_manifest_keep_removed_file_count: usize,
78 #[serde(with = "humantime_serde")]
83 pub experimental_manifest_keep_removed_file_ttl: Duration,
84 pub compress_manifest: bool,
86
87 pub max_background_index_builds: usize,
90 pub max_background_flushes: usize,
92 pub max_background_compactions: usize,
94 pub max_background_purges: usize,
96 pub experimental_compaction_memory_limit: MemoryLimit,
98 pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
100
101 #[serde(with = "humantime_serde")]
104 pub auto_flush_interval: Duration,
105 pub global_write_buffer_size: ReadableSize,
107 pub global_write_buffer_reject_size: ReadableSize,
109
110 pub sst_meta_cache_size: ReadableSize,
113 pub vector_cache_size: ReadableSize,
115 pub page_cache_size: ReadableSize,
117 pub selector_result_cache_size: ReadableSize,
119 pub enable_write_cache: bool,
121 pub write_cache_path: String,
123 pub write_cache_size: ReadableSize,
125 #[serde(with = "humantime_serde")]
127 pub write_cache_ttl: Option<Duration>,
128 pub preload_index_cache: bool,
130 pub index_cache_percent: u8,
134 pub enable_refill_cache_on_read: bool,
137 pub manifest_cache_size: ReadableSize,
139
140 pub sst_write_buffer_size: ReadableSize,
143 pub parallel_scan_channel_size: usize,
145 pub max_concurrent_scan_files: usize,
147 pub allow_stale_entries: bool,
149 pub scan_memory_limit: MemoryLimit,
152
153 pub index: IndexConfig,
155 pub inverted_index: InvertedIndexConfig,
157 pub fulltext_index: FulltextIndexConfig,
159 pub bloom_filter_index: BloomFilterConfig,
161
162 pub memtable: MemtableConfig,
164
165 #[serde(with = "humantime_serde")]
168 pub min_compaction_interval: Duration,
169
170 pub default_experimental_flat_format: bool,
173
174 pub gc: GcConfig,
175}
176
177impl Default for MitoConfig {
178 fn default() -> Self {
179 let mut mito_config = MitoConfig {
180 num_workers: divide_num_cpus(2),
181 worker_channel_size: 128,
182 worker_request_batch_size: 64,
183 manifest_checkpoint_distance: 10,
184 experimental_manifest_keep_removed_file_count: 256,
185 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
186 compress_manifest: false,
187 max_background_index_builds: divide_num_cpus(8),
188 max_background_flushes: divide_num_cpus(2),
189 max_background_compactions: divide_num_cpus(4),
190 max_background_purges: get_total_cpu_cores(),
191 experimental_compaction_memory_limit: MemoryLimit::Unlimited,
192 experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
193 auto_flush_interval: Duration::from_secs(30 * 60),
194 global_write_buffer_size: ReadableSize::gb(1),
195 global_write_buffer_reject_size: ReadableSize::gb(2),
196 sst_meta_cache_size: ReadableSize::mb(128),
197 vector_cache_size: ReadableSize::mb(512),
198 page_cache_size: ReadableSize::mb(512),
199 selector_result_cache_size: ReadableSize::mb(512),
200 enable_write_cache: false,
201 write_cache_path: String::new(),
202 write_cache_size: ReadableSize::gb(5),
203 write_cache_ttl: None,
204 preload_index_cache: true,
205 index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
206 enable_refill_cache_on_read: true,
207 manifest_cache_size: ReadableSize::mb(256),
208 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
209 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
210 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
211 allow_stale_entries: false,
212 scan_memory_limit: MemoryLimit::default(),
213 index: IndexConfig::default(),
214 inverted_index: InvertedIndexConfig::default(),
215 fulltext_index: FulltextIndexConfig::default(),
216 bloom_filter_index: BloomFilterConfig::default(),
217 memtable: MemtableConfig::default(),
218 min_compaction_interval: Duration::from_secs(0),
219 default_experimental_flat_format: false,
220 gc: GcConfig::default(),
221 };
222
223 if let Some(sys_memory) = get_total_memory_readable() {
225 mito_config.adjust_buffer_and_cache_size(sys_memory);
226 }
227
228 mito_config
229 }
230}
231
232impl MitoConfig {
233 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
237 if self.num_workers == 0 {
239 self.num_workers = divide_num_cpus(2);
240 }
241
242 if self.worker_channel_size == 0 {
244 warn!("Sanitize channel size 0 to 1");
245 self.worker_channel_size = 1;
246 }
247
248 if self.max_background_flushes == 0 {
249 warn!(
250 "Sanitize max background flushes 0 to {}",
251 divide_num_cpus(2)
252 );
253 self.max_background_flushes = divide_num_cpus(2);
254 }
255 if self.max_background_compactions == 0 {
256 warn!(
257 "Sanitize max background compactions 0 to {}",
258 divide_num_cpus(4)
259 );
260 self.max_background_compactions = divide_num_cpus(4);
261 }
262 if self.max_background_purges == 0 {
263 let cpu_cores = get_total_cpu_cores();
264 warn!("Sanitize max background purges 0 to {}", cpu_cores);
265 self.max_background_purges = cpu_cores;
266 }
267
268 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
269 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
270 warn!(
271 "Sanitize global write buffer reject size to {}",
272 self.global_write_buffer_reject_size
273 );
274 }
275
276 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
277 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
278 warn!(
279 "Sanitize sst write buffer size to {}",
280 self.sst_write_buffer_size
281 );
282 }
283
284 if self.parallel_scan_channel_size < 1 {
285 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
286 warn!(
287 "Sanitize scan channel size to {}",
288 self.parallel_scan_channel_size
289 );
290 }
291
292 if self.write_cache_path.trim().is_empty() {
294 self.write_cache_path = data_home.to_string();
295 }
296
297 if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
299 warn!(
300 "Invalid index_cache_percent {}, resetting to default {}",
301 self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
302 );
303 self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
304 }
305
306 self.index.sanitize(data_home, &self.inverted_index)?;
307
308 Ok(())
309 }
310
311 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
312 let global_write_buffer_size = cmp::min(
314 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
315 ReadableSize::gb(1),
316 );
317 let global_write_buffer_reject_size = global_write_buffer_size * 2;
319 let sst_meta_cache_size = cmp::min(
321 sys_memory / SST_META_CACHE_SIZE_FACTOR,
322 ReadableSize::mb(128),
323 );
324 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
326 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
327
328 self.global_write_buffer_size = global_write_buffer_size;
329 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
330 self.sst_meta_cache_size = sst_meta_cache_size;
331 self.vector_cache_size = mem_cache_size;
332 self.page_cache_size = page_cache_size;
333 self.selector_result_cache_size = mem_cache_size;
334
335 self.index.adjust_buffer_and_cache_size(sys_memory);
336 }
337
338 #[cfg(test)]
340 pub fn enable_write_cache(
341 mut self,
342 path: String,
343 size: ReadableSize,
344 ttl: Option<Duration>,
345 ) -> Self {
346 self.enable_write_cache = true;
347 self.write_cache_path = path;
348 self.write_cache_size = size;
349 self.write_cache_ttl = ttl;
350 self
351 }
352}
353
354#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
356#[serde(rename_all = "snake_case")]
357pub enum IndexBuildMode {
358 #[default]
360 Sync,
361 Async,
363}
364
365#[serde_as]
366#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
367#[serde(default)]
368pub struct IndexConfig {
369 pub aux_path: String,
379
380 pub staging_size: ReadableSize,
382 #[serde(with = "humantime_serde")]
386 pub staging_ttl: Option<Duration>,
387
388 pub build_mode: IndexBuildMode,
390
391 pub write_buffer_size: ReadableSize,
393
394 pub metadata_cache_size: ReadableSize,
396 pub content_cache_size: ReadableSize,
398 pub content_cache_page_size: ReadableSize,
400 pub result_cache_size: ReadableSize,
402}
403
404impl Default for IndexConfig {
405 fn default() -> Self {
406 Self {
407 aux_path: String::new(),
408 staging_size: ReadableSize::gb(2),
409 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
410 build_mode: IndexBuildMode::default(),
411 write_buffer_size: ReadableSize::mb(8),
412 metadata_cache_size: ReadableSize::mb(64),
413 content_cache_size: ReadableSize::mb(128),
414 content_cache_page_size: ReadableSize::kb(64),
415 result_cache_size: ReadableSize::mb(128),
416 }
417 }
418}
419
420impl IndexConfig {
421 pub fn sanitize(
422 &mut self,
423 data_home: &str,
424 inverted_index: &InvertedIndexConfig,
425 ) -> Result<()> {
426 #[allow(deprecated)]
427 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
428 self.aux_path.clone_from(&inverted_index.intermediate_path);
429 warn!(
430 "`inverted_index.intermediate_path` is deprecated, use
431 `index.aux_path` instead. Set `index.aux_path` to {}",
432 &inverted_index.intermediate_path
433 )
434 }
435 if self.aux_path.is_empty() {
436 let path = Path::new(data_home).join("index_intermediate");
437 self.aux_path = path.as_os_str().to_string_lossy().to_string();
438 }
439
440 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
441 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
442 warn!(
443 "Sanitize index write buffer size to {}",
444 self.write_buffer_size
445 );
446 }
447
448 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
449 self.staging_ttl = None;
450 }
451
452 Ok(())
453 }
454
455 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
456 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
457 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
458 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
459
460 let metadata_cache_size = cmp::min(
461 sys_memory / SST_META_CACHE_SIZE_FACTOR,
462 ReadableSize::mb(64),
463 );
464 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
465 }
466}
467
468#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
470#[serde(rename_all = "snake_case")]
471pub enum Mode {
472 #[default]
474 Auto,
475 Disable,
477}
478
479impl Mode {
480 pub fn disabled(&self) -> bool {
482 matches!(self, Mode::Disable)
483 }
484
485 pub fn auto(&self) -> bool {
487 matches!(self, Mode::Auto)
488 }
489}
490
491#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
493#[serde(rename_all = "snake_case")]
494pub enum MemoryThreshold {
495 #[default]
497 Auto,
498 Unlimited,
500 #[serde(untagged)]
502 Size(ReadableSize),
503}
504
505#[serde_as]
507#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
508#[serde(default)]
509pub struct InvertedIndexConfig {
510 pub create_on_flush: Mode,
512 pub create_on_compaction: Mode,
514 pub apply_on_query: Mode,
516
517 pub mem_threshold_on_create: MemoryThreshold,
519
520 #[deprecated = "use [IndexConfig::aux_path] instead"]
521 #[serde(skip_serializing)]
522 pub intermediate_path: String,
523
524 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
525 #[serde(skip_serializing)]
526 pub write_buffer_size: ReadableSize,
527}
528
529impl Default for InvertedIndexConfig {
530 #[allow(deprecated)]
531 fn default() -> Self {
532 Self {
533 create_on_flush: Mode::Auto,
534 create_on_compaction: Mode::Auto,
535 apply_on_query: Mode::Auto,
536 mem_threshold_on_create: MemoryThreshold::Auto,
537 write_buffer_size: ReadableSize::mb(8),
538 intermediate_path: String::new(),
539 }
540 }
541}
542
543impl InvertedIndexConfig {
544 pub fn mem_threshold_on_create(&self) -> Option<usize> {
545 match self.mem_threshold_on_create {
546 MemoryThreshold::Auto => {
547 if let Some(sys_memory) = get_total_memory_readable() {
548 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
549 } else {
550 Some(ReadableSize::mb(64).as_bytes() as usize)
551 }
552 }
553 MemoryThreshold::Unlimited => None,
554 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
555 }
556 }
557}
558
559#[serde_as]
561#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
562#[serde(default)]
563pub struct FulltextIndexConfig {
564 pub create_on_flush: Mode,
566 pub create_on_compaction: Mode,
568 pub apply_on_query: Mode,
570 pub mem_threshold_on_create: MemoryThreshold,
572 pub compress: bool,
574}
575
576impl Default for FulltextIndexConfig {
577 fn default() -> Self {
578 Self {
579 create_on_flush: Mode::Auto,
580 create_on_compaction: Mode::Auto,
581 apply_on_query: Mode::Auto,
582 mem_threshold_on_create: MemoryThreshold::Auto,
583 compress: true,
584 }
585 }
586}
587
588impl FulltextIndexConfig {
589 pub fn mem_threshold_on_create(&self) -> usize {
590 match self.mem_threshold_on_create {
591 MemoryThreshold::Auto => {
592 if let Some(sys_memory) = get_total_memory_readable() {
593 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
594 } else {
595 ReadableSize::mb(64).as_bytes() as _
596 }
597 }
598 MemoryThreshold::Unlimited => usize::MAX,
599 MemoryThreshold::Size(size) => size.as_bytes() as _,
600 }
601 }
602}
603
604#[serde_as]
606#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
607#[serde(default)]
608pub struct BloomFilterConfig {
609 pub create_on_flush: Mode,
611 pub create_on_compaction: Mode,
613 pub apply_on_query: Mode,
615 pub mem_threshold_on_create: MemoryThreshold,
617}
618
619impl Default for BloomFilterConfig {
620 fn default() -> Self {
621 Self {
622 create_on_flush: Mode::Auto,
623 create_on_compaction: Mode::Auto,
624 apply_on_query: Mode::Auto,
625 mem_threshold_on_create: MemoryThreshold::Auto,
626 }
627 }
628}
629
630impl BloomFilterConfig {
631 pub fn mem_threshold_on_create(&self) -> Option<usize> {
632 match self.mem_threshold_on_create {
633 MemoryThreshold::Auto => {
634 if let Some(sys_memory) = get_total_memory_readable() {
635 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
636 } else {
637 Some(ReadableSize::mb(64).as_bytes() as usize)
638 }
639 }
640 MemoryThreshold::Unlimited => None,
641 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
642 }
643 }
644}
645
646fn divide_num_cpus(divisor: usize) -> usize {
648 debug_assert!(divisor > 0);
649 let cores = get_total_cpu_cores();
650 debug_assert!(cores > 0);
651
652 cores.div_ceil(divisor)
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658
659 #[test]
660 fn test_deserialize_config() {
661 let s = r#"
662[memtable]
663type = "partition_tree"
664index_max_keys_per_shard = 8192
665data_freeze_threshold = 1024
666dedup = true
667fork_dictionary_bytes = "512MiB"
668"#;
669 let config: MitoConfig = toml::from_str(s).unwrap();
670 let MemtableConfig::PartitionTree(config) = &config.memtable else {
671 unreachable!()
672 };
673 assert_eq!(1024, config.data_freeze_threshold);
674 }
675}