1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
35
36const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
38const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
40const MEM_CACHE_SIZE_FACTOR: u64 = 16;
42const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
44const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
46
47pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
49
50#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
53#[serde(default)]
54pub struct MitoConfig {
55 pub num_workers: usize,
59 pub worker_channel_size: usize,
61 pub worker_request_batch_size: usize,
63
64 pub manifest_checkpoint_distance: u64,
68 pub experimental_manifest_keep_removed_file_count: usize,
73 #[serde(with = "humantime_serde")]
78 pub experimental_manifest_keep_removed_file_ttl: Duration,
79 pub compress_manifest: bool,
81
82 pub max_background_index_builds: usize,
85 pub max_background_flushes: usize,
87 pub max_background_compactions: usize,
89 pub max_background_purges: usize,
91
92 #[serde(with = "humantime_serde")]
95 pub auto_flush_interval: Duration,
96 pub global_write_buffer_size: ReadableSize,
98 pub global_write_buffer_reject_size: ReadableSize,
100
101 pub sst_meta_cache_size: ReadableSize,
104 pub vector_cache_size: ReadableSize,
106 pub page_cache_size: ReadableSize,
108 pub selector_result_cache_size: ReadableSize,
110 pub enable_write_cache: bool,
112 pub write_cache_path: String,
114 pub write_cache_size: ReadableSize,
116 #[serde(with = "humantime_serde")]
118 pub write_cache_ttl: Option<Duration>,
119
120 pub sst_write_buffer_size: ReadableSize,
123 pub parallel_scan_channel_size: usize,
125 pub max_concurrent_scan_files: usize,
127 pub allow_stale_entries: bool,
129
130 pub index: IndexConfig,
132 pub inverted_index: InvertedIndexConfig,
134 pub fulltext_index: FulltextIndexConfig,
136 pub bloom_filter_index: BloomFilterConfig,
138
139 pub memtable: MemtableConfig,
141
142 #[serde(with = "humantime_serde")]
145 pub min_compaction_interval: Duration,
146
147 pub default_experimental_flat_format: bool,
150}
151
152impl Default for MitoConfig {
153 fn default() -> Self {
154 let mut mito_config = MitoConfig {
155 num_workers: divide_num_cpus(2),
156 worker_channel_size: 128,
157 worker_request_batch_size: 64,
158 manifest_checkpoint_distance: 10,
159 experimental_manifest_keep_removed_file_count: 256,
160 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
161 compress_manifest: false,
162 max_background_index_builds: divide_num_cpus(8),
163 max_background_flushes: divide_num_cpus(2),
164 max_background_compactions: divide_num_cpus(4),
165 max_background_purges: common_config::utils::get_cpus(),
166 auto_flush_interval: Duration::from_secs(30 * 60),
167 global_write_buffer_size: ReadableSize::gb(1),
168 global_write_buffer_reject_size: ReadableSize::gb(2),
169 sst_meta_cache_size: ReadableSize::mb(128),
170 vector_cache_size: ReadableSize::mb(512),
171 page_cache_size: ReadableSize::mb(512),
172 selector_result_cache_size: ReadableSize::mb(512),
173 enable_write_cache: false,
174 write_cache_path: String::new(),
175 write_cache_size: ReadableSize::gb(5),
176 write_cache_ttl: None,
177 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
178 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
179 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
180 allow_stale_entries: false,
181 index: IndexConfig::default(),
182 inverted_index: InvertedIndexConfig::default(),
183 fulltext_index: FulltextIndexConfig::default(),
184 bloom_filter_index: BloomFilterConfig::default(),
185 memtable: MemtableConfig::default(),
186 min_compaction_interval: Duration::from_secs(0),
187 default_experimental_flat_format: false,
188 };
189
190 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
192 mito_config.adjust_buffer_and_cache_size(sys_memory);
193 }
194
195 mito_config
196 }
197}
198
199impl MitoConfig {
200 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
204 if self.num_workers == 0 {
206 self.num_workers = divide_num_cpus(2);
207 }
208
209 if self.worker_channel_size == 0 {
211 warn!("Sanitize channel size 0 to 1");
212 self.worker_channel_size = 1;
213 }
214
215 if self.max_background_flushes == 0 {
216 warn!(
217 "Sanitize max background flushes 0 to {}",
218 divide_num_cpus(2)
219 );
220 self.max_background_flushes = divide_num_cpus(2);
221 }
222 if self.max_background_compactions == 0 {
223 warn!(
224 "Sanitize max background compactions 0 to {}",
225 divide_num_cpus(4)
226 );
227 self.max_background_compactions = divide_num_cpus(4);
228 }
229 if self.max_background_purges == 0 {
230 warn!(
231 "Sanitize max background purges 0 to {}",
232 common_config::utils::get_cpus()
233 );
234 self.max_background_purges = common_config::utils::get_cpus();
235 }
236
237 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
238 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
239 warn!(
240 "Sanitize global write buffer reject size to {}",
241 self.global_write_buffer_reject_size
242 );
243 }
244
245 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
246 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
247 warn!(
248 "Sanitize sst write buffer size to {}",
249 self.sst_write_buffer_size
250 );
251 }
252
253 if self.parallel_scan_channel_size < 1 {
254 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
255 warn!(
256 "Sanitize scan channel size to {}",
257 self.parallel_scan_channel_size
258 );
259 }
260
261 if self.write_cache_path.trim().is_empty() {
263 self.write_cache_path = data_home.to_string();
264 }
265
266 self.index.sanitize(data_home, &self.inverted_index)?;
267
268 Ok(())
269 }
270
271 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
272 let global_write_buffer_size = cmp::min(
274 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
275 ReadableSize::gb(1),
276 );
277 let global_write_buffer_reject_size = global_write_buffer_size * 2;
279 let sst_meta_cache_size = cmp::min(
281 sys_memory / SST_META_CACHE_SIZE_FACTOR,
282 ReadableSize::mb(128),
283 );
284 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
286 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
287
288 self.global_write_buffer_size = global_write_buffer_size;
289 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
290 self.sst_meta_cache_size = sst_meta_cache_size;
291 self.vector_cache_size = mem_cache_size;
292 self.page_cache_size = page_cache_size;
293 self.selector_result_cache_size = mem_cache_size;
294
295 self.index.adjust_buffer_and_cache_size(sys_memory);
296 }
297
298 #[cfg(test)]
300 pub fn enable_write_cache(
301 mut self,
302 path: String,
303 size: ReadableSize,
304 ttl: Option<Duration>,
305 ) -> Self {
306 self.enable_write_cache = true;
307 self.write_cache_path = path;
308 self.write_cache_size = size;
309 self.write_cache_ttl = ttl;
310 self
311 }
312}
313
314#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
316#[serde(rename_all = "snake_case")]
317pub enum IndexBuildMode {
318 #[default]
320 Sync,
321 Async,
323}
324
325#[serde_as]
326#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
327#[serde(default)]
328pub struct IndexConfig {
329 pub aux_path: String,
339
340 pub staging_size: ReadableSize,
342 #[serde(with = "humantime_serde")]
346 pub staging_ttl: Option<Duration>,
347
348 pub build_mode: IndexBuildMode,
350
351 pub write_buffer_size: ReadableSize,
353
354 pub metadata_cache_size: ReadableSize,
356 pub content_cache_size: ReadableSize,
358 pub content_cache_page_size: ReadableSize,
360 pub result_cache_size: ReadableSize,
362}
363
364impl Default for IndexConfig {
365 fn default() -> Self {
366 Self {
367 aux_path: String::new(),
368 staging_size: ReadableSize::gb(2),
369 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
370 build_mode: IndexBuildMode::default(),
371 write_buffer_size: ReadableSize::mb(8),
372 metadata_cache_size: ReadableSize::mb(64),
373 content_cache_size: ReadableSize::mb(128),
374 content_cache_page_size: ReadableSize::kb(64),
375 result_cache_size: ReadableSize::mb(128),
376 }
377 }
378}
379
380impl IndexConfig {
381 pub fn sanitize(
382 &mut self,
383 data_home: &str,
384 inverted_index: &InvertedIndexConfig,
385 ) -> Result<()> {
386 #[allow(deprecated)]
387 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
388 self.aux_path.clone_from(&inverted_index.intermediate_path);
389 warn!(
390 "`inverted_index.intermediate_path` is deprecated, use
391 `index.aux_path` instead. Set `index.aux_path` to {}",
392 &inverted_index.intermediate_path
393 )
394 }
395 if self.aux_path.is_empty() {
396 let path = Path::new(data_home).join("index_intermediate");
397 self.aux_path = path.as_os_str().to_string_lossy().to_string();
398 }
399
400 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
401 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
402 warn!(
403 "Sanitize index write buffer size to {}",
404 self.write_buffer_size
405 );
406 }
407
408 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
409 self.staging_ttl = None;
410 }
411
412 Ok(())
413 }
414
415 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
416 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
417 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
418 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
419
420 let metadata_cache_size = cmp::min(
421 sys_memory / SST_META_CACHE_SIZE_FACTOR,
422 ReadableSize::mb(64),
423 );
424 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
425 }
426}
427
428#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
430#[serde(rename_all = "snake_case")]
431pub enum Mode {
432 #[default]
434 Auto,
435 Disable,
437}
438
439impl Mode {
440 pub fn disabled(&self) -> bool {
442 matches!(self, Mode::Disable)
443 }
444
445 pub fn auto(&self) -> bool {
447 matches!(self, Mode::Auto)
448 }
449}
450
451#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
453#[serde(rename_all = "snake_case")]
454pub enum MemoryThreshold {
455 #[default]
457 Auto,
458 Unlimited,
460 #[serde(untagged)]
462 Size(ReadableSize),
463}
464
465#[serde_as]
467#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
468#[serde(default)]
469pub struct InvertedIndexConfig {
470 pub create_on_flush: Mode,
472 pub create_on_compaction: Mode,
474 pub apply_on_query: Mode,
476
477 pub mem_threshold_on_create: MemoryThreshold,
479
480 #[deprecated = "use [IndexConfig::aux_path] instead"]
481 #[serde(skip_serializing)]
482 pub intermediate_path: String,
483
484 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
485 #[serde(skip_serializing)]
486 pub write_buffer_size: ReadableSize,
487}
488
489impl Default for InvertedIndexConfig {
490 #[allow(deprecated)]
491 fn default() -> Self {
492 Self {
493 create_on_flush: Mode::Auto,
494 create_on_compaction: Mode::Auto,
495 apply_on_query: Mode::Auto,
496 mem_threshold_on_create: MemoryThreshold::Auto,
497 write_buffer_size: ReadableSize::mb(8),
498 intermediate_path: String::new(),
499 }
500 }
501}
502
503impl InvertedIndexConfig {
504 pub fn mem_threshold_on_create(&self) -> Option<usize> {
505 match self.mem_threshold_on_create {
506 MemoryThreshold::Auto => {
507 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
508 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
509 } else {
510 Some(ReadableSize::mb(64).as_bytes() as usize)
511 }
512 }
513 MemoryThreshold::Unlimited => None,
514 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
515 }
516 }
517}
518
519#[serde_as]
521#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
522#[serde(default)]
523pub struct FulltextIndexConfig {
524 pub create_on_flush: Mode,
526 pub create_on_compaction: Mode,
528 pub apply_on_query: Mode,
530 pub mem_threshold_on_create: MemoryThreshold,
532 pub compress: bool,
534}
535
536impl Default for FulltextIndexConfig {
537 fn default() -> Self {
538 Self {
539 create_on_flush: Mode::Auto,
540 create_on_compaction: Mode::Auto,
541 apply_on_query: Mode::Auto,
542 mem_threshold_on_create: MemoryThreshold::Auto,
543 compress: true,
544 }
545 }
546}
547
548impl FulltextIndexConfig {
549 pub fn mem_threshold_on_create(&self) -> usize {
550 match self.mem_threshold_on_create {
551 MemoryThreshold::Auto => {
552 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
553 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
554 } else {
555 ReadableSize::mb(64).as_bytes() as _
556 }
557 }
558 MemoryThreshold::Unlimited => usize::MAX,
559 MemoryThreshold::Size(size) => size.as_bytes() as _,
560 }
561 }
562}
563
564#[serde_as]
566#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
567#[serde(default)]
568pub struct BloomFilterConfig {
569 pub create_on_flush: Mode,
571 pub create_on_compaction: Mode,
573 pub apply_on_query: Mode,
575 pub mem_threshold_on_create: MemoryThreshold,
577}
578
579impl Default for BloomFilterConfig {
580 fn default() -> Self {
581 Self {
582 create_on_flush: Mode::Auto,
583 create_on_compaction: Mode::Auto,
584 apply_on_query: Mode::Auto,
585 mem_threshold_on_create: MemoryThreshold::Auto,
586 }
587 }
588}
589
590impl BloomFilterConfig {
591 pub fn mem_threshold_on_create(&self) -> Option<usize> {
592 match self.mem_threshold_on_create {
593 MemoryThreshold::Auto => {
594 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
595 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
596 } else {
597 Some(ReadableSize::mb(64).as_bytes() as usize)
598 }
599 }
600 MemoryThreshold::Unlimited => None,
601 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
602 }
603 }
604}
605
606fn divide_num_cpus(divisor: usize) -> usize {
608 debug_assert!(divisor > 0);
609 let cores = common_config::utils::get_cpus();
610 debug_assert!(cores > 0);
611
612 cores.div_ceil(divisor)
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618
619 #[test]
620 fn test_deserialize_config() {
621 let s = r#"
622[memtable]
623type = "partition_tree"
624index_max_keys_per_shard = 8192
625data_freeze_threshold = 1024
626dedup = true
627fork_dictionary_bytes = "512MiB"
628"#;
629 let config: MitoConfig = toml::from_str(s).unwrap();
630 let MemtableConfig::PartitionTree(config) = &config.memtable else {
631 unreachable!()
632 };
633 assert_eq!(1024, config.data_freeze_threshold);
634 }
635}