mito2/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configurations.
16
17use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31/// Default channel size for parallel scan task.
32pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33
34// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
35const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
36/// Use `1/SST_META_CACHE_SIZE_FACTOR` of OS memory size as SST meta cache size in default mode
37const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
38/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
39const MEM_CACHE_SIZE_FACTOR: u64 = 16;
40/// Use `1/PAGE_CACHE_SIZE_FACTOR` of OS memory size as page cache size in default mode
41const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
42/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
43const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
44
45/// Fetch option timeout
46pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
47
48/// Configuration for [MitoEngine](crate::engine::MitoEngine).
49/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
50#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
51#[serde(default)]
52pub struct MitoConfig {
53    // Worker configs:
54    /// Number of region workers (default: 1/2 of cpu cores).
55    /// Sets to 0 to use the default value.
56    pub num_workers: usize,
57    /// Request channel size of each worker (default 128).
58    pub worker_channel_size: usize,
59    /// Max batch size for a worker to handle requests (default 64).
60    pub worker_request_batch_size: usize,
61
62    // Manifest configs:
63    /// Number of meta action updated to trigger a new checkpoint
64    /// for the manifest (default 10).
65    pub manifest_checkpoint_distance: u64,
66    /// Whether to compress manifest and checkpoint file by gzip (default false).
67    pub compress_manifest: bool,
68
69    // Background job configs:
70    /// Max number of running background flush jobs (default: 1/2 of cpu cores).
71    pub max_background_flushes: usize,
72    /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
73    pub max_background_compactions: usize,
74    /// Max number of running background purge jobs (default: number of cpu cores).
75    pub max_background_purges: usize,
76
77    // Flush configs:
78    /// Interval to auto flush a region if it has not flushed yet (default 30 min).
79    #[serde(with = "humantime_serde")]
80    pub auto_flush_interval: Duration,
81    /// Global write buffer size threshold to trigger flush.
82    pub global_write_buffer_size: ReadableSize,
83    /// Global write buffer size threshold to reject write requests.
84    pub global_write_buffer_reject_size: ReadableSize,
85
86    // Cache configs:
87    /// Cache size for SST metadata. Setting it to 0 to disable the cache.
88    pub sst_meta_cache_size: ReadableSize,
89    /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
90    pub vector_cache_size: ReadableSize,
91    /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
92    pub page_cache_size: ReadableSize,
93    /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
94    pub selector_result_cache_size: ReadableSize,
95    /// Whether to enable the write cache.
96    pub enable_write_cache: bool,
97    /// File system path for write cache dir's root, defaults to `{data_home}`.
98    pub write_cache_path: String,
99    /// Capacity for write cache.
100    pub write_cache_size: ReadableSize,
101    /// TTL for write cache.
102    #[serde(with = "humantime_serde")]
103    pub write_cache_ttl: Option<Duration>,
104
105    // Other configs:
106    /// Buffer size for SST writing.
107    pub sst_write_buffer_size: ReadableSize,
108    /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
109    pub parallel_scan_channel_size: usize,
110    /// Whether to allow stale entries read during replay.
111    pub allow_stale_entries: bool,
112
113    /// Index configs.
114    pub index: IndexConfig,
115    /// Inverted index configs.
116    pub inverted_index: InvertedIndexConfig,
117    /// Full-text index configs.
118    pub fulltext_index: FulltextIndexConfig,
119    /// Bloom filter index configs.
120    pub bloom_filter_index: BloomFilterConfig,
121
122    /// Memtable config
123    pub memtable: MemtableConfig,
124
125    /// Minimum time interval between two compactions.
126    /// To align with the old behavior, the default value is 0 (no restrictions).
127    #[serde(with = "humantime_serde")]
128    pub min_compaction_interval: Duration,
129}
130
131impl Default for MitoConfig {
132    fn default() -> Self {
133        let mut mito_config = MitoConfig {
134            num_workers: divide_num_cpus(2),
135            worker_channel_size: 128,
136            worker_request_batch_size: 64,
137            manifest_checkpoint_distance: 10,
138            compress_manifest: false,
139            max_background_flushes: divide_num_cpus(2),
140            max_background_compactions: divide_num_cpus(4),
141            max_background_purges: common_config::utils::get_cpus(),
142            auto_flush_interval: Duration::from_secs(30 * 60),
143            global_write_buffer_size: ReadableSize::gb(1),
144            global_write_buffer_reject_size: ReadableSize::gb(2),
145            sst_meta_cache_size: ReadableSize::mb(128),
146            vector_cache_size: ReadableSize::mb(512),
147            page_cache_size: ReadableSize::mb(512),
148            selector_result_cache_size: ReadableSize::mb(512),
149            enable_write_cache: false,
150            write_cache_path: String::new(),
151            write_cache_size: ReadableSize::gb(5),
152            write_cache_ttl: None,
153            sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
154            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
155            allow_stale_entries: false,
156            index: IndexConfig::default(),
157            inverted_index: InvertedIndexConfig::default(),
158            fulltext_index: FulltextIndexConfig::default(),
159            bloom_filter_index: BloomFilterConfig::default(),
160            memtable: MemtableConfig::default(),
161            min_compaction_interval: Duration::from_secs(0),
162        };
163
164        // Adjust buffer and cache size according to system memory if we can.
165        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
166            mito_config.adjust_buffer_and_cache_size(sys_memory);
167        }
168
169        mito_config
170    }
171}
172
173impl MitoConfig {
174    /// Sanitize incorrect configurations.
175    ///
176    /// Returns an error if there is a configuration that unable to sanitize.
177    pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
178        // Use default value if `num_workers` is 0.
179        if self.num_workers == 0 {
180            self.num_workers = divide_num_cpus(2);
181        }
182
183        // Sanitize channel size.
184        if self.worker_channel_size == 0 {
185            warn!("Sanitize channel size 0 to 1");
186            self.worker_channel_size = 1;
187        }
188
189        if self.max_background_flushes == 0 {
190            warn!(
191                "Sanitize max background flushes 0 to {}",
192                divide_num_cpus(2)
193            );
194            self.max_background_flushes = divide_num_cpus(2);
195        }
196        if self.max_background_compactions == 0 {
197            warn!(
198                "Sanitize max background compactions 0 to {}",
199                divide_num_cpus(4)
200            );
201            self.max_background_compactions = divide_num_cpus(4);
202        }
203        if self.max_background_purges == 0 {
204            warn!(
205                "Sanitize max background purges 0 to {}",
206                common_config::utils::get_cpus()
207            );
208            self.max_background_purges = common_config::utils::get_cpus();
209        }
210
211        if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
212            self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
213            warn!(
214                "Sanitize global write buffer reject size to {}",
215                self.global_write_buffer_reject_size
216            );
217        }
218
219        if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
220            self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
221            warn!(
222                "Sanitize sst write buffer size to {}",
223                self.sst_write_buffer_size
224            );
225        }
226
227        if self.parallel_scan_channel_size < 1 {
228            self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
229            warn!(
230                "Sanitize scan channel size to {}",
231                self.parallel_scan_channel_size
232            );
233        }
234
235        // Sets write cache path if it is empty.
236        if self.write_cache_path.trim().is_empty() {
237            self.write_cache_path = data_home.to_string();
238        }
239
240        self.index.sanitize(data_home, &self.inverted_index)?;
241
242        Ok(())
243    }
244
245    fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
246        // shouldn't be greater than 1G in default mode.
247        let global_write_buffer_size = cmp::min(
248            sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
249            ReadableSize::gb(1),
250        );
251        // Use 2x of global write buffer size as global write buffer reject size.
252        let global_write_buffer_reject_size = global_write_buffer_size * 2;
253        // shouldn't be greater than 128MB in default mode.
254        let sst_meta_cache_size = cmp::min(
255            sys_memory / SST_META_CACHE_SIZE_FACTOR,
256            ReadableSize::mb(128),
257        );
258        // shouldn't be greater than 512MB in default mode.
259        let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
260        let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
261
262        self.global_write_buffer_size = global_write_buffer_size;
263        self.global_write_buffer_reject_size = global_write_buffer_reject_size;
264        self.sst_meta_cache_size = sst_meta_cache_size;
265        self.vector_cache_size = mem_cache_size;
266        self.page_cache_size = page_cache_size;
267        self.selector_result_cache_size = mem_cache_size;
268
269        self.index.adjust_buffer_and_cache_size(sys_memory);
270    }
271
272    /// Enable write cache.
273    #[cfg(test)]
274    pub fn enable_write_cache(
275        mut self,
276        path: String,
277        size: ReadableSize,
278        ttl: Option<Duration>,
279    ) -> Self {
280        self.enable_write_cache = true;
281        self.write_cache_path = path;
282        self.write_cache_size = size;
283        self.write_cache_ttl = ttl;
284        self
285    }
286}
287
288#[serde_as]
289#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
290#[serde(default)]
291pub struct IndexConfig {
292    /// Auxiliary directory path for the index in filesystem, used to
293    /// store intermediate files for creating the index and staging files
294    /// for searching the index, defaults to `{data_home}/index_intermediate`.
295    ///
296    /// This path contains two subdirectories:
297    /// - `__intm`: for storing intermediate files used during creating index.
298    /// - `staging`: for storing staging files used during searching index.
299    ///
300    /// The default name for this directory is `index_intermediate` for backward compatibility.
301    pub aux_path: String,
302
303    /// The max capacity of the staging directory.
304    pub staging_size: ReadableSize,
305    /// The TTL of the staging directory.
306    /// Defaults to 7 days.
307    /// Setting it to "0s" to disable TTL.
308    #[serde(with = "humantime_serde")]
309    pub staging_ttl: Option<Duration>,
310
311    /// Write buffer size for creating the index.
312    pub write_buffer_size: ReadableSize,
313
314    /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
315    pub metadata_cache_size: ReadableSize,
316    /// Cache size for inverted index content. Setting it to 0 to disable the cache.
317    pub content_cache_size: ReadableSize,
318    /// Page size for inverted index content.
319    pub content_cache_page_size: ReadableSize,
320    /// Cache size for index result. Setting it to 0 to disable the cache.
321    pub result_cache_size: ReadableSize,
322}
323
324impl Default for IndexConfig {
325    fn default() -> Self {
326        Self {
327            aux_path: String::new(),
328            staging_size: ReadableSize::gb(2),
329            staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
330            write_buffer_size: ReadableSize::mb(8),
331            metadata_cache_size: ReadableSize::mb(64),
332            content_cache_size: ReadableSize::mb(128),
333            content_cache_page_size: ReadableSize::kb(64),
334            result_cache_size: ReadableSize::mb(128),
335        }
336    }
337}
338
339impl IndexConfig {
340    pub fn sanitize(
341        &mut self,
342        data_home: &str,
343        inverted_index: &InvertedIndexConfig,
344    ) -> Result<()> {
345        #[allow(deprecated)]
346        if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
347            self.aux_path.clone_from(&inverted_index.intermediate_path);
348            warn!(
349                "`inverted_index.intermediate_path` is deprecated, use
350                 `index.aux_path` instead. Set `index.aux_path` to {}",
351                &inverted_index.intermediate_path
352            )
353        }
354        if self.aux_path.is_empty() {
355            let path = Path::new(data_home).join("index_intermediate");
356            self.aux_path = path.as_os_str().to_string_lossy().to_string();
357        }
358
359        if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
360            self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
361            warn!(
362                "Sanitize index write buffer size to {}",
363                self.write_buffer_size
364            );
365        }
366
367        if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
368            self.staging_ttl = None;
369        }
370
371        Ok(())
372    }
373
374    pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
375        let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
376        self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
377        self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
378
379        let metadata_cache_size = cmp::min(
380            sys_memory / SST_META_CACHE_SIZE_FACTOR,
381            ReadableSize::mb(64),
382        );
383        self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
384    }
385}
386
387/// Operational mode for certain actions.
388#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
389#[serde(rename_all = "snake_case")]
390pub enum Mode {
391    /// The action is performed automatically based on internal criteria.
392    #[default]
393    Auto,
394    /// The action is explicitly disabled.
395    Disable,
396}
397
398impl Mode {
399    /// Whether the action is disabled.
400    pub fn disabled(&self) -> bool {
401        matches!(self, Mode::Disable)
402    }
403
404    /// Whether the action is automatic.
405    pub fn auto(&self) -> bool {
406        matches!(self, Mode::Auto)
407    }
408}
409
410/// Memory threshold for performing certain actions.
411#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
412#[serde(rename_all = "snake_case")]
413pub enum MemoryThreshold {
414    /// Automatically determine the threshold based on internal criteria.
415    #[default]
416    Auto,
417    /// Unlimited memory.
418    Unlimited,
419    /// Fixed memory threshold.
420    #[serde(untagged)]
421    Size(ReadableSize),
422}
423
424/// Configuration options for the inverted index.
425#[serde_as]
426#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
427#[serde(default)]
428pub struct InvertedIndexConfig {
429    /// Whether to create the index on flush: automatically or never.
430    pub create_on_flush: Mode,
431    /// Whether to create the index on compaction: automatically or never.
432    pub create_on_compaction: Mode,
433    /// Whether to apply the index on query: automatically or never.
434    pub apply_on_query: Mode,
435
436    /// Memory threshold for performing an external sort during index creation.
437    pub mem_threshold_on_create: MemoryThreshold,
438
439    #[deprecated = "use [IndexConfig::aux_path] instead"]
440    #[serde(skip_serializing)]
441    pub intermediate_path: String,
442
443    #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
444    #[serde(skip_serializing)]
445    pub write_buffer_size: ReadableSize,
446}
447
448impl Default for InvertedIndexConfig {
449    #[allow(deprecated)]
450    fn default() -> Self {
451        Self {
452            create_on_flush: Mode::Auto,
453            create_on_compaction: Mode::Auto,
454            apply_on_query: Mode::Auto,
455            mem_threshold_on_create: MemoryThreshold::Auto,
456            write_buffer_size: ReadableSize::mb(8),
457            intermediate_path: String::new(),
458        }
459    }
460}
461
462impl InvertedIndexConfig {
463    pub fn mem_threshold_on_create(&self) -> Option<usize> {
464        match self.mem_threshold_on_create {
465            MemoryThreshold::Auto => {
466                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
467                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
468                } else {
469                    Some(ReadableSize::mb(64).as_bytes() as usize)
470                }
471            }
472            MemoryThreshold::Unlimited => None,
473            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
474        }
475    }
476}
477
478/// Configuration options for the full-text index.
479#[serde_as]
480#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
481#[serde(default)]
482pub struct FulltextIndexConfig {
483    /// Whether to create the index on flush: automatically or never.
484    pub create_on_flush: Mode,
485    /// Whether to create the index on compaction: automatically or never.
486    pub create_on_compaction: Mode,
487    /// Whether to apply the index on query: automatically or never.
488    pub apply_on_query: Mode,
489    /// Memory threshold for creating the index.
490    pub mem_threshold_on_create: MemoryThreshold,
491    /// Whether to compress the index data.
492    pub compress: bool,
493}
494
495impl Default for FulltextIndexConfig {
496    fn default() -> Self {
497        Self {
498            create_on_flush: Mode::Auto,
499            create_on_compaction: Mode::Auto,
500            apply_on_query: Mode::Auto,
501            mem_threshold_on_create: MemoryThreshold::Auto,
502            compress: true,
503        }
504    }
505}
506
507impl FulltextIndexConfig {
508    pub fn mem_threshold_on_create(&self) -> usize {
509        match self.mem_threshold_on_create {
510            MemoryThreshold::Auto => {
511                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
512                    (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
513                } else {
514                    ReadableSize::mb(64).as_bytes() as _
515                }
516            }
517            MemoryThreshold::Unlimited => usize::MAX,
518            MemoryThreshold::Size(size) => size.as_bytes() as _,
519        }
520    }
521}
522
523/// Configuration options for the bloom filter.
524#[serde_as]
525#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
526#[serde(default)]
527pub struct BloomFilterConfig {
528    /// Whether to create the index on flush: automatically or never.
529    pub create_on_flush: Mode,
530    /// Whether to create the index on compaction: automatically or never.
531    pub create_on_compaction: Mode,
532    /// Whether to apply the index on query: automatically or never.
533    pub apply_on_query: Mode,
534    /// Memory threshold for creating the index.
535    pub mem_threshold_on_create: MemoryThreshold,
536}
537
538impl Default for BloomFilterConfig {
539    fn default() -> Self {
540        Self {
541            create_on_flush: Mode::Auto,
542            create_on_compaction: Mode::Auto,
543            apply_on_query: Mode::Auto,
544            mem_threshold_on_create: MemoryThreshold::Auto,
545        }
546    }
547}
548
549impl BloomFilterConfig {
550    pub fn mem_threshold_on_create(&self) -> Option<usize> {
551        match self.mem_threshold_on_create {
552            MemoryThreshold::Auto => {
553                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
554                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
555                } else {
556                    Some(ReadableSize::mb(64).as_bytes() as usize)
557                }
558            }
559            MemoryThreshold::Unlimited => None,
560            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
561        }
562    }
563}
564
565/// Divide cpu num by a non-zero `divisor` and returns at least 1.
566fn divide_num_cpus(divisor: usize) -> usize {
567    debug_assert!(divisor > 0);
568    let cores = common_config::utils::get_cpus();
569    debug_assert!(cores > 0);
570
571    cores.div_ceil(divisor)
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[test]
579    fn test_deserialize_config() {
580        let s = r#"
581[memtable]
582type = "partition_tree"
583index_max_keys_per_shard = 8192
584data_freeze_threshold = 1024
585dedup = true
586fork_dictionary_bytes = "512MiB"
587"#;
588        let config: MitoConfig = toml::from_str(s).unwrap();
589        let MemtableConfig::PartitionTree(config) = &config.memtable else {
590            unreachable!()
591        };
592        assert_eq!(1024, config.data_freeze_threshold);
593    }
594}