1use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
33
34const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
35pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
37
38const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
40const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
42const MEM_CACHE_SIZE_FACTOR: u64 = 16;
44const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
46const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
48
49pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
51
52#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
55#[serde(default)]
56pub struct MitoConfig {
57 pub num_workers: usize,
61 pub worker_channel_size: usize,
63 pub worker_request_batch_size: usize,
65
66 pub manifest_checkpoint_distance: u64,
70 pub experimental_manifest_keep_removed_file_count: usize,
75 #[serde(with = "humantime_serde")]
80 pub experimental_manifest_keep_removed_file_ttl: Duration,
81 pub compress_manifest: bool,
83
84 pub max_background_index_builds: usize,
87 pub max_background_flushes: usize,
89 pub max_background_compactions: usize,
91 pub max_background_purges: usize,
93 pub experimental_compaction_memory_limit: MemoryLimit,
97 pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
99
100 #[serde(with = "humantime_serde")]
103 pub auto_flush_interval: Duration,
104 pub global_write_buffer_size: ReadableSize,
106 pub global_write_buffer_reject_size: ReadableSize,
108
109 pub sst_meta_cache_size: ReadableSize,
112 pub vector_cache_size: ReadableSize,
114 pub page_cache_size: ReadableSize,
116 pub selector_result_cache_size: ReadableSize,
118 pub range_result_cache_size: ReadableSize,
120 pub prefilter_result_cache_size: ReadableSize,
122 pub enable_write_cache: bool,
124 pub write_cache_path: String,
126 pub write_cache_size: ReadableSize,
128 #[serde(with = "humantime_serde")]
130 pub write_cache_ttl: Option<Duration>,
131 pub preload_index_cache: bool,
133 pub index_cache_percent: u8,
137 pub enable_refill_cache_on_read: bool,
140 pub manifest_cache_size: ReadableSize,
142
143 pub sst_write_buffer_size: ReadableSize,
146 pub max_concurrent_scan_files: usize,
148 pub allow_stale_entries: bool,
150 pub scan_memory_limit: MemoryLimit,
154 pub scan_memory_on_exhausted: OnExhaustedPolicy,
159
160 pub index: IndexConfig,
162 pub inverted_index: InvertedIndexConfig,
164 pub fulltext_index: FulltextIndexConfig,
166 pub bloom_filter_index: BloomFilterConfig,
168 #[cfg(feature = "vector_index")]
170 pub vector_index: VectorIndexConfig,
171
172 #[serde(with = "humantime_serde")]
175 pub min_compaction_interval: Duration,
176 pub schedule_compaction_after_edit: bool,
178
179 pub default_flat_format: bool,
182
183 pub gc: GcConfig,
184}
185
186impl Default for MitoConfig {
187 fn default() -> Self {
188 let mut mito_config = MitoConfig {
189 num_workers: divide_num_cpus(2),
190 worker_channel_size: 128,
191 worker_request_batch_size: 64,
192 manifest_checkpoint_distance: 10,
193 experimental_manifest_keep_removed_file_count: 256,
194 experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
195 compress_manifest: false,
196 max_background_index_builds: divide_num_cpus(8),
197 max_background_flushes: divide_num_cpus(2),
198 max_background_compactions: divide_num_cpus(4),
199 max_background_purges: get_total_cpu_cores(),
200 experimental_compaction_memory_limit: MemoryLimit::Unlimited,
201 experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
202 auto_flush_interval: Duration::from_secs(30 * 60),
203 global_write_buffer_size: ReadableSize::gb(1),
204 global_write_buffer_reject_size: ReadableSize::gb(2),
205 sst_meta_cache_size: ReadableSize::mb(128),
206 vector_cache_size: ReadableSize::mb(512),
207 page_cache_size: ReadableSize::mb(512),
208 selector_result_cache_size: ReadableSize::mb(512),
209 range_result_cache_size: ReadableSize::mb(512),
210 prefilter_result_cache_size: ReadableSize::mb(128),
211 enable_write_cache: false,
212 write_cache_path: String::new(),
213 write_cache_size: ReadableSize::gb(5),
214 write_cache_ttl: None,
215 preload_index_cache: true,
216 index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
217 enable_refill_cache_on_read: true,
218 manifest_cache_size: ReadableSize::mb(256),
219 sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
220 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
221 allow_stale_entries: false,
222 scan_memory_limit: MemoryLimit::default(),
223 scan_memory_on_exhausted: OnExhaustedPolicy::Fail,
224 index: IndexConfig::default(),
225 inverted_index: InvertedIndexConfig::default(),
226 fulltext_index: FulltextIndexConfig::default(),
227 bloom_filter_index: BloomFilterConfig::default(),
228 #[cfg(feature = "vector_index")]
229 vector_index: VectorIndexConfig::default(),
230 min_compaction_interval: Duration::from_secs(0),
231 schedule_compaction_after_edit: true,
232 default_flat_format: true,
233 gc: GcConfig::default(),
234 };
235
236 if let Some(sys_memory) = get_total_memory_readable() {
238 mito_config.adjust_buffer_and_cache_size(sys_memory);
239 }
240
241 mito_config
242 }
243}
244
245impl MitoConfig {
246 pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
250 if self.num_workers == 0 {
252 self.num_workers = divide_num_cpus(2);
253 }
254
255 if self.worker_channel_size == 0 {
257 warn!("Sanitize channel size 0 to 1");
258 self.worker_channel_size = 1;
259 }
260
261 if self.max_background_flushes == 0 {
262 warn!(
263 "Sanitize max background flushes 0 to {}",
264 divide_num_cpus(2)
265 );
266 self.max_background_flushes = divide_num_cpus(2);
267 }
268 if self.max_background_compactions == 0 {
269 warn!(
270 "Sanitize max background compactions 0 to {}",
271 divide_num_cpus(4)
272 );
273 self.max_background_compactions = divide_num_cpus(4);
274 }
275 if self.max_background_purges == 0 {
276 let cpu_cores = get_total_cpu_cores();
277 warn!("Sanitize max background purges 0 to {}", cpu_cores);
278 self.max_background_purges = cpu_cores;
279 }
280
281 if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
282 self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
283 warn!(
284 "Sanitize global write buffer reject size to {}",
285 self.global_write_buffer_reject_size
286 );
287 }
288
289 if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
290 self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
291 warn!(
292 "Sanitize sst write buffer size to {}",
293 self.sst_write_buffer_size
294 );
295 }
296
297 if self.write_cache_path.trim().is_empty() {
299 self.write_cache_path = data_home.to_string();
300 }
301
302 if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
304 warn!(
305 "Invalid index_cache_percent {}, resetting to default {}",
306 self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
307 );
308 self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
309 }
310
311 self.index.sanitize(data_home, &self.inverted_index)?;
312
313 Ok(())
314 }
315
316 fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
317 let global_write_buffer_size = cmp::min(
319 sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
320 ReadableSize::gb(1),
321 );
322 let global_write_buffer_reject_size = global_write_buffer_size * 2;
324 let sst_meta_cache_size = cmp::min(
326 sys_memory / SST_META_CACHE_SIZE_FACTOR,
327 ReadableSize::mb(128),
328 );
329 let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
331 let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
332
333 self.global_write_buffer_size = global_write_buffer_size;
334 self.global_write_buffer_reject_size = global_write_buffer_reject_size;
335 self.sst_meta_cache_size = sst_meta_cache_size;
336 self.vector_cache_size = mem_cache_size;
337 self.page_cache_size = page_cache_size;
338 self.selector_result_cache_size = mem_cache_size;
339 self.range_result_cache_size = mem_cache_size;
340 self.prefilter_result_cache_size = sst_meta_cache_size;
342
343 self.index.adjust_buffer_and_cache_size(sys_memory);
344 }
345
346 #[cfg(test)]
348 pub fn enable_write_cache(
349 mut self,
350 path: String,
351 size: ReadableSize,
352 ttl: Option<Duration>,
353 ) -> Self {
354 self.enable_write_cache = true;
355 self.write_cache_path = path;
356 self.write_cache_size = size;
357 self.write_cache_ttl = ttl;
358 self
359 }
360}
361
362#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
364#[serde(rename_all = "snake_case")]
365pub enum IndexBuildMode {
366 #[default]
368 Sync,
369 Async,
371}
372
373#[serde_as]
374#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
375#[serde(default)]
376pub struct IndexConfig {
377 pub aux_path: String,
387
388 pub staging_size: ReadableSize,
390 #[serde(with = "humantime_serde")]
394 pub staging_ttl: Option<Duration>,
395
396 pub build_mode: IndexBuildMode,
398
399 pub write_buffer_size: ReadableSize,
401
402 pub metadata_cache_size: ReadableSize,
404 pub content_cache_size: ReadableSize,
406 pub content_cache_page_size: ReadableSize,
408 pub result_cache_size: ReadableSize,
410}
411
412impl Default for IndexConfig {
413 fn default() -> Self {
414 Self {
415 aux_path: String::new(),
416 staging_size: ReadableSize::gb(2),
417 staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
418 build_mode: IndexBuildMode::default(),
419 write_buffer_size: ReadableSize::mb(8),
420 metadata_cache_size: ReadableSize::mb(64),
421 content_cache_size: ReadableSize::mb(128),
422 content_cache_page_size: ReadableSize::kb(64),
423 result_cache_size: ReadableSize::mb(128),
424 }
425 }
426}
427
428impl IndexConfig {
429 pub fn sanitize(
430 &mut self,
431 data_home: &str,
432 inverted_index: &InvertedIndexConfig,
433 ) -> Result<()> {
434 #[allow(deprecated)]
435 if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
436 self.aux_path.clone_from(&inverted_index.intermediate_path);
437 warn!(
438 "`inverted_index.intermediate_path` is deprecated, use
439 `index.aux_path` instead. Set `index.aux_path` to {}",
440 &inverted_index.intermediate_path
441 )
442 }
443 if self.aux_path.is_empty() {
444 let path = Path::new(data_home).join("index_intermediate");
445 self.aux_path = path.as_os_str().to_string_lossy().to_string();
446 }
447
448 if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
449 self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
450 warn!(
451 "Sanitize index write buffer size to {}",
452 self.write_buffer_size
453 );
454 }
455
456 if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
457 self.staging_ttl = None;
458 }
459
460 Ok(())
461 }
462
463 pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
464 let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
465 self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
466 self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
467
468 let metadata_cache_size = cmp::min(
469 sys_memory / SST_META_CACHE_SIZE_FACTOR,
470 ReadableSize::mb(64),
471 );
472 self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
473 }
474}
475
476#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
478#[serde(rename_all = "snake_case")]
479pub enum Mode {
480 #[default]
482 Auto,
483 Disable,
485}
486
487impl Mode {
488 pub fn disabled(&self) -> bool {
490 matches!(self, Mode::Disable)
491 }
492
493 pub fn auto(&self) -> bool {
495 matches!(self, Mode::Auto)
496 }
497}
498
499#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
501#[serde(rename_all = "snake_case")]
502pub enum MemoryThreshold {
503 #[default]
505 Auto,
506 Unlimited,
508 #[serde(untagged)]
510 Size(ReadableSize),
511}
512
513#[serde_as]
515#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
516#[serde(default)]
517pub struct InvertedIndexConfig {
518 pub create_on_flush: Mode,
520 pub create_on_compaction: Mode,
522 pub apply_on_query: Mode,
524
525 pub mem_threshold_on_create: MemoryThreshold,
527
528 #[deprecated = "use [IndexConfig::aux_path] instead"]
529 #[serde(skip_serializing)]
530 pub intermediate_path: String,
531
532 #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
533 #[serde(skip_serializing)]
534 pub write_buffer_size: ReadableSize,
535}
536
537impl Default for InvertedIndexConfig {
538 #[allow(deprecated)]
539 fn default() -> Self {
540 Self {
541 create_on_flush: Mode::Auto,
542 create_on_compaction: Mode::Auto,
543 apply_on_query: Mode::Auto,
544 mem_threshold_on_create: MemoryThreshold::Auto,
545 write_buffer_size: ReadableSize::mb(8),
546 intermediate_path: String::new(),
547 }
548 }
549}
550
551impl InvertedIndexConfig {
552 pub fn mem_threshold_on_create(&self) -> Option<usize> {
553 match self.mem_threshold_on_create {
554 MemoryThreshold::Auto => {
555 if let Some(sys_memory) = get_total_memory_readable() {
556 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
557 } else {
558 Some(ReadableSize::mb(64).as_bytes() as usize)
559 }
560 }
561 MemoryThreshold::Unlimited => None,
562 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
563 }
564 }
565}
566
567#[serde_as]
569#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
570#[serde(default)]
571pub struct FulltextIndexConfig {
572 pub create_on_flush: Mode,
574 pub create_on_compaction: Mode,
576 pub apply_on_query: Mode,
578 pub mem_threshold_on_create: MemoryThreshold,
580 pub compress: bool,
582}
583
584impl Default for FulltextIndexConfig {
585 fn default() -> Self {
586 Self {
587 create_on_flush: Mode::Auto,
588 create_on_compaction: Mode::Auto,
589 apply_on_query: Mode::Auto,
590 mem_threshold_on_create: MemoryThreshold::Auto,
591 compress: true,
592 }
593 }
594}
595
596impl FulltextIndexConfig {
597 pub fn mem_threshold_on_create(&self) -> usize {
598 match self.mem_threshold_on_create {
599 MemoryThreshold::Auto => {
600 if let Some(sys_memory) = get_total_memory_readable() {
601 (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
602 } else {
603 ReadableSize::mb(64).as_bytes() as _
604 }
605 }
606 MemoryThreshold::Unlimited => usize::MAX,
607 MemoryThreshold::Size(size) => size.as_bytes() as _,
608 }
609 }
610}
611
612#[serde_as]
614#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
615#[serde(default)]
616pub struct BloomFilterConfig {
617 pub create_on_flush: Mode,
619 pub create_on_compaction: Mode,
621 pub apply_on_query: Mode,
623 pub mem_threshold_on_create: MemoryThreshold,
625}
626
627impl Default for BloomFilterConfig {
628 fn default() -> Self {
629 Self {
630 create_on_flush: Mode::Auto,
631 create_on_compaction: Mode::Auto,
632 apply_on_query: Mode::Auto,
633 mem_threshold_on_create: MemoryThreshold::Auto,
634 }
635 }
636}
637
638impl BloomFilterConfig {
639 pub fn mem_threshold_on_create(&self) -> Option<usize> {
640 match self.mem_threshold_on_create {
641 MemoryThreshold::Auto => {
642 if let Some(sys_memory) = get_total_memory_readable() {
643 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
644 } else {
645 Some(ReadableSize::mb(64).as_bytes() as usize)
646 }
647 }
648 MemoryThreshold::Unlimited => None,
649 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
650 }
651 }
652}
653
654#[cfg(feature = "vector_index")]
656#[serde_as]
657#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
658#[serde(default)]
659pub struct VectorIndexConfig {
660 pub create_on_flush: Mode,
662 pub create_on_compaction: Mode,
664 pub apply_on_query: Mode,
666 pub mem_threshold_on_create: MemoryThreshold,
668}
669
670#[cfg(feature = "vector_index")]
671impl Default for VectorIndexConfig {
672 fn default() -> Self {
673 Self {
674 create_on_flush: Mode::Auto,
675 create_on_compaction: Mode::Auto,
676 apply_on_query: Mode::Auto,
677 mem_threshold_on_create: MemoryThreshold::Auto,
678 }
679 }
680}
681
682#[cfg(feature = "vector_index")]
683impl VectorIndexConfig {
684 pub fn mem_threshold_on_create(&self) -> Option<usize> {
685 match self.mem_threshold_on_create {
686 MemoryThreshold::Auto => {
687 if let Some(sys_memory) = get_total_memory_readable() {
688 Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
689 } else {
690 Some(ReadableSize::mb(64).as_bytes() as usize)
691 }
692 }
693 MemoryThreshold::Unlimited => None,
694 MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
695 }
696 }
697}
698
699fn divide_num_cpus(divisor: usize) -> usize {
701 debug_assert!(divisor > 0);
702 let cores = get_total_cpu_cores();
703 debug_assert!(cores > 0);
704
705 cores.div_ceil(divisor)
706}