mito2/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configurations.
16
17use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::memtable::MemtableConfig;
33use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
34
35const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
36/// Default channel size for parallel scan task.
37pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
38/// Default maximum number of SST files to scan concurrently.
39pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
40
41// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
42const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
43/// Use `1/SST_META_CACHE_SIZE_FACTOR` of OS memory size as SST meta cache size in default mode
44const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
45/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
46const MEM_CACHE_SIZE_FACTOR: u64 = 16;
47/// Use `1/PAGE_CACHE_SIZE_FACTOR` of OS memory size as page cache size in default mode
48const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
49/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
50const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
51
52/// Fetch option timeout
53pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
54
55/// Configuration for [MitoEngine](crate::engine::MitoEngine).
56/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
57#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
58#[serde(default)]
59pub struct MitoConfig {
60    // Worker configs:
61    /// Number of region workers (default: 1/2 of cpu cores).
62    /// Sets to 0 to use the default value.
63    pub num_workers: usize,
64    /// Request channel size of each worker (default 128).
65    pub worker_channel_size: usize,
66    /// Max batch size for a worker to handle requests (default 64).
67    pub worker_request_batch_size: usize,
68
69    // Manifest configs:
70    /// Number of meta action updated to trigger a new checkpoint
71    /// for the manifest (default 10).
72    pub manifest_checkpoint_distance: u64,
73    /// Number of removed files to keep in manifest's `removed_files` field before also
74    /// remove them from `removed_files`. Mostly for debugging purpose.
75    /// If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
76    /// from `removed_files` field.
77    pub experimental_manifest_keep_removed_file_count: usize,
78    /// How long to keep removed files in the `removed_files` field of manifest
79    /// after they are removed from manifest.
80    /// files will only be removed from `removed_files` field
81    /// if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached.
82    #[serde(with = "humantime_serde")]
83    pub experimental_manifest_keep_removed_file_ttl: Duration,
84    /// Whether to compress manifest and checkpoint file by gzip (default false).
85    pub compress_manifest: bool,
86
87    // Background job configs:
88    /// Max number of running background index build jobs (default: 1/8 of cpu cores).
89    pub max_background_index_builds: usize,
90    /// Max number of running background flush jobs (default: 1/2 of cpu cores).
91    pub max_background_flushes: usize,
92    /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
93    pub max_background_compactions: usize,
94    /// Max number of running background purge jobs (default: number of cpu cores).
95    pub max_background_purges: usize,
96    /// Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
97    pub experimental_compaction_memory_limit: MemoryLimit,
98    /// Behavior when compaction cannot acquire memory from the budget.
99    pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
100
101    // Flush configs:
102    /// Interval to auto flush a region if it has not flushed yet (default 30 min).
103    #[serde(with = "humantime_serde")]
104    pub auto_flush_interval: Duration,
105    /// Global write buffer size threshold to trigger flush.
106    pub global_write_buffer_size: ReadableSize,
107    /// Global write buffer size threshold to reject write requests.
108    pub global_write_buffer_reject_size: ReadableSize,
109
110    // Cache configs:
111    /// Cache size for SST metadata. Setting it to 0 to disable the cache.
112    pub sst_meta_cache_size: ReadableSize,
113    /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
114    pub vector_cache_size: ReadableSize,
115    /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
116    pub page_cache_size: ReadableSize,
117    /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
118    pub selector_result_cache_size: ReadableSize,
119    /// Whether to enable the write cache.
120    pub enable_write_cache: bool,
121    /// File system path for write cache dir's root, defaults to `{data_home}`.
122    pub write_cache_path: String,
123    /// Capacity for write cache.
124    pub write_cache_size: ReadableSize,
125    /// TTL for write cache.
126    #[serde(with = "humantime_serde")]
127    pub write_cache_ttl: Option<Duration>,
128    /// Preload index (puffin) files into cache on region open (default: true).
129    pub preload_index_cache: bool,
130    /// Percentage of write cache capacity allocated for index (puffin) files (default: 20).
131    /// The remaining capacity is used for data (parquet) files.
132    /// Must be between 0 and 100 (exclusive).
133    pub index_cache_percent: u8,
134    /// Enable background downloading of files to the local cache when accessed during queries (default: true).
135    /// When enabled, files will be asynchronously downloaded to improve performance for subsequent reads.
136    pub enable_refill_cache_on_read: bool,
137    /// Capacity for manifest cache (default: 256MB).
138    pub manifest_cache_size: ReadableSize,
139
140    // Other configs:
141    /// Buffer size for SST writing.
142    pub sst_write_buffer_size: ReadableSize,
143    /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
144    pub parallel_scan_channel_size: usize,
145    /// Maximum number of SST files to scan concurrently (default 384).
146    pub max_concurrent_scan_files: usize,
147    /// Whether to allow stale entries read during replay.
148    pub allow_stale_entries: bool,
149    /// Memory limit for table scans across all queries. Setting it to 0 disables the limit.
150    /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
151    pub scan_memory_limit: MemoryLimit,
152
153    /// Index configs.
154    pub index: IndexConfig,
155    /// Inverted index configs.
156    pub inverted_index: InvertedIndexConfig,
157    /// Full-text index configs.
158    pub fulltext_index: FulltextIndexConfig,
159    /// Bloom filter index configs.
160    pub bloom_filter_index: BloomFilterConfig,
161    /// Vector index configs (HNSW).
162    #[cfg(feature = "vector_index")]
163    pub vector_index: VectorIndexConfig,
164
165    /// Memtable config
166    pub memtable: MemtableConfig,
167
168    /// Minimum time interval between two compactions.
169    /// To align with the old behavior, the default value is 0 (no restrictions).
170    #[serde(with = "humantime_serde")]
171    pub min_compaction_interval: Duration,
172
173    /// Whether to enable experimental flat format as the default format.
174    /// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
175    pub default_experimental_flat_format: bool,
176
177    pub gc: GcConfig,
178}
179
180impl Default for MitoConfig {
181    fn default() -> Self {
182        let mut mito_config = MitoConfig {
183            num_workers: divide_num_cpus(2),
184            worker_channel_size: 128,
185            worker_request_batch_size: 64,
186            manifest_checkpoint_distance: 10,
187            experimental_manifest_keep_removed_file_count: 256,
188            experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
189            compress_manifest: false,
190            max_background_index_builds: divide_num_cpus(8),
191            max_background_flushes: divide_num_cpus(2),
192            max_background_compactions: divide_num_cpus(4),
193            max_background_purges: get_total_cpu_cores(),
194            experimental_compaction_memory_limit: MemoryLimit::Unlimited,
195            experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
196            auto_flush_interval: Duration::from_secs(30 * 60),
197            global_write_buffer_size: ReadableSize::gb(1),
198            global_write_buffer_reject_size: ReadableSize::gb(2),
199            sst_meta_cache_size: ReadableSize::mb(128),
200            vector_cache_size: ReadableSize::mb(512),
201            page_cache_size: ReadableSize::mb(512),
202            selector_result_cache_size: ReadableSize::mb(512),
203            enable_write_cache: false,
204            write_cache_path: String::new(),
205            write_cache_size: ReadableSize::gb(5),
206            write_cache_ttl: None,
207            preload_index_cache: true,
208            index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
209            enable_refill_cache_on_read: true,
210            manifest_cache_size: ReadableSize::mb(256),
211            sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
212            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
213            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
214            allow_stale_entries: false,
215            scan_memory_limit: MemoryLimit::default(),
216            index: IndexConfig::default(),
217            inverted_index: InvertedIndexConfig::default(),
218            fulltext_index: FulltextIndexConfig::default(),
219            bloom_filter_index: BloomFilterConfig::default(),
220            #[cfg(feature = "vector_index")]
221            vector_index: VectorIndexConfig::default(),
222            memtable: MemtableConfig::default(),
223            min_compaction_interval: Duration::from_secs(0),
224            default_experimental_flat_format: false,
225            gc: GcConfig::default(),
226        };
227
228        // Adjust buffer and cache size according to system memory if we can.
229        if let Some(sys_memory) = get_total_memory_readable() {
230            mito_config.adjust_buffer_and_cache_size(sys_memory);
231        }
232
233        mito_config
234    }
235}
236
237impl MitoConfig {
238    /// Sanitize incorrect configurations.
239    ///
240    /// Returns an error if there is a configuration that unable to sanitize.
241    pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
242        // Use default value if `num_workers` is 0.
243        if self.num_workers == 0 {
244            self.num_workers = divide_num_cpus(2);
245        }
246
247        // Sanitize channel size.
248        if self.worker_channel_size == 0 {
249            warn!("Sanitize channel size 0 to 1");
250            self.worker_channel_size = 1;
251        }
252
253        if self.max_background_flushes == 0 {
254            warn!(
255                "Sanitize max background flushes 0 to {}",
256                divide_num_cpus(2)
257            );
258            self.max_background_flushes = divide_num_cpus(2);
259        }
260        if self.max_background_compactions == 0 {
261            warn!(
262                "Sanitize max background compactions 0 to {}",
263                divide_num_cpus(4)
264            );
265            self.max_background_compactions = divide_num_cpus(4);
266        }
267        if self.max_background_purges == 0 {
268            let cpu_cores = get_total_cpu_cores();
269            warn!("Sanitize max background purges 0 to {}", cpu_cores);
270            self.max_background_purges = cpu_cores;
271        }
272
273        if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
274            self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
275            warn!(
276                "Sanitize global write buffer reject size to {}",
277                self.global_write_buffer_reject_size
278            );
279        }
280
281        if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
282            self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
283            warn!(
284                "Sanitize sst write buffer size to {}",
285                self.sst_write_buffer_size
286            );
287        }
288
289        if self.parallel_scan_channel_size < 1 {
290            self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
291            warn!(
292                "Sanitize scan channel size to {}",
293                self.parallel_scan_channel_size
294            );
295        }
296
297        // Sets write cache path if it is empty.
298        if self.write_cache_path.trim().is_empty() {
299            self.write_cache_path = data_home.to_string();
300        }
301
302        // Validate index_cache_percent is within valid range (0, 100)
303        if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
304            warn!(
305                "Invalid index_cache_percent {}, resetting to default {}",
306                self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
307            );
308            self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
309        }
310
311        self.index.sanitize(data_home, &self.inverted_index)?;
312
313        Ok(())
314    }
315
316    fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
317        // shouldn't be greater than 1G in default mode.
318        let global_write_buffer_size = cmp::min(
319            sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
320            ReadableSize::gb(1),
321        );
322        // Use 2x of global write buffer size as global write buffer reject size.
323        let global_write_buffer_reject_size = global_write_buffer_size * 2;
324        // shouldn't be greater than 128MB in default mode.
325        let sst_meta_cache_size = cmp::min(
326            sys_memory / SST_META_CACHE_SIZE_FACTOR,
327            ReadableSize::mb(128),
328        );
329        // shouldn't be greater than 512MB in default mode.
330        let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
331        let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
332
333        self.global_write_buffer_size = global_write_buffer_size;
334        self.global_write_buffer_reject_size = global_write_buffer_reject_size;
335        self.sst_meta_cache_size = sst_meta_cache_size;
336        self.vector_cache_size = mem_cache_size;
337        self.page_cache_size = page_cache_size;
338        self.selector_result_cache_size = mem_cache_size;
339
340        self.index.adjust_buffer_and_cache_size(sys_memory);
341    }
342
343    /// Enable write cache.
344    #[cfg(test)]
345    pub fn enable_write_cache(
346        mut self,
347        path: String,
348        size: ReadableSize,
349        ttl: Option<Duration>,
350    ) -> Self {
351        self.enable_write_cache = true;
352        self.write_cache_path = path;
353        self.write_cache_size = size;
354        self.write_cache_ttl = ttl;
355        self
356    }
357}
358
359/// Index build mode.
360#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
361#[serde(rename_all = "snake_case")]
362pub enum IndexBuildMode {
363    /// Build index synchronously.
364    #[default]
365    Sync,
366    /// Build index asynchronously.
367    Async,
368}
369
370#[serde_as]
371#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
372#[serde(default)]
373pub struct IndexConfig {
374    /// Auxiliary directory path for the index in filesystem, used to
375    /// store intermediate files for creating the index and staging files
376    /// for searching the index, defaults to `{data_home}/index_intermediate`.
377    ///
378    /// This path contains two subdirectories:
379    /// - `__intm`: for storing intermediate files used during creating index.
380    /// - `staging`: for storing staging files used during searching index.
381    ///
382    /// The default name for this directory is `index_intermediate` for backward compatibility.
383    pub aux_path: String,
384
385    /// The max capacity of the staging directory.
386    pub staging_size: ReadableSize,
387    /// The TTL of the staging directory.
388    /// Defaults to 7 days.
389    /// Setting it to "0s" to disable TTL.
390    #[serde(with = "humantime_serde")]
391    pub staging_ttl: Option<Duration>,
392
393    /// Index Build Mode
394    pub build_mode: IndexBuildMode,
395
396    /// Write buffer size for creating the index.
397    pub write_buffer_size: ReadableSize,
398
399    /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
400    pub metadata_cache_size: ReadableSize,
401    /// Cache size for inverted index content. Setting it to 0 to disable the cache.
402    pub content_cache_size: ReadableSize,
403    /// Page size for inverted index content.
404    pub content_cache_page_size: ReadableSize,
405    /// Cache size for index result. Setting it to 0 to disable the cache.
406    pub result_cache_size: ReadableSize,
407}
408
409impl Default for IndexConfig {
410    fn default() -> Self {
411        Self {
412            aux_path: String::new(),
413            staging_size: ReadableSize::gb(2),
414            staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
415            build_mode: IndexBuildMode::default(),
416            write_buffer_size: ReadableSize::mb(8),
417            metadata_cache_size: ReadableSize::mb(64),
418            content_cache_size: ReadableSize::mb(128),
419            content_cache_page_size: ReadableSize::kb(64),
420            result_cache_size: ReadableSize::mb(128),
421        }
422    }
423}
424
425impl IndexConfig {
426    pub fn sanitize(
427        &mut self,
428        data_home: &str,
429        inverted_index: &InvertedIndexConfig,
430    ) -> Result<()> {
431        #[allow(deprecated)]
432        if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
433            self.aux_path.clone_from(&inverted_index.intermediate_path);
434            warn!(
435                "`inverted_index.intermediate_path` is deprecated, use
436                 `index.aux_path` instead. Set `index.aux_path` to {}",
437                &inverted_index.intermediate_path
438            )
439        }
440        if self.aux_path.is_empty() {
441            let path = Path::new(data_home).join("index_intermediate");
442            self.aux_path = path.as_os_str().to_string_lossy().to_string();
443        }
444
445        if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
446            self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
447            warn!(
448                "Sanitize index write buffer size to {}",
449                self.write_buffer_size
450            );
451        }
452
453        if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
454            self.staging_ttl = None;
455        }
456
457        Ok(())
458    }
459
460    pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
461        let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
462        self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
463        self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
464
465        let metadata_cache_size = cmp::min(
466            sys_memory / SST_META_CACHE_SIZE_FACTOR,
467            ReadableSize::mb(64),
468        );
469        self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
470    }
471}
472
473/// Operational mode for certain actions.
474#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
475#[serde(rename_all = "snake_case")]
476pub enum Mode {
477    /// The action is performed automatically based on internal criteria.
478    #[default]
479    Auto,
480    /// The action is explicitly disabled.
481    Disable,
482}
483
484impl Mode {
485    /// Whether the action is disabled.
486    pub fn disabled(&self) -> bool {
487        matches!(self, Mode::Disable)
488    }
489
490    /// Whether the action is automatic.
491    pub fn auto(&self) -> bool {
492        matches!(self, Mode::Auto)
493    }
494}
495
496/// Memory threshold for performing certain actions.
497#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
498#[serde(rename_all = "snake_case")]
499pub enum MemoryThreshold {
500    /// Automatically determine the threshold based on internal criteria.
501    #[default]
502    Auto,
503    /// Unlimited memory.
504    Unlimited,
505    /// Fixed memory threshold.
506    #[serde(untagged)]
507    Size(ReadableSize),
508}
509
510/// Configuration options for the inverted index.
511#[serde_as]
512#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
513#[serde(default)]
514pub struct InvertedIndexConfig {
515    /// Whether to create the index on flush: automatically or never.
516    pub create_on_flush: Mode,
517    /// Whether to create the index on compaction: automatically or never.
518    pub create_on_compaction: Mode,
519    /// Whether to apply the index on query: automatically or never.
520    pub apply_on_query: Mode,
521
522    /// Memory threshold for performing an external sort during index creation.
523    pub mem_threshold_on_create: MemoryThreshold,
524
525    #[deprecated = "use [IndexConfig::aux_path] instead"]
526    #[serde(skip_serializing)]
527    pub intermediate_path: String,
528
529    #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
530    #[serde(skip_serializing)]
531    pub write_buffer_size: ReadableSize,
532}
533
534impl Default for InvertedIndexConfig {
535    #[allow(deprecated)]
536    fn default() -> Self {
537        Self {
538            create_on_flush: Mode::Auto,
539            create_on_compaction: Mode::Auto,
540            apply_on_query: Mode::Auto,
541            mem_threshold_on_create: MemoryThreshold::Auto,
542            write_buffer_size: ReadableSize::mb(8),
543            intermediate_path: String::new(),
544        }
545    }
546}
547
548impl InvertedIndexConfig {
549    pub fn mem_threshold_on_create(&self) -> Option<usize> {
550        match self.mem_threshold_on_create {
551            MemoryThreshold::Auto => {
552                if let Some(sys_memory) = get_total_memory_readable() {
553                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
554                } else {
555                    Some(ReadableSize::mb(64).as_bytes() as usize)
556                }
557            }
558            MemoryThreshold::Unlimited => None,
559            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
560        }
561    }
562}
563
564/// Configuration options for the full-text index.
565#[serde_as]
566#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
567#[serde(default)]
568pub struct FulltextIndexConfig {
569    /// Whether to create the index on flush: automatically or never.
570    pub create_on_flush: Mode,
571    /// Whether to create the index on compaction: automatically or never.
572    pub create_on_compaction: Mode,
573    /// Whether to apply the index on query: automatically or never.
574    pub apply_on_query: Mode,
575    /// Memory threshold for creating the index.
576    pub mem_threshold_on_create: MemoryThreshold,
577    /// Whether to compress the index data.
578    pub compress: bool,
579}
580
581impl Default for FulltextIndexConfig {
582    fn default() -> Self {
583        Self {
584            create_on_flush: Mode::Auto,
585            create_on_compaction: Mode::Auto,
586            apply_on_query: Mode::Auto,
587            mem_threshold_on_create: MemoryThreshold::Auto,
588            compress: true,
589        }
590    }
591}
592
593impl FulltextIndexConfig {
594    pub fn mem_threshold_on_create(&self) -> usize {
595        match self.mem_threshold_on_create {
596            MemoryThreshold::Auto => {
597                if let Some(sys_memory) = get_total_memory_readable() {
598                    (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
599                } else {
600                    ReadableSize::mb(64).as_bytes() as _
601                }
602            }
603            MemoryThreshold::Unlimited => usize::MAX,
604            MemoryThreshold::Size(size) => size.as_bytes() as _,
605        }
606    }
607}
608
609/// Configuration options for the bloom filter.
610#[serde_as]
611#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
612#[serde(default)]
613pub struct BloomFilterConfig {
614    /// Whether to create the index on flush: automatically or never.
615    pub create_on_flush: Mode,
616    /// Whether to create the index on compaction: automatically or never.
617    pub create_on_compaction: Mode,
618    /// Whether to apply the index on query: automatically or never.
619    pub apply_on_query: Mode,
620    /// Memory threshold for creating the index.
621    pub mem_threshold_on_create: MemoryThreshold,
622}
623
624impl Default for BloomFilterConfig {
625    fn default() -> Self {
626        Self {
627            create_on_flush: Mode::Auto,
628            create_on_compaction: Mode::Auto,
629            apply_on_query: Mode::Auto,
630            mem_threshold_on_create: MemoryThreshold::Auto,
631        }
632    }
633}
634
635impl BloomFilterConfig {
636    pub fn mem_threshold_on_create(&self) -> Option<usize> {
637        match self.mem_threshold_on_create {
638            MemoryThreshold::Auto => {
639                if let Some(sys_memory) = get_total_memory_readable() {
640                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
641                } else {
642                    Some(ReadableSize::mb(64).as_bytes() as usize)
643                }
644            }
645            MemoryThreshold::Unlimited => None,
646            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
647        }
648    }
649}
650
651/// Configuration options for the vector index (HNSW).
652#[cfg(feature = "vector_index")]
653#[serde_as]
654#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
655#[serde(default)]
656pub struct VectorIndexConfig {
657    /// Whether to create the index on flush: automatically or never.
658    pub create_on_flush: Mode,
659    /// Whether to create the index on compaction: automatically or never.
660    pub create_on_compaction: Mode,
661    /// Whether to apply the index on query: automatically or never.
662    pub apply_on_query: Mode,
663    /// Memory threshold for creating the index.
664    pub mem_threshold_on_create: MemoryThreshold,
665}
666
667#[cfg(feature = "vector_index")]
668impl Default for VectorIndexConfig {
669    fn default() -> Self {
670        Self {
671            create_on_flush: Mode::Auto,
672            create_on_compaction: Mode::Auto,
673            apply_on_query: Mode::Auto,
674            mem_threshold_on_create: MemoryThreshold::Auto,
675        }
676    }
677}
678
679#[cfg(feature = "vector_index")]
680impl VectorIndexConfig {
681    pub fn mem_threshold_on_create(&self) -> Option<usize> {
682        match self.mem_threshold_on_create {
683            MemoryThreshold::Auto => {
684                if let Some(sys_memory) = get_total_memory_readable() {
685                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
686                } else {
687                    Some(ReadableSize::mb(64).as_bytes() as usize)
688                }
689            }
690            MemoryThreshold::Unlimited => None,
691            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
692        }
693    }
694}
695
696/// Divide cpu num by a non-zero `divisor` and returns at least 1.
697fn divide_num_cpus(divisor: usize) -> usize {
698    debug_assert!(divisor > 0);
699    let cores = get_total_cpu_cores();
700    debug_assert!(cores > 0);
701
702    cores.div_ceil(divisor)
703}
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708
709    #[test]
710    fn test_deserialize_config() {
711        let s = r#"
712[memtable]
713type = "partition_tree"
714index_max_keys_per_shard = 8192
715data_freeze_threshold = 1024
716dedup = true
717fork_dictionary_bytes = "512MiB"
718"#;
719        let config: MitoConfig = toml::from_str(s).unwrap();
720        let MemtableConfig::PartitionTree(config) = &config.memtable else {
721            unreachable!()
722        };
723        assert_eq!(1024, config.data_freeze_threshold);
724    }
725}