1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::memtable::MemtableConfig;
33use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
34
35const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
36pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
38pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
40
41const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
43const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
45const MEM_CACHE_SIZE_FACTOR: u64 = 16;
47const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
49const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
51
52pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
54
55#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
58#[serde(default)]
59pub struct MitoConfig {
60 pub num_workers: usize,
64 pub worker_channel_size: usize,
66 pub worker_request_batch_size: usize,
68
69 pub manifest_checkpoint_distance: u64,
73 pub experimental_manifest_keep_removed_file_count: usize,
78 #[serde(with = "humantime_serde")]
83 pub experimental_manifest_keep_removed_file_ttl: Duration,
84 pub compress_manifest: bool,
86
87 pub max_background_index_builds: usize,
90 pub max_background_flushes: usize,
92 pub max_background_compactions: usize,
94 pub max_background_purges: usize,
96 pub experimental_compaction_memory_limit: MemoryLimit,
98 pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
100
101 #[serde(with = "humantime_serde")]
104 pub auto_flush_interval: Duration,
105 pub global_write_buffer_size: ReadableSize,
107 pub global_write_buffer_reject_size: ReadableSize,
109
110 pub sst_meta_cache_size: ReadableSize,
113 pub vector_cache_size: ReadableSize,
115 pub page_cache_size: ReadableSize,
117 pub selector_result_cache_size: ReadableSize,
119 pub range_result_cache_size: ReadableSize,
121 pub enable_write_cache: bool,
123 pub write_cache_path: String,
125 pub write_cache_size: ReadableSize,
127 #[serde(with = "humantime_serde")]
129 pub write_cache_ttl: Option<Duration>,
130 pub preload_index_cache: bool,
132 pub index_cache_percent: u8,
136 pub enable_refill_cache_on_read: bool,
139 pub manifest_cache_size: ReadableSize,
141
142 pub sst_write_buffer_size: ReadableSize,
145 pub parallel_scan_channel_size: usize,
147 pub max_concurrent_scan_files: usize,
149 pub allow_stale_entries: bool,
151 pub scan_memory_limit: MemoryLimit,
154 pub scan_memory_on_exhausted: OnExhaustedPolicy,
159
160 pub index: IndexConfig,
162 pub inverted_index: InvertedIndexConfig,
164 pub fulltext_index: FulltextIndexConfig,
166 pub bloom_filter_index: BloomFilterConfig,
168 #[cfg(feature = "vector_index")]
170 pub vector_index: VectorIndexConfig,
171
172 pub memtable: MemtableConfig,
174
175 #[serde(with = "humantime_serde")]
178 pub min_compaction_interval: Duration,
179
180 pub default_experimental_flat_format: bool,
183
184 pub gc: GcConfig,
185}
186
187impl Default for MitoConfig {
188 fn default() -> Self {
189 let mut mito_config = MitoConfig {
190 num_workers: divide_num_cpus(2),
191 worker_channel_size: 128,
192 worker_request_batch_size: 64,
193 manifest_checkpoint_distance: 10,
194 experimental_manifest_keep_removed_file_count: 256,
195 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
196 compress_manifest: false,
197 max_background_index_builds: divide_num_cpus(8),
198 max_background_flushes: divide_num_cpus(2),
199 max_background_compactions: divide_num_cpus(4),
200 max_background_purges: get_total_cpu_cores(),
201 experimental_compaction_memory_limit: MemoryLimit::Unlimited,
202 experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
203 auto_flush_interval: Duration::from_secs(30 * 60),
204 global_write_buffer_size: ReadableSize::gb(1),
205 global_write_buffer_reject_size: ReadableSize::gb(2),
206 sst_meta_cache_size: ReadableSize::mb(128),
207 vector_cache_size: ReadableSize::mb(512),
208 page_cache_size: ReadableSize::mb(512),
209 selector_result_cache_size: ReadableSize::mb(512),
210 range_result_cache_size: ReadableSize::mb(512),
211 enable_write_cache: false,
212 write_cache_path: String::new(),
213 write_cache_size: ReadableSize::gb(5),
214 write_cache_ttl: None,
215 preload_index_cache: true,
216 index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
217 enable_refill_cache_on_read: true,
218 manifest_cache_size: ReadableSize::mb(256),
219 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
220 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
221 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
222 allow_stale_entries: false,
223 scan_memory_limit: MemoryLimit::default(),
224 scan_memory_on_exhausted: OnExhaustedPolicy::Fail,
225 index: IndexConfig::default(),
226 inverted_index: InvertedIndexConfig::default(),
227 fulltext_index: FulltextIndexConfig::default(),
228 bloom_filter_index: BloomFilterConfig::default(),
229 #[cfg(feature = "vector_index")]
230 vector_index: VectorIndexConfig::default(),
231 memtable: MemtableConfig::default(),
232 min_compaction_interval: Duration::from_secs(0),
233 default_experimental_flat_format: false,
234 gc: GcConfig::default(),
235 };
236
237 if let Some(sys_memory) = get_total_memory_readable() {
239 mito_config.adjust_buffer_and_cache_size(sys_memory);
240 }
241
242 mito_config
243 }
244}
245
246impl MitoConfig {
247 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
251 if self.num_workers == 0 {
253 self.num_workers = divide_num_cpus(2);
254 }
255
256 if self.worker_channel_size == 0 {
258 warn!("Sanitize channel size 0 to 1");
259 self.worker_channel_size = 1;
260 }
261
262 if self.max_background_flushes == 0 {
263 warn!(
264 "Sanitize max background flushes 0 to {}",
265 divide_num_cpus(2)
266 );
267 self.max_background_flushes = divide_num_cpus(2);
268 }
269 if self.max_background_compactions == 0 {
270 warn!(
271 "Sanitize max background compactions 0 to {}",
272 divide_num_cpus(4)
273 );
274 self.max_background_compactions = divide_num_cpus(4);
275 }
276 if self.max_background_purges == 0 {
277 let cpu_cores = get_total_cpu_cores();
278 warn!("Sanitize max background purges 0 to {}", cpu_cores);
279 self.max_background_purges = cpu_cores;
280 }
281
282 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
283 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
284 warn!(
285 "Sanitize global write buffer reject size to {}",
286 self.global_write_buffer_reject_size
287 );
288 }
289
290 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
291 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
292 warn!(
293 "Sanitize sst write buffer size to {}",
294 self.sst_write_buffer_size
295 );
296 }
297
298 if self.parallel_scan_channel_size < 1 {
299 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
300 warn!(
301 "Sanitize scan channel size to {}",
302 self.parallel_scan_channel_size
303 );
304 }
305
306 if self.write_cache_path.trim().is_empty() {
308 self.write_cache_path = data_home.to_string();
309 }
310
311 if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
313 warn!(
314 "Invalid index_cache_percent {}, resetting to default {}",
315 self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
316 );
317 self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
318 }
319
320 self.index.sanitize(data_home, &self.inverted_index)?;
321
322 Ok(())
323 }
324
325 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
326 let global_write_buffer_size = cmp::min(
328 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
329 ReadableSize::gb(1),
330 );
331 let global_write_buffer_reject_size = global_write_buffer_size * 2;
333 let sst_meta_cache_size = cmp::min(
335 sys_memory / SST_META_CACHE_SIZE_FACTOR,
336 ReadableSize::mb(128),
337 );
338 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
340 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
341
342 self.global_write_buffer_size = global_write_buffer_size;
343 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
344 self.sst_meta_cache_size = sst_meta_cache_size;
345 self.vector_cache_size = mem_cache_size;
346 self.page_cache_size = page_cache_size;
347 self.selector_result_cache_size = mem_cache_size;
348 self.range_result_cache_size = mem_cache_size;
349
350 self.index.adjust_buffer_and_cache_size(sys_memory);
351 }
352
353 #[cfg(test)]
355 pub fn enable_write_cache(
356 mut self,
357 path: String,
358 size: ReadableSize,
359 ttl: Option<Duration>,
360 ) -> Self {
361 self.enable_write_cache = true;
362 self.write_cache_path = path;
363 self.write_cache_size = size;
364 self.write_cache_ttl = ttl;
365 self
366 }
367}
368
369#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
371#[serde(rename_all = "snake_case")]
372pub enum IndexBuildMode {
373 #[default]
375 Sync,
376 Async,
378}
379
380#[serde_as]
381#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
382#[serde(default)]
383pub struct IndexConfig {
384 pub aux_path: String,
394
395 pub staging_size: ReadableSize,
397 #[serde(with = "humantime_serde")]
401 pub staging_ttl: Option<Duration>,
402
403 pub build_mode: IndexBuildMode,
405
406 pub write_buffer_size: ReadableSize,
408
409 pub metadata_cache_size: ReadableSize,
411 pub content_cache_size: ReadableSize,
413 pub content_cache_page_size: ReadableSize,
415 pub result_cache_size: ReadableSize,
417}
418
419impl Default for IndexConfig {
420 fn default() -> Self {
421 Self {
422 aux_path: String::new(),
423 staging_size: ReadableSize::gb(2),
424 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
425 build_mode: IndexBuildMode::default(),
426 write_buffer_size: ReadableSize::mb(8),
427 metadata_cache_size: ReadableSize::mb(64),
428 content_cache_size: ReadableSize::mb(128),
429 content_cache_page_size: ReadableSize::kb(64),
430 result_cache_size: ReadableSize::mb(128),
431 }
432 }
433}
434
435impl IndexConfig {
436 pub fn sanitize(
437 &mut self,
438 data_home: &str,
439 inverted_index: &InvertedIndexConfig,
440 ) -> Result<()> {
441 #[allow(deprecated)]
442 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
443 self.aux_path.clone_from(&inverted_index.intermediate_path);
444 warn!(
445 "`inverted_index.intermediate_path` is deprecated, use
446 `index.aux_path` instead. Set `index.aux_path` to {}",
447 &inverted_index.intermediate_path
448 )
449 }
450 if self.aux_path.is_empty() {
451 let path = Path::new(data_home).join("index_intermediate");
452 self.aux_path = path.as_os_str().to_string_lossy().to_string();
453 }
454
455 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
456 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
457 warn!(
458 "Sanitize index write buffer size to {}",
459 self.write_buffer_size
460 );
461 }
462
463 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
464 self.staging_ttl = None;
465 }
466
467 Ok(())
468 }
469
470 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
471 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
472 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
473 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
474
475 let metadata_cache_size = cmp::min(
476 sys_memory / SST_META_CACHE_SIZE_FACTOR,
477 ReadableSize::mb(64),
478 );
479 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
480 }
481}
482
483#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
485#[serde(rename_all = "snake_case")]
486pub enum Mode {
487 #[default]
489 Auto,
490 Disable,
492}
493
494impl Mode {
495 pub fn disabled(&self) -> bool {
497 matches!(self, Mode::Disable)
498 }
499
500 pub fn auto(&self) -> bool {
502 matches!(self, Mode::Auto)
503 }
504}
505
506#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
508#[serde(rename_all = "snake_case")]
509pub enum MemoryThreshold {
510 #[default]
512 Auto,
513 Unlimited,
515 #[serde(untagged)]
517 Size(ReadableSize),
518}
519
520#[serde_as]
522#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
523#[serde(default)]
524pub struct InvertedIndexConfig {
525 pub create_on_flush: Mode,
527 pub create_on_compaction: Mode,
529 pub apply_on_query: Mode,
531
532 pub mem_threshold_on_create: MemoryThreshold,
534
535 #[deprecated = "use [IndexConfig::aux_path] instead"]
536 #[serde(skip_serializing)]
537 pub intermediate_path: String,
538
539 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
540 #[serde(skip_serializing)]
541 pub write_buffer_size: ReadableSize,
542}
543
544impl Default for InvertedIndexConfig {
545 #[allow(deprecated)]
546 fn default() -> Self {
547 Self {
548 create_on_flush: Mode::Auto,
549 create_on_compaction: Mode::Auto,
550 apply_on_query: Mode::Auto,
551 mem_threshold_on_create: MemoryThreshold::Auto,
552 write_buffer_size: ReadableSize::mb(8),
553 intermediate_path: String::new(),
554 }
555 }
556}
557
558impl InvertedIndexConfig {
559 pub fn mem_threshold_on_create(&self) -> Option<usize> {
560 match self.mem_threshold_on_create {
561 MemoryThreshold::Auto => {
562 if let Some(sys_memory) = get_total_memory_readable() {
563 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
564 } else {
565 Some(ReadableSize::mb(64).as_bytes() as usize)
566 }
567 }
568 MemoryThreshold::Unlimited => None,
569 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
570 }
571 }
572}
573
574#[serde_as]
576#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
577#[serde(default)]
578pub struct FulltextIndexConfig {
579 pub create_on_flush: Mode,
581 pub create_on_compaction: Mode,
583 pub apply_on_query: Mode,
585 pub mem_threshold_on_create: MemoryThreshold,
587 pub compress: bool,
589}
590
591impl Default for FulltextIndexConfig {
592 fn default() -> Self {
593 Self {
594 create_on_flush: Mode::Auto,
595 create_on_compaction: Mode::Auto,
596 apply_on_query: Mode::Auto,
597 mem_threshold_on_create: MemoryThreshold::Auto,
598 compress: true,
599 }
600 }
601}
602
603impl FulltextIndexConfig {
604 pub fn mem_threshold_on_create(&self) -> usize {
605 match self.mem_threshold_on_create {
606 MemoryThreshold::Auto => {
607 if let Some(sys_memory) = get_total_memory_readable() {
608 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
609 } else {
610 ReadableSize::mb(64).as_bytes() as _
611 }
612 }
613 MemoryThreshold::Unlimited => usize::MAX,
614 MemoryThreshold::Size(size) => size.as_bytes() as _,
615 }
616 }
617}
618
619#[serde_as]
621#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
622#[serde(default)]
623pub struct BloomFilterConfig {
624 pub create_on_flush: Mode,
626 pub create_on_compaction: Mode,
628 pub apply_on_query: Mode,
630 pub mem_threshold_on_create: MemoryThreshold,
632}
633
634impl Default for BloomFilterConfig {
635 fn default() -> Self {
636 Self {
637 create_on_flush: Mode::Auto,
638 create_on_compaction: Mode::Auto,
639 apply_on_query: Mode::Auto,
640 mem_threshold_on_create: MemoryThreshold::Auto,
641 }
642 }
643}
644
645impl BloomFilterConfig {
646 pub fn mem_threshold_on_create(&self) -> Option<usize> {
647 match self.mem_threshold_on_create {
648 MemoryThreshold::Auto => {
649 if let Some(sys_memory) = get_total_memory_readable() {
650 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
651 } else {
652 Some(ReadableSize::mb(64).as_bytes() as usize)
653 }
654 }
655 MemoryThreshold::Unlimited => None,
656 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
657 }
658 }
659}
660
661#[cfg(feature = "vector_index")]
663#[serde_as]
664#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
665#[serde(default)]
666pub struct VectorIndexConfig {
667 pub create_on_flush: Mode,
669 pub create_on_compaction: Mode,
671 pub apply_on_query: Mode,
673 pub mem_threshold_on_create: MemoryThreshold,
675}
676
677#[cfg(feature = "vector_index")]
678impl Default for VectorIndexConfig {
679 fn default() -> Self {
680 Self {
681 create_on_flush: Mode::Auto,
682 create_on_compaction: Mode::Auto,
683 apply_on_query: Mode::Auto,
684 mem_threshold_on_create: MemoryThreshold::Auto,
685 }
686 }
687}
688
689#[cfg(feature = "vector_index")]
690impl VectorIndexConfig {
691 pub fn mem_threshold_on_create(&self) -> Option<usize> {
692 match self.mem_threshold_on_create {
693 MemoryThreshold::Auto => {
694 if let Some(sys_memory) = get_total_memory_readable() {
695 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
696 } else {
697 Some(ReadableSize::mb(64).as_bytes() as usize)
698 }
699 }
700 MemoryThreshold::Unlimited => None,
701 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
702 }
703 }
704}
705
706fn divide_num_cpus(divisor: usize) -> usize {
708 debug_assert!(divisor > 0);
709 let cores = get_total_cpu_cores();
710 debug_assert!(cores > 0);
711
712 cores.div_ceil(divisor)
713}
714
715#[cfg(test)]
716mod tests {
717 use super::*;
718
719 #[test]
720 fn test_deserialize_config() {
721 let s = r#"
722[memtable]
723type = "partition_tree"
724index_max_keys_per_shard = 8192
725data_freeze_threshold = 1024
726dedup = true
727fork_dictionary_bytes = "512MiB"
728"#;
729 let config: MitoConfig = toml::from_str(s).unwrap();
730 let MemtableConfig::PartitionTree(config) = &config.memtable else {
731 unreachable!()
732 };
733 assert_eq!(1024, config.data_freeze_threshold);
734 }
735}