1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_stat::{get_total_cpu_cores, get_total_memory_readable};
23use common_telemetry::warn;
24use serde::{Deserialize, Serialize};
25use serde_with::serde_as;
26
27use crate::error::Result;
28use crate::memtable::MemtableConfig;
29use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
30
31const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
32pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
34pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
36
37const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
39const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
41const MEM_CACHE_SIZE_FACTOR: u64 = 16;
43const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
45const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
47
48pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
50
51#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
54#[serde(default)]
55pub struct MitoConfig {
56 pub num_workers: usize,
60 pub worker_channel_size: usize,
62 pub worker_request_batch_size: usize,
64
65 pub manifest_checkpoint_distance: u64,
69 pub experimental_manifest_keep_removed_file_count: usize,
74 #[serde(with = "humantime_serde")]
79 pub experimental_manifest_keep_removed_file_ttl: Duration,
80 pub compress_manifest: bool,
82
83 pub max_background_index_builds: usize,
86 pub max_background_flushes: usize,
88 pub max_background_compactions: usize,
90 pub max_background_purges: usize,
92
93 #[serde(with = "humantime_serde")]
96 pub auto_flush_interval: Duration,
97 pub global_write_buffer_size: ReadableSize,
99 pub global_write_buffer_reject_size: ReadableSize,
101
102 pub sst_meta_cache_size: ReadableSize,
105 pub vector_cache_size: ReadableSize,
107 pub page_cache_size: ReadableSize,
109 pub selector_result_cache_size: ReadableSize,
111 pub enable_write_cache: bool,
113 pub write_cache_path: String,
115 pub write_cache_size: ReadableSize,
117 #[serde(with = "humantime_serde")]
119 pub write_cache_ttl: Option<Duration>,
120
121 pub sst_write_buffer_size: ReadableSize,
124 pub parallel_scan_channel_size: usize,
126 pub max_concurrent_scan_files: usize,
128 pub allow_stale_entries: bool,
130
131 pub index: IndexConfig,
133 pub inverted_index: InvertedIndexConfig,
135 pub fulltext_index: FulltextIndexConfig,
137 pub bloom_filter_index: BloomFilterConfig,
139
140 pub memtable: MemtableConfig,
142
143 #[serde(with = "humantime_serde")]
146 pub min_compaction_interval: Duration,
147
148 pub default_experimental_flat_format: bool,
151}
152
153impl Default for MitoConfig {
154 fn default() -> Self {
155 let mut mito_config = MitoConfig {
156 num_workers: divide_num_cpus(2),
157 worker_channel_size: 128,
158 worker_request_batch_size: 64,
159 manifest_checkpoint_distance: 10,
160 experimental_manifest_keep_removed_file_count: 256,
161 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
162 compress_manifest: false,
163 max_background_index_builds: divide_num_cpus(8),
164 max_background_flushes: divide_num_cpus(2),
165 max_background_compactions: divide_num_cpus(4),
166 max_background_purges: get_total_cpu_cores(),
167 auto_flush_interval: Duration::from_secs(30 * 60),
168 global_write_buffer_size: ReadableSize::gb(1),
169 global_write_buffer_reject_size: ReadableSize::gb(2),
170 sst_meta_cache_size: ReadableSize::mb(128),
171 vector_cache_size: ReadableSize::mb(512),
172 page_cache_size: ReadableSize::mb(512),
173 selector_result_cache_size: ReadableSize::mb(512),
174 enable_write_cache: false,
175 write_cache_path: String::new(),
176 write_cache_size: ReadableSize::gb(5),
177 write_cache_ttl: None,
178 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
179 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
180 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
181 allow_stale_entries: false,
182 index: IndexConfig::default(),
183 inverted_index: InvertedIndexConfig::default(),
184 fulltext_index: FulltextIndexConfig::default(),
185 bloom_filter_index: BloomFilterConfig::default(),
186 memtable: MemtableConfig::default(),
187 min_compaction_interval: Duration::from_secs(0),
188 default_experimental_flat_format: false,
189 };
190
191 if let Some(sys_memory) = get_total_memory_readable() {
193 mito_config.adjust_buffer_and_cache_size(sys_memory);
194 }
195
196 mito_config
197 }
198}
199
200impl MitoConfig {
201 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
205 if self.num_workers == 0 {
207 self.num_workers = divide_num_cpus(2);
208 }
209
210 if self.worker_channel_size == 0 {
212 warn!("Sanitize channel size 0 to 1");
213 self.worker_channel_size = 1;
214 }
215
216 if self.max_background_flushes == 0 {
217 warn!(
218 "Sanitize max background flushes 0 to {}",
219 divide_num_cpus(2)
220 );
221 self.max_background_flushes = divide_num_cpus(2);
222 }
223 if self.max_background_compactions == 0 {
224 warn!(
225 "Sanitize max background compactions 0 to {}",
226 divide_num_cpus(4)
227 );
228 self.max_background_compactions = divide_num_cpus(4);
229 }
230 if self.max_background_purges == 0 {
231 let cpu_cores = get_total_cpu_cores();
232 warn!("Sanitize max background purges 0 to {}", cpu_cores);
233 self.max_background_purges = cpu_cores;
234 }
235
236 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
237 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
238 warn!(
239 "Sanitize global write buffer reject size to {}",
240 self.global_write_buffer_reject_size
241 );
242 }
243
244 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
245 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
246 warn!(
247 "Sanitize sst write buffer size to {}",
248 self.sst_write_buffer_size
249 );
250 }
251
252 if self.parallel_scan_channel_size < 1 {
253 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
254 warn!(
255 "Sanitize scan channel size to {}",
256 self.parallel_scan_channel_size
257 );
258 }
259
260 if self.write_cache_path.trim().is_empty() {
262 self.write_cache_path = data_home.to_string();
263 }
264
265 self.index.sanitize(data_home, &self.inverted_index)?;
266
267 Ok(())
268 }
269
270 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
271 let global_write_buffer_size = cmp::min(
273 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
274 ReadableSize::gb(1),
275 );
276 let global_write_buffer_reject_size = global_write_buffer_size * 2;
278 let sst_meta_cache_size = cmp::min(
280 sys_memory / SST_META_CACHE_SIZE_FACTOR,
281 ReadableSize::mb(128),
282 );
283 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
285 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
286
287 self.global_write_buffer_size = global_write_buffer_size;
288 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
289 self.sst_meta_cache_size = sst_meta_cache_size;
290 self.vector_cache_size = mem_cache_size;
291 self.page_cache_size = page_cache_size;
292 self.selector_result_cache_size = mem_cache_size;
293
294 self.index.adjust_buffer_and_cache_size(sys_memory);
295 }
296
297 #[cfg(test)]
299 pub fn enable_write_cache(
300 mut self,
301 path: String,
302 size: ReadableSize,
303 ttl: Option<Duration>,
304 ) -> Self {
305 self.enable_write_cache = true;
306 self.write_cache_path = path;
307 self.write_cache_size = size;
308 self.write_cache_ttl = ttl;
309 self
310 }
311}
312
313#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
315#[serde(rename_all = "snake_case")]
316pub enum IndexBuildMode {
317 #[default]
319 Sync,
320 Async,
322}
323
324#[serde_as]
325#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
326#[serde(default)]
327pub struct IndexConfig {
328 pub aux_path: String,
338
339 pub staging_size: ReadableSize,
341 #[serde(with = "humantime_serde")]
345 pub staging_ttl: Option<Duration>,
346
347 pub build_mode: IndexBuildMode,
349
350 pub write_buffer_size: ReadableSize,
352
353 pub metadata_cache_size: ReadableSize,
355 pub content_cache_size: ReadableSize,
357 pub content_cache_page_size: ReadableSize,
359 pub result_cache_size: ReadableSize,
361}
362
363impl Default for IndexConfig {
364 fn default() -> Self {
365 Self {
366 aux_path: String::new(),
367 staging_size: ReadableSize::gb(2),
368 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
369 build_mode: IndexBuildMode::default(),
370 write_buffer_size: ReadableSize::mb(8),
371 metadata_cache_size: ReadableSize::mb(64),
372 content_cache_size: ReadableSize::mb(128),
373 content_cache_page_size: ReadableSize::kb(64),
374 result_cache_size: ReadableSize::mb(128),
375 }
376 }
377}
378
379impl IndexConfig {
380 pub fn sanitize(
381 &mut self,
382 data_home: &str,
383 inverted_index: &InvertedIndexConfig,
384 ) -> Result<()> {
385 #[allow(deprecated)]
386 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
387 self.aux_path.clone_from(&inverted_index.intermediate_path);
388 warn!(
389 "`inverted_index.intermediate_path` is deprecated, use
390 `index.aux_path` instead. Set `index.aux_path` to {}",
391 &inverted_index.intermediate_path
392 )
393 }
394 if self.aux_path.is_empty() {
395 let path = Path::new(data_home).join("index_intermediate");
396 self.aux_path = path.as_os_str().to_string_lossy().to_string();
397 }
398
399 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
400 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
401 warn!(
402 "Sanitize index write buffer size to {}",
403 self.write_buffer_size
404 );
405 }
406
407 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
408 self.staging_ttl = None;
409 }
410
411 Ok(())
412 }
413
414 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
415 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
416 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
417 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
418
419 let metadata_cache_size = cmp::min(
420 sys_memory / SST_META_CACHE_SIZE_FACTOR,
421 ReadableSize::mb(64),
422 );
423 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
424 }
425}
426
427#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
429#[serde(rename_all = "snake_case")]
430pub enum Mode {
431 #[default]
433 Auto,
434 Disable,
436}
437
438impl Mode {
439 pub fn disabled(&self) -> bool {
441 matches!(self, Mode::Disable)
442 }
443
444 pub fn auto(&self) -> bool {
446 matches!(self, Mode::Auto)
447 }
448}
449
450#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
452#[serde(rename_all = "snake_case")]
453pub enum MemoryThreshold {
454 #[default]
456 Auto,
457 Unlimited,
459 #[serde(untagged)]
461 Size(ReadableSize),
462}
463
464#[serde_as]
466#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
467#[serde(default)]
468pub struct InvertedIndexConfig {
469 pub create_on_flush: Mode,
471 pub create_on_compaction: Mode,
473 pub apply_on_query: Mode,
475
476 pub mem_threshold_on_create: MemoryThreshold,
478
479 #[deprecated = "use [IndexConfig::aux_path] instead"]
480 #[serde(skip_serializing)]
481 pub intermediate_path: String,
482
483 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
484 #[serde(skip_serializing)]
485 pub write_buffer_size: ReadableSize,
486}
487
488impl Default for InvertedIndexConfig {
489 #[allow(deprecated)]
490 fn default() -> Self {
491 Self {
492 create_on_flush: Mode::Auto,
493 create_on_compaction: Mode::Auto,
494 apply_on_query: Mode::Auto,
495 mem_threshold_on_create: MemoryThreshold::Auto,
496 write_buffer_size: ReadableSize::mb(8),
497 intermediate_path: String::new(),
498 }
499 }
500}
501
502impl InvertedIndexConfig {
503 pub fn mem_threshold_on_create(&self) -> Option<usize> {
504 match self.mem_threshold_on_create {
505 MemoryThreshold::Auto => {
506 if let Some(sys_memory) = get_total_memory_readable() {
507 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
508 } else {
509 Some(ReadableSize::mb(64).as_bytes() as usize)
510 }
511 }
512 MemoryThreshold::Unlimited => None,
513 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
514 }
515 }
516}
517
518#[serde_as]
520#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
521#[serde(default)]
522pub struct FulltextIndexConfig {
523 pub create_on_flush: Mode,
525 pub create_on_compaction: Mode,
527 pub apply_on_query: Mode,
529 pub mem_threshold_on_create: MemoryThreshold,
531 pub compress: bool,
533}
534
535impl Default for FulltextIndexConfig {
536 fn default() -> Self {
537 Self {
538 create_on_flush: Mode::Auto,
539 create_on_compaction: Mode::Auto,
540 apply_on_query: Mode::Auto,
541 mem_threshold_on_create: MemoryThreshold::Auto,
542 compress: true,
543 }
544 }
545}
546
547impl FulltextIndexConfig {
548 pub fn mem_threshold_on_create(&self) -> usize {
549 match self.mem_threshold_on_create {
550 MemoryThreshold::Auto => {
551 if let Some(sys_memory) = get_total_memory_readable() {
552 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
553 } else {
554 ReadableSize::mb(64).as_bytes() as _
555 }
556 }
557 MemoryThreshold::Unlimited => usize::MAX,
558 MemoryThreshold::Size(size) => size.as_bytes() as _,
559 }
560 }
561}
562
563#[serde_as]
565#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
566#[serde(default)]
567pub struct BloomFilterConfig {
568 pub create_on_flush: Mode,
570 pub create_on_compaction: Mode,
572 pub apply_on_query: Mode,
574 pub mem_threshold_on_create: MemoryThreshold,
576}
577
578impl Default for BloomFilterConfig {
579 fn default() -> Self {
580 Self {
581 create_on_flush: Mode::Auto,
582 create_on_compaction: Mode::Auto,
583 apply_on_query: Mode::Auto,
584 mem_threshold_on_create: MemoryThreshold::Auto,
585 }
586 }
587}
588
589impl BloomFilterConfig {
590 pub fn mem_threshold_on_create(&self) -> Option<usize> {
591 match self.mem_threshold_on_create {
592 MemoryThreshold::Auto => {
593 if let Some(sys_memory) = get_total_memory_readable() {
594 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
595 } else {
596 Some(ReadableSize::mb(64).as_bytes() as usize)
597 }
598 }
599 MemoryThreshold::Unlimited => None,
600 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
601 }
602 }
603}
604
605fn divide_num_cpus(divisor: usize) -> usize {
607 debug_assert!(divisor > 0);
608 let cores = get_total_cpu_cores();
609 debug_assert!(cores > 0);
610
611 cores.div_ceil(divisor)
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617
618 #[test]
619 fn test_deserialize_config() {
620 let s = r#"
621[memtable]
622type = "partition_tree"
623index_max_keys_per_shard = 8192
624data_freeze_threshold = 1024
625dedup = true
626fork_dictionary_bytes = "512MiB"
627"#;
628 let config: MitoConfig = toml::from_str(s).unwrap();
629 let MemtableConfig::PartitionTree(config) = &config.memtable else {
630 unreachable!()
631 };
632 assert_eq!(1024, config.data_freeze_threshold);
633 }
634}