mito2/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configurations.
16
17use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31/// Default channel size for parallel scan task.
32pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33/// Default maximum number of SST files to scan concurrently.
34pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
35
36// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
37const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
38/// Use `1/SST_META_CACHE_SIZE_FACTOR` of OS memory size as SST meta cache size in default mode
39const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
40/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
41const MEM_CACHE_SIZE_FACTOR: u64 = 16;
42/// Use `1/PAGE_CACHE_SIZE_FACTOR` of OS memory size as page cache size in default mode
43const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
44/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
45const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
46
47/// Fetch option timeout
48pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
49
50/// Configuration for [MitoEngine](crate::engine::MitoEngine).
51/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
52#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
53#[serde(default)]
54pub struct MitoConfig {
55    // Worker configs:
56    /// Number of region workers (default: 1/2 of cpu cores).
57    /// Sets to 0 to use the default value.
58    pub num_workers: usize,
59    /// Request channel size of each worker (default 128).
60    pub worker_channel_size: usize,
61    /// Max batch size for a worker to handle requests (default 64).
62    pub worker_request_batch_size: usize,
63
64    // Manifest configs:
65    /// Number of meta action updated to trigger a new checkpoint
66    /// for the manifest (default 10).
67    pub manifest_checkpoint_distance: u64,
68    /// Number of removed files to keep in manifest's `removed_files` field before also
69    /// remove them from `removed_files`. Mostly for debugging purpose.
70    /// If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
71    /// from `removed_files` field.
72    pub experimental_manifest_keep_removed_file_count: usize,
73    /// How long to keep removed files in the `removed_files` field of manifest
74    /// after they are removed from manifest.
75    /// files will only be removed from `removed_files` field
76    /// if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached.
77    #[serde(with = "humantime_serde")]
78    pub experimental_manifest_keep_removed_file_ttl: Duration,
79    /// Whether to compress manifest and checkpoint file by gzip (default false).
80    pub compress_manifest: bool,
81
82    // Background job configs:
83    /// Max number of running background flush jobs (default: 1/2 of cpu cores).
84    pub max_background_flushes: usize,
85    /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
86    pub max_background_compactions: usize,
87    /// Max number of running background purge jobs (default: number of cpu cores).
88    pub max_background_purges: usize,
89
90    // Flush configs:
91    /// Interval to auto flush a region if it has not flushed yet (default 30 min).
92    #[serde(with = "humantime_serde")]
93    pub auto_flush_interval: Duration,
94    /// Global write buffer size threshold to trigger flush.
95    pub global_write_buffer_size: ReadableSize,
96    /// Global write buffer size threshold to reject write requests.
97    pub global_write_buffer_reject_size: ReadableSize,
98
99    // Cache configs:
100    /// Cache size for SST metadata. Setting it to 0 to disable the cache.
101    pub sst_meta_cache_size: ReadableSize,
102    /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
103    pub vector_cache_size: ReadableSize,
104    /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
105    pub page_cache_size: ReadableSize,
106    /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
107    pub selector_result_cache_size: ReadableSize,
108    /// Whether to enable the write cache.
109    pub enable_write_cache: bool,
110    /// File system path for write cache dir's root, defaults to `{data_home}`.
111    pub write_cache_path: String,
112    /// Capacity for write cache.
113    pub write_cache_size: ReadableSize,
114    /// TTL for write cache.
115    #[serde(with = "humantime_serde")]
116    pub write_cache_ttl: Option<Duration>,
117
118    // Other configs:
119    /// Buffer size for SST writing.
120    pub sst_write_buffer_size: ReadableSize,
121    /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
122    pub parallel_scan_channel_size: usize,
123    /// Maximum number of SST files to scan concurrently (default 384).
124    pub max_concurrent_scan_files: usize,
125    /// Whether to allow stale entries read during replay.
126    pub allow_stale_entries: bool,
127
128    /// Index configs.
129    pub index: IndexConfig,
130    /// Inverted index configs.
131    pub inverted_index: InvertedIndexConfig,
132    /// Full-text index configs.
133    pub fulltext_index: FulltextIndexConfig,
134    /// Bloom filter index configs.
135    pub bloom_filter_index: BloomFilterConfig,
136
137    /// Memtable config
138    pub memtable: MemtableConfig,
139
140    /// Minimum time interval between two compactions.
141    /// To align with the old behavior, the default value is 0 (no restrictions).
142    #[serde(with = "humantime_serde")]
143    pub min_compaction_interval: Duration,
144}
145
146impl Default for MitoConfig {
147    fn default() -> Self {
148        let mut mito_config = MitoConfig {
149            num_workers: divide_num_cpus(2),
150            worker_channel_size: 128,
151            worker_request_batch_size: 64,
152            manifest_checkpoint_distance: 10,
153            experimental_manifest_keep_removed_file_count: 256,
154            experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
155            compress_manifest: false,
156            max_background_flushes: divide_num_cpus(2),
157            max_background_compactions: divide_num_cpus(4),
158            max_background_purges: common_config::utils::get_cpus(),
159            auto_flush_interval: Duration::from_secs(30 * 60),
160            global_write_buffer_size: ReadableSize::gb(1),
161            global_write_buffer_reject_size: ReadableSize::gb(2),
162            sst_meta_cache_size: ReadableSize::mb(128),
163            vector_cache_size: ReadableSize::mb(512),
164            page_cache_size: ReadableSize::mb(512),
165            selector_result_cache_size: ReadableSize::mb(512),
166            enable_write_cache: false,
167            write_cache_path: String::new(),
168            write_cache_size: ReadableSize::gb(5),
169            write_cache_ttl: None,
170            sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
171            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
172            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
173            allow_stale_entries: false,
174            index: IndexConfig::default(),
175            inverted_index: InvertedIndexConfig::default(),
176            fulltext_index: FulltextIndexConfig::default(),
177            bloom_filter_index: BloomFilterConfig::default(),
178            memtable: MemtableConfig::default(),
179            min_compaction_interval: Duration::from_secs(0),
180        };
181
182        // Adjust buffer and cache size according to system memory if we can.
183        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
184            mito_config.adjust_buffer_and_cache_size(sys_memory);
185        }
186
187        mito_config
188    }
189}
190
191impl MitoConfig {
192    /// Sanitize incorrect configurations.
193    ///
194    /// Returns an error if there is a configuration that unable to sanitize.
195    pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
196        // Use default value if `num_workers` is 0.
197        if self.num_workers == 0 {
198            self.num_workers = divide_num_cpus(2);
199        }
200
201        // Sanitize channel size.
202        if self.worker_channel_size == 0 {
203            warn!("Sanitize channel size 0 to 1");
204            self.worker_channel_size = 1;
205        }
206
207        if self.max_background_flushes == 0 {
208            warn!(
209                "Sanitize max background flushes 0 to {}",
210                divide_num_cpus(2)
211            );
212            self.max_background_flushes = divide_num_cpus(2);
213        }
214        if self.max_background_compactions == 0 {
215            warn!(
216                "Sanitize max background compactions 0 to {}",
217                divide_num_cpus(4)
218            );
219            self.max_background_compactions = divide_num_cpus(4);
220        }
221        if self.max_background_purges == 0 {
222            warn!(
223                "Sanitize max background purges 0 to {}",
224                common_config::utils::get_cpus()
225            );
226            self.max_background_purges = common_config::utils::get_cpus();
227        }
228
229        if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
230            self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
231            warn!(
232                "Sanitize global write buffer reject size to {}",
233                self.global_write_buffer_reject_size
234            );
235        }
236
237        if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
238            self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
239            warn!(
240                "Sanitize sst write buffer size to {}",
241                self.sst_write_buffer_size
242            );
243        }
244
245        if self.parallel_scan_channel_size < 1 {
246            self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
247            warn!(
248                "Sanitize scan channel size to {}",
249                self.parallel_scan_channel_size
250            );
251        }
252
253        // Sets write cache path if it is empty.
254        if self.write_cache_path.trim().is_empty() {
255            self.write_cache_path = data_home.to_string();
256        }
257
258        self.index.sanitize(data_home, &self.inverted_index)?;
259
260        Ok(())
261    }
262
263    fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
264        // shouldn't be greater than 1G in default mode.
265        let global_write_buffer_size = cmp::min(
266            sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
267            ReadableSize::gb(1),
268        );
269        // Use 2x of global write buffer size as global write buffer reject size.
270        let global_write_buffer_reject_size = global_write_buffer_size * 2;
271        // shouldn't be greater than 128MB in default mode.
272        let sst_meta_cache_size = cmp::min(
273            sys_memory / SST_META_CACHE_SIZE_FACTOR,
274            ReadableSize::mb(128),
275        );
276        // shouldn't be greater than 512MB in default mode.
277        let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
278        let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
279
280        self.global_write_buffer_size = global_write_buffer_size;
281        self.global_write_buffer_reject_size = global_write_buffer_reject_size;
282        self.sst_meta_cache_size = sst_meta_cache_size;
283        self.vector_cache_size = mem_cache_size;
284        self.page_cache_size = page_cache_size;
285        self.selector_result_cache_size = mem_cache_size;
286
287        self.index.adjust_buffer_and_cache_size(sys_memory);
288    }
289
290    /// Enable write cache.
291    #[cfg(test)]
292    pub fn enable_write_cache(
293        mut self,
294        path: String,
295        size: ReadableSize,
296        ttl: Option<Duration>,
297    ) -> Self {
298        self.enable_write_cache = true;
299        self.write_cache_path = path;
300        self.write_cache_size = size;
301        self.write_cache_ttl = ttl;
302        self
303    }
304}
305
306#[serde_as]
307#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
308#[serde(default)]
309pub struct IndexConfig {
310    /// Auxiliary directory path for the index in filesystem, used to
311    /// store intermediate files for creating the index and staging files
312    /// for searching the index, defaults to `{data_home}/index_intermediate`.
313    ///
314    /// This path contains two subdirectories:
315    /// - `__intm`: for storing intermediate files used during creating index.
316    /// - `staging`: for storing staging files used during searching index.
317    ///
318    /// The default name for this directory is `index_intermediate` for backward compatibility.
319    pub aux_path: String,
320
321    /// The max capacity of the staging directory.
322    pub staging_size: ReadableSize,
323    /// The TTL of the staging directory.
324    /// Defaults to 7 days.
325    /// Setting it to "0s" to disable TTL.
326    #[serde(with = "humantime_serde")]
327    pub staging_ttl: Option<Duration>,
328
329    /// Write buffer size for creating the index.
330    pub write_buffer_size: ReadableSize,
331
332    /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
333    pub metadata_cache_size: ReadableSize,
334    /// Cache size for inverted index content. Setting it to 0 to disable the cache.
335    pub content_cache_size: ReadableSize,
336    /// Page size for inverted index content.
337    pub content_cache_page_size: ReadableSize,
338    /// Cache size for index result. Setting it to 0 to disable the cache.
339    pub result_cache_size: ReadableSize,
340}
341
342impl Default for IndexConfig {
343    fn default() -> Self {
344        Self {
345            aux_path: String::new(),
346            staging_size: ReadableSize::gb(2),
347            staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
348            write_buffer_size: ReadableSize::mb(8),
349            metadata_cache_size: ReadableSize::mb(64),
350            content_cache_size: ReadableSize::mb(128),
351            content_cache_page_size: ReadableSize::kb(64),
352            result_cache_size: ReadableSize::mb(128),
353        }
354    }
355}
356
357impl IndexConfig {
358    pub fn sanitize(
359        &mut self,
360        data_home: &str,
361        inverted_index: &InvertedIndexConfig,
362    ) -> Result<()> {
363        #[allow(deprecated)]
364        if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
365            self.aux_path.clone_from(&inverted_index.intermediate_path);
366            warn!(
367                "`inverted_index.intermediate_path` is deprecated, use
368                 `index.aux_path` instead. Set `index.aux_path` to {}",
369                &inverted_index.intermediate_path
370            )
371        }
372        if self.aux_path.is_empty() {
373            let path = Path::new(data_home).join("index_intermediate");
374            self.aux_path = path.as_os_str().to_string_lossy().to_string();
375        }
376
377        if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
378            self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
379            warn!(
380                "Sanitize index write buffer size to {}",
381                self.write_buffer_size
382            );
383        }
384
385        if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
386            self.staging_ttl = None;
387        }
388
389        Ok(())
390    }
391
392    pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
393        let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
394        self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
395        self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
396
397        let metadata_cache_size = cmp::min(
398            sys_memory / SST_META_CACHE_SIZE_FACTOR,
399            ReadableSize::mb(64),
400        );
401        self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
402    }
403}
404
405/// Operational mode for certain actions.
406#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
407#[serde(rename_all = "snake_case")]
408pub enum Mode {
409    /// The action is performed automatically based on internal criteria.
410    #[default]
411    Auto,
412    /// The action is explicitly disabled.
413    Disable,
414}
415
416impl Mode {
417    /// Whether the action is disabled.
418    pub fn disabled(&self) -> bool {
419        matches!(self, Mode::Disable)
420    }
421
422    /// Whether the action is automatic.
423    pub fn auto(&self) -> bool {
424        matches!(self, Mode::Auto)
425    }
426}
427
428/// Memory threshold for performing certain actions.
429#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
430#[serde(rename_all = "snake_case")]
431pub enum MemoryThreshold {
432    /// Automatically determine the threshold based on internal criteria.
433    #[default]
434    Auto,
435    /// Unlimited memory.
436    Unlimited,
437    /// Fixed memory threshold.
438    #[serde(untagged)]
439    Size(ReadableSize),
440}
441
442/// Configuration options for the inverted index.
443#[serde_as]
444#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
445#[serde(default)]
446pub struct InvertedIndexConfig {
447    /// Whether to create the index on flush: automatically or never.
448    pub create_on_flush: Mode,
449    /// Whether to create the index on compaction: automatically or never.
450    pub create_on_compaction: Mode,
451    /// Whether to apply the index on query: automatically or never.
452    pub apply_on_query: Mode,
453
454    /// Memory threshold for performing an external sort during index creation.
455    pub mem_threshold_on_create: MemoryThreshold,
456
457    #[deprecated = "use [IndexConfig::aux_path] instead"]
458    #[serde(skip_serializing)]
459    pub intermediate_path: String,
460
461    #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
462    #[serde(skip_serializing)]
463    pub write_buffer_size: ReadableSize,
464}
465
466impl Default for InvertedIndexConfig {
467    #[allow(deprecated)]
468    fn default() -> Self {
469        Self {
470            create_on_flush: Mode::Auto,
471            create_on_compaction: Mode::Auto,
472            apply_on_query: Mode::Auto,
473            mem_threshold_on_create: MemoryThreshold::Auto,
474            write_buffer_size: ReadableSize::mb(8),
475            intermediate_path: String::new(),
476        }
477    }
478}
479
480impl InvertedIndexConfig {
481    pub fn mem_threshold_on_create(&self) -> Option<usize> {
482        match self.mem_threshold_on_create {
483            MemoryThreshold::Auto => {
484                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
485                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
486                } else {
487                    Some(ReadableSize::mb(64).as_bytes() as usize)
488                }
489            }
490            MemoryThreshold::Unlimited => None,
491            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
492        }
493    }
494}
495
496/// Configuration options for the full-text index.
497#[serde_as]
498#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
499#[serde(default)]
500pub struct FulltextIndexConfig {
501    /// Whether to create the index on flush: automatically or never.
502    pub create_on_flush: Mode,
503    /// Whether to create the index on compaction: automatically or never.
504    pub create_on_compaction: Mode,
505    /// Whether to apply the index on query: automatically or never.
506    pub apply_on_query: Mode,
507    /// Memory threshold for creating the index.
508    pub mem_threshold_on_create: MemoryThreshold,
509    /// Whether to compress the index data.
510    pub compress: bool,
511}
512
513impl Default for FulltextIndexConfig {
514    fn default() -> Self {
515        Self {
516            create_on_flush: Mode::Auto,
517            create_on_compaction: Mode::Auto,
518            apply_on_query: Mode::Auto,
519            mem_threshold_on_create: MemoryThreshold::Auto,
520            compress: true,
521        }
522    }
523}
524
525impl FulltextIndexConfig {
526    pub fn mem_threshold_on_create(&self) -> usize {
527        match self.mem_threshold_on_create {
528            MemoryThreshold::Auto => {
529                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
530                    (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
531                } else {
532                    ReadableSize::mb(64).as_bytes() as _
533                }
534            }
535            MemoryThreshold::Unlimited => usize::MAX,
536            MemoryThreshold::Size(size) => size.as_bytes() as _,
537        }
538    }
539}
540
541/// Configuration options for the bloom filter.
542#[serde_as]
543#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
544#[serde(default)]
545pub struct BloomFilterConfig {
546    /// Whether to create the index on flush: automatically or never.
547    pub create_on_flush: Mode,
548    /// Whether to create the index on compaction: automatically or never.
549    pub create_on_compaction: Mode,
550    /// Whether to apply the index on query: automatically or never.
551    pub apply_on_query: Mode,
552    /// Memory threshold for creating the index.
553    pub mem_threshold_on_create: MemoryThreshold,
554}
555
556impl Default for BloomFilterConfig {
557    fn default() -> Self {
558        Self {
559            create_on_flush: Mode::Auto,
560            create_on_compaction: Mode::Auto,
561            apply_on_query: Mode::Auto,
562            mem_threshold_on_create: MemoryThreshold::Auto,
563        }
564    }
565}
566
567impl BloomFilterConfig {
568    pub fn mem_threshold_on_create(&self) -> Option<usize> {
569        match self.mem_threshold_on_create {
570            MemoryThreshold::Auto => {
571                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
572                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
573                } else {
574                    Some(ReadableSize::mb(64).as_bytes() as usize)
575                }
576            }
577            MemoryThreshold::Unlimited => None,
578            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
579        }
580    }
581}
582
583/// Divide cpu num by a non-zero `divisor` and returns at least 1.
584fn divide_num_cpus(divisor: usize) -> usize {
585    debug_assert!(divisor > 0);
586    let cores = common_config::utils::get_cpus();
587    debug_assert!(cores > 0);
588
589    cores.div_ceil(divisor)
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595
596    #[test]
597    fn test_deserialize_config() {
598        let s = r#"
599[memtable]
600type = "partition_tree"
601index_max_keys_per_shard = 8192
602data_freeze_threshold = 1024
603dedup = true
604fork_dictionary_bytes = "512MiB"
605"#;
606        let config: MitoConfig = toml::from_str(s).unwrap();
607        let MemtableConfig::PartitionTree(config) = &config.memtable else {
608            unreachable!()
609        };
610        assert_eq!(1024, config.data_freeze_threshold);
611    }
612}