1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::memtable::MemtableConfig;
33use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
34
35const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
36pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
38pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
40
41const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
43const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
45const MEM_CACHE_SIZE_FACTOR: u64 = 16;
47const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
49const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
51
52pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
54
55#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
58#[serde(default)]
59pub struct MitoConfig {
60 pub num_workers: usize,
64 pub worker_channel_size: usize,
66 pub worker_request_batch_size: usize,
68
69 pub manifest_checkpoint_distance: u64,
73 pub experimental_manifest_keep_removed_file_count: usize,
78 #[serde(with = "humantime_serde")]
83 pub experimental_manifest_keep_removed_file_ttl: Duration,
84 pub compress_manifest: bool,
86
87 pub max_background_index_builds: usize,
90 pub max_background_flushes: usize,
92 pub max_background_compactions: usize,
94 pub max_background_purges: usize,
96 pub experimental_compaction_memory_limit: MemoryLimit,
98 pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
100
101 #[serde(with = "humantime_serde")]
104 pub auto_flush_interval: Duration,
105 pub global_write_buffer_size: ReadableSize,
107 pub global_write_buffer_reject_size: ReadableSize,
109
110 pub sst_meta_cache_size: ReadableSize,
113 pub vector_cache_size: ReadableSize,
115 pub page_cache_size: ReadableSize,
117 pub selector_result_cache_size: ReadableSize,
119 pub enable_write_cache: bool,
121 pub write_cache_path: String,
123 pub write_cache_size: ReadableSize,
125 #[serde(with = "humantime_serde")]
127 pub write_cache_ttl: Option<Duration>,
128 pub preload_index_cache: bool,
130 pub index_cache_percent: u8,
134 pub enable_refill_cache_on_read: bool,
137 pub manifest_cache_size: ReadableSize,
139
140 pub sst_write_buffer_size: ReadableSize,
143 pub parallel_scan_channel_size: usize,
145 pub max_concurrent_scan_files: usize,
147 pub allow_stale_entries: bool,
149 pub scan_memory_limit: MemoryLimit,
152
153 pub index: IndexConfig,
155 pub inverted_index: InvertedIndexConfig,
157 pub fulltext_index: FulltextIndexConfig,
159 pub bloom_filter_index: BloomFilterConfig,
161 #[cfg(feature = "vector_index")]
163 pub vector_index: VectorIndexConfig,
164
165 pub memtable: MemtableConfig,
167
168 #[serde(with = "humantime_serde")]
171 pub min_compaction_interval: Duration,
172
173 pub default_experimental_flat_format: bool,
176
177 pub gc: GcConfig,
178}
179
180impl Default for MitoConfig {
181 fn default() -> Self {
182 let mut mito_config = MitoConfig {
183 num_workers: divide_num_cpus(2),
184 worker_channel_size: 128,
185 worker_request_batch_size: 64,
186 manifest_checkpoint_distance: 10,
187 experimental_manifest_keep_removed_file_count: 256,
188 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
189 compress_manifest: false,
190 max_background_index_builds: divide_num_cpus(8),
191 max_background_flushes: divide_num_cpus(2),
192 max_background_compactions: divide_num_cpus(4),
193 max_background_purges: get_total_cpu_cores(),
194 experimental_compaction_memory_limit: MemoryLimit::Unlimited,
195 experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
196 auto_flush_interval: Duration::from_secs(30 * 60),
197 global_write_buffer_size: ReadableSize::gb(1),
198 global_write_buffer_reject_size: ReadableSize::gb(2),
199 sst_meta_cache_size: ReadableSize::mb(128),
200 vector_cache_size: ReadableSize::mb(512),
201 page_cache_size: ReadableSize::mb(512),
202 selector_result_cache_size: ReadableSize::mb(512),
203 enable_write_cache: false,
204 write_cache_path: String::new(),
205 write_cache_size: ReadableSize::gb(5),
206 write_cache_ttl: None,
207 preload_index_cache: true,
208 index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
209 enable_refill_cache_on_read: true,
210 manifest_cache_size: ReadableSize::mb(256),
211 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
212 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
213 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
214 allow_stale_entries: false,
215 scan_memory_limit: MemoryLimit::default(),
216 index: IndexConfig::default(),
217 inverted_index: InvertedIndexConfig::default(),
218 fulltext_index: FulltextIndexConfig::default(),
219 bloom_filter_index: BloomFilterConfig::default(),
220 #[cfg(feature = "vector_index")]
221 vector_index: VectorIndexConfig::default(),
222 memtable: MemtableConfig::default(),
223 min_compaction_interval: Duration::from_secs(0),
224 default_experimental_flat_format: false,
225 gc: GcConfig::default(),
226 };
227
228 if let Some(sys_memory) = get_total_memory_readable() {
230 mito_config.adjust_buffer_and_cache_size(sys_memory);
231 }
232
233 mito_config
234 }
235}
236
237impl MitoConfig {
238 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
242 if self.num_workers == 0 {
244 self.num_workers = divide_num_cpus(2);
245 }
246
247 if self.worker_channel_size == 0 {
249 warn!("Sanitize channel size 0 to 1");
250 self.worker_channel_size = 1;
251 }
252
253 if self.max_background_flushes == 0 {
254 warn!(
255 "Sanitize max background flushes 0 to {}",
256 divide_num_cpus(2)
257 );
258 self.max_background_flushes = divide_num_cpus(2);
259 }
260 if self.max_background_compactions == 0 {
261 warn!(
262 "Sanitize max background compactions 0 to {}",
263 divide_num_cpus(4)
264 );
265 self.max_background_compactions = divide_num_cpus(4);
266 }
267 if self.max_background_purges == 0 {
268 let cpu_cores = get_total_cpu_cores();
269 warn!("Sanitize max background purges 0 to {}", cpu_cores);
270 self.max_background_purges = cpu_cores;
271 }
272
273 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
274 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
275 warn!(
276 "Sanitize global write buffer reject size to {}",
277 self.global_write_buffer_reject_size
278 );
279 }
280
281 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
282 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
283 warn!(
284 "Sanitize sst write buffer size to {}",
285 self.sst_write_buffer_size
286 );
287 }
288
289 if self.parallel_scan_channel_size < 1 {
290 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
291 warn!(
292 "Sanitize scan channel size to {}",
293 self.parallel_scan_channel_size
294 );
295 }
296
297 if self.write_cache_path.trim().is_empty() {
299 self.write_cache_path = data_home.to_string();
300 }
301
302 if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
304 warn!(
305 "Invalid index_cache_percent {}, resetting to default {}",
306 self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
307 );
308 self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
309 }
310
311 self.index.sanitize(data_home, &self.inverted_index)?;
312
313 Ok(())
314 }
315
316 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
317 let global_write_buffer_size = cmp::min(
319 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
320 ReadableSize::gb(1),
321 );
322 let global_write_buffer_reject_size = global_write_buffer_size * 2;
324 let sst_meta_cache_size = cmp::min(
326 sys_memory / SST_META_CACHE_SIZE_FACTOR,
327 ReadableSize::mb(128),
328 );
329 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
331 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
332
333 self.global_write_buffer_size = global_write_buffer_size;
334 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
335 self.sst_meta_cache_size = sst_meta_cache_size;
336 self.vector_cache_size = mem_cache_size;
337 self.page_cache_size = page_cache_size;
338 self.selector_result_cache_size = mem_cache_size;
339
340 self.index.adjust_buffer_and_cache_size(sys_memory);
341 }
342
343 #[cfg(test)]
345 pub fn enable_write_cache(
346 mut self,
347 path: String,
348 size: ReadableSize,
349 ttl: Option<Duration>,
350 ) -> Self {
351 self.enable_write_cache = true;
352 self.write_cache_path = path;
353 self.write_cache_size = size;
354 self.write_cache_ttl = ttl;
355 self
356 }
357}
358
359#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
361#[serde(rename_all = "snake_case")]
362pub enum IndexBuildMode {
363 #[default]
365 Sync,
366 Async,
368}
369
370#[serde_as]
371#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
372#[serde(default)]
373pub struct IndexConfig {
374 pub aux_path: String,
384
385 pub staging_size: ReadableSize,
387 #[serde(with = "humantime_serde")]
391 pub staging_ttl: Option<Duration>,
392
393 pub build_mode: IndexBuildMode,
395
396 pub write_buffer_size: ReadableSize,
398
399 pub metadata_cache_size: ReadableSize,
401 pub content_cache_size: ReadableSize,
403 pub content_cache_page_size: ReadableSize,
405 pub result_cache_size: ReadableSize,
407}
408
409impl Default for IndexConfig {
410 fn default() -> Self {
411 Self {
412 aux_path: String::new(),
413 staging_size: ReadableSize::gb(2),
414 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
415 build_mode: IndexBuildMode::default(),
416 write_buffer_size: ReadableSize::mb(8),
417 metadata_cache_size: ReadableSize::mb(64),
418 content_cache_size: ReadableSize::mb(128),
419 content_cache_page_size: ReadableSize::kb(64),
420 result_cache_size: ReadableSize::mb(128),
421 }
422 }
423}
424
425impl IndexConfig {
426 pub fn sanitize(
427 &mut self,
428 data_home: &str,
429 inverted_index: &InvertedIndexConfig,
430 ) -> Result<()> {
431 #[allow(deprecated)]
432 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
433 self.aux_path.clone_from(&inverted_index.intermediate_path);
434 warn!(
435 "`inverted_index.intermediate_path` is deprecated, use
436 `index.aux_path` instead. Set `index.aux_path` to {}",
437 &inverted_index.intermediate_path
438 )
439 }
440 if self.aux_path.is_empty() {
441 let path = Path::new(data_home).join("index_intermediate");
442 self.aux_path = path.as_os_str().to_string_lossy().to_string();
443 }
444
445 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
446 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
447 warn!(
448 "Sanitize index write buffer size to {}",
449 self.write_buffer_size
450 );
451 }
452
453 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
454 self.staging_ttl = None;
455 }
456
457 Ok(())
458 }
459
460 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
461 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
462 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
463 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
464
465 let metadata_cache_size = cmp::min(
466 sys_memory / SST_META_CACHE_SIZE_FACTOR,
467 ReadableSize::mb(64),
468 );
469 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
470 }
471}
472
473#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
475#[serde(rename_all = "snake_case")]
476pub enum Mode {
477 #[default]
479 Auto,
480 Disable,
482}
483
484impl Mode {
485 pub fn disabled(&self) -> bool {
487 matches!(self, Mode::Disable)
488 }
489
490 pub fn auto(&self) -> bool {
492 matches!(self, Mode::Auto)
493 }
494}
495
496#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
498#[serde(rename_all = "snake_case")]
499pub enum MemoryThreshold {
500 #[default]
502 Auto,
503 Unlimited,
505 #[serde(untagged)]
507 Size(ReadableSize),
508}
509
510#[serde_as]
512#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
513#[serde(default)]
514pub struct InvertedIndexConfig {
515 pub create_on_flush: Mode,
517 pub create_on_compaction: Mode,
519 pub apply_on_query: Mode,
521
522 pub mem_threshold_on_create: MemoryThreshold,
524
525 #[deprecated = "use [IndexConfig::aux_path] instead"]
526 #[serde(skip_serializing)]
527 pub intermediate_path: String,
528
529 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
530 #[serde(skip_serializing)]
531 pub write_buffer_size: ReadableSize,
532}
533
534impl Default for InvertedIndexConfig {
535 #[allow(deprecated)]
536 fn default() -> Self {
537 Self {
538 create_on_flush: Mode::Auto,
539 create_on_compaction: Mode::Auto,
540 apply_on_query: Mode::Auto,
541 mem_threshold_on_create: MemoryThreshold::Auto,
542 write_buffer_size: ReadableSize::mb(8),
543 intermediate_path: String::new(),
544 }
545 }
546}
547
548impl InvertedIndexConfig {
549 pub fn mem_threshold_on_create(&self) -> Option<usize> {
550 match self.mem_threshold_on_create {
551 MemoryThreshold::Auto => {
552 if let Some(sys_memory) = get_total_memory_readable() {
553 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
554 } else {
555 Some(ReadableSize::mb(64).as_bytes() as usize)
556 }
557 }
558 MemoryThreshold::Unlimited => None,
559 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
560 }
561 }
562}
563
564#[serde_as]
566#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
567#[serde(default)]
568pub struct FulltextIndexConfig {
569 pub create_on_flush: Mode,
571 pub create_on_compaction: Mode,
573 pub apply_on_query: Mode,
575 pub mem_threshold_on_create: MemoryThreshold,
577 pub compress: bool,
579}
580
581impl Default for FulltextIndexConfig {
582 fn default() -> Self {
583 Self {
584 create_on_flush: Mode::Auto,
585 create_on_compaction: Mode::Auto,
586 apply_on_query: Mode::Auto,
587 mem_threshold_on_create: MemoryThreshold::Auto,
588 compress: true,
589 }
590 }
591}
592
593impl FulltextIndexConfig {
594 pub fn mem_threshold_on_create(&self) -> usize {
595 match self.mem_threshold_on_create {
596 MemoryThreshold::Auto => {
597 if let Some(sys_memory) = get_total_memory_readable() {
598 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
599 } else {
600 ReadableSize::mb(64).as_bytes() as _
601 }
602 }
603 MemoryThreshold::Unlimited => usize::MAX,
604 MemoryThreshold::Size(size) => size.as_bytes() as _,
605 }
606 }
607}
608
609#[serde_as]
611#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
612#[serde(default)]
613pub struct BloomFilterConfig {
614 pub create_on_flush: Mode,
616 pub create_on_compaction: Mode,
618 pub apply_on_query: Mode,
620 pub mem_threshold_on_create: MemoryThreshold,
622}
623
624impl Default for BloomFilterConfig {
625 fn default() -> Self {
626 Self {
627 create_on_flush: Mode::Auto,
628 create_on_compaction: Mode::Auto,
629 apply_on_query: Mode::Auto,
630 mem_threshold_on_create: MemoryThreshold::Auto,
631 }
632 }
633}
634
635impl BloomFilterConfig {
636 pub fn mem_threshold_on_create(&self) -> Option<usize> {
637 match self.mem_threshold_on_create {
638 MemoryThreshold::Auto => {
639 if let Some(sys_memory) = get_total_memory_readable() {
640 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
641 } else {
642 Some(ReadableSize::mb(64).as_bytes() as usize)
643 }
644 }
645 MemoryThreshold::Unlimited => None,
646 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
647 }
648 }
649}
650
651#[cfg(feature = "vector_index")]
653#[serde_as]
654#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
655#[serde(default)]
656pub struct VectorIndexConfig {
657 pub create_on_flush: Mode,
659 pub create_on_compaction: Mode,
661 pub apply_on_query: Mode,
663 pub mem_threshold_on_create: MemoryThreshold,
665}
666
667#[cfg(feature = "vector_index")]
668impl Default for VectorIndexConfig {
669 fn default() -> Self {
670 Self {
671 create_on_flush: Mode::Auto,
672 create_on_compaction: Mode::Auto,
673 apply_on_query: Mode::Auto,
674 mem_threshold_on_create: MemoryThreshold::Auto,
675 }
676 }
677}
678
679#[cfg(feature = "vector_index")]
680impl VectorIndexConfig {
681 pub fn mem_threshold_on_create(&self) -> Option<usize> {
682 match self.mem_threshold_on_create {
683 MemoryThreshold::Auto => {
684 if let Some(sys_memory) = get_total_memory_readable() {
685 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
686 } else {
687 Some(ReadableSize::mb(64).as_bytes() as usize)
688 }
689 }
690 MemoryThreshold::Unlimited => None,
691 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
692 }
693 }
694}
695
696fn divide_num_cpus(divisor: usize) -> usize {
698 debug_assert!(divisor > 0);
699 let cores = get_total_cpu_cores();
700 debug_assert!(cores > 0);
701
702 cores.div_ceil(divisor)
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708
709 #[test]
710 fn test_deserialize_config() {
711 let s = r#"
712[memtable]
713type = "partition_tree"
714index_max_keys_per_shard = 8192
715data_freeze_threshold = 1024
716dedup = true
717fork_dictionary_bytes = "512MiB"
718"#;
719 let config: MitoConfig = toml::from_str(s).unwrap();
720 let MemtableConfig::PartitionTree(config) = &config.memtable else {
721 unreachable!()
722 };
723 assert_eq!(1024, config.data_freeze_threshold);
724 }
725}