mito2/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configurations.
16
17use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_stat::{get_total_cpu_cores, get_total_memory_readable};
24use common_telemetry::warn;
25use serde::{Deserialize, Serialize};
26use serde_with::serde_as;
27
28use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
29use crate::error::Result;
30use crate::gc::GcConfig;
31use crate::memtable::MemtableConfig;
32use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
33
34const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
35/// Default channel size for parallel scan task.
36pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
37/// Default maximum number of SST files to scan concurrently.
38pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
39
40// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
41const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
42/// Use `1/SST_META_CACHE_SIZE_FACTOR` of OS memory size as SST meta cache size in default mode
43const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
44/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
45const MEM_CACHE_SIZE_FACTOR: u64 = 16;
46/// Use `1/PAGE_CACHE_SIZE_FACTOR` of OS memory size as page cache size in default mode
47const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
48/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
49const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
50
51/// Fetch option timeout
52pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
53
54/// Configuration for [MitoEngine](crate::engine::MitoEngine).
55/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
56#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
57#[serde(default)]
58pub struct MitoConfig {
59    // Worker configs:
60    /// Number of region workers (default: 1/2 of cpu cores).
61    /// Sets to 0 to use the default value.
62    pub num_workers: usize,
63    /// Request channel size of each worker (default 128).
64    pub worker_channel_size: usize,
65    /// Max batch size for a worker to handle requests (default 64).
66    pub worker_request_batch_size: usize,
67
68    // Manifest configs:
69    /// Number of meta action updated to trigger a new checkpoint
70    /// for the manifest (default 10).
71    pub manifest_checkpoint_distance: u64,
72    /// Number of removed files to keep in manifest's `removed_files` field before also
73    /// remove them from `removed_files`. Mostly for debugging purpose.
74    /// If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
75    /// from `removed_files` field.
76    pub experimental_manifest_keep_removed_file_count: usize,
77    /// How long to keep removed files in the `removed_files` field of manifest
78    /// after they are removed from manifest.
79    /// files will only be removed from `removed_files` field
80    /// if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached.
81    #[serde(with = "humantime_serde")]
82    pub experimental_manifest_keep_removed_file_ttl: Duration,
83    /// Whether to compress manifest and checkpoint file by gzip (default false).
84    pub compress_manifest: bool,
85
86    // Background job configs:
87    /// Max number of running background index build jobs (default: 1/8 of cpu cores).
88    pub max_background_index_builds: usize,
89    /// Max number of running background flush jobs (default: 1/2 of cpu cores).
90    pub max_background_flushes: usize,
91    /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
92    pub max_background_compactions: usize,
93    /// Max number of running background purge jobs (default: number of cpu cores).
94    pub max_background_purges: usize,
95
96    // Flush configs:
97    /// Interval to auto flush a region if it has not flushed yet (default 30 min).
98    #[serde(with = "humantime_serde")]
99    pub auto_flush_interval: Duration,
100    /// Global write buffer size threshold to trigger flush.
101    pub global_write_buffer_size: ReadableSize,
102    /// Global write buffer size threshold to reject write requests.
103    pub global_write_buffer_reject_size: ReadableSize,
104
105    // Cache configs:
106    /// Cache size for SST metadata. Setting it to 0 to disable the cache.
107    pub sst_meta_cache_size: ReadableSize,
108    /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
109    pub vector_cache_size: ReadableSize,
110    /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
111    pub page_cache_size: ReadableSize,
112    /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
113    pub selector_result_cache_size: ReadableSize,
114    /// Whether to enable the write cache.
115    pub enable_write_cache: bool,
116    /// File system path for write cache dir's root, defaults to `{data_home}`.
117    pub write_cache_path: String,
118    /// Capacity for write cache.
119    pub write_cache_size: ReadableSize,
120    /// TTL for write cache.
121    #[serde(with = "humantime_serde")]
122    pub write_cache_ttl: Option<Duration>,
123    /// Preload index (puffin) files into cache on region open (default: true).
124    pub preload_index_cache: bool,
125    /// Percentage of write cache capacity allocated for index (puffin) files (default: 20).
126    /// The remaining capacity is used for data (parquet) files.
127    /// Must be between 0 and 100 (exclusive).
128    pub index_cache_percent: u8,
129
130    // Other configs:
131    /// Buffer size for SST writing.
132    pub sst_write_buffer_size: ReadableSize,
133    /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
134    pub parallel_scan_channel_size: usize,
135    /// Maximum number of SST files to scan concurrently (default 384).
136    pub max_concurrent_scan_files: usize,
137    /// Whether to allow stale entries read during replay.
138    pub allow_stale_entries: bool,
139    /// Memory limit for table scans across all queries. Setting it to 0 disables the limit.
140    /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
141    pub scan_memory_limit: MemoryLimit,
142
143    /// Index configs.
144    pub index: IndexConfig,
145    /// Inverted index configs.
146    pub inverted_index: InvertedIndexConfig,
147    /// Full-text index configs.
148    pub fulltext_index: FulltextIndexConfig,
149    /// Bloom filter index configs.
150    pub bloom_filter_index: BloomFilterConfig,
151
152    /// Memtable config
153    pub memtable: MemtableConfig,
154
155    /// Minimum time interval between two compactions.
156    /// To align with the old behavior, the default value is 0 (no restrictions).
157    #[serde(with = "humantime_serde")]
158    pub min_compaction_interval: Duration,
159
160    /// Whether to enable experimental flat format as the default format.
161    /// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
162    pub default_experimental_flat_format: bool,
163
164    pub gc: GcConfig,
165}
166
167impl Default for MitoConfig {
168    fn default() -> Self {
169        let mut mito_config = MitoConfig {
170            num_workers: divide_num_cpus(2),
171            worker_channel_size: 128,
172            worker_request_batch_size: 64,
173            manifest_checkpoint_distance: 10,
174            experimental_manifest_keep_removed_file_count: 256,
175            experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
176            compress_manifest: false,
177            max_background_index_builds: divide_num_cpus(8),
178            max_background_flushes: divide_num_cpus(2),
179            max_background_compactions: divide_num_cpus(4),
180            max_background_purges: get_total_cpu_cores(),
181            auto_flush_interval: Duration::from_secs(30 * 60),
182            global_write_buffer_size: ReadableSize::gb(1),
183            global_write_buffer_reject_size: ReadableSize::gb(2),
184            sst_meta_cache_size: ReadableSize::mb(128),
185            vector_cache_size: ReadableSize::mb(512),
186            page_cache_size: ReadableSize::mb(512),
187            selector_result_cache_size: ReadableSize::mb(512),
188            enable_write_cache: false,
189            write_cache_path: String::new(),
190            write_cache_size: ReadableSize::gb(5),
191            write_cache_ttl: None,
192            preload_index_cache: true,
193            index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
194            sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
195            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
196            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
197            allow_stale_entries: false,
198            scan_memory_limit: MemoryLimit::default(),
199            index: IndexConfig::default(),
200            inverted_index: InvertedIndexConfig::default(),
201            fulltext_index: FulltextIndexConfig::default(),
202            bloom_filter_index: BloomFilterConfig::default(),
203            memtable: MemtableConfig::default(),
204            min_compaction_interval: Duration::from_secs(0),
205            default_experimental_flat_format: false,
206            gc: GcConfig::default(),
207        };
208
209        // Adjust buffer and cache size according to system memory if we can.
210        if let Some(sys_memory) = get_total_memory_readable() {
211            mito_config.adjust_buffer_and_cache_size(sys_memory);
212        }
213
214        mito_config
215    }
216}
217
218impl MitoConfig {
219    /// Sanitize incorrect configurations.
220    ///
221    /// Returns an error if there is a configuration that unable to sanitize.
222    pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
223        // Use default value if `num_workers` is 0.
224        if self.num_workers == 0 {
225            self.num_workers = divide_num_cpus(2);
226        }
227
228        // Sanitize channel size.
229        if self.worker_channel_size == 0 {
230            warn!("Sanitize channel size 0 to 1");
231            self.worker_channel_size = 1;
232        }
233
234        if self.max_background_flushes == 0 {
235            warn!(
236                "Sanitize max background flushes 0 to {}",
237                divide_num_cpus(2)
238            );
239            self.max_background_flushes = divide_num_cpus(2);
240        }
241        if self.max_background_compactions == 0 {
242            warn!(
243                "Sanitize max background compactions 0 to {}",
244                divide_num_cpus(4)
245            );
246            self.max_background_compactions = divide_num_cpus(4);
247        }
248        if self.max_background_purges == 0 {
249            let cpu_cores = get_total_cpu_cores();
250            warn!("Sanitize max background purges 0 to {}", cpu_cores);
251            self.max_background_purges = cpu_cores;
252        }
253
254        if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
255            self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
256            warn!(
257                "Sanitize global write buffer reject size to {}",
258                self.global_write_buffer_reject_size
259            );
260        }
261
262        if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
263            self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
264            warn!(
265                "Sanitize sst write buffer size to {}",
266                self.sst_write_buffer_size
267            );
268        }
269
270        if self.parallel_scan_channel_size < 1 {
271            self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
272            warn!(
273                "Sanitize scan channel size to {}",
274                self.parallel_scan_channel_size
275            );
276        }
277
278        // Sets write cache path if it is empty.
279        if self.write_cache_path.trim().is_empty() {
280            self.write_cache_path = data_home.to_string();
281        }
282
283        // Validate index_cache_percent is within valid range (0, 100)
284        if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
285            warn!(
286                "Invalid index_cache_percent {}, resetting to default {}",
287                self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
288            );
289            self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
290        }
291
292        self.index.sanitize(data_home, &self.inverted_index)?;
293
294        Ok(())
295    }
296
297    fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
298        // shouldn't be greater than 1G in default mode.
299        let global_write_buffer_size = cmp::min(
300            sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
301            ReadableSize::gb(1),
302        );
303        // Use 2x of global write buffer size as global write buffer reject size.
304        let global_write_buffer_reject_size = global_write_buffer_size * 2;
305        // shouldn't be greater than 128MB in default mode.
306        let sst_meta_cache_size = cmp::min(
307            sys_memory / SST_META_CACHE_SIZE_FACTOR,
308            ReadableSize::mb(128),
309        );
310        // shouldn't be greater than 512MB in default mode.
311        let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
312        let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
313
314        self.global_write_buffer_size = global_write_buffer_size;
315        self.global_write_buffer_reject_size = global_write_buffer_reject_size;
316        self.sst_meta_cache_size = sst_meta_cache_size;
317        self.vector_cache_size = mem_cache_size;
318        self.page_cache_size = page_cache_size;
319        self.selector_result_cache_size = mem_cache_size;
320
321        self.index.adjust_buffer_and_cache_size(sys_memory);
322    }
323
324    /// Enable write cache.
325    #[cfg(test)]
326    pub fn enable_write_cache(
327        mut self,
328        path: String,
329        size: ReadableSize,
330        ttl: Option<Duration>,
331    ) -> Self {
332        self.enable_write_cache = true;
333        self.write_cache_path = path;
334        self.write_cache_size = size;
335        self.write_cache_ttl = ttl;
336        self
337    }
338}
339
340/// Index build mode.
341#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
342#[serde(rename_all = "snake_case")]
343pub enum IndexBuildMode {
344    /// Build index synchronously.
345    #[default]
346    Sync,
347    /// Build index asynchronously.
348    Async,
349}
350
351#[serde_as]
352#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
353#[serde(default)]
354pub struct IndexConfig {
355    /// Auxiliary directory path for the index in filesystem, used to
356    /// store intermediate files for creating the index and staging files
357    /// for searching the index, defaults to `{data_home}/index_intermediate`.
358    ///
359    /// This path contains two subdirectories:
360    /// - `__intm`: for storing intermediate files used during creating index.
361    /// - `staging`: for storing staging files used during searching index.
362    ///
363    /// The default name for this directory is `index_intermediate` for backward compatibility.
364    pub aux_path: String,
365
366    /// The max capacity of the staging directory.
367    pub staging_size: ReadableSize,
368    /// The TTL of the staging directory.
369    /// Defaults to 7 days.
370    /// Setting it to "0s" to disable TTL.
371    #[serde(with = "humantime_serde")]
372    pub staging_ttl: Option<Duration>,
373
374    /// Index Build Mode
375    pub build_mode: IndexBuildMode,
376
377    /// Write buffer size for creating the index.
378    pub write_buffer_size: ReadableSize,
379
380    /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
381    pub metadata_cache_size: ReadableSize,
382    /// Cache size for inverted index content. Setting it to 0 to disable the cache.
383    pub content_cache_size: ReadableSize,
384    /// Page size for inverted index content.
385    pub content_cache_page_size: ReadableSize,
386    /// Cache size for index result. Setting it to 0 to disable the cache.
387    pub result_cache_size: ReadableSize,
388}
389
390impl Default for IndexConfig {
391    fn default() -> Self {
392        Self {
393            aux_path: String::new(),
394            staging_size: ReadableSize::gb(2),
395            staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
396            build_mode: IndexBuildMode::default(),
397            write_buffer_size: ReadableSize::mb(8),
398            metadata_cache_size: ReadableSize::mb(64),
399            content_cache_size: ReadableSize::mb(128),
400            content_cache_page_size: ReadableSize::kb(64),
401            result_cache_size: ReadableSize::mb(128),
402        }
403    }
404}
405
406impl IndexConfig {
407    pub fn sanitize(
408        &mut self,
409        data_home: &str,
410        inverted_index: &InvertedIndexConfig,
411    ) -> Result<()> {
412        #[allow(deprecated)]
413        if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
414            self.aux_path.clone_from(&inverted_index.intermediate_path);
415            warn!(
416                "`inverted_index.intermediate_path` is deprecated, use
417                 `index.aux_path` instead. Set `index.aux_path` to {}",
418                &inverted_index.intermediate_path
419            )
420        }
421        if self.aux_path.is_empty() {
422            let path = Path::new(data_home).join("index_intermediate");
423            self.aux_path = path.as_os_str().to_string_lossy().to_string();
424        }
425
426        if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
427            self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
428            warn!(
429                "Sanitize index write buffer size to {}",
430                self.write_buffer_size
431            );
432        }
433
434        if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
435            self.staging_ttl = None;
436        }
437
438        Ok(())
439    }
440
441    pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
442        let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
443        self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
444        self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
445
446        let metadata_cache_size = cmp::min(
447            sys_memory / SST_META_CACHE_SIZE_FACTOR,
448            ReadableSize::mb(64),
449        );
450        self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
451    }
452}
453
454/// Operational mode for certain actions.
455#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
456#[serde(rename_all = "snake_case")]
457pub enum Mode {
458    /// The action is performed automatically based on internal criteria.
459    #[default]
460    Auto,
461    /// The action is explicitly disabled.
462    Disable,
463}
464
465impl Mode {
466    /// Whether the action is disabled.
467    pub fn disabled(&self) -> bool {
468        matches!(self, Mode::Disable)
469    }
470
471    /// Whether the action is automatic.
472    pub fn auto(&self) -> bool {
473        matches!(self, Mode::Auto)
474    }
475}
476
477/// Memory threshold for performing certain actions.
478#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
479#[serde(rename_all = "snake_case")]
480pub enum MemoryThreshold {
481    /// Automatically determine the threshold based on internal criteria.
482    #[default]
483    Auto,
484    /// Unlimited memory.
485    Unlimited,
486    /// Fixed memory threshold.
487    #[serde(untagged)]
488    Size(ReadableSize),
489}
490
491/// Configuration options for the inverted index.
492#[serde_as]
493#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
494#[serde(default)]
495pub struct InvertedIndexConfig {
496    /// Whether to create the index on flush: automatically or never.
497    pub create_on_flush: Mode,
498    /// Whether to create the index on compaction: automatically or never.
499    pub create_on_compaction: Mode,
500    /// Whether to apply the index on query: automatically or never.
501    pub apply_on_query: Mode,
502
503    /// Memory threshold for performing an external sort during index creation.
504    pub mem_threshold_on_create: MemoryThreshold,
505
506    #[deprecated = "use [IndexConfig::aux_path] instead"]
507    #[serde(skip_serializing)]
508    pub intermediate_path: String,
509
510    #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
511    #[serde(skip_serializing)]
512    pub write_buffer_size: ReadableSize,
513}
514
515impl Default for InvertedIndexConfig {
516    #[allow(deprecated)]
517    fn default() -> Self {
518        Self {
519            create_on_flush: Mode::Auto,
520            create_on_compaction: Mode::Auto,
521            apply_on_query: Mode::Auto,
522            mem_threshold_on_create: MemoryThreshold::Auto,
523            write_buffer_size: ReadableSize::mb(8),
524            intermediate_path: String::new(),
525        }
526    }
527}
528
529impl InvertedIndexConfig {
530    pub fn mem_threshold_on_create(&self) -> Option<usize> {
531        match self.mem_threshold_on_create {
532            MemoryThreshold::Auto => {
533                if let Some(sys_memory) = get_total_memory_readable() {
534                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
535                } else {
536                    Some(ReadableSize::mb(64).as_bytes() as usize)
537                }
538            }
539            MemoryThreshold::Unlimited => None,
540            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
541        }
542    }
543}
544
545/// Configuration options for the full-text index.
546#[serde_as]
547#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
548#[serde(default)]
549pub struct FulltextIndexConfig {
550    /// Whether to create the index on flush: automatically or never.
551    pub create_on_flush: Mode,
552    /// Whether to create the index on compaction: automatically or never.
553    pub create_on_compaction: Mode,
554    /// Whether to apply the index on query: automatically or never.
555    pub apply_on_query: Mode,
556    /// Memory threshold for creating the index.
557    pub mem_threshold_on_create: MemoryThreshold,
558    /// Whether to compress the index data.
559    pub compress: bool,
560}
561
562impl Default for FulltextIndexConfig {
563    fn default() -> Self {
564        Self {
565            create_on_flush: Mode::Auto,
566            create_on_compaction: Mode::Auto,
567            apply_on_query: Mode::Auto,
568            mem_threshold_on_create: MemoryThreshold::Auto,
569            compress: true,
570        }
571    }
572}
573
574impl FulltextIndexConfig {
575    pub fn mem_threshold_on_create(&self) -> usize {
576        match self.mem_threshold_on_create {
577            MemoryThreshold::Auto => {
578                if let Some(sys_memory) = get_total_memory_readable() {
579                    (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
580                } else {
581                    ReadableSize::mb(64).as_bytes() as _
582                }
583            }
584            MemoryThreshold::Unlimited => usize::MAX,
585            MemoryThreshold::Size(size) => size.as_bytes() as _,
586        }
587    }
588}
589
590/// Configuration options for the bloom filter.
591#[serde_as]
592#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
593#[serde(default)]
594pub struct BloomFilterConfig {
595    /// Whether to create the index on flush: automatically or never.
596    pub create_on_flush: Mode,
597    /// Whether to create the index on compaction: automatically or never.
598    pub create_on_compaction: Mode,
599    /// Whether to apply the index on query: automatically or never.
600    pub apply_on_query: Mode,
601    /// Memory threshold for creating the index.
602    pub mem_threshold_on_create: MemoryThreshold,
603}
604
605impl Default for BloomFilterConfig {
606    fn default() -> Self {
607        Self {
608            create_on_flush: Mode::Auto,
609            create_on_compaction: Mode::Auto,
610            apply_on_query: Mode::Auto,
611            mem_threshold_on_create: MemoryThreshold::Auto,
612        }
613    }
614}
615
616impl BloomFilterConfig {
617    pub fn mem_threshold_on_create(&self) -> Option<usize> {
618        match self.mem_threshold_on_create {
619            MemoryThreshold::Auto => {
620                if let Some(sys_memory) = get_total_memory_readable() {
621                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
622                } else {
623                    Some(ReadableSize::mb(64).as_bytes() as usize)
624                }
625            }
626            MemoryThreshold::Unlimited => None,
627            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
628        }
629    }
630}
631
632/// Divide cpu num by a non-zero `divisor` and returns at least 1.
633fn divide_num_cpus(divisor: usize) -> usize {
634    debug_assert!(divisor > 0);
635    let cores = get_total_cpu_cores();
636    debug_assert!(cores > 0);
637
638    cores.div_ceil(divisor)
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644
645    #[test]
646    fn test_deserialize_config() {
647        let s = r#"
648[memtable]
649type = "partition_tree"
650index_max_keys_per_shard = 8192
651data_freeze_threshold = 1024
652dedup = true
653fork_dictionary_bytes = "512MiB"
654"#;
655        let config: MitoConfig = toml::from_str(s).unwrap();
656        let MemtableConfig::PartitionTree(config) = &config.memtable else {
657            unreachable!()
658        };
659        assert_eq!(1024, config.data_freeze_threshold);
660    }
661}