1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
35
36const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
38const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
40const MEM_CACHE_SIZE_FACTOR: u64 = 16;
42const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
44const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
46
47pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
49
50#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
53#[serde(default)]
54pub struct MitoConfig {
55 pub num_workers: usize,
59 pub worker_channel_size: usize,
61 pub worker_request_batch_size: usize,
63
64 pub manifest_checkpoint_distance: u64,
68 pub experimental_manifest_keep_removed_file_count: usize,
73 #[serde(with = "humantime_serde")]
78 pub experimental_manifest_keep_removed_file_ttl: Duration,
79 pub compress_manifest: bool,
81
82 pub max_background_flushes: usize,
85 pub max_background_compactions: usize,
87 pub max_background_purges: usize,
89
90 #[serde(with = "humantime_serde")]
93 pub auto_flush_interval: Duration,
94 pub global_write_buffer_size: ReadableSize,
96 pub global_write_buffer_reject_size: ReadableSize,
98
99 pub sst_meta_cache_size: ReadableSize,
102 pub vector_cache_size: ReadableSize,
104 pub page_cache_size: ReadableSize,
106 pub selector_result_cache_size: ReadableSize,
108 pub enable_write_cache: bool,
110 pub write_cache_path: String,
112 pub write_cache_size: ReadableSize,
114 #[serde(with = "humantime_serde")]
116 pub write_cache_ttl: Option<Duration>,
117
118 pub sst_write_buffer_size: ReadableSize,
121 pub parallel_scan_channel_size: usize,
123 pub max_concurrent_scan_files: usize,
125 pub allow_stale_entries: bool,
127
128 pub index: IndexConfig,
130 pub inverted_index: InvertedIndexConfig,
132 pub fulltext_index: FulltextIndexConfig,
134 pub bloom_filter_index: BloomFilterConfig,
136
137 pub memtable: MemtableConfig,
139
140 #[serde(with = "humantime_serde")]
143 pub min_compaction_interval: Duration,
144}
145
146impl Default for MitoConfig {
147 fn default() -> Self {
148 let mut mito_config = MitoConfig {
149 num_workers: divide_num_cpus(2),
150 worker_channel_size: 128,
151 worker_request_batch_size: 64,
152 manifest_checkpoint_distance: 10,
153 experimental_manifest_keep_removed_file_count: 256,
154 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
155 compress_manifest: false,
156 max_background_flushes: divide_num_cpus(2),
157 max_background_compactions: divide_num_cpus(4),
158 max_background_purges: common_config::utils::get_cpus(),
159 auto_flush_interval: Duration::from_secs(30 * 60),
160 global_write_buffer_size: ReadableSize::gb(1),
161 global_write_buffer_reject_size: ReadableSize::gb(2),
162 sst_meta_cache_size: ReadableSize::mb(128),
163 vector_cache_size: ReadableSize::mb(512),
164 page_cache_size: ReadableSize::mb(512),
165 selector_result_cache_size: ReadableSize::mb(512),
166 enable_write_cache: false,
167 write_cache_path: String::new(),
168 write_cache_size: ReadableSize::gb(5),
169 write_cache_ttl: None,
170 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
171 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
172 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
173 allow_stale_entries: false,
174 index: IndexConfig::default(),
175 inverted_index: InvertedIndexConfig::default(),
176 fulltext_index: FulltextIndexConfig::default(),
177 bloom_filter_index: BloomFilterConfig::default(),
178 memtable: MemtableConfig::default(),
179 min_compaction_interval: Duration::from_secs(0),
180 };
181
182 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
184 mito_config.adjust_buffer_and_cache_size(sys_memory);
185 }
186
187 mito_config
188 }
189}
190
191impl MitoConfig {
192 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
196 if self.num_workers == 0 {
198 self.num_workers = divide_num_cpus(2);
199 }
200
201 if self.worker_channel_size == 0 {
203 warn!("Sanitize channel size 0 to 1");
204 self.worker_channel_size = 1;
205 }
206
207 if self.max_background_flushes == 0 {
208 warn!(
209 "Sanitize max background flushes 0 to {}",
210 divide_num_cpus(2)
211 );
212 self.max_background_flushes = divide_num_cpus(2);
213 }
214 if self.max_background_compactions == 0 {
215 warn!(
216 "Sanitize max background compactions 0 to {}",
217 divide_num_cpus(4)
218 );
219 self.max_background_compactions = divide_num_cpus(4);
220 }
221 if self.max_background_purges == 0 {
222 warn!(
223 "Sanitize max background purges 0 to {}",
224 common_config::utils::get_cpus()
225 );
226 self.max_background_purges = common_config::utils::get_cpus();
227 }
228
229 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
230 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
231 warn!(
232 "Sanitize global write buffer reject size to {}",
233 self.global_write_buffer_reject_size
234 );
235 }
236
237 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
238 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
239 warn!(
240 "Sanitize sst write buffer size to {}",
241 self.sst_write_buffer_size
242 );
243 }
244
245 if self.parallel_scan_channel_size < 1 {
246 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
247 warn!(
248 "Sanitize scan channel size to {}",
249 self.parallel_scan_channel_size
250 );
251 }
252
253 if self.write_cache_path.trim().is_empty() {
255 self.write_cache_path = data_home.to_string();
256 }
257
258 self.index.sanitize(data_home, &self.inverted_index)?;
259
260 Ok(())
261 }
262
263 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
264 let global_write_buffer_size = cmp::min(
266 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
267 ReadableSize::gb(1),
268 );
269 let global_write_buffer_reject_size = global_write_buffer_size * 2;
271 let sst_meta_cache_size = cmp::min(
273 sys_memory / SST_META_CACHE_SIZE_FACTOR,
274 ReadableSize::mb(128),
275 );
276 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
278 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
279
280 self.global_write_buffer_size = global_write_buffer_size;
281 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
282 self.sst_meta_cache_size = sst_meta_cache_size;
283 self.vector_cache_size = mem_cache_size;
284 self.page_cache_size = page_cache_size;
285 self.selector_result_cache_size = mem_cache_size;
286
287 self.index.adjust_buffer_and_cache_size(sys_memory);
288 }
289
290 #[cfg(test)]
292 pub fn enable_write_cache(
293 mut self,
294 path: String,
295 size: ReadableSize,
296 ttl: Option<Duration>,
297 ) -> Self {
298 self.enable_write_cache = true;
299 self.write_cache_path = path;
300 self.write_cache_size = size;
301 self.write_cache_ttl = ttl;
302 self
303 }
304}
305
306#[serde_as]
307#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
308#[serde(default)]
309pub struct IndexConfig {
310 pub aux_path: String,
320
321 pub staging_size: ReadableSize,
323 #[serde(with = "humantime_serde")]
327 pub staging_ttl: Option<Duration>,
328
329 pub write_buffer_size: ReadableSize,
331
332 pub metadata_cache_size: ReadableSize,
334 pub content_cache_size: ReadableSize,
336 pub content_cache_page_size: ReadableSize,
338 pub result_cache_size: ReadableSize,
340}
341
342impl Default for IndexConfig {
343 fn default() -> Self {
344 Self {
345 aux_path: String::new(),
346 staging_size: ReadableSize::gb(2),
347 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
348 write_buffer_size: ReadableSize::mb(8),
349 metadata_cache_size: ReadableSize::mb(64),
350 content_cache_size: ReadableSize::mb(128),
351 content_cache_page_size: ReadableSize::kb(64),
352 result_cache_size: ReadableSize::mb(128),
353 }
354 }
355}
356
357impl IndexConfig {
358 pub fn sanitize(
359 &mut self,
360 data_home: &str,
361 inverted_index: &InvertedIndexConfig,
362 ) -> Result<()> {
363 #[allow(deprecated)]
364 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
365 self.aux_path.clone_from(&inverted_index.intermediate_path);
366 warn!(
367 "`inverted_index.intermediate_path` is deprecated, use
368 `index.aux_path` instead. Set `index.aux_path` to {}",
369 &inverted_index.intermediate_path
370 )
371 }
372 if self.aux_path.is_empty() {
373 let path = Path::new(data_home).join("index_intermediate");
374 self.aux_path = path.as_os_str().to_string_lossy().to_string();
375 }
376
377 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
378 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
379 warn!(
380 "Sanitize index write buffer size to {}",
381 self.write_buffer_size
382 );
383 }
384
385 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
386 self.staging_ttl = None;
387 }
388
389 Ok(())
390 }
391
392 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
393 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
394 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
395 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
396
397 let metadata_cache_size = cmp::min(
398 sys_memory / SST_META_CACHE_SIZE_FACTOR,
399 ReadableSize::mb(64),
400 );
401 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
402 }
403}
404
405#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
407#[serde(rename_all = "snake_case")]
408pub enum Mode {
409 #[default]
411 Auto,
412 Disable,
414}
415
416impl Mode {
417 pub fn disabled(&self) -> bool {
419 matches!(self, Mode::Disable)
420 }
421
422 pub fn auto(&self) -> bool {
424 matches!(self, Mode::Auto)
425 }
426}
427
428#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
430#[serde(rename_all = "snake_case")]
431pub enum MemoryThreshold {
432 #[default]
434 Auto,
435 Unlimited,
437 #[serde(untagged)]
439 Size(ReadableSize),
440}
441
442#[serde_as]
444#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
445#[serde(default)]
446pub struct InvertedIndexConfig {
447 pub create_on_flush: Mode,
449 pub create_on_compaction: Mode,
451 pub apply_on_query: Mode,
453
454 pub mem_threshold_on_create: MemoryThreshold,
456
457 #[deprecated = "use [IndexConfig::aux_path] instead"]
458 #[serde(skip_serializing)]
459 pub intermediate_path: String,
460
461 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
462 #[serde(skip_serializing)]
463 pub write_buffer_size: ReadableSize,
464}
465
466impl Default for InvertedIndexConfig {
467 #[allow(deprecated)]
468 fn default() -> Self {
469 Self {
470 create_on_flush: Mode::Auto,
471 create_on_compaction: Mode::Auto,
472 apply_on_query: Mode::Auto,
473 mem_threshold_on_create: MemoryThreshold::Auto,
474 write_buffer_size: ReadableSize::mb(8),
475 intermediate_path: String::new(),
476 }
477 }
478}
479
480impl InvertedIndexConfig {
481 pub fn mem_threshold_on_create(&self) -> Option<usize> {
482 match self.mem_threshold_on_create {
483 MemoryThreshold::Auto => {
484 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
485 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
486 } else {
487 Some(ReadableSize::mb(64).as_bytes() as usize)
488 }
489 }
490 MemoryThreshold::Unlimited => None,
491 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
492 }
493 }
494}
495
496#[serde_as]
498#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
499#[serde(default)]
500pub struct FulltextIndexConfig {
501 pub create_on_flush: Mode,
503 pub create_on_compaction: Mode,
505 pub apply_on_query: Mode,
507 pub mem_threshold_on_create: MemoryThreshold,
509 pub compress: bool,
511}
512
513impl Default for FulltextIndexConfig {
514 fn default() -> Self {
515 Self {
516 create_on_flush: Mode::Auto,
517 create_on_compaction: Mode::Auto,
518 apply_on_query: Mode::Auto,
519 mem_threshold_on_create: MemoryThreshold::Auto,
520 compress: true,
521 }
522 }
523}
524
525impl FulltextIndexConfig {
526 pub fn mem_threshold_on_create(&self) -> usize {
527 match self.mem_threshold_on_create {
528 MemoryThreshold::Auto => {
529 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
530 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
531 } else {
532 ReadableSize::mb(64).as_bytes() as _
533 }
534 }
535 MemoryThreshold::Unlimited => usize::MAX,
536 MemoryThreshold::Size(size) => size.as_bytes() as _,
537 }
538 }
539}
540
541#[serde_as]
543#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
544#[serde(default)]
545pub struct BloomFilterConfig {
546 pub create_on_flush: Mode,
548 pub create_on_compaction: Mode,
550 pub apply_on_query: Mode,
552 pub mem_threshold_on_create: MemoryThreshold,
554}
555
556impl Default for BloomFilterConfig {
557 fn default() -> Self {
558 Self {
559 create_on_flush: Mode::Auto,
560 create_on_compaction: Mode::Auto,
561 apply_on_query: Mode::Auto,
562 mem_threshold_on_create: MemoryThreshold::Auto,
563 }
564 }
565}
566
567impl BloomFilterConfig {
568 pub fn mem_threshold_on_create(&self) -> Option<usize> {
569 match self.mem_threshold_on_create {
570 MemoryThreshold::Auto => {
571 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
572 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
573 } else {
574 Some(ReadableSize::mb(64).as_bytes() as usize)
575 }
576 }
577 MemoryThreshold::Unlimited => None,
578 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
579 }
580 }
581}
582
583fn divide_num_cpus(divisor: usize) -> usize {
585 debug_assert!(divisor > 0);
586 let cores = common_config::utils::get_cpus();
587 debug_assert!(cores > 0);
588
589 cores.div_ceil(divisor)
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595
596 #[test]
597 fn test_deserialize_config() {
598 let s = r#"
599[memtable]
600type = "partition_tree"
601index_max_keys_per_shard = 8192
602data_freeze_threshold = 1024
603dedup = true
604fork_dictionary_bytes = "512MiB"
605"#;
606 let config: MitoConfig = toml::from_str(s).unwrap();
607 let MemtableConfig::PartitionTree(config) = &config.memtable else {
608 unreachable!()
609 };
610 assert_eq!(1024, config.data_freeze_threshold);
611 }
612}