1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_stat::{get_total_cpu_cores, get_total_memory_readable};
24use common_telemetry::warn;
25use serde::{Deserialize, Serialize};
26use serde_with::serde_as;
27
28use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
29use crate::error::Result;
30use crate::gc::GcConfig;
31use crate::memtable::MemtableConfig;
32use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
33
34const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
35pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
37pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
39
40const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
42const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
44const MEM_CACHE_SIZE_FACTOR: u64 = 16;
46const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
48const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
50
51pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
53
54#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
57#[serde(default)]
58pub struct MitoConfig {
59 pub num_workers: usize,
63 pub worker_channel_size: usize,
65 pub worker_request_batch_size: usize,
67
68 pub manifest_checkpoint_distance: u64,
72 pub experimental_manifest_keep_removed_file_count: usize,
77 #[serde(with = "humantime_serde")]
82 pub experimental_manifest_keep_removed_file_ttl: Duration,
83 pub compress_manifest: bool,
85
86 pub max_background_index_builds: usize,
89 pub max_background_flushes: usize,
91 pub max_background_compactions: usize,
93 pub max_background_purges: usize,
95
96 #[serde(with = "humantime_serde")]
99 pub auto_flush_interval: Duration,
100 pub global_write_buffer_size: ReadableSize,
102 pub global_write_buffer_reject_size: ReadableSize,
104
105 pub sst_meta_cache_size: ReadableSize,
108 pub vector_cache_size: ReadableSize,
110 pub page_cache_size: ReadableSize,
112 pub selector_result_cache_size: ReadableSize,
114 pub enable_write_cache: bool,
116 pub write_cache_path: String,
118 pub write_cache_size: ReadableSize,
120 #[serde(with = "humantime_serde")]
122 pub write_cache_ttl: Option<Duration>,
123 pub preload_index_cache: bool,
125 pub index_cache_percent: u8,
129
130 pub sst_write_buffer_size: ReadableSize,
133 pub parallel_scan_channel_size: usize,
135 pub max_concurrent_scan_files: usize,
137 pub allow_stale_entries: bool,
139 pub scan_memory_limit: MemoryLimit,
142
143 pub index: IndexConfig,
145 pub inverted_index: InvertedIndexConfig,
147 pub fulltext_index: FulltextIndexConfig,
149 pub bloom_filter_index: BloomFilterConfig,
151
152 pub memtable: MemtableConfig,
154
155 #[serde(with = "humantime_serde")]
158 pub min_compaction_interval: Duration,
159
160 pub default_experimental_flat_format: bool,
163
164 pub gc: GcConfig,
165}
166
167impl Default for MitoConfig {
168 fn default() -> Self {
169 let mut mito_config = MitoConfig {
170 num_workers: divide_num_cpus(2),
171 worker_channel_size: 128,
172 worker_request_batch_size: 64,
173 manifest_checkpoint_distance: 10,
174 experimental_manifest_keep_removed_file_count: 256,
175 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
176 compress_manifest: false,
177 max_background_index_builds: divide_num_cpus(8),
178 max_background_flushes: divide_num_cpus(2),
179 max_background_compactions: divide_num_cpus(4),
180 max_background_purges: get_total_cpu_cores(),
181 auto_flush_interval: Duration::from_secs(30 * 60),
182 global_write_buffer_size: ReadableSize::gb(1),
183 global_write_buffer_reject_size: ReadableSize::gb(2),
184 sst_meta_cache_size: ReadableSize::mb(128),
185 vector_cache_size: ReadableSize::mb(512),
186 page_cache_size: ReadableSize::mb(512),
187 selector_result_cache_size: ReadableSize::mb(512),
188 enable_write_cache: false,
189 write_cache_path: String::new(),
190 write_cache_size: ReadableSize::gb(5),
191 write_cache_ttl: None,
192 preload_index_cache: true,
193 index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
194 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
195 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
196 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
197 allow_stale_entries: false,
198 scan_memory_limit: MemoryLimit::default(),
199 index: IndexConfig::default(),
200 inverted_index: InvertedIndexConfig::default(),
201 fulltext_index: FulltextIndexConfig::default(),
202 bloom_filter_index: BloomFilterConfig::default(),
203 memtable: MemtableConfig::default(),
204 min_compaction_interval: Duration::from_secs(0),
205 default_experimental_flat_format: false,
206 gc: GcConfig::default(),
207 };
208
209 if let Some(sys_memory) = get_total_memory_readable() {
211 mito_config.adjust_buffer_and_cache_size(sys_memory);
212 }
213
214 mito_config
215 }
216}
217
218impl MitoConfig {
219 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
223 if self.num_workers == 0 {
225 self.num_workers = divide_num_cpus(2);
226 }
227
228 if self.worker_channel_size == 0 {
230 warn!("Sanitize channel size 0 to 1");
231 self.worker_channel_size = 1;
232 }
233
234 if self.max_background_flushes == 0 {
235 warn!(
236 "Sanitize max background flushes 0 to {}",
237 divide_num_cpus(2)
238 );
239 self.max_background_flushes = divide_num_cpus(2);
240 }
241 if self.max_background_compactions == 0 {
242 warn!(
243 "Sanitize max background compactions 0 to {}",
244 divide_num_cpus(4)
245 );
246 self.max_background_compactions = divide_num_cpus(4);
247 }
248 if self.max_background_purges == 0 {
249 let cpu_cores = get_total_cpu_cores();
250 warn!("Sanitize max background purges 0 to {}", cpu_cores);
251 self.max_background_purges = cpu_cores;
252 }
253
254 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
255 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
256 warn!(
257 "Sanitize global write buffer reject size to {}",
258 self.global_write_buffer_reject_size
259 );
260 }
261
262 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
263 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
264 warn!(
265 "Sanitize sst write buffer size to {}",
266 self.sst_write_buffer_size
267 );
268 }
269
270 if self.parallel_scan_channel_size < 1 {
271 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
272 warn!(
273 "Sanitize scan channel size to {}",
274 self.parallel_scan_channel_size
275 );
276 }
277
278 if self.write_cache_path.trim().is_empty() {
280 self.write_cache_path = data_home.to_string();
281 }
282
283 if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
285 warn!(
286 "Invalid index_cache_percent {}, resetting to default {}",
287 self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
288 );
289 self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
290 }
291
292 self.index.sanitize(data_home, &self.inverted_index)?;
293
294 Ok(())
295 }
296
297 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
298 let global_write_buffer_size = cmp::min(
300 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
301 ReadableSize::gb(1),
302 );
303 let global_write_buffer_reject_size = global_write_buffer_size * 2;
305 let sst_meta_cache_size = cmp::min(
307 sys_memory / SST_META_CACHE_SIZE_FACTOR,
308 ReadableSize::mb(128),
309 );
310 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
312 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
313
314 self.global_write_buffer_size = global_write_buffer_size;
315 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
316 self.sst_meta_cache_size = sst_meta_cache_size;
317 self.vector_cache_size = mem_cache_size;
318 self.page_cache_size = page_cache_size;
319 self.selector_result_cache_size = mem_cache_size;
320
321 self.index.adjust_buffer_and_cache_size(sys_memory);
322 }
323
324 #[cfg(test)]
326 pub fn enable_write_cache(
327 mut self,
328 path: String,
329 size: ReadableSize,
330 ttl: Option<Duration>,
331 ) -> Self {
332 self.enable_write_cache = true;
333 self.write_cache_path = path;
334 self.write_cache_size = size;
335 self.write_cache_ttl = ttl;
336 self
337 }
338}
339
340#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
342#[serde(rename_all = "snake_case")]
343pub enum IndexBuildMode {
344 #[default]
346 Sync,
347 Async,
349}
350
351#[serde_as]
352#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
353#[serde(default)]
354pub struct IndexConfig {
355 pub aux_path: String,
365
366 pub staging_size: ReadableSize,
368 #[serde(with = "humantime_serde")]
372 pub staging_ttl: Option<Duration>,
373
374 pub build_mode: IndexBuildMode,
376
377 pub write_buffer_size: ReadableSize,
379
380 pub metadata_cache_size: ReadableSize,
382 pub content_cache_size: ReadableSize,
384 pub content_cache_page_size: ReadableSize,
386 pub result_cache_size: ReadableSize,
388}
389
390impl Default for IndexConfig {
391 fn default() -> Self {
392 Self {
393 aux_path: String::new(),
394 staging_size: ReadableSize::gb(2),
395 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
396 build_mode: IndexBuildMode::default(),
397 write_buffer_size: ReadableSize::mb(8),
398 metadata_cache_size: ReadableSize::mb(64),
399 content_cache_size: ReadableSize::mb(128),
400 content_cache_page_size: ReadableSize::kb(64),
401 result_cache_size: ReadableSize::mb(128),
402 }
403 }
404}
405
406impl IndexConfig {
407 pub fn sanitize(
408 &mut self,
409 data_home: &str,
410 inverted_index: &InvertedIndexConfig,
411 ) -> Result<()> {
412 #[allow(deprecated)]
413 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
414 self.aux_path.clone_from(&inverted_index.intermediate_path);
415 warn!(
416 "`inverted_index.intermediate_path` is deprecated, use
417 `index.aux_path` instead. Set `index.aux_path` to {}",
418 &inverted_index.intermediate_path
419 )
420 }
421 if self.aux_path.is_empty() {
422 let path = Path::new(data_home).join("index_intermediate");
423 self.aux_path = path.as_os_str().to_string_lossy().to_string();
424 }
425
426 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
427 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
428 warn!(
429 "Sanitize index write buffer size to {}",
430 self.write_buffer_size
431 );
432 }
433
434 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
435 self.staging_ttl = None;
436 }
437
438 Ok(())
439 }
440
441 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
442 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
443 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
444 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
445
446 let metadata_cache_size = cmp::min(
447 sys_memory / SST_META_CACHE_SIZE_FACTOR,
448 ReadableSize::mb(64),
449 );
450 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
451 }
452}
453
454#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
456#[serde(rename_all = "snake_case")]
457pub enum Mode {
458 #[default]
460 Auto,
461 Disable,
463}
464
465impl Mode {
466 pub fn disabled(&self) -> bool {
468 matches!(self, Mode::Disable)
469 }
470
471 pub fn auto(&self) -> bool {
473 matches!(self, Mode::Auto)
474 }
475}
476
477#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
479#[serde(rename_all = "snake_case")]
480pub enum MemoryThreshold {
481 #[default]
483 Auto,
484 Unlimited,
486 #[serde(untagged)]
488 Size(ReadableSize),
489}
490
491#[serde_as]
493#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
494#[serde(default)]
495pub struct InvertedIndexConfig {
496 pub create_on_flush: Mode,
498 pub create_on_compaction: Mode,
500 pub apply_on_query: Mode,
502
503 pub mem_threshold_on_create: MemoryThreshold,
505
506 #[deprecated = "use [IndexConfig::aux_path] instead"]
507 #[serde(skip_serializing)]
508 pub intermediate_path: String,
509
510 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
511 #[serde(skip_serializing)]
512 pub write_buffer_size: ReadableSize,
513}
514
515impl Default for InvertedIndexConfig {
516 #[allow(deprecated)]
517 fn default() -> Self {
518 Self {
519 create_on_flush: Mode::Auto,
520 create_on_compaction: Mode::Auto,
521 apply_on_query: Mode::Auto,
522 mem_threshold_on_create: MemoryThreshold::Auto,
523 write_buffer_size: ReadableSize::mb(8),
524 intermediate_path: String::new(),
525 }
526 }
527}
528
529impl InvertedIndexConfig {
530 pub fn mem_threshold_on_create(&self) -> Option<usize> {
531 match self.mem_threshold_on_create {
532 MemoryThreshold::Auto => {
533 if let Some(sys_memory) = get_total_memory_readable() {
534 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
535 } else {
536 Some(ReadableSize::mb(64).as_bytes() as usize)
537 }
538 }
539 MemoryThreshold::Unlimited => None,
540 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
541 }
542 }
543}
544
545#[serde_as]
547#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
548#[serde(default)]
549pub struct FulltextIndexConfig {
550 pub create_on_flush: Mode,
552 pub create_on_compaction: Mode,
554 pub apply_on_query: Mode,
556 pub mem_threshold_on_create: MemoryThreshold,
558 pub compress: bool,
560}
561
562impl Default for FulltextIndexConfig {
563 fn default() -> Self {
564 Self {
565 create_on_flush: Mode::Auto,
566 create_on_compaction: Mode::Auto,
567 apply_on_query: Mode::Auto,
568 mem_threshold_on_create: MemoryThreshold::Auto,
569 compress: true,
570 }
571 }
572}
573
574impl FulltextIndexConfig {
575 pub fn mem_threshold_on_create(&self) -> usize {
576 match self.mem_threshold_on_create {
577 MemoryThreshold::Auto => {
578 if let Some(sys_memory) = get_total_memory_readable() {
579 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
580 } else {
581 ReadableSize::mb(64).as_bytes() as _
582 }
583 }
584 MemoryThreshold::Unlimited => usize::MAX,
585 MemoryThreshold::Size(size) => size.as_bytes() as _,
586 }
587 }
588}
589
590#[serde_as]
592#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
593#[serde(default)]
594pub struct BloomFilterConfig {
595 pub create_on_flush: Mode,
597 pub create_on_compaction: Mode,
599 pub apply_on_query: Mode,
601 pub mem_threshold_on_create: MemoryThreshold,
603}
604
605impl Default for BloomFilterConfig {
606 fn default() -> Self {
607 Self {
608 create_on_flush: Mode::Auto,
609 create_on_compaction: Mode::Auto,
610 apply_on_query: Mode::Auto,
611 mem_threshold_on_create: MemoryThreshold::Auto,
612 }
613 }
614}
615
616impl BloomFilterConfig {
617 pub fn mem_threshold_on_create(&self) -> Option<usize> {
618 match self.mem_threshold_on_create {
619 MemoryThreshold::Auto => {
620 if let Some(sys_memory) = get_total_memory_readable() {
621 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
622 } else {
623 Some(ReadableSize::mb(64).as_bytes() as usize)
624 }
625 }
626 MemoryThreshold::Unlimited => None,
627 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
628 }
629 }
630}
631
632fn divide_num_cpus(divisor: usize) -> usize {
634 debug_assert!(divisor > 0);
635 let cores = get_total_cpu_cores();
636 debug_assert!(cores > 0);
637
638 cores.div_ceil(divisor)
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644
645 #[test]
646 fn test_deserialize_config() {
647 let s = r#"
648[memtable]
649type = "partition_tree"
650index_max_keys_per_shard = 8192
651data_freeze_threshold = 1024
652dedup = true
653fork_dictionary_bytes = "512MiB"
654"#;
655 let config: MitoConfig = toml::from_str(s).unwrap();
656 let MemtableConfig::PartitionTree(config) = &config.memtable else {
657 unreachable!()
658 };
659 assert_eq!(1024, config.data_freeze_threshold);
660 }
661}