mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_time::TimeToLive;
24use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
25use serde::de::Error as _;
26use serde::{Deserialize, Deserializer, Serialize};
27use serde_json::Value;
28use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString};
29use snafu::{ensure, ResultExt};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::storage::ColumnId;
32use strum::EnumString;
33
34use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
35use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
36
37const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
38
39/// Mode to handle duplicate rows while merging.
40#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
41#[serde(rename_all = "snake_case")]
42#[strum(serialize_all = "snake_case")]
43pub enum MergeMode {
44    /// Keeps the last row.
45    #[default]
46    LastRow,
47    /// Keeps the last non-null field for each row.
48    LastNonNull,
49}
50
51// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
52// if we want expose the option to table options.
53/// Options that affect the entire region.
54///
55/// Users need to specify the options while creating/opening a region.
56#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(default)]
58pub struct RegionOptions {
59    /// Region SST files TTL.
60    pub ttl: Option<TimeToLive>,
61    /// Compaction options.
62    pub compaction: CompactionOptions,
63    /// Custom storage. Uses default storage if it is `None`.
64    pub storage: Option<String>,
65    /// If append mode is enabled, the region keeps duplicate rows.
66    pub append_mode: bool,
67    /// Wal options.
68    pub wal_options: WalOptions,
69    /// Index options.
70    pub index_options: IndexOptions,
71    /// Memtable options.
72    pub memtable: Option<MemtableOptions>,
73    /// The mode to merge duplicate rows.
74    /// Only takes effect when `append_mode` is `false`.
75    pub merge_mode: Option<MergeMode>,
76}
77
78impl RegionOptions {
79    /// Validates options.
80    pub fn validate(&self) -> Result<()> {
81        if self.append_mode {
82            ensure!(
83                self.merge_mode.is_none(),
84                InvalidRegionOptionsSnafu {
85                    reason: "merge_mode is not allowed when append_mode is enabled",
86                }
87            );
88        }
89        Ok(())
90    }
91
92    /// Returns `true` if deduplication is needed.
93    pub fn need_dedup(&self) -> bool {
94        !self.append_mode
95    }
96
97    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
98    pub fn merge_mode(&self) -> MergeMode {
99        self.merge_mode.unwrap_or_default()
100    }
101
102    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
103    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
104        self.memtable
105            .as_ref()
106            .map_or(PrimaryKeyEncoding::default(), |memtable| {
107                memtable.primary_key_encoding()
108            })
109    }
110}
111
112impl TryFrom<&HashMap<String, String>> for RegionOptions {
113    type Error = Error;
114
115    fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
116        let value = options_map_to_value(options_map);
117        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
118
119        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
120        // each field manually instead of using #[serde(flatten)] for `compaction`.
121        // See https://github.com/serde-rs/serde/issues/1626
122        let options: RegionOptionsWithoutEnum =
123            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
124        let compaction = if validate_enum_options(options_map, "compaction.type")? {
125            serde_json::from_str(&json).context(JsonOptionsSnafu)?
126        } else {
127            CompactionOptions::default()
128        };
129
130        // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
131        let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
132            || Ok(WalOptions::default()),
133            |encoded_wal_options| {
134                serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
135            },
136        )?;
137
138        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
139        let memtable = if validate_enum_options(options_map, "memtable.type")? {
140            Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
141        } else {
142            None
143        };
144
145        let opts = RegionOptions {
146            ttl: options.ttl,
147            compaction,
148            storage: options.storage,
149            append_mode: options.append_mode,
150            wal_options,
151            index_options,
152            memtable,
153            merge_mode: options.merge_mode,
154        };
155        opts.validate()?;
156
157        Ok(opts)
158    }
159}
160
161/// Options for compactions
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(tag = "compaction.type")]
164#[serde(rename_all = "snake_case")]
165pub enum CompactionOptions {
166    /// Time window compaction strategy.
167    #[serde(with = "prefix_twcs")]
168    Twcs(TwcsOptions),
169}
170
171impl CompactionOptions {
172    pub(crate) fn time_window(&self) -> Option<Duration> {
173        match self {
174            CompactionOptions::Twcs(opts) => opts.time_window,
175        }
176    }
177
178    pub(crate) fn remote_compaction(&self) -> bool {
179        match self {
180            CompactionOptions::Twcs(opts) => opts.remote_compaction,
181        }
182    }
183
184    pub(crate) fn fallback_to_local(&self) -> bool {
185        match self {
186            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
187        }
188    }
189}
190
191impl Default for CompactionOptions {
192    fn default() -> Self {
193        Self::Twcs(TwcsOptions::default())
194    }
195}
196
197/// Time window compaction options.
198#[serde_as]
199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
200#[serde(default)]
201pub struct TwcsOptions {
202    /// Max num of sorted runs that can be kept in active writing time window.
203    #[serde_as(as = "DisplayFromStr")]
204    pub max_active_window_runs: usize,
205    /// Max num of files in the active window.
206    #[serde_as(as = "DisplayFromStr")]
207    pub max_active_window_files: usize,
208    /// Max num of sorted runs that can be kept in inactive time windows.
209    #[serde_as(as = "DisplayFromStr")]
210    pub max_inactive_window_runs: usize,
211    /// Max num of files in inactive time windows.
212    #[serde_as(as = "DisplayFromStr")]
213    pub max_inactive_window_files: usize,
214    /// Compaction time window defined when creating tables.
215    #[serde(with = "humantime_serde")]
216    pub time_window: Option<Duration>,
217    /// Compaction time window defined when creating tables.
218    pub max_output_file_size: Option<ReadableSize>,
219    /// Whether to use remote compaction.
220    #[serde_as(as = "DisplayFromStr")]
221    pub remote_compaction: bool,
222    /// Whether to fall back to local compaction if remote compaction fails.
223    #[serde_as(as = "DisplayFromStr")]
224    pub fallback_to_local: bool,
225}
226
227with_prefix!(prefix_twcs "compaction.twcs.");
228
229impl TwcsOptions {
230    /// Returns time window in second resolution.
231    pub fn time_window_seconds(&self) -> Option<i64> {
232        self.time_window.and_then(|window| {
233            let window_secs = window.as_secs();
234            if window_secs == 0 {
235                None
236            } else {
237                window_secs.try_into().ok()
238            }
239        })
240    }
241}
242
243impl Default for TwcsOptions {
244    fn default() -> Self {
245        Self {
246            max_active_window_runs: 4,
247            max_active_window_files: 4,
248            max_inactive_window_runs: 1,
249            max_inactive_window_files: 1,
250            time_window: None,
251            max_output_file_size: Some(ReadableSize::gb(2)),
252            remote_compaction: false,
253            fallback_to_local: true,
254        }
255    }
256}
257
258/// We need to define a new struct without enum fields as `#[serde(default)]` does not
259/// support external tagging.
260#[serde_as]
261#[derive(Debug, Deserialize)]
262#[serde(default)]
263struct RegionOptionsWithoutEnum {
264    /// Region SST files TTL.
265    ttl: Option<TimeToLive>,
266    storage: Option<String>,
267    #[serde_as(as = "DisplayFromStr")]
268    append_mode: bool,
269    #[serde_as(as = "NoneAsEmptyString")]
270    merge_mode: Option<MergeMode>,
271}
272
273impl Default for RegionOptionsWithoutEnum {
274    fn default() -> Self {
275        let options = RegionOptions::default();
276        RegionOptionsWithoutEnum {
277            ttl: options.ttl,
278            storage: options.storage,
279            append_mode: options.append_mode,
280            merge_mode: options.merge_mode,
281        }
282    }
283}
284
285with_prefix!(prefix_inverted_index "index.inverted_index.");
286
287/// Options for index.
288#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
289#[serde(default)]
290pub struct IndexOptions {
291    /// Options for the inverted index.
292    #[serde(flatten, with = "prefix_inverted_index")]
293    pub inverted_index: InvertedIndexOptions,
294}
295
296/// Options for the inverted index.
297#[serde_as]
298#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
299#[serde(default)]
300pub struct InvertedIndexOptions {
301    /// The column ids that should be ignored when building the inverted index.
302    /// The column ids are separated by commas. For example, "1,2,3".
303    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
304    #[serde(serialize_with = "serialize_ignore_column_ids")]
305    pub ignore_column_ids: Vec<ColumnId>,
306
307    /// The number of rows in a segment.
308    #[serde_as(as = "DisplayFromStr")]
309    pub segment_row_count: usize,
310}
311
312impl Default for InvertedIndexOptions {
313    fn default() -> Self {
314        Self {
315            ignore_column_ids: Vec::new(),
316            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
317        }
318    }
319}
320
321/// Options for region level memtable.
322#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
323#[serde(tag = "memtable.type", rename_all = "snake_case")]
324pub enum MemtableOptions {
325    TimeSeries,
326    #[serde(with = "prefix_partition_tree")]
327    PartitionTree(PartitionTreeOptions),
328}
329
330with_prefix!(prefix_partition_tree "memtable.partition_tree.");
331
332impl MemtableOptions {
333    /// Returns the primary key encoding mode.
334    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
335        match self {
336            MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
337            _ => PrimaryKeyEncoding::Dense,
338        }
339    }
340}
341
342/// Partition tree memtable options.
343#[serde_as]
344#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
345#[serde(default)]
346pub struct PartitionTreeOptions {
347    /// Max keys in an index shard.
348    #[serde_as(as = "DisplayFromStr")]
349    pub index_max_keys_per_shard: usize,
350    /// Number of rows to freeze a data part.
351    #[serde_as(as = "DisplayFromStr")]
352    pub data_freeze_threshold: usize,
353    /// Total bytes of dictionary to keep in fork.
354    pub fork_dictionary_bytes: ReadableSize,
355    /// Primary key encoding mode.
356    pub primary_key_encoding: PrimaryKeyEncoding,
357}
358
359impl Default for PartitionTreeOptions {
360    fn default() -> Self {
361        let mut fork_dictionary_bytes = ReadableSize::mb(512);
362        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
363            let adjust_dictionary_bytes = std::cmp::min(
364                sys_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
365                fork_dictionary_bytes,
366            );
367            if adjust_dictionary_bytes.0 > 0 {
368                fork_dictionary_bytes = adjust_dictionary_bytes;
369            }
370        }
371        Self {
372            index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
373            data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
374            fork_dictionary_bytes,
375            primary_key_encoding: PrimaryKeyEncoding::Dense,
376        }
377    }
378}
379
380fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
381where
382    D: Deserializer<'de>,
383{
384    let s: String = Deserialize::deserialize(deserializer)?;
385    let mut column_ids = Vec::new();
386    if s.is_empty() {
387        return Ok(column_ids);
388    }
389    for item in s.split(',') {
390        let column_id = item.parse().map_err(D::Error::custom)?;
391        column_ids.push(column_id);
392    }
393    Ok(column_ids)
394}
395
396fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
397where
398    S: serde::Serializer,
399{
400    let s = column_ids
401        .iter()
402        .map(|id| id.to_string())
403        .collect::<Vec<_>>()
404        .join(",");
405    serializer.serialize_str(&s)
406}
407
408/// Converts the `options` map to a json object.
409///
410/// Replaces "null" strings by `null` json values.
411fn options_map_to_value(options: &HashMap<String, String>) -> Value {
412    let map = options
413        .iter()
414        .map(|(key, value)| {
415            // Only convert the key to lowercase.
416            if value.eq_ignore_ascii_case("null") {
417                (key.to_string(), Value::Null)
418            } else {
419                (key.to_string(), Value::from(value.to_string()))
420            }
421        })
422        .collect();
423    Value::Object(map)
424}
425
426// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
427// check the type key first.
428/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
429/// and returns `true` if the map contains enum options.
430fn validate_enum_options(
431    options_map: &HashMap<String, String>,
432    enum_tag_key: &str,
433) -> Result<bool> {
434    let enum_type = enum_tag_key.split('.').next().unwrap();
435    let mut has_other_options = false;
436    let mut has_tag = false;
437    for key in options_map.keys() {
438        if key == enum_tag_key {
439            has_tag = true;
440        } else if key.starts_with(enum_type) {
441            has_other_options = true;
442        }
443    }
444
445    // If tag is not provided, then other options for the enum should not exist.
446    ensure!(
447        has_tag || !has_other_options,
448        InvalidRegionOptionsSnafu {
449            reason: format!("missing key {} in options", enum_tag_key),
450        }
451    );
452
453    Ok(has_tag)
454}
455
456#[cfg(test)]
457mod tests {
458    use common_error::ext::ErrorExt;
459    use common_error::status_code::StatusCode;
460    use common_wal::options::KafkaWalOptions;
461
462    use super::*;
463
464    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
465        options
466            .iter()
467            .map(|(k, v)| (k.to_string(), v.to_string()))
468            .collect()
469    }
470
471    #[test]
472    fn test_empty_region_options() {
473        let map = make_map(&[]);
474        let options = RegionOptions::try_from(&map).unwrap();
475        assert_eq!(RegionOptions::default(), options);
476    }
477
478    #[test]
479    fn test_with_ttl() {
480        let map = make_map(&[("ttl", "7d")]);
481        let options = RegionOptions::try_from(&map).unwrap();
482        let expect = RegionOptions {
483            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
484            ..Default::default()
485        };
486        assert_eq!(expect, options);
487    }
488
489    #[test]
490    fn test_with_storage() {
491        let map = make_map(&[("storage", "S3")]);
492        let options = RegionOptions::try_from(&map).unwrap();
493        let expect = RegionOptions {
494            storage: Some("S3".to_string()),
495            ..Default::default()
496        };
497        assert_eq!(expect, options);
498    }
499
500    #[test]
501    fn test_without_compaction_type() {
502        let map = make_map(&[
503            ("compaction.twcs.max_active_window_runs", "8"),
504            ("compaction.twcs.time_window", "2h"),
505        ]);
506        let err = RegionOptions::try_from(&map).unwrap_err();
507        assert_eq!(StatusCode::InvalidArguments, err.status_code());
508    }
509
510    #[test]
511    fn test_with_compaction_type() {
512        let map = make_map(&[
513            ("compaction.twcs.max_active_window_runs", "8"),
514            ("compaction.twcs.time_window", "2h"),
515            ("compaction.type", "twcs"),
516        ]);
517        let options = RegionOptions::try_from(&map).unwrap();
518        let expect = RegionOptions {
519            compaction: CompactionOptions::Twcs(TwcsOptions {
520                max_active_window_runs: 8,
521                time_window: Some(Duration::from_secs(3600 * 2)),
522                ..Default::default()
523            }),
524            ..Default::default()
525        };
526        assert_eq!(expect, options);
527    }
528
529    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
530        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
531        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
532        let got = RegionOptions::try_from(&map).unwrap();
533        let expect = RegionOptions {
534            wal_options: wal_options.clone(),
535            ..Default::default()
536        };
537        expect == got
538    }
539
540    #[test]
541    fn test_with_index() {
542        let map = make_map(&[
543            ("index.inverted_index.ignore_column_ids", "1,2,3"),
544            ("index.inverted_index.segment_row_count", "512"),
545        ]);
546        let options = RegionOptions::try_from(&map).unwrap();
547        let expect = RegionOptions {
548            index_options: IndexOptions {
549                inverted_index: InvertedIndexOptions {
550                    ignore_column_ids: vec![1, 2, 3],
551                    segment_row_count: 512,
552                },
553            },
554            ..Default::default()
555        };
556        assert_eq!(expect, options);
557    }
558
559    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
560    #[test]
561    fn test_with_any_wal_options() {
562        let all_wal_options = [
563            WalOptions::RaftEngine,
564            WalOptions::Kafka(KafkaWalOptions {
565                topic: "test_topic".to_string(),
566            }),
567        ];
568        all_wal_options.iter().all(test_with_wal_options);
569    }
570
571    #[test]
572    fn test_with_memtable() {
573        let map = make_map(&[("memtable.type", "time_series")]);
574        let options = RegionOptions::try_from(&map).unwrap();
575        let expect = RegionOptions {
576            memtable: Some(MemtableOptions::TimeSeries),
577            ..Default::default()
578        };
579        assert_eq!(expect, options);
580
581        let map = make_map(&[("memtable.type", "partition_tree")]);
582        let options = RegionOptions::try_from(&map).unwrap();
583        let expect = RegionOptions {
584            memtable: Some(MemtableOptions::PartitionTree(
585                PartitionTreeOptions::default(),
586            )),
587            ..Default::default()
588        };
589        assert_eq!(expect, options);
590    }
591
592    #[test]
593    fn test_unknown_memtable_type() {
594        let map = make_map(&[("memtable.type", "no_such_memtable")]);
595        let err = RegionOptions::try_from(&map).unwrap_err();
596        assert_eq!(StatusCode::InvalidArguments, err.status_code());
597    }
598
599    #[test]
600    fn test_with_merge_mode() {
601        let map = make_map(&[("merge_mode", "last_row")]);
602        let options = RegionOptions::try_from(&map).unwrap();
603        assert_eq!(MergeMode::LastRow, options.merge_mode());
604
605        let map = make_map(&[("merge_mode", "last_non_null")]);
606        let options = RegionOptions::try_from(&map).unwrap();
607        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
608
609        let map = make_map(&[("merge_mode", "unknown")]);
610        let err = RegionOptions::try_from(&map).unwrap_err();
611        assert_eq!(StatusCode::InvalidArguments, err.status_code());
612    }
613
614    #[test]
615    fn test_with_all() {
616        let wal_options = WalOptions::Kafka(KafkaWalOptions {
617            topic: "test_topic".to_string(),
618        });
619        let map = make_map(&[
620            ("ttl", "7d"),
621            ("compaction.twcs.max_active_window_runs", "8"),
622            ("compaction.twcs.max_active_window_files", "11"),
623            ("compaction.twcs.max_inactive_window_runs", "2"),
624            ("compaction.twcs.max_inactive_window_files", "3"),
625            ("compaction.twcs.max_output_file_size", "1GB"),
626            ("compaction.twcs.time_window", "2h"),
627            ("compaction.type", "twcs"),
628            ("compaction.twcs.remote_compaction", "false"),
629            ("compaction.twcs.fallback_to_local", "true"),
630            ("storage", "S3"),
631            ("append_mode", "false"),
632            ("index.inverted_index.ignore_column_ids", "1,2,3"),
633            ("index.inverted_index.segment_row_count", "512"),
634            (
635                WAL_OPTIONS_KEY,
636                &serde_json::to_string(&wal_options).unwrap(),
637            ),
638            ("memtable.type", "partition_tree"),
639            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
640            ("memtable.partition_tree.data_freeze_threshold", "2048"),
641            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
642            ("merge_mode", "last_non_null"),
643        ]);
644        let options = RegionOptions::try_from(&map).unwrap();
645        let expect = RegionOptions {
646            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
647            compaction: CompactionOptions::Twcs(TwcsOptions {
648                max_active_window_runs: 8,
649                max_active_window_files: 11,
650                max_inactive_window_runs: 2,
651                max_inactive_window_files: 3,
652                time_window: Some(Duration::from_secs(3600 * 2)),
653                max_output_file_size: Some(ReadableSize::gb(1)),
654                remote_compaction: false,
655                fallback_to_local: true,
656            }),
657            storage: Some("S3".to_string()),
658            append_mode: false,
659            wal_options,
660            index_options: IndexOptions {
661                inverted_index: InvertedIndexOptions {
662                    ignore_column_ids: vec![1, 2, 3],
663                    segment_row_count: 512,
664                },
665            },
666            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
667                index_max_keys_per_shard: 2048,
668                data_freeze_threshold: 2048,
669                fork_dictionary_bytes: ReadableSize::mb(128),
670                primary_key_encoding: PrimaryKeyEncoding::Dense,
671            })),
672            merge_mode: Some(MergeMode::LastNonNull),
673        };
674        assert_eq!(expect, options);
675    }
676
677    #[test]
678    fn test_region_options_serde() {
679        let options = RegionOptions {
680            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
681            compaction: CompactionOptions::Twcs(TwcsOptions {
682                max_active_window_runs: 8,
683                max_active_window_files: usize::MAX,
684                max_inactive_window_runs: 2,
685                max_inactive_window_files: usize::MAX,
686                time_window: Some(Duration::from_secs(3600 * 2)),
687                max_output_file_size: None,
688                remote_compaction: false,
689                fallback_to_local: true,
690            }),
691            storage: Some("S3".to_string()),
692            append_mode: false,
693            wal_options: WalOptions::Kafka(KafkaWalOptions {
694                topic: "test_topic".to_string(),
695            }),
696            index_options: IndexOptions {
697                inverted_index: InvertedIndexOptions {
698                    ignore_column_ids: vec![1, 2, 3],
699                    segment_row_count: 512,
700                },
701            },
702            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
703                index_max_keys_per_shard: 2048,
704                data_freeze_threshold: 2048,
705                fork_dictionary_bytes: ReadableSize::mb(128),
706                primary_key_encoding: PrimaryKeyEncoding::Dense,
707            })),
708            merge_mode: Some(MergeMode::LastNonNull),
709        };
710        let region_options_json_str = serde_json::to_string(&options).unwrap();
711        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
712        assert_eq!(options, got);
713    }
714
715    #[test]
716    fn test_region_options_str_serde() {
717        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
718        let region_options_json_str = r#"{
719  "ttl": "7days",
720  "compaction": {
721    "compaction.type": "twcs",
722    "compaction.twcs.max_active_window_runs": "8",
723    "compaction.twcs.max_active_window_files": "11",
724    "compaction.twcs.max_inactive_window_runs": "2",
725    "compaction.twcs.max_inactive_window_files": "7",
726    "compaction.twcs.max_output_file_size": "7MB",
727    "compaction.twcs.time_window": "2h"
728  },
729  "storage": "S3",
730  "append_mode": false,
731  "wal_options": {
732    "wal.provider": "kafka",
733    "wal.kafka.topic": "test_topic"
734  },
735  "index_options": {
736    "index.inverted_index.ignore_column_ids": "",
737    "index.inverted_index.segment_row_count": "512"
738  },
739  "memtable": {
740    "memtable.type": "partition_tree",
741    "memtable.partition_tree.index_max_keys_per_shard": "2048",
742    "memtable.partition_tree.data_freeze_threshold": "2048",
743    "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
744  },
745  "merge_mode": "last_non_null"
746}"#;
747        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
748        let options = RegionOptions {
749            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
750            compaction: CompactionOptions::Twcs(TwcsOptions {
751                max_active_window_runs: 8,
752                max_active_window_files: 11,
753                max_inactive_window_runs: 2,
754                max_inactive_window_files: 7,
755                time_window: Some(Duration::from_secs(3600 * 2)),
756                max_output_file_size: Some(ReadableSize::mb(7)),
757                remote_compaction: false,
758                fallback_to_local: true,
759            }),
760            storage: Some("S3".to_string()),
761            append_mode: false,
762            wal_options: WalOptions::Kafka(KafkaWalOptions {
763                topic: "test_topic".to_string(),
764            }),
765            index_options: IndexOptions {
766                inverted_index: InvertedIndexOptions {
767                    ignore_column_ids: vec![],
768                    segment_row_count: 512,
769                },
770            },
771            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
772                index_max_keys_per_shard: 2048,
773                data_freeze_threshold: 2048,
774                fork_dictionary_bytes: ReadableSize::mb(128),
775                primary_key_encoding: PrimaryKeyEncoding::Dense,
776            })),
777            merge_mode: Some(MergeMode::LastNonNull),
778        };
779        assert_eq!(options, got);
780    }
781}