Skip to main content

mito2/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configurations.
16
17use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::memory_limit::MemoryLimit;
22use common_base::readable_size::ReadableSize;
23use common_memory_manager::OnExhaustedPolicy;
24use common_stat::{get_total_cpu_cores, get_total_memory_readable};
25use common_telemetry::warn;
26use serde::{Deserialize, Serialize};
27use serde_with::serde_as;
28
29use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
30use crate::error::Result;
31use crate::gc::GcConfig;
32use crate::memtable::MemtableConfig;
33use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
34
35const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
36/// Default channel size for parallel scan task.
37pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
38/// Default maximum number of SST files to scan concurrently.
39pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
40
41// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
42const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
43/// Use `1/SST_META_CACHE_SIZE_FACTOR` of OS memory size as SST meta cache size in default mode
44const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
45/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
46const MEM_CACHE_SIZE_FACTOR: u64 = 16;
47/// Use `1/PAGE_CACHE_SIZE_FACTOR` of OS memory size as page cache size in default mode
48const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
49/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
50const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
51
52/// Fetch option timeout
53pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
54
55/// Configuration for [MitoEngine](crate::engine::MitoEngine).
56/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
57#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
58#[serde(default)]
59pub struct MitoConfig {
60    // Worker configs:
61    /// Number of region workers (default: 1/2 of cpu cores).
62    /// Sets to 0 to use the default value.
63    pub num_workers: usize,
64    /// Request channel size of each worker (default 128).
65    pub worker_channel_size: usize,
66    /// Max batch size for a worker to handle requests (default 64).
67    pub worker_request_batch_size: usize,
68
69    // Manifest configs:
70    /// Number of meta action updated to trigger a new checkpoint
71    /// for the manifest (default 10).
72    pub manifest_checkpoint_distance: u64,
73    /// Number of removed files to keep in manifest's `removed_files` field before also
74    /// remove them from `removed_files`. Mostly for debugging purpose.
75    /// If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
76    /// from `removed_files` field.
77    pub experimental_manifest_keep_removed_file_count: usize,
78    /// How long to keep removed files in the `removed_files` field of manifest
79    /// after they are removed from manifest.
80    /// files will only be removed from `removed_files` field
81    /// if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached.
82    #[serde(with = "humantime_serde")]
83    pub experimental_manifest_keep_removed_file_ttl: Duration,
84    /// Whether to compress manifest and checkpoint file by gzip (default false).
85    pub compress_manifest: bool,
86
87    // Background job configs:
88    /// Max number of running background index build jobs (default: 1/8 of cpu cores).
89    pub max_background_index_builds: usize,
90    /// Max number of running background flush jobs (default: 1/2 of cpu cores).
91    pub max_background_flushes: usize,
92    /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
93    pub max_background_compactions: usize,
94    /// Max number of running background purge jobs (default: number of cpu cores).
95    pub max_background_purges: usize,
96    /// Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
97    pub experimental_compaction_memory_limit: MemoryLimit,
98    /// Behavior when compaction cannot acquire memory from the budget.
99    pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
100
101    // Flush configs:
102    /// Interval to auto flush a region if it has not flushed yet (default 30 min).
103    #[serde(with = "humantime_serde")]
104    pub auto_flush_interval: Duration,
105    /// Global write buffer size threshold to trigger flush.
106    pub global_write_buffer_size: ReadableSize,
107    /// Global write buffer size threshold to reject write requests.
108    pub global_write_buffer_reject_size: ReadableSize,
109
110    // Cache configs:
111    /// Cache size for SST metadata. Setting it to 0 to disable the cache.
112    pub sst_meta_cache_size: ReadableSize,
113    /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
114    pub vector_cache_size: ReadableSize,
115    /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
116    pub page_cache_size: ReadableSize,
117    /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
118    pub selector_result_cache_size: ReadableSize,
119    /// Cache size for flat range scan results. Setting it to 0 to disable the cache.
120    pub range_result_cache_size: ReadableSize,
121    /// Whether to enable the write cache.
122    pub enable_write_cache: bool,
123    /// File system path for write cache dir's root, defaults to `{data_home}`.
124    pub write_cache_path: String,
125    /// Capacity for write cache.
126    pub write_cache_size: ReadableSize,
127    /// TTL for write cache.
128    #[serde(with = "humantime_serde")]
129    pub write_cache_ttl: Option<Duration>,
130    /// Preload index (puffin) files into cache on region open (default: true).
131    pub preload_index_cache: bool,
132    /// Percentage of write cache capacity allocated for index (puffin) files (default: 20).
133    /// The remaining capacity is used for data (parquet) files.
134    /// Must be between 0 and 100 (exclusive).
135    pub index_cache_percent: u8,
136    /// Enable background downloading of files to the local cache when accessed during queries (default: true).
137    /// When enabled, files will be asynchronously downloaded to improve performance for subsequent reads.
138    pub enable_refill_cache_on_read: bool,
139    /// Capacity for manifest cache (default: 256MB).
140    pub manifest_cache_size: ReadableSize,
141
142    // Other configs:
143    /// Buffer size for SST writing.
144    pub sst_write_buffer_size: ReadableSize,
145    /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
146    pub parallel_scan_channel_size: usize,
147    /// Maximum number of SST files to scan concurrently (default 384).
148    pub max_concurrent_scan_files: usize,
149    /// Whether to allow stale entries read during replay.
150    pub allow_stale_entries: bool,
151    /// Memory limit for table scans across all queries. Setting it to 0 disables the limit.
152    /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
153    pub scan_memory_limit: MemoryLimit,
154    /// Behavior when scan memory tracking cannot acquire memory from the budget.
155    /// `wait` means `wait(10s)`, not unlimited waiting.
156    /// Defaults to [`OnExhaustedPolicy::Fail`], which intentionally differs from
157    /// [`OnExhaustedPolicy::default()`].
158    pub scan_memory_on_exhausted: OnExhaustedPolicy,
159
160    /// Index configs.
161    pub index: IndexConfig,
162    /// Inverted index configs.
163    pub inverted_index: InvertedIndexConfig,
164    /// Full-text index configs.
165    pub fulltext_index: FulltextIndexConfig,
166    /// Bloom filter index configs.
167    pub bloom_filter_index: BloomFilterConfig,
168    /// Vector index configs (HNSW).
169    #[cfg(feature = "vector_index")]
170    pub vector_index: VectorIndexConfig,
171
172    /// Memtable config
173    pub memtable: MemtableConfig,
174
175    /// Minimum time interval between two compactions.
176    /// To align with the old behavior, the default value is 0 (no restrictions).
177    #[serde(with = "humantime_serde")]
178    pub min_compaction_interval: Duration,
179
180    /// Whether to enable experimental flat format as the default format.
181    /// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
182    pub default_experimental_flat_format: bool,
183
184    pub gc: GcConfig,
185}
186
187impl Default for MitoConfig {
188    fn default() -> Self {
189        let mut mito_config = MitoConfig {
190            num_workers: divide_num_cpus(2),
191            worker_channel_size: 128,
192            worker_request_batch_size: 64,
193            manifest_checkpoint_distance: 10,
194            experimental_manifest_keep_removed_file_count: 256,
195            experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
196            compress_manifest: false,
197            max_background_index_builds: divide_num_cpus(8),
198            max_background_flushes: divide_num_cpus(2),
199            max_background_compactions: divide_num_cpus(4),
200            max_background_purges: get_total_cpu_cores(),
201            experimental_compaction_memory_limit: MemoryLimit::Unlimited,
202            experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
203            auto_flush_interval: Duration::from_secs(30 * 60),
204            global_write_buffer_size: ReadableSize::gb(1),
205            global_write_buffer_reject_size: ReadableSize::gb(2),
206            sst_meta_cache_size: ReadableSize::mb(128),
207            vector_cache_size: ReadableSize::mb(512),
208            page_cache_size: ReadableSize::mb(512),
209            selector_result_cache_size: ReadableSize::mb(512),
210            range_result_cache_size: ReadableSize::mb(512),
211            enable_write_cache: false,
212            write_cache_path: String::new(),
213            write_cache_size: ReadableSize::gb(5),
214            write_cache_ttl: None,
215            preload_index_cache: true,
216            index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
217            enable_refill_cache_on_read: true,
218            manifest_cache_size: ReadableSize::mb(256),
219            sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
220            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
221            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
222            allow_stale_entries: false,
223            scan_memory_limit: MemoryLimit::default(),
224            scan_memory_on_exhausted: OnExhaustedPolicy::Fail,
225            index: IndexConfig::default(),
226            inverted_index: InvertedIndexConfig::default(),
227            fulltext_index: FulltextIndexConfig::default(),
228            bloom_filter_index: BloomFilterConfig::default(),
229            #[cfg(feature = "vector_index")]
230            vector_index: VectorIndexConfig::default(),
231            memtable: MemtableConfig::default(),
232            min_compaction_interval: Duration::from_secs(0),
233            default_experimental_flat_format: false,
234            gc: GcConfig::default(),
235        };
236
237        // Adjust buffer and cache size according to system memory if we can.
238        if let Some(sys_memory) = get_total_memory_readable() {
239            mito_config.adjust_buffer_and_cache_size(sys_memory);
240        }
241
242        mito_config
243    }
244}
245
246impl MitoConfig {
247    /// Sanitize incorrect configurations.
248    ///
249    /// Returns an error if there is a configuration that unable to sanitize.
250    pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
251        // Use default value if `num_workers` is 0.
252        if self.num_workers == 0 {
253            self.num_workers = divide_num_cpus(2);
254        }
255
256        // Sanitize channel size.
257        if self.worker_channel_size == 0 {
258            warn!("Sanitize channel size 0 to 1");
259            self.worker_channel_size = 1;
260        }
261
262        if self.max_background_flushes == 0 {
263            warn!(
264                "Sanitize max background flushes 0 to {}",
265                divide_num_cpus(2)
266            );
267            self.max_background_flushes = divide_num_cpus(2);
268        }
269        if self.max_background_compactions == 0 {
270            warn!(
271                "Sanitize max background compactions 0 to {}",
272                divide_num_cpus(4)
273            );
274            self.max_background_compactions = divide_num_cpus(4);
275        }
276        if self.max_background_purges == 0 {
277            let cpu_cores = get_total_cpu_cores();
278            warn!("Sanitize max background purges 0 to {}", cpu_cores);
279            self.max_background_purges = cpu_cores;
280        }
281
282        if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
283            self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
284            warn!(
285                "Sanitize global write buffer reject size to {}",
286                self.global_write_buffer_reject_size
287            );
288        }
289
290        if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
291            self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
292            warn!(
293                "Sanitize sst write buffer size to {}",
294                self.sst_write_buffer_size
295            );
296        }
297
298        if self.parallel_scan_channel_size < 1 {
299            self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
300            warn!(
301                "Sanitize scan channel size to {}",
302                self.parallel_scan_channel_size
303            );
304        }
305
306        // Sets write cache path if it is empty.
307        if self.write_cache_path.trim().is_empty() {
308            self.write_cache_path = data_home.to_string();
309        }
310
311        // Validate index_cache_percent is within valid range (0, 100)
312        if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
313            warn!(
314                "Invalid index_cache_percent {}, resetting to default {}",
315                self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
316            );
317            self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
318        }
319
320        self.index.sanitize(data_home, &self.inverted_index)?;
321
322        Ok(())
323    }
324
325    fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
326        // shouldn't be greater than 1G in default mode.
327        let global_write_buffer_size = cmp::min(
328            sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
329            ReadableSize::gb(1),
330        );
331        // Use 2x of global write buffer size as global write buffer reject size.
332        let global_write_buffer_reject_size = global_write_buffer_size * 2;
333        // shouldn't be greater than 128MB in default mode.
334        let sst_meta_cache_size = cmp::min(
335            sys_memory / SST_META_CACHE_SIZE_FACTOR,
336            ReadableSize::mb(128),
337        );
338        // shouldn't be greater than 512MB in default mode.
339        let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
340        let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
341
342        self.global_write_buffer_size = global_write_buffer_size;
343        self.global_write_buffer_reject_size = global_write_buffer_reject_size;
344        self.sst_meta_cache_size = sst_meta_cache_size;
345        self.vector_cache_size = mem_cache_size;
346        self.page_cache_size = page_cache_size;
347        self.selector_result_cache_size = mem_cache_size;
348        self.range_result_cache_size = mem_cache_size;
349
350        self.index.adjust_buffer_and_cache_size(sys_memory);
351    }
352
353    /// Enable write cache.
354    #[cfg(test)]
355    pub fn enable_write_cache(
356        mut self,
357        path: String,
358        size: ReadableSize,
359        ttl: Option<Duration>,
360    ) -> Self {
361        self.enable_write_cache = true;
362        self.write_cache_path = path;
363        self.write_cache_size = size;
364        self.write_cache_ttl = ttl;
365        self
366    }
367}
368
369/// Index build mode.
370#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
371#[serde(rename_all = "snake_case")]
372pub enum IndexBuildMode {
373    /// Build index synchronously.
374    #[default]
375    Sync,
376    /// Build index asynchronously.
377    Async,
378}
379
380#[serde_as]
381#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
382#[serde(default)]
383pub struct IndexConfig {
384    /// Auxiliary directory path for the index in filesystem, used to
385    /// store intermediate files for creating the index and staging files
386    /// for searching the index, defaults to `{data_home}/index_intermediate`.
387    ///
388    /// This path contains two subdirectories:
389    /// - `__intm`: for storing intermediate files used during creating index.
390    /// - `staging`: for storing staging files used during searching index.
391    ///
392    /// The default name for this directory is `index_intermediate` for backward compatibility.
393    pub aux_path: String,
394
395    /// The max capacity of the staging directory.
396    pub staging_size: ReadableSize,
397    /// The TTL of the staging directory.
398    /// Defaults to 7 days.
399    /// Setting it to "0s" to disable TTL.
400    #[serde(with = "humantime_serde")]
401    pub staging_ttl: Option<Duration>,
402
403    /// Index Build Mode
404    pub build_mode: IndexBuildMode,
405
406    /// Write buffer size for creating the index.
407    pub write_buffer_size: ReadableSize,
408
409    /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
410    pub metadata_cache_size: ReadableSize,
411    /// Cache size for inverted index content. Setting it to 0 to disable the cache.
412    pub content_cache_size: ReadableSize,
413    /// Page size for inverted index content.
414    pub content_cache_page_size: ReadableSize,
415    /// Cache size for index result. Setting it to 0 to disable the cache.
416    pub result_cache_size: ReadableSize,
417}
418
419impl Default for IndexConfig {
420    fn default() -> Self {
421        Self {
422            aux_path: String::new(),
423            staging_size: ReadableSize::gb(2),
424            staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
425            build_mode: IndexBuildMode::default(),
426            write_buffer_size: ReadableSize::mb(8),
427            metadata_cache_size: ReadableSize::mb(64),
428            content_cache_size: ReadableSize::mb(128),
429            content_cache_page_size: ReadableSize::kb(64),
430            result_cache_size: ReadableSize::mb(128),
431        }
432    }
433}
434
435impl IndexConfig {
436    pub fn sanitize(
437        &mut self,
438        data_home: &str,
439        inverted_index: &InvertedIndexConfig,
440    ) -> Result<()> {
441        #[allow(deprecated)]
442        if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
443            self.aux_path.clone_from(&inverted_index.intermediate_path);
444            warn!(
445                "`inverted_index.intermediate_path` is deprecated, use
446                 `index.aux_path` instead. Set `index.aux_path` to {}",
447                &inverted_index.intermediate_path
448            )
449        }
450        if self.aux_path.is_empty() {
451            let path = Path::new(data_home).join("index_intermediate");
452            self.aux_path = path.as_os_str().to_string_lossy().to_string();
453        }
454
455        if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
456            self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
457            warn!(
458                "Sanitize index write buffer size to {}",
459                self.write_buffer_size
460            );
461        }
462
463        if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
464            self.staging_ttl = None;
465        }
466
467        Ok(())
468    }
469
470    pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
471        let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
472        self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
473        self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
474
475        let metadata_cache_size = cmp::min(
476            sys_memory / SST_META_CACHE_SIZE_FACTOR,
477            ReadableSize::mb(64),
478        );
479        self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
480    }
481}
482
483/// Operational mode for certain actions.
484#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
485#[serde(rename_all = "snake_case")]
486pub enum Mode {
487    /// The action is performed automatically based on internal criteria.
488    #[default]
489    Auto,
490    /// The action is explicitly disabled.
491    Disable,
492}
493
494impl Mode {
495    /// Whether the action is disabled.
496    pub fn disabled(&self) -> bool {
497        matches!(self, Mode::Disable)
498    }
499
500    /// Whether the action is automatic.
501    pub fn auto(&self) -> bool {
502        matches!(self, Mode::Auto)
503    }
504}
505
506/// Memory threshold for performing certain actions.
507#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
508#[serde(rename_all = "snake_case")]
509pub enum MemoryThreshold {
510    /// Automatically determine the threshold based on internal criteria.
511    #[default]
512    Auto,
513    /// Unlimited memory.
514    Unlimited,
515    /// Fixed memory threshold.
516    #[serde(untagged)]
517    Size(ReadableSize),
518}
519
520/// Configuration options for the inverted index.
521#[serde_as]
522#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
523#[serde(default)]
524pub struct InvertedIndexConfig {
525    /// Whether to create the index on flush: automatically or never.
526    pub create_on_flush: Mode,
527    /// Whether to create the index on compaction: automatically or never.
528    pub create_on_compaction: Mode,
529    /// Whether to apply the index on query: automatically or never.
530    pub apply_on_query: Mode,
531
532    /// Memory threshold for performing an external sort during index creation.
533    pub mem_threshold_on_create: MemoryThreshold,
534
535    #[deprecated = "use [IndexConfig::aux_path] instead"]
536    #[serde(skip_serializing)]
537    pub intermediate_path: String,
538
539    #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
540    #[serde(skip_serializing)]
541    pub write_buffer_size: ReadableSize,
542}
543
544impl Default for InvertedIndexConfig {
545    #[allow(deprecated)]
546    fn default() -> Self {
547        Self {
548            create_on_flush: Mode::Auto,
549            create_on_compaction: Mode::Auto,
550            apply_on_query: Mode::Auto,
551            mem_threshold_on_create: MemoryThreshold::Auto,
552            write_buffer_size: ReadableSize::mb(8),
553            intermediate_path: String::new(),
554        }
555    }
556}
557
558impl InvertedIndexConfig {
559    pub fn mem_threshold_on_create(&self) -> Option<usize> {
560        match self.mem_threshold_on_create {
561            MemoryThreshold::Auto => {
562                if let Some(sys_memory) = get_total_memory_readable() {
563                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
564                } else {
565                    Some(ReadableSize::mb(64).as_bytes() as usize)
566                }
567            }
568            MemoryThreshold::Unlimited => None,
569            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
570        }
571    }
572}
573
574/// Configuration options for the full-text index.
575#[serde_as]
576#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
577#[serde(default)]
578pub struct FulltextIndexConfig {
579    /// Whether to create the index on flush: automatically or never.
580    pub create_on_flush: Mode,
581    /// Whether to create the index on compaction: automatically or never.
582    pub create_on_compaction: Mode,
583    /// Whether to apply the index on query: automatically or never.
584    pub apply_on_query: Mode,
585    /// Memory threshold for creating the index.
586    pub mem_threshold_on_create: MemoryThreshold,
587    /// Whether to compress the index data.
588    pub compress: bool,
589}
590
591impl Default for FulltextIndexConfig {
592    fn default() -> Self {
593        Self {
594            create_on_flush: Mode::Auto,
595            create_on_compaction: Mode::Auto,
596            apply_on_query: Mode::Auto,
597            mem_threshold_on_create: MemoryThreshold::Auto,
598            compress: true,
599        }
600    }
601}
602
603impl FulltextIndexConfig {
604    pub fn mem_threshold_on_create(&self) -> usize {
605        match self.mem_threshold_on_create {
606            MemoryThreshold::Auto => {
607                if let Some(sys_memory) = get_total_memory_readable() {
608                    (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
609                } else {
610                    ReadableSize::mb(64).as_bytes() as _
611                }
612            }
613            MemoryThreshold::Unlimited => usize::MAX,
614            MemoryThreshold::Size(size) => size.as_bytes() as _,
615        }
616    }
617}
618
619/// Configuration options for the bloom filter.
620#[serde_as]
621#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
622#[serde(default)]
623pub struct BloomFilterConfig {
624    /// Whether to create the index on flush: automatically or never.
625    pub create_on_flush: Mode,
626    /// Whether to create the index on compaction: automatically or never.
627    pub create_on_compaction: Mode,
628    /// Whether to apply the index on query: automatically or never.
629    pub apply_on_query: Mode,
630    /// Memory threshold for creating the index.
631    pub mem_threshold_on_create: MemoryThreshold,
632}
633
634impl Default for BloomFilterConfig {
635    fn default() -> Self {
636        Self {
637            create_on_flush: Mode::Auto,
638            create_on_compaction: Mode::Auto,
639            apply_on_query: Mode::Auto,
640            mem_threshold_on_create: MemoryThreshold::Auto,
641        }
642    }
643}
644
645impl BloomFilterConfig {
646    pub fn mem_threshold_on_create(&self) -> Option<usize> {
647        match self.mem_threshold_on_create {
648            MemoryThreshold::Auto => {
649                if let Some(sys_memory) = get_total_memory_readable() {
650                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
651                } else {
652                    Some(ReadableSize::mb(64).as_bytes() as usize)
653                }
654            }
655            MemoryThreshold::Unlimited => None,
656            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
657        }
658    }
659}
660
661/// Configuration options for the vector index (HNSW).
662#[cfg(feature = "vector_index")]
663#[serde_as]
664#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
665#[serde(default)]
666pub struct VectorIndexConfig {
667    /// Whether to create the index on flush: automatically or never.
668    pub create_on_flush: Mode,
669    /// Whether to create the index on compaction: automatically or never.
670    pub create_on_compaction: Mode,
671    /// Whether to apply the index on query: automatically or never.
672    pub apply_on_query: Mode,
673    /// Memory threshold for creating the index.
674    pub mem_threshold_on_create: MemoryThreshold,
675}
676
677#[cfg(feature = "vector_index")]
678impl Default for VectorIndexConfig {
679    fn default() -> Self {
680        Self {
681            create_on_flush: Mode::Auto,
682            create_on_compaction: Mode::Auto,
683            apply_on_query: Mode::Auto,
684            mem_threshold_on_create: MemoryThreshold::Auto,
685        }
686    }
687}
688
689#[cfg(feature = "vector_index")]
690impl VectorIndexConfig {
691    pub fn mem_threshold_on_create(&self) -> Option<usize> {
692        match self.mem_threshold_on_create {
693            MemoryThreshold::Auto => {
694                if let Some(sys_memory) = get_total_memory_readable() {
695                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
696                } else {
697                    Some(ReadableSize::mb(64).as_bytes() as usize)
698                }
699            }
700            MemoryThreshold::Unlimited => None,
701            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
702        }
703    }
704}
705
706/// Divide cpu num by a non-zero `divisor` and returns at least 1.
707fn divide_num_cpus(divisor: usize) -> usize {
708    debug_assert!(divisor > 0);
709    let cores = get_total_cpu_cores();
710    debug_assert!(cores > 0);
711
712    cores.div_ceil(divisor)
713}
714
715#[cfg(test)]
716mod tests {
717    use super::*;
718
719    #[test]
720    fn test_deserialize_config() {
721        let s = r#"
722[memtable]
723type = "partition_tree"
724index_max_keys_per_shard = 8192
725data_freeze_threshold = 1024
726dedup = true
727fork_dictionary_bytes = "512MiB"
728"#;
729        let config: MitoConfig = toml::from_str(s).unwrap();
730        let MemtableConfig::PartitionTree(config) = &config.memtable else {
731            unreachable!()
732        };
733        assert_eq!(1024, config.data_freeze_threshold);
734    }
735}