1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::memtable::MemtableConfig;
33use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
34
35const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
36pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
38
39const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
41const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
43const MEM_CACHE_SIZE_FACTOR: u64 = 16;
45const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
47const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
49
50pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
52
53#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
56#[serde(default)]
57pub struct MitoConfig {
58 pub num_workers: usize,
62 pub worker_channel_size: usize,
64 pub worker_request_batch_size: usize,
66
67 pub manifest_checkpoint_distance: u64,
71 pub experimental_manifest_keep_removed_file_count: usize,
76 #[serde(with = "humantime_serde")]
81 pub experimental_manifest_keep_removed_file_ttl: Duration,
82 pub compress_manifest: bool,
84
85 pub max_background_index_builds: usize,
88 pub max_background_flushes: usize,
90 pub max_background_compactions: usize,
92 pub max_background_purges: usize,
94 pub experimental_compaction_memory_limit: MemoryLimit,
98 pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
100
101 #[serde(with = "humantime_serde")]
104 pub auto_flush_interval: Duration,
105 pub global_write_buffer_size: ReadableSize,
107 pub global_write_buffer_reject_size: ReadableSize,
109
110 pub sst_meta_cache_size: ReadableSize,
113 pub vector_cache_size: ReadableSize,
115 pub page_cache_size: ReadableSize,
117 pub selector_result_cache_size: ReadableSize,
119 pub range_result_cache_size: ReadableSize,
121 pub enable_write_cache: bool,
123 pub write_cache_path: String,
125 pub write_cache_size: ReadableSize,
127 #[serde(with = "humantime_serde")]
129 pub write_cache_ttl: Option<Duration>,
130 pub preload_index_cache: bool,
132 pub index_cache_percent: u8,
136 pub enable_refill_cache_on_read: bool,
139 pub manifest_cache_size: ReadableSize,
141
142 pub sst_write_buffer_size: ReadableSize,
145 pub max_concurrent_scan_files: usize,
147 pub allow_stale_entries: bool,
149 pub scan_memory_limit: MemoryLimit,
152 pub scan_memory_on_exhausted: OnExhaustedPolicy,
157
158 pub index: IndexConfig,
160 pub inverted_index: InvertedIndexConfig,
162 pub fulltext_index: FulltextIndexConfig,
164 pub bloom_filter_index: BloomFilterConfig,
166 #[cfg(feature = "vector_index")]
168 pub vector_index: VectorIndexConfig,
169
170 pub memtable: MemtableConfig,
172
173 #[serde(with = "humantime_serde")]
176 pub min_compaction_interval: Duration,
177
178 pub default_flat_format: bool,
181
182 pub gc: GcConfig,
183}
184
185impl Default for MitoConfig {
186 fn default() -> Self {
187 let mut mito_config = MitoConfig {
188 num_workers: divide_num_cpus(2),
189 worker_channel_size: 128,
190 worker_request_batch_size: 64,
191 manifest_checkpoint_distance: 10,
192 experimental_manifest_keep_removed_file_count: 256,
193 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
194 compress_manifest: false,
195 max_background_index_builds: divide_num_cpus(8),
196 max_background_flushes: divide_num_cpus(2),
197 max_background_compactions: divide_num_cpus(4),
198 max_background_purges: get_total_cpu_cores(),
199 experimental_compaction_memory_limit: MemoryLimit::Unlimited,
200 experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
201 auto_flush_interval: Duration::from_secs(30 * 60),
202 global_write_buffer_size: ReadableSize::gb(1),
203 global_write_buffer_reject_size: ReadableSize::gb(2),
204 sst_meta_cache_size: ReadableSize::mb(128),
205 vector_cache_size: ReadableSize::mb(512),
206 page_cache_size: ReadableSize::mb(512),
207 selector_result_cache_size: ReadableSize::mb(512),
208 range_result_cache_size: ReadableSize::mb(512),
209 enable_write_cache: false,
210 write_cache_path: String::new(),
211 write_cache_size: ReadableSize::gb(5),
212 write_cache_ttl: None,
213 preload_index_cache: true,
214 index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
215 enable_refill_cache_on_read: true,
216 manifest_cache_size: ReadableSize::mb(256),
217 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
218 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
219 allow_stale_entries: false,
220 scan_memory_limit: MemoryLimit::default(),
221 scan_memory_on_exhausted: OnExhaustedPolicy::Fail,
222 index: IndexConfig::default(),
223 inverted_index: InvertedIndexConfig::default(),
224 fulltext_index: FulltextIndexConfig::default(),
225 bloom_filter_index: BloomFilterConfig::default(),
226 #[cfg(feature = "vector_index")]
227 vector_index: VectorIndexConfig::default(),
228 memtable: MemtableConfig::default(),
229 min_compaction_interval: Duration::from_secs(0),
230 default_flat_format: true,
231 gc: GcConfig::default(),
232 };
233
234 if let Some(sys_memory) = get_total_memory_readable() {
236 mito_config.adjust_buffer_and_cache_size(sys_memory);
237 }
238
239 mito_config
240 }
241}
242
243impl MitoConfig {
244 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
248 if self.num_workers == 0 {
250 self.num_workers = divide_num_cpus(2);
251 }
252
253 if self.worker_channel_size == 0 {
255 warn!("Sanitize channel size 0 to 1");
256 self.worker_channel_size = 1;
257 }
258
259 if self.max_background_flushes == 0 {
260 warn!(
261 "Sanitize max background flushes 0 to {}",
262 divide_num_cpus(2)
263 );
264 self.max_background_flushes = divide_num_cpus(2);
265 }
266 if self.max_background_compactions == 0 {
267 warn!(
268 "Sanitize max background compactions 0 to {}",
269 divide_num_cpus(4)
270 );
271 self.max_background_compactions = divide_num_cpus(4);
272 }
273 if self.max_background_purges == 0 {
274 let cpu_cores = get_total_cpu_cores();
275 warn!("Sanitize max background purges 0 to {}", cpu_cores);
276 self.max_background_purges = cpu_cores;
277 }
278
279 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
280 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
281 warn!(
282 "Sanitize global write buffer reject size to {}",
283 self.global_write_buffer_reject_size
284 );
285 }
286
287 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
288 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
289 warn!(
290 "Sanitize sst write buffer size to {}",
291 self.sst_write_buffer_size
292 );
293 }
294
295 if self.write_cache_path.trim().is_empty() {
297 self.write_cache_path = data_home.to_string();
298 }
299
300 if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
302 warn!(
303 "Invalid index_cache_percent {}, resetting to default {}",
304 self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
305 );
306 self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
307 }
308
309 self.index.sanitize(data_home, &self.inverted_index)?;
310
311 Ok(())
312 }
313
314 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
315 let global_write_buffer_size = cmp::min(
317 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
318 ReadableSize::gb(1),
319 );
320 let global_write_buffer_reject_size = global_write_buffer_size * 2;
322 let sst_meta_cache_size = cmp::min(
324 sys_memory / SST_META_CACHE_SIZE_FACTOR,
325 ReadableSize::mb(128),
326 );
327 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
329 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
330
331 self.global_write_buffer_size = global_write_buffer_size;
332 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
333 self.sst_meta_cache_size = sst_meta_cache_size;
334 self.vector_cache_size = mem_cache_size;
335 self.page_cache_size = page_cache_size;
336 self.selector_result_cache_size = mem_cache_size;
337 self.range_result_cache_size = mem_cache_size;
338
339 self.index.adjust_buffer_and_cache_size(sys_memory);
340 }
341
342 #[cfg(test)]
344 pub fn enable_write_cache(
345 mut self,
346 path: String,
347 size: ReadableSize,
348 ttl: Option<Duration>,
349 ) -> Self {
350 self.enable_write_cache = true;
351 self.write_cache_path = path;
352 self.write_cache_size = size;
353 self.write_cache_ttl = ttl;
354 self
355 }
356}
357
358#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
360#[serde(rename_all = "snake_case")]
361pub enum IndexBuildMode {
362 #[default]
364 Sync,
365 Async,
367}
368
369#[serde_as]
370#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
371#[serde(default)]
372pub struct IndexConfig {
373 pub aux_path: String,
383
384 pub staging_size: ReadableSize,
386 #[serde(with = "humantime_serde")]
390 pub staging_ttl: Option<Duration>,
391
392 pub build_mode: IndexBuildMode,
394
395 pub write_buffer_size: ReadableSize,
397
398 pub metadata_cache_size: ReadableSize,
400 pub content_cache_size: ReadableSize,
402 pub content_cache_page_size: ReadableSize,
404 pub result_cache_size: ReadableSize,
406}
407
408impl Default for IndexConfig {
409 fn default() -> Self {
410 Self {
411 aux_path: String::new(),
412 staging_size: ReadableSize::gb(2),
413 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
414 build_mode: IndexBuildMode::default(),
415 write_buffer_size: ReadableSize::mb(8),
416 metadata_cache_size: ReadableSize::mb(64),
417 content_cache_size: ReadableSize::mb(128),
418 content_cache_page_size: ReadableSize::kb(64),
419 result_cache_size: ReadableSize::mb(128),
420 }
421 }
422}
423
424impl IndexConfig {
425 pub fn sanitize(
426 &mut self,
427 data_home: &str,
428 inverted_index: &InvertedIndexConfig,
429 ) -> Result<()> {
430 #[allow(deprecated)]
431 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
432 self.aux_path.clone_from(&inverted_index.intermediate_path);
433 warn!(
434 "`inverted_index.intermediate_path` is deprecated, use
435 `index.aux_path` instead. Set `index.aux_path` to {}",
436 &inverted_index.intermediate_path
437 )
438 }
439 if self.aux_path.is_empty() {
440 let path = Path::new(data_home).join("index_intermediate");
441 self.aux_path = path.as_os_str().to_string_lossy().to_string();
442 }
443
444 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
445 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
446 warn!(
447 "Sanitize index write buffer size to {}",
448 self.write_buffer_size
449 );
450 }
451
452 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
453 self.staging_ttl = None;
454 }
455
456 Ok(())
457 }
458
459 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
460 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
461 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
462 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
463
464 let metadata_cache_size = cmp::min(
465 sys_memory / SST_META_CACHE_SIZE_FACTOR,
466 ReadableSize::mb(64),
467 );
468 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
469 }
470}
471
472#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
474#[serde(rename_all = "snake_case")]
475pub enum Mode {
476 #[default]
478 Auto,
479 Disable,
481}
482
483impl Mode {
484 pub fn disabled(&self) -> bool {
486 matches!(self, Mode::Disable)
487 }
488
489 pub fn auto(&self) -> bool {
491 matches!(self, Mode::Auto)
492 }
493}
494
495#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
497#[serde(rename_all = "snake_case")]
498pub enum MemoryThreshold {
499 #[default]
501 Auto,
502 Unlimited,
504 #[serde(untagged)]
506 Size(ReadableSize),
507}
508
509#[serde_as]
511#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
512#[serde(default)]
513pub struct InvertedIndexConfig {
514 pub create_on_flush: Mode,
516 pub create_on_compaction: Mode,
518 pub apply_on_query: Mode,
520
521 pub mem_threshold_on_create: MemoryThreshold,
523
524 #[deprecated = "use [IndexConfig::aux_path] instead"]
525 #[serde(skip_serializing)]
526 pub intermediate_path: String,
527
528 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
529 #[serde(skip_serializing)]
530 pub write_buffer_size: ReadableSize,
531}
532
533impl Default for InvertedIndexConfig {
534 #[allow(deprecated)]
535 fn default() -> Self {
536 Self {
537 create_on_flush: Mode::Auto,
538 create_on_compaction: Mode::Auto,
539 apply_on_query: Mode::Auto,
540 mem_threshold_on_create: MemoryThreshold::Auto,
541 write_buffer_size: ReadableSize::mb(8),
542 intermediate_path: String::new(),
543 }
544 }
545}
546
547impl InvertedIndexConfig {
548 pub fn mem_threshold_on_create(&self) -> Option<usize> {
549 match self.mem_threshold_on_create {
550 MemoryThreshold::Auto => {
551 if let Some(sys_memory) = get_total_memory_readable() {
552 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
553 } else {
554 Some(ReadableSize::mb(64).as_bytes() as usize)
555 }
556 }
557 MemoryThreshold::Unlimited => None,
558 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
559 }
560 }
561}
562
563#[serde_as]
565#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
566#[serde(default)]
567pub struct FulltextIndexConfig {
568 pub create_on_flush: Mode,
570 pub create_on_compaction: Mode,
572 pub apply_on_query: Mode,
574 pub mem_threshold_on_create: MemoryThreshold,
576 pub compress: bool,
578}
579
580impl Default for FulltextIndexConfig {
581 fn default() -> Self {
582 Self {
583 create_on_flush: Mode::Auto,
584 create_on_compaction: Mode::Auto,
585 apply_on_query: Mode::Auto,
586 mem_threshold_on_create: MemoryThreshold::Auto,
587 compress: true,
588 }
589 }
590}
591
592impl FulltextIndexConfig {
593 pub fn mem_threshold_on_create(&self) -> usize {
594 match self.mem_threshold_on_create {
595 MemoryThreshold::Auto => {
596 if let Some(sys_memory) = get_total_memory_readable() {
597 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
598 } else {
599 ReadableSize::mb(64).as_bytes() as _
600 }
601 }
602 MemoryThreshold::Unlimited => usize::MAX,
603 MemoryThreshold::Size(size) => size.as_bytes() as _,
604 }
605 }
606}
607
608#[serde_as]
610#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
611#[serde(default)]
612pub struct BloomFilterConfig {
613 pub create_on_flush: Mode,
615 pub create_on_compaction: Mode,
617 pub apply_on_query: Mode,
619 pub mem_threshold_on_create: MemoryThreshold,
621}
622
623impl Default for BloomFilterConfig {
624 fn default() -> Self {
625 Self {
626 create_on_flush: Mode::Auto,
627 create_on_compaction: Mode::Auto,
628 apply_on_query: Mode::Auto,
629 mem_threshold_on_create: MemoryThreshold::Auto,
630 }
631 }
632}
633
634impl BloomFilterConfig {
635 pub fn mem_threshold_on_create(&self) -> Option<usize> {
636 match self.mem_threshold_on_create {
637 MemoryThreshold::Auto => {
638 if let Some(sys_memory) = get_total_memory_readable() {
639 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
640 } else {
641 Some(ReadableSize::mb(64).as_bytes() as usize)
642 }
643 }
644 MemoryThreshold::Unlimited => None,
645 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
646 }
647 }
648}
649
650#[cfg(feature = "vector_index")]
652#[serde_as]
653#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
654#[serde(default)]
655pub struct VectorIndexConfig {
656 pub create_on_flush: Mode,
658 pub create_on_compaction: Mode,
660 pub apply_on_query: Mode,
662 pub mem_threshold_on_create: MemoryThreshold,
664}
665
666#[cfg(feature = "vector_index")]
667impl Default for VectorIndexConfig {
668 fn default() -> Self {
669 Self {
670 create_on_flush: Mode::Auto,
671 create_on_compaction: Mode::Auto,
672 apply_on_query: Mode::Auto,
673 mem_threshold_on_create: MemoryThreshold::Auto,
674 }
675 }
676}
677
678#[cfg(feature = "vector_index")]
679impl VectorIndexConfig {
680 pub fn mem_threshold_on_create(&self) -> Option<usize> {
681 match self.mem_threshold_on_create {
682 MemoryThreshold::Auto => {
683 if let Some(sys_memory) = get_total_memory_readable() {
684 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
685 } else {
686 Some(ReadableSize::mb(64).as_bytes() as usize)
687 }
688 }
689 MemoryThreshold::Unlimited => None,
690 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
691 }
692 }
693}
694
695fn divide_num_cpus(divisor: usize) -> usize {
697 debug_assert!(divisor > 0);
698 let cores = get_total_cpu_cores();
699 debug_assert!(cores > 0);
700
701 cores.div_ceil(divisor)
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[test]
709 fn test_deserialize_config() {
710 let s = r#"
711[memtable]
712type = "partition_tree"
713index_max_keys_per_shard = 8192
714data_freeze_threshold = 1024
715dedup = true
716fork_dictionary_bytes = "512MiB"
717"#;
718 let config: MitoConfig = toml::from_str(s).unwrap();
719 let MemtableConfig::PartitionTree(config) = &config.memtable else {
720 unreachable!()
721 };
722 assert_eq!(1024, config.data_freeze_threshold);
723 }
724}