mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_time::TimeToLive;
24use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
25use serde::de::Error as _;
26use serde::{Deserialize, Deserializer, Serialize};
27use serde_json::Value;
28use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString};
29use snafu::{ensure, ResultExt};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::storage::ColumnId;
32use strum::EnumString;
33
34use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
35use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
36
37const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
38
39/// Mode to handle duplicate rows while merging.
40#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
41#[serde(rename_all = "snake_case")]
42#[strum(serialize_all = "snake_case")]
43pub enum MergeMode {
44    /// Keeps the last row.
45    #[default]
46    LastRow,
47    /// Keeps the last non-null field for each row.
48    LastNonNull,
49}
50
51// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
52// if we want expose the option to table options.
53/// Options that affect the entire region.
54///
55/// Users need to specify the options while creating/opening a region.
56#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(default)]
58pub struct RegionOptions {
59    /// Region SST files TTL.
60    pub ttl: Option<TimeToLive>,
61    /// Compaction options.
62    pub compaction: CompactionOptions,
63    /// Custom storage. Uses default storage if it is `None`.
64    pub storage: Option<String>,
65    /// If append mode is enabled, the region keeps duplicate rows.
66    pub append_mode: bool,
67    /// Wal options.
68    pub wal_options: WalOptions,
69    /// Index options.
70    pub index_options: IndexOptions,
71    /// Memtable options.
72    pub memtable: Option<MemtableOptions>,
73    /// The mode to merge duplicate rows.
74    /// Only takes effect when `append_mode` is `false`.
75    pub merge_mode: Option<MergeMode>,
76}
77
78impl RegionOptions {
79    /// Validates options.
80    pub fn validate(&self) -> Result<()> {
81        if self.append_mode {
82            ensure!(
83                self.merge_mode.is_none(),
84                InvalidRegionOptionsSnafu {
85                    reason: "merge_mode is not allowed when append_mode is enabled",
86                }
87            );
88        }
89        Ok(())
90    }
91
92    /// Returns `true` if deduplication is needed.
93    pub fn need_dedup(&self) -> bool {
94        !self.append_mode
95    }
96
97    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
98    pub fn merge_mode(&self) -> MergeMode {
99        self.merge_mode.unwrap_or_default()
100    }
101
102    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
103    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
104        self.memtable
105            .as_ref()
106            .map_or(PrimaryKeyEncoding::default(), |memtable| {
107                memtable.primary_key_encoding()
108            })
109    }
110}
111
112impl TryFrom<&HashMap<String, String>> for RegionOptions {
113    type Error = Error;
114
115    fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
116        let value = options_map_to_value(options_map);
117        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
118
119        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
120        // each field manually instead of using #[serde(flatten)] for `compaction`.
121        // See https://github.com/serde-rs/serde/issues/1626
122        let options: RegionOptionsWithoutEnum =
123            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
124        let compaction = if validate_enum_options(options_map, "compaction.type")? {
125            serde_json::from_str(&json).context(JsonOptionsSnafu)?
126        } else {
127            CompactionOptions::default()
128        };
129
130        // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
131        let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
132            || Ok(WalOptions::default()),
133            |encoded_wal_options| {
134                serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
135            },
136        )?;
137
138        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
139        let memtable = if validate_enum_options(options_map, "memtable.type")? {
140            Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
141        } else {
142            None
143        };
144
145        let opts = RegionOptions {
146            ttl: options.ttl,
147            compaction,
148            storage: options.storage,
149            append_mode: options.append_mode,
150            wal_options,
151            index_options,
152            memtable,
153            merge_mode: options.merge_mode,
154        };
155        opts.validate()?;
156
157        Ok(opts)
158    }
159}
160
161/// Options for compactions
162#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(tag = "compaction.type")]
164#[serde(rename_all = "snake_case")]
165pub enum CompactionOptions {
166    /// Time window compaction strategy.
167    #[serde(with = "prefix_twcs")]
168    Twcs(TwcsOptions),
169}
170
171impl CompactionOptions {
172    pub(crate) fn time_window(&self) -> Option<Duration> {
173        match self {
174            CompactionOptions::Twcs(opts) => opts.time_window,
175        }
176    }
177
178    pub(crate) fn remote_compaction(&self) -> bool {
179        match self {
180            CompactionOptions::Twcs(opts) => opts.remote_compaction,
181        }
182    }
183
184    pub(crate) fn fallback_to_local(&self) -> bool {
185        match self {
186            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
187        }
188    }
189}
190
191impl Default for CompactionOptions {
192    fn default() -> Self {
193        Self::Twcs(TwcsOptions::default())
194    }
195}
196
197/// Time window compaction options.
198#[serde_as]
199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
200#[serde(default)]
201pub struct TwcsOptions {
202    /// Minimum file num in every time window to trigger a compaction.
203    #[serde_as(as = "DisplayFromStr")]
204    pub trigger_file_num: usize,
205    /// Compaction time window defined when creating tables.
206    #[serde(with = "humantime_serde")]
207    pub time_window: Option<Duration>,
208    /// Compaction time window defined when creating tables.
209    pub max_output_file_size: Option<ReadableSize>,
210    /// Whether to use remote compaction.
211    #[serde_as(as = "DisplayFromStr")]
212    pub remote_compaction: bool,
213    /// Whether to fall back to local compaction if remote compaction fails.
214    #[serde_as(as = "DisplayFromStr")]
215    pub fallback_to_local: bool,
216}
217
218with_prefix!(prefix_twcs "compaction.twcs.");
219
220impl TwcsOptions {
221    /// Returns time window in second resolution.
222    pub fn time_window_seconds(&self) -> Option<i64> {
223        self.time_window.and_then(|window| {
224            let window_secs = window.as_secs();
225            if window_secs == 0 {
226                None
227            } else {
228                window_secs.try_into().ok()
229            }
230        })
231    }
232}
233
234impl Default for TwcsOptions {
235    fn default() -> Self {
236        Self {
237            trigger_file_num: 4,
238            time_window: None,
239            max_output_file_size: Some(ReadableSize::mb(512)),
240            remote_compaction: false,
241            fallback_to_local: true,
242        }
243    }
244}
245
246/// We need to define a new struct without enum fields as `#[serde(default)]` does not
247/// support external tagging.
248#[serde_as]
249#[derive(Debug, Deserialize)]
250#[serde(default)]
251struct RegionOptionsWithoutEnum {
252    /// Region SST files TTL.
253    ttl: Option<TimeToLive>,
254    storage: Option<String>,
255    #[serde_as(as = "DisplayFromStr")]
256    append_mode: bool,
257    #[serde_as(as = "NoneAsEmptyString")]
258    merge_mode: Option<MergeMode>,
259}
260
261impl Default for RegionOptionsWithoutEnum {
262    fn default() -> Self {
263        let options = RegionOptions::default();
264        RegionOptionsWithoutEnum {
265            ttl: options.ttl,
266            storage: options.storage,
267            append_mode: options.append_mode,
268            merge_mode: options.merge_mode,
269        }
270    }
271}
272
273with_prefix!(prefix_inverted_index "index.inverted_index.");
274
275/// Options for index.
276#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
277#[serde(default)]
278pub struct IndexOptions {
279    /// Options for the inverted index.
280    #[serde(flatten, with = "prefix_inverted_index")]
281    pub inverted_index: InvertedIndexOptions,
282}
283
284/// Options for the inverted index.
285#[serde_as]
286#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(default)]
288pub struct InvertedIndexOptions {
289    /// The column ids that should be ignored when building the inverted index.
290    /// The column ids are separated by commas. For example, "1,2,3".
291    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
292    #[serde(serialize_with = "serialize_ignore_column_ids")]
293    pub ignore_column_ids: Vec<ColumnId>,
294
295    /// The number of rows in a segment.
296    #[serde_as(as = "DisplayFromStr")]
297    pub segment_row_count: usize,
298}
299
300impl Default for InvertedIndexOptions {
301    fn default() -> Self {
302        Self {
303            ignore_column_ids: Vec::new(),
304            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
305        }
306    }
307}
308
309/// Options for region level memtable.
310#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
311#[serde(tag = "memtable.type", rename_all = "snake_case")]
312pub enum MemtableOptions {
313    TimeSeries,
314    #[serde(with = "prefix_partition_tree")]
315    PartitionTree(PartitionTreeOptions),
316}
317
318with_prefix!(prefix_partition_tree "memtable.partition_tree.");
319
320impl MemtableOptions {
321    /// Returns the primary key encoding mode.
322    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
323        match self {
324            MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
325            _ => PrimaryKeyEncoding::Dense,
326        }
327    }
328}
329
330/// Partition tree memtable options.
331#[serde_as]
332#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
333#[serde(default)]
334pub struct PartitionTreeOptions {
335    /// Max keys in an index shard.
336    #[serde_as(as = "DisplayFromStr")]
337    pub index_max_keys_per_shard: usize,
338    /// Number of rows to freeze a data part.
339    #[serde_as(as = "DisplayFromStr")]
340    pub data_freeze_threshold: usize,
341    /// Total bytes of dictionary to keep in fork.
342    pub fork_dictionary_bytes: ReadableSize,
343    /// Primary key encoding mode.
344    pub primary_key_encoding: PrimaryKeyEncoding,
345}
346
347impl Default for PartitionTreeOptions {
348    fn default() -> Self {
349        let mut fork_dictionary_bytes = ReadableSize::mb(512);
350        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
351            let adjust_dictionary_bytes = std::cmp::min(
352                sys_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
353                fork_dictionary_bytes,
354            );
355            if adjust_dictionary_bytes.0 > 0 {
356                fork_dictionary_bytes = adjust_dictionary_bytes;
357            }
358        }
359        Self {
360            index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
361            data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
362            fork_dictionary_bytes,
363            primary_key_encoding: PrimaryKeyEncoding::Dense,
364        }
365    }
366}
367
368fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
369where
370    D: Deserializer<'de>,
371{
372    let s: String = Deserialize::deserialize(deserializer)?;
373    let mut column_ids = Vec::new();
374    if s.is_empty() {
375        return Ok(column_ids);
376    }
377    for item in s.split(',') {
378        let column_id = item.parse().map_err(D::Error::custom)?;
379        column_ids.push(column_id);
380    }
381    Ok(column_ids)
382}
383
384fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
385where
386    S: serde::Serializer,
387{
388    let s = column_ids
389        .iter()
390        .map(|id| id.to_string())
391        .collect::<Vec<_>>()
392        .join(",");
393    serializer.serialize_str(&s)
394}
395
396/// Converts the `options` map to a json object.
397///
398/// Replaces "null" strings by `null` json values.
399fn options_map_to_value(options: &HashMap<String, String>) -> Value {
400    let map = options
401        .iter()
402        .map(|(key, value)| {
403            // Only convert the key to lowercase.
404            if value.eq_ignore_ascii_case("null") {
405                (key.to_string(), Value::Null)
406            } else {
407                (key.to_string(), Value::from(value.to_string()))
408            }
409        })
410        .collect();
411    Value::Object(map)
412}
413
414// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
415// check the type key first.
416/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
417/// and returns `true` if the map contains enum options.
418fn validate_enum_options(
419    options_map: &HashMap<String, String>,
420    enum_tag_key: &str,
421) -> Result<bool> {
422    let enum_type = enum_tag_key.split('.').next().unwrap();
423    let mut has_other_options = false;
424    let mut has_tag = false;
425    for key in options_map.keys() {
426        if key == enum_tag_key {
427            has_tag = true;
428        } else if key.starts_with(enum_type) {
429            has_other_options = true;
430        }
431    }
432
433    // If tag is not provided, then other options for the enum should not exist.
434    ensure!(
435        has_tag || !has_other_options,
436        InvalidRegionOptionsSnafu {
437            reason: format!("missing key {} in options", enum_tag_key),
438        }
439    );
440
441    Ok(has_tag)
442}
443
444#[cfg(test)]
445mod tests {
446    use common_error::ext::ErrorExt;
447    use common_error::status_code::StatusCode;
448    use common_wal::options::KafkaWalOptions;
449
450    use super::*;
451
452    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
453        options
454            .iter()
455            .map(|(k, v)| (k.to_string(), v.to_string()))
456            .collect()
457    }
458
459    #[test]
460    fn test_empty_region_options() {
461        let map = make_map(&[]);
462        let options = RegionOptions::try_from(&map).unwrap();
463        assert_eq!(RegionOptions::default(), options);
464    }
465
466    #[test]
467    fn test_with_ttl() {
468        let map = make_map(&[("ttl", "7d")]);
469        let options = RegionOptions::try_from(&map).unwrap();
470        let expect = RegionOptions {
471            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
472            ..Default::default()
473        };
474        assert_eq!(expect, options);
475    }
476
477    #[test]
478    fn test_with_storage() {
479        let map = make_map(&[("storage", "S3")]);
480        let options = RegionOptions::try_from(&map).unwrap();
481        let expect = RegionOptions {
482            storage: Some("S3".to_string()),
483            ..Default::default()
484        };
485        assert_eq!(expect, options);
486    }
487
488    #[test]
489    fn test_without_compaction_type() {
490        let map = make_map(&[
491            ("compaction.twcs.trigger_file_num", "8"),
492            ("compaction.twcs.time_window", "2h"),
493        ]);
494        let err = RegionOptions::try_from(&map).unwrap_err();
495        assert_eq!(StatusCode::InvalidArguments, err.status_code());
496    }
497
498    #[test]
499    fn test_with_compaction_type() {
500        let map = make_map(&[
501            ("compaction.twcs.trigger_file_num", "8"),
502            ("compaction.twcs.time_window", "2h"),
503            ("compaction.type", "twcs"),
504        ]);
505        let options = RegionOptions::try_from(&map).unwrap();
506        let expect = RegionOptions {
507            compaction: CompactionOptions::Twcs(TwcsOptions {
508                trigger_file_num: 8,
509                time_window: Some(Duration::from_secs(3600 * 2)),
510                ..Default::default()
511            }),
512            ..Default::default()
513        };
514        assert_eq!(expect, options);
515    }
516
517    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
518        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
519        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
520        let got = RegionOptions::try_from(&map).unwrap();
521        let expect = RegionOptions {
522            wal_options: wal_options.clone(),
523            ..Default::default()
524        };
525        expect == got
526    }
527
528    #[test]
529    fn test_with_index() {
530        let map = make_map(&[
531            ("index.inverted_index.ignore_column_ids", "1,2,3"),
532            ("index.inverted_index.segment_row_count", "512"),
533        ]);
534        let options = RegionOptions::try_from(&map).unwrap();
535        let expect = RegionOptions {
536            index_options: IndexOptions {
537                inverted_index: InvertedIndexOptions {
538                    ignore_column_ids: vec![1, 2, 3],
539                    segment_row_count: 512,
540                },
541            },
542            ..Default::default()
543        };
544        assert_eq!(expect, options);
545    }
546
547    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
548    #[test]
549    fn test_with_any_wal_options() {
550        let all_wal_options = [
551            WalOptions::RaftEngine,
552            WalOptions::Kafka(KafkaWalOptions {
553                topic: "test_topic".to_string(),
554            }),
555        ];
556        all_wal_options.iter().all(test_with_wal_options);
557    }
558
559    #[test]
560    fn test_with_memtable() {
561        let map = make_map(&[("memtable.type", "time_series")]);
562        let options = RegionOptions::try_from(&map).unwrap();
563        let expect = RegionOptions {
564            memtable: Some(MemtableOptions::TimeSeries),
565            ..Default::default()
566        };
567        assert_eq!(expect, options);
568
569        let map = make_map(&[("memtable.type", "partition_tree")]);
570        let options = RegionOptions::try_from(&map).unwrap();
571        let expect = RegionOptions {
572            memtable: Some(MemtableOptions::PartitionTree(
573                PartitionTreeOptions::default(),
574            )),
575            ..Default::default()
576        };
577        assert_eq!(expect, options);
578    }
579
580    #[test]
581    fn test_unknown_memtable_type() {
582        let map = make_map(&[("memtable.type", "no_such_memtable")]);
583        let err = RegionOptions::try_from(&map).unwrap_err();
584        assert_eq!(StatusCode::InvalidArguments, err.status_code());
585    }
586
587    #[test]
588    fn test_with_merge_mode() {
589        let map = make_map(&[("merge_mode", "last_row")]);
590        let options = RegionOptions::try_from(&map).unwrap();
591        assert_eq!(MergeMode::LastRow, options.merge_mode());
592
593        let map = make_map(&[("merge_mode", "last_non_null")]);
594        let options = RegionOptions::try_from(&map).unwrap();
595        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
596
597        let map = make_map(&[("merge_mode", "unknown")]);
598        let err = RegionOptions::try_from(&map).unwrap_err();
599        assert_eq!(StatusCode::InvalidArguments, err.status_code());
600    }
601
602    #[test]
603    fn test_with_all() {
604        let wal_options = WalOptions::Kafka(KafkaWalOptions {
605            topic: "test_topic".to_string(),
606        });
607        let map = make_map(&[
608            ("ttl", "7d"),
609            ("compaction.twcs.trigger_file_num", "8"),
610            ("compaction.twcs.max_output_file_size", "1GB"),
611            ("compaction.twcs.time_window", "2h"),
612            ("compaction.type", "twcs"),
613            ("compaction.twcs.remote_compaction", "false"),
614            ("compaction.twcs.fallback_to_local", "true"),
615            ("storage", "S3"),
616            ("append_mode", "false"),
617            ("index.inverted_index.ignore_column_ids", "1,2,3"),
618            ("index.inverted_index.segment_row_count", "512"),
619            (
620                WAL_OPTIONS_KEY,
621                &serde_json::to_string(&wal_options).unwrap(),
622            ),
623            ("memtable.type", "partition_tree"),
624            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
625            ("memtable.partition_tree.data_freeze_threshold", "2048"),
626            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
627            ("merge_mode", "last_non_null"),
628        ]);
629        let options = RegionOptions::try_from(&map).unwrap();
630        let expect = RegionOptions {
631            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
632            compaction: CompactionOptions::Twcs(TwcsOptions {
633                trigger_file_num: 8,
634                time_window: Some(Duration::from_secs(3600 * 2)),
635                max_output_file_size: Some(ReadableSize::gb(1)),
636                remote_compaction: false,
637                fallback_to_local: true,
638            }),
639            storage: Some("S3".to_string()),
640            append_mode: false,
641            wal_options,
642            index_options: IndexOptions {
643                inverted_index: InvertedIndexOptions {
644                    ignore_column_ids: vec![1, 2, 3],
645                    segment_row_count: 512,
646                },
647            },
648            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
649                index_max_keys_per_shard: 2048,
650                data_freeze_threshold: 2048,
651                fork_dictionary_bytes: ReadableSize::mb(128),
652                primary_key_encoding: PrimaryKeyEncoding::Dense,
653            })),
654            merge_mode: Some(MergeMode::LastNonNull),
655        };
656        assert_eq!(expect, options);
657    }
658
659    #[test]
660    fn test_region_options_serde() {
661        let options = RegionOptions {
662            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
663            compaction: CompactionOptions::Twcs(TwcsOptions {
664                trigger_file_num: 8,
665                time_window: Some(Duration::from_secs(3600 * 2)),
666                max_output_file_size: None,
667                remote_compaction: false,
668                fallback_to_local: true,
669            }),
670            storage: Some("S3".to_string()),
671            append_mode: false,
672            wal_options: WalOptions::Kafka(KafkaWalOptions {
673                topic: "test_topic".to_string(),
674            }),
675            index_options: IndexOptions {
676                inverted_index: InvertedIndexOptions {
677                    ignore_column_ids: vec![1, 2, 3],
678                    segment_row_count: 512,
679                },
680            },
681            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
682                index_max_keys_per_shard: 2048,
683                data_freeze_threshold: 2048,
684                fork_dictionary_bytes: ReadableSize::mb(128),
685                primary_key_encoding: PrimaryKeyEncoding::Dense,
686            })),
687            merge_mode: Some(MergeMode::LastNonNull),
688        };
689        let region_options_json_str = serde_json::to_string(&options).unwrap();
690        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
691        assert_eq!(options, got);
692    }
693
694    #[test]
695    fn test_region_options_str_serde() {
696        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
697        let region_options_json_str = r#"{
698  "ttl": "7days",
699  "compaction": {
700    "compaction.type": "twcs",
701    "compaction.twcs.trigger_file_num": "8",
702    "compaction.twcs.max_output_file_size": "7MB",
703    "compaction.twcs.time_window": "2h"
704  },
705  "storage": "S3",
706  "append_mode": false,
707  "wal_options": {
708    "wal.provider": "kafka",
709    "wal.kafka.topic": "test_topic"
710  },
711  "index_options": {
712    "index.inverted_index.ignore_column_ids": "",
713    "index.inverted_index.segment_row_count": "512"
714  },
715  "memtable": {
716    "memtable.type": "partition_tree",
717    "memtable.partition_tree.index_max_keys_per_shard": "2048",
718    "memtable.partition_tree.data_freeze_threshold": "2048",
719    "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
720  },
721  "merge_mode": "last_non_null"
722}"#;
723        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
724        let options = RegionOptions {
725            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
726            compaction: CompactionOptions::Twcs(TwcsOptions {
727                trigger_file_num: 8,
728                time_window: Some(Duration::from_secs(3600 * 2)),
729                max_output_file_size: Some(ReadableSize::mb(7)),
730                remote_compaction: false,
731                fallback_to_local: true,
732            }),
733            storage: Some("S3".to_string()),
734            append_mode: false,
735            wal_options: WalOptions::Kafka(KafkaWalOptions {
736                topic: "test_topic".to_string(),
737            }),
738            index_options: IndexOptions {
739                inverted_index: InvertedIndexOptions {
740                    ignore_column_ids: vec![],
741                    segment_row_count: 512,
742                },
743            },
744            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
745                index_max_keys_per_shard: 2048,
746                data_freeze_threshold: 2048,
747                fork_dictionary_bytes: ReadableSize::mb(128),
748                primary_key_encoding: PrimaryKeyEncoding::Dense,
749            })),
750            merge_mode: Some(MergeMode::LastNonNull),
751        };
752        assert_eq!(options, got);
753    }
754}