1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33
34const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
36const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
38const MEM_CACHE_SIZE_FACTOR: u64 = 16;
40const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
42const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
44
45pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
47
48#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
51#[serde(default)]
52pub struct MitoConfig {
53 pub num_workers: usize,
57 pub worker_channel_size: usize,
59 pub worker_request_batch_size: usize,
61
62 pub manifest_checkpoint_distance: u64,
66 pub compress_manifest: bool,
68
69 pub max_background_flushes: usize,
72 pub max_background_compactions: usize,
74 pub max_background_purges: usize,
76
77 #[serde(with = "humantime_serde")]
80 pub auto_flush_interval: Duration,
81 pub global_write_buffer_size: ReadableSize,
83 pub global_write_buffer_reject_size: ReadableSize,
85
86 pub sst_meta_cache_size: ReadableSize,
89 pub vector_cache_size: ReadableSize,
91 pub page_cache_size: ReadableSize,
93 pub selector_result_cache_size: ReadableSize,
95 pub enable_write_cache: bool,
97 pub write_cache_path: String,
99 pub write_cache_size: ReadableSize,
101 #[serde(with = "humantime_serde")]
103 pub write_cache_ttl: Option<Duration>,
104
105 pub sst_write_buffer_size: ReadableSize,
108 pub parallel_scan_channel_size: usize,
110 pub allow_stale_entries: bool,
112
113 pub index: IndexConfig,
115 pub inverted_index: InvertedIndexConfig,
117 pub fulltext_index: FulltextIndexConfig,
119 pub bloom_filter_index: BloomFilterConfig,
121
122 pub memtable: MemtableConfig,
124
125 #[serde(with = "humantime_serde")]
128 pub min_compaction_interval: Duration,
129}
130
131impl Default for MitoConfig {
132 fn default() -> Self {
133 let mut mito_config = MitoConfig {
134 num_workers: divide_num_cpus(2),
135 worker_channel_size: 128,
136 worker_request_batch_size: 64,
137 manifest_checkpoint_distance: 10,
138 compress_manifest: false,
139 max_background_flushes: divide_num_cpus(2),
140 max_background_compactions: divide_num_cpus(4),
141 max_background_purges: common_config::utils::get_cpus(),
142 auto_flush_interval: Duration::from_secs(30 * 60),
143 global_write_buffer_size: ReadableSize::gb(1),
144 global_write_buffer_reject_size: ReadableSize::gb(2),
145 sst_meta_cache_size: ReadableSize::mb(128),
146 vector_cache_size: ReadableSize::mb(512),
147 page_cache_size: ReadableSize::mb(512),
148 selector_result_cache_size: ReadableSize::mb(512),
149 enable_write_cache: false,
150 write_cache_path: String::new(),
151 write_cache_size: ReadableSize::gb(5),
152 write_cache_ttl: None,
153 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
154 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
155 allow_stale_entries: false,
156 index: IndexConfig::default(),
157 inverted_index: InvertedIndexConfig::default(),
158 fulltext_index: FulltextIndexConfig::default(),
159 bloom_filter_index: BloomFilterConfig::default(),
160 memtable: MemtableConfig::default(),
161 min_compaction_interval: Duration::from_secs(0),
162 };
163
164 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
166 mito_config.adjust_buffer_and_cache_size(sys_memory);
167 }
168
169 mito_config
170 }
171}
172
173impl MitoConfig {
174 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
178 if self.num_workers == 0 {
180 self.num_workers = divide_num_cpus(2);
181 }
182
183 if self.worker_channel_size == 0 {
185 warn!("Sanitize channel size 0 to 1");
186 self.worker_channel_size = 1;
187 }
188
189 if self.max_background_flushes == 0 {
190 warn!(
191 "Sanitize max background flushes 0 to {}",
192 divide_num_cpus(2)
193 );
194 self.max_background_flushes = divide_num_cpus(2);
195 }
196 if self.max_background_compactions == 0 {
197 warn!(
198 "Sanitize max background compactions 0 to {}",
199 divide_num_cpus(4)
200 );
201 self.max_background_compactions = divide_num_cpus(4);
202 }
203 if self.max_background_purges == 0 {
204 warn!(
205 "Sanitize max background purges 0 to {}",
206 common_config::utils::get_cpus()
207 );
208 self.max_background_purges = common_config::utils::get_cpus();
209 }
210
211 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
212 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
213 warn!(
214 "Sanitize global write buffer reject size to {}",
215 self.global_write_buffer_reject_size
216 );
217 }
218
219 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
220 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
221 warn!(
222 "Sanitize sst write buffer size to {}",
223 self.sst_write_buffer_size
224 );
225 }
226
227 if self.parallel_scan_channel_size < 1 {
228 self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
229 warn!(
230 "Sanitize scan channel size to {}",
231 self.parallel_scan_channel_size
232 );
233 }
234
235 if self.write_cache_path.trim().is_empty() {
237 self.write_cache_path = data_home.to_string();
238 }
239
240 self.index.sanitize(data_home, &self.inverted_index)?;
241
242 Ok(())
243 }
244
245 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
246 let global_write_buffer_size = cmp::min(
248 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
249 ReadableSize::gb(1),
250 );
251 let global_write_buffer_reject_size = global_write_buffer_size * 2;
253 let sst_meta_cache_size = cmp::min(
255 sys_memory / SST_META_CACHE_SIZE_FACTOR,
256 ReadableSize::mb(128),
257 );
258 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
260 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
261
262 self.global_write_buffer_size = global_write_buffer_size;
263 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
264 self.sst_meta_cache_size = sst_meta_cache_size;
265 self.vector_cache_size = mem_cache_size;
266 self.page_cache_size = page_cache_size;
267 self.selector_result_cache_size = mem_cache_size;
268 }
269
270 #[cfg(test)]
272 pub fn enable_write_cache(
273 mut self,
274 path: String,
275 size: ReadableSize,
276 ttl: Option<Duration>,
277 ) -> Self {
278 self.enable_write_cache = true;
279 self.write_cache_path = path;
280 self.write_cache_size = size;
281 self.write_cache_ttl = ttl;
282 self
283 }
284}
285
286#[serde_as]
287#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
288#[serde(default)]
289pub struct IndexConfig {
290 pub aux_path: String,
300
301 pub staging_size: ReadableSize,
303 #[serde(with = "humantime_serde")]
307 pub staging_ttl: Option<Duration>,
308
309 pub write_buffer_size: ReadableSize,
311
312 pub metadata_cache_size: ReadableSize,
314 pub content_cache_size: ReadableSize,
316 pub content_cache_page_size: ReadableSize,
318}
319
320impl Default for IndexConfig {
321 fn default() -> Self {
322 Self {
323 aux_path: String::new(),
324 staging_size: ReadableSize::gb(2),
325 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
326 write_buffer_size: ReadableSize::mb(8),
327 metadata_cache_size: ReadableSize::mb(64),
328 content_cache_size: ReadableSize::mb(128),
329 content_cache_page_size: ReadableSize::kb(64),
330 }
331 }
332}
333
334impl IndexConfig {
335 pub fn sanitize(
336 &mut self,
337 data_home: &str,
338 inverted_index: &InvertedIndexConfig,
339 ) -> Result<()> {
340 #[allow(deprecated)]
341 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
342 self.aux_path.clone_from(&inverted_index.intermediate_path);
343 warn!(
344 "`inverted_index.intermediate_path` is deprecated, use
345 `index.aux_path` instead. Set `index.aux_path` to {}",
346 &inverted_index.intermediate_path
347 )
348 }
349 if self.aux_path.is_empty() {
350 let path = Path::new(data_home).join("index_intermediate");
351 self.aux_path = path.as_os_str().to_string_lossy().to_string();
352 }
353
354 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
355 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
356 warn!(
357 "Sanitize index write buffer size to {}",
358 self.write_buffer_size
359 );
360 }
361
362 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
363 self.staging_ttl = None;
364 }
365
366 Ok(())
367 }
368}
369
370#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
372#[serde(rename_all = "snake_case")]
373pub enum Mode {
374 #[default]
376 Auto,
377 Disable,
379}
380
381impl Mode {
382 pub fn disabled(&self) -> bool {
384 matches!(self, Mode::Disable)
385 }
386
387 pub fn auto(&self) -> bool {
389 matches!(self, Mode::Auto)
390 }
391}
392
393#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
395#[serde(rename_all = "snake_case")]
396pub enum MemoryThreshold {
397 #[default]
399 Auto,
400 Unlimited,
402 #[serde(untagged)]
404 Size(ReadableSize),
405}
406
407#[serde_as]
409#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
410#[serde(default)]
411pub struct InvertedIndexConfig {
412 pub create_on_flush: Mode,
414 pub create_on_compaction: Mode,
416 pub apply_on_query: Mode,
418
419 pub mem_threshold_on_create: MemoryThreshold,
421
422 #[deprecated = "use [IndexConfig::aux_path] instead"]
423 #[serde(skip_serializing)]
424 pub intermediate_path: String,
425
426 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
427 #[serde(skip_serializing)]
428 pub write_buffer_size: ReadableSize,
429}
430
431impl Default for InvertedIndexConfig {
432 #[allow(deprecated)]
433 fn default() -> Self {
434 Self {
435 create_on_flush: Mode::Auto,
436 create_on_compaction: Mode::Auto,
437 apply_on_query: Mode::Auto,
438 mem_threshold_on_create: MemoryThreshold::Auto,
439 write_buffer_size: ReadableSize::mb(8),
440 intermediate_path: String::new(),
441 }
442 }
443}
444
445impl InvertedIndexConfig {
446 pub fn mem_threshold_on_create(&self) -> Option<usize> {
447 match self.mem_threshold_on_create {
448 MemoryThreshold::Auto => {
449 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
450 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
451 } else {
452 Some(ReadableSize::mb(64).as_bytes() as usize)
453 }
454 }
455 MemoryThreshold::Unlimited => None,
456 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
457 }
458 }
459}
460
461#[serde_as]
463#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
464#[serde(default)]
465pub struct FulltextIndexConfig {
466 pub create_on_flush: Mode,
468 pub create_on_compaction: Mode,
470 pub apply_on_query: Mode,
472 pub mem_threshold_on_create: MemoryThreshold,
474 pub compress: bool,
476}
477
478impl Default for FulltextIndexConfig {
479 fn default() -> Self {
480 Self {
481 create_on_flush: Mode::Auto,
482 create_on_compaction: Mode::Auto,
483 apply_on_query: Mode::Auto,
484 mem_threshold_on_create: MemoryThreshold::Auto,
485 compress: true,
486 }
487 }
488}
489
490impl FulltextIndexConfig {
491 pub fn mem_threshold_on_create(&self) -> usize {
492 match self.mem_threshold_on_create {
493 MemoryThreshold::Auto => {
494 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
495 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
496 } else {
497 ReadableSize::mb(64).as_bytes() as _
498 }
499 }
500 MemoryThreshold::Unlimited => usize::MAX,
501 MemoryThreshold::Size(size) => size.as_bytes() as _,
502 }
503 }
504}
505
506#[serde_as]
508#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
509#[serde(default)]
510pub struct BloomFilterConfig {
511 pub create_on_flush: Mode,
513 pub create_on_compaction: Mode,
515 pub apply_on_query: Mode,
517 pub mem_threshold_on_create: MemoryThreshold,
519}
520
521impl Default for BloomFilterConfig {
522 fn default() -> Self {
523 Self {
524 create_on_flush: Mode::Auto,
525 create_on_compaction: Mode::Auto,
526 apply_on_query: Mode::Auto,
527 mem_threshold_on_create: MemoryThreshold::Auto,
528 }
529 }
530}
531
532impl BloomFilterConfig {
533 pub fn mem_threshold_on_create(&self) -> Option<usize> {
534 match self.mem_threshold_on_create {
535 MemoryThreshold::Auto => {
536 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
537 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
538 } else {
539 Some(ReadableSize::mb(64).as_bytes() as usize)
540 }
541 }
542 MemoryThreshold::Unlimited => None,
543 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
544 }
545 }
546}
547
548fn divide_num_cpus(divisor: usize) -> usize {
550 debug_assert!(divisor > 0);
551 let cores = common_config::utils::get_cpus();
552 debug_assert!(cores > 0);
553
554 cores.div_ceil(divisor)
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn test_deserialize_config() {
563 let s = r#"
564[memtable]
565type = "partition_tree"
566index_max_keys_per_shard = 8192
567data_freeze_threshold = 1024
568dedup = true
569fork_dictionary_bytes = "512MiB"
570"#;
571 let config: MitoConfig = toml::from_str(s).unwrap();
572 let MemtableConfig::PartitionTree(config) = &config.memtable else {
573 unreachable!()
574 };
575 assert_eq!(1024, config.data_freeze_threshold);
576 }
577}