mito2/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Configurations.
16
17use std::cmp;
18use std::path::Path;
19use std::time::Duration;
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::warn;
23use serde::{Deserialize, Serialize};
24use serde_with::serde_as;
25
26use crate::error::Result;
27use crate::memtable::MemtableConfig;
28use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
29
30const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
31/// Default channel size for parallel scan task.
32pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
33
34// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
35const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
36/// Use `1/SST_META_CACHE_SIZE_FACTOR` of OS memory size as SST meta cache size in default mode
37const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
38/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
39const MEM_CACHE_SIZE_FACTOR: u64 = 16;
40/// Use `1/PAGE_CACHE_SIZE_FACTOR` of OS memory size as page cache size in default mode
41const PAGE_CACHE_SIZE_FACTOR: u64 = 8;
42/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
43const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
44
45/// Fetch option timeout
46pub(crate) const FETCH_OPTION_TIMEOUT: Duration = Duration::from_secs(3);
47
48/// Configuration for [MitoEngine](crate::engine::MitoEngine).
49/// Before using the config, make sure to call `MitoConfig::validate()` to check if the config is valid.
50#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
51#[serde(default)]
52pub struct MitoConfig {
53    // Worker configs:
54    /// Number of region workers (default: 1/2 of cpu cores).
55    /// Sets to 0 to use the default value.
56    pub num_workers: usize,
57    /// Request channel size of each worker (default 128).
58    pub worker_channel_size: usize,
59    /// Max batch size for a worker to handle requests (default 64).
60    pub worker_request_batch_size: usize,
61
62    // Manifest configs:
63    /// Number of meta action updated to trigger a new checkpoint
64    /// for the manifest (default 10).
65    pub manifest_checkpoint_distance: u64,
66    /// Whether to compress manifest and checkpoint file by gzip (default false).
67    pub compress_manifest: bool,
68
69    // Background job configs:
70    /// Max number of running background flush jobs (default: 1/2 of cpu cores).
71    pub max_background_flushes: usize,
72    /// Max number of running background compaction jobs (default: 1/4 of cpu cores).
73    pub max_background_compactions: usize,
74    /// Max number of running background purge jobs (default: number of cpu cores).
75    pub max_background_purges: usize,
76
77    // Flush configs:
78    /// Interval to auto flush a region if it has not flushed yet (default 30 min).
79    #[serde(with = "humantime_serde")]
80    pub auto_flush_interval: Duration,
81    /// Global write buffer size threshold to trigger flush.
82    pub global_write_buffer_size: ReadableSize,
83    /// Global write buffer size threshold to reject write requests.
84    pub global_write_buffer_reject_size: ReadableSize,
85
86    // Cache configs:
87    /// Cache size for SST metadata. Setting it to 0 to disable the cache.
88    pub sst_meta_cache_size: ReadableSize,
89    /// Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
90    pub vector_cache_size: ReadableSize,
91    /// Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
92    pub page_cache_size: ReadableSize,
93    /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache.
94    pub selector_result_cache_size: ReadableSize,
95    /// Whether to enable the write cache.
96    pub enable_write_cache: bool,
97    /// File system path for write cache dir's root, defaults to `{data_home}`.
98    pub write_cache_path: String,
99    /// Capacity for write cache.
100    pub write_cache_size: ReadableSize,
101    /// TTL for write cache.
102    #[serde(with = "humantime_serde")]
103    pub write_cache_ttl: Option<Duration>,
104
105    // Other configs:
106    /// Buffer size for SST writing.
107    pub sst_write_buffer_size: ReadableSize,
108    /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
109    pub parallel_scan_channel_size: usize,
110    /// Whether to allow stale entries read during replay.
111    pub allow_stale_entries: bool,
112
113    /// Index configs.
114    pub index: IndexConfig,
115    /// Inverted index configs.
116    pub inverted_index: InvertedIndexConfig,
117    /// Full-text index configs.
118    pub fulltext_index: FulltextIndexConfig,
119    /// Bloom filter index configs.
120    pub bloom_filter_index: BloomFilterConfig,
121
122    /// Memtable config
123    pub memtable: MemtableConfig,
124
125    /// Minimum time interval between two compactions.
126    /// To align with the old behavior, the default value is 0 (no restrictions).
127    #[serde(with = "humantime_serde")]
128    pub min_compaction_interval: Duration,
129}
130
131impl Default for MitoConfig {
132    fn default() -> Self {
133        let mut mito_config = MitoConfig {
134            num_workers: divide_num_cpus(2),
135            worker_channel_size: 128,
136            worker_request_batch_size: 64,
137            manifest_checkpoint_distance: 10,
138            compress_manifest: false,
139            max_background_flushes: divide_num_cpus(2),
140            max_background_compactions: divide_num_cpus(4),
141            max_background_purges: common_config::utils::get_cpus(),
142            auto_flush_interval: Duration::from_secs(30 * 60),
143            global_write_buffer_size: ReadableSize::gb(1),
144            global_write_buffer_reject_size: ReadableSize::gb(2),
145            sst_meta_cache_size: ReadableSize::mb(128),
146            vector_cache_size: ReadableSize::mb(512),
147            page_cache_size: ReadableSize::mb(512),
148            selector_result_cache_size: ReadableSize::mb(512),
149            enable_write_cache: false,
150            write_cache_path: String::new(),
151            write_cache_size: ReadableSize::gb(5),
152            write_cache_ttl: None,
153            sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
154            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
155            allow_stale_entries: false,
156            index: IndexConfig::default(),
157            inverted_index: InvertedIndexConfig::default(),
158            fulltext_index: FulltextIndexConfig::default(),
159            bloom_filter_index: BloomFilterConfig::default(),
160            memtable: MemtableConfig::default(),
161            min_compaction_interval: Duration::from_secs(0),
162        };
163
164        // Adjust buffer and cache size according to system memory if we can.
165        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
166            mito_config.adjust_buffer_and_cache_size(sys_memory);
167        }
168
169        mito_config
170    }
171}
172
173impl MitoConfig {
174    /// Sanitize incorrect configurations.
175    ///
176    /// Returns an error if there is a configuration that unable to sanitize.
177    pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
178        // Use default value if `num_workers` is 0.
179        if self.num_workers == 0 {
180            self.num_workers = divide_num_cpus(2);
181        }
182
183        // Sanitize channel size.
184        if self.worker_channel_size == 0 {
185            warn!("Sanitize channel size 0 to 1");
186            self.worker_channel_size = 1;
187        }
188
189        if self.max_background_flushes == 0 {
190            warn!(
191                "Sanitize max background flushes 0 to {}",
192                divide_num_cpus(2)
193            );
194            self.max_background_flushes = divide_num_cpus(2);
195        }
196        if self.max_background_compactions == 0 {
197            warn!(
198                "Sanitize max background compactions 0 to {}",
199                divide_num_cpus(4)
200            );
201            self.max_background_compactions = divide_num_cpus(4);
202        }
203        if self.max_background_purges == 0 {
204            warn!(
205                "Sanitize max background purges 0 to {}",
206                common_config::utils::get_cpus()
207            );
208            self.max_background_purges = common_config::utils::get_cpus();
209        }
210
211        if self.global_write_buffer_reject_size <= self.global_write_buffer_size {
212            self.global_write_buffer_reject_size = self.global_write_buffer_size * 2;
213            warn!(
214                "Sanitize global write buffer reject size to {}",
215                self.global_write_buffer_reject_size
216            );
217        }
218
219        if self.sst_write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
220            self.sst_write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
221            warn!(
222                "Sanitize sst write buffer size to {}",
223                self.sst_write_buffer_size
224            );
225        }
226
227        if self.parallel_scan_channel_size < 1 {
228            self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
229            warn!(
230                "Sanitize scan channel size to {}",
231                self.parallel_scan_channel_size
232            );
233        }
234
235        // Sets write cache path if it is empty.
236        if self.write_cache_path.trim().is_empty() {
237            self.write_cache_path = data_home.to_string();
238        }
239
240        self.index.sanitize(data_home, &self.inverted_index)?;
241
242        Ok(())
243    }
244
245    fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
246        // shouldn't be greater than 1G in default mode.
247        let global_write_buffer_size = cmp::min(
248            sys_memory / GLOBAL_WRITE_BUFFER_SIZE_FACTOR,
249            ReadableSize::gb(1),
250        );
251        // Use 2x of global write buffer size as global write buffer reject size.
252        let global_write_buffer_reject_size = global_write_buffer_size * 2;
253        // shouldn't be greater than 128MB in default mode.
254        let sst_meta_cache_size = cmp::min(
255            sys_memory / SST_META_CACHE_SIZE_FACTOR,
256            ReadableSize::mb(128),
257        );
258        // shouldn't be greater than 512MB in default mode.
259        let mem_cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(512));
260        let page_cache_size = sys_memory / PAGE_CACHE_SIZE_FACTOR;
261
262        self.global_write_buffer_size = global_write_buffer_size;
263        self.global_write_buffer_reject_size = global_write_buffer_reject_size;
264        self.sst_meta_cache_size = sst_meta_cache_size;
265        self.vector_cache_size = mem_cache_size;
266        self.page_cache_size = page_cache_size;
267        self.selector_result_cache_size = mem_cache_size;
268    }
269
270    /// Enable write cache.
271    #[cfg(test)]
272    pub fn enable_write_cache(
273        mut self,
274        path: String,
275        size: ReadableSize,
276        ttl: Option<Duration>,
277    ) -> Self {
278        self.enable_write_cache = true;
279        self.write_cache_path = path;
280        self.write_cache_size = size;
281        self.write_cache_ttl = ttl;
282        self
283    }
284}
285
286#[serde_as]
287#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
288#[serde(default)]
289pub struct IndexConfig {
290    /// Auxiliary directory path for the index in filesystem, used to
291    /// store intermediate files for creating the index and staging files
292    /// for searching the index, defaults to `{data_home}/index_intermediate`.
293    ///
294    /// This path contains two subdirectories:
295    /// - `__intm`: for storing intermediate files used during creating index.
296    /// - `staging`: for storing staging files used during searching index.
297    ///
298    /// The default name for this directory is `index_intermediate` for backward compatibility.
299    pub aux_path: String,
300
301    /// The max capacity of the staging directory.
302    pub staging_size: ReadableSize,
303    /// The TTL of the staging directory.
304    /// Defaults to 7 days.
305    /// Setting it to "0s" to disable TTL.
306    #[serde(with = "humantime_serde")]
307    pub staging_ttl: Option<Duration>,
308
309    /// Write buffer size for creating the index.
310    pub write_buffer_size: ReadableSize,
311
312    /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
313    pub metadata_cache_size: ReadableSize,
314    /// Cache size for inverted index content. Setting it to 0 to disable the cache.
315    pub content_cache_size: ReadableSize,
316    /// Page size for inverted index content.
317    pub content_cache_page_size: ReadableSize,
318}
319
320impl Default for IndexConfig {
321    fn default() -> Self {
322        Self {
323            aux_path: String::new(),
324            staging_size: ReadableSize::gb(2),
325            staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
326            write_buffer_size: ReadableSize::mb(8),
327            metadata_cache_size: ReadableSize::mb(64),
328            content_cache_size: ReadableSize::mb(128),
329            content_cache_page_size: ReadableSize::kb(64),
330        }
331    }
332}
333
334impl IndexConfig {
335    pub fn sanitize(
336        &mut self,
337        data_home: &str,
338        inverted_index: &InvertedIndexConfig,
339    ) -> Result<()> {
340        #[allow(deprecated)]
341        if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
342            self.aux_path.clone_from(&inverted_index.intermediate_path);
343            warn!(
344                "`inverted_index.intermediate_path` is deprecated, use
345                 `index.aux_path` instead. Set `index.aux_path` to {}",
346                &inverted_index.intermediate_path
347            )
348        }
349        if self.aux_path.is_empty() {
350            let path = Path::new(data_home).join("index_intermediate");
351            self.aux_path = path.as_os_str().to_string_lossy().to_string();
352        }
353
354        if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
355            self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
356            warn!(
357                "Sanitize index write buffer size to {}",
358                self.write_buffer_size
359            );
360        }
361
362        if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
363            self.staging_ttl = None;
364        }
365
366        Ok(())
367    }
368}
369
370/// Operational mode for certain actions.
371#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
372#[serde(rename_all = "snake_case")]
373pub enum Mode {
374    /// The action is performed automatically based on internal criteria.
375    #[default]
376    Auto,
377    /// The action is explicitly disabled.
378    Disable,
379}
380
381impl Mode {
382    /// Whether the action is disabled.
383    pub fn disabled(&self) -> bool {
384        matches!(self, Mode::Disable)
385    }
386
387    /// Whether the action is automatic.
388    pub fn auto(&self) -> bool {
389        matches!(self, Mode::Auto)
390    }
391}
392
393/// Memory threshold for performing certain actions.
394#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
395#[serde(rename_all = "snake_case")]
396pub enum MemoryThreshold {
397    /// Automatically determine the threshold based on internal criteria.
398    #[default]
399    Auto,
400    /// Unlimited memory.
401    Unlimited,
402    /// Fixed memory threshold.
403    #[serde(untagged)]
404    Size(ReadableSize),
405}
406
407/// Configuration options for the inverted index.
408#[serde_as]
409#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
410#[serde(default)]
411pub struct InvertedIndexConfig {
412    /// Whether to create the index on flush: automatically or never.
413    pub create_on_flush: Mode,
414    /// Whether to create the index on compaction: automatically or never.
415    pub create_on_compaction: Mode,
416    /// Whether to apply the index on query: automatically or never.
417    pub apply_on_query: Mode,
418
419    /// Memory threshold for performing an external sort during index creation.
420    pub mem_threshold_on_create: MemoryThreshold,
421
422    #[deprecated = "use [IndexConfig::aux_path] instead"]
423    #[serde(skip_serializing)]
424    pub intermediate_path: String,
425
426    #[deprecated = "use [IndexConfig::write_buffer_size] instead"]
427    #[serde(skip_serializing)]
428    pub write_buffer_size: ReadableSize,
429}
430
431impl Default for InvertedIndexConfig {
432    #[allow(deprecated)]
433    fn default() -> Self {
434        Self {
435            create_on_flush: Mode::Auto,
436            create_on_compaction: Mode::Auto,
437            apply_on_query: Mode::Auto,
438            mem_threshold_on_create: MemoryThreshold::Auto,
439            write_buffer_size: ReadableSize::mb(8),
440            intermediate_path: String::new(),
441        }
442    }
443}
444
445impl InvertedIndexConfig {
446    pub fn mem_threshold_on_create(&self) -> Option<usize> {
447        match self.mem_threshold_on_create {
448            MemoryThreshold::Auto => {
449                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
450                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
451                } else {
452                    Some(ReadableSize::mb(64).as_bytes() as usize)
453                }
454            }
455            MemoryThreshold::Unlimited => None,
456            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
457        }
458    }
459}
460
461/// Configuration options for the full-text index.
462#[serde_as]
463#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
464#[serde(default)]
465pub struct FulltextIndexConfig {
466    /// Whether to create the index on flush: automatically or never.
467    pub create_on_flush: Mode,
468    /// Whether to create the index on compaction: automatically or never.
469    pub create_on_compaction: Mode,
470    /// Whether to apply the index on query: automatically or never.
471    pub apply_on_query: Mode,
472    /// Memory threshold for creating the index.
473    pub mem_threshold_on_create: MemoryThreshold,
474    /// Whether to compress the index data.
475    pub compress: bool,
476}
477
478impl Default for FulltextIndexConfig {
479    fn default() -> Self {
480        Self {
481            create_on_flush: Mode::Auto,
482            create_on_compaction: Mode::Auto,
483            apply_on_query: Mode::Auto,
484            mem_threshold_on_create: MemoryThreshold::Auto,
485            compress: true,
486        }
487    }
488}
489
490impl FulltextIndexConfig {
491    pub fn mem_threshold_on_create(&self) -> usize {
492        match self.mem_threshold_on_create {
493            MemoryThreshold::Auto => {
494                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
495                    (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
496                } else {
497                    ReadableSize::mb(64).as_bytes() as _
498                }
499            }
500            MemoryThreshold::Unlimited => usize::MAX,
501            MemoryThreshold::Size(size) => size.as_bytes() as _,
502        }
503    }
504}
505
506/// Configuration options for the bloom filter.
507#[serde_as]
508#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
509#[serde(default)]
510pub struct BloomFilterConfig {
511    /// Whether to create the index on flush: automatically or never.
512    pub create_on_flush: Mode,
513    /// Whether to create the index on compaction: automatically or never.
514    pub create_on_compaction: Mode,
515    /// Whether to apply the index on query: automatically or never.
516    pub apply_on_query: Mode,
517    /// Memory threshold for creating the index.
518    pub mem_threshold_on_create: MemoryThreshold,
519}
520
521impl Default for BloomFilterConfig {
522    fn default() -> Self {
523        Self {
524            create_on_flush: Mode::Auto,
525            create_on_compaction: Mode::Auto,
526            apply_on_query: Mode::Auto,
527            mem_threshold_on_create: MemoryThreshold::Auto,
528        }
529    }
530}
531
532impl BloomFilterConfig {
533    pub fn mem_threshold_on_create(&self) -> Option<usize> {
534        match self.mem_threshold_on_create {
535            MemoryThreshold::Auto => {
536                if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
537                    Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
538                } else {
539                    Some(ReadableSize::mb(64).as_bytes() as usize)
540                }
541            }
542            MemoryThreshold::Unlimited => None,
543            MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
544        }
545    }
546}
547
548/// Divide cpu num by a non-zero `divisor` and returns at least 1.
549fn divide_num_cpus(divisor: usize) -> usize {
550    debug_assert!(divisor > 0);
551    let cores = common_config::utils::get_cpus();
552    debug_assert!(cores > 0);
553
554    cores.div_ceil(divisor)
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    #[test]
562    fn test_deserialize_config() {
563        let s = r#"
564[memtable]
565type = "partition_tree"
566index_max_keys_per_shard = 8192
567data_freeze_threshold = 1024
568dedup = true
569fork_dictionary_bytes = "512MiB"
570"#;
571        let config: MitoConfig = toml::from_str(s).unwrap();
572        let MemtableConfig::PartitionTree(config) = &config.memtable else {
573            unreachable!()
574        };
575        assert_eq!(1024, config.data_freeze_threshold);
576    }
577}