mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_stat::get_total_memory_readable;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::mito_engine_options::COMPACTION_OVERRIDE;
33use store_api::storage::ColumnId;
34use strum::EnumString;
35
36use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
37use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
38use crate::sst::FormatType;
39
40const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
41
42/// Mode to handle duplicate rows while merging.
43#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
44#[serde(rename_all = "snake_case")]
45#[strum(serialize_all = "snake_case")]
46pub enum MergeMode {
47    /// Keeps the last row.
48    #[default]
49    LastRow,
50    /// Keeps the last non-null field for each row.
51    LastNonNull,
52}
53
54// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
55// if we want expose the option to table options.
56/// Options that affect the entire region.
57///
58/// Users need to specify the options while creating/opening a region.
59#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(default)]
61pub struct RegionOptions {
62    /// Region SST files TTL.
63    pub ttl: Option<TimeToLive>,
64    /// Compaction options.
65    pub compaction: CompactionOptions,
66    pub compaction_override: bool,
67    /// Custom storage. Uses default storage if it is `None`.
68    pub storage: Option<String>,
69    /// If append mode is enabled, the region keeps duplicate rows.
70    pub append_mode: bool,
71    /// Wal options.
72    pub wal_options: WalOptions,
73    /// Index options.
74    pub index_options: IndexOptions,
75    /// Memtable options.
76    pub memtable: Option<MemtableOptions>,
77    /// The mode to merge duplicate rows.
78    /// Only takes effect when `append_mode` is `false`.
79    pub merge_mode: Option<MergeMode>,
80    /// SST format type.
81    pub sst_format: Option<FormatType>,
82}
83
84impl RegionOptions {
85    /// Validates options.
86    pub fn validate(&self) -> Result<()> {
87        if self.append_mode {
88            ensure!(
89                self.merge_mode.is_none(),
90                InvalidRegionOptionsSnafu {
91                    reason: "merge_mode is not allowed when append_mode is enabled",
92                }
93            );
94        }
95        Ok(())
96    }
97
98    /// Returns `true` if deduplication is needed.
99    pub fn need_dedup(&self) -> bool {
100        !self.append_mode
101    }
102
103    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
104    pub fn merge_mode(&self) -> MergeMode {
105        self.merge_mode.unwrap_or_default()
106    }
107
108    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
109    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
110        self.memtable
111            .as_ref()
112            .map_or(PrimaryKeyEncoding::default(), |memtable| {
113                memtable.primary_key_encoding()
114            })
115    }
116}
117
118impl TryFrom<&HashMap<String, String>> for RegionOptions {
119    type Error = Error;
120
121    fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
122        let value = options_map_to_value(options_map);
123        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
124
125        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
126        // each field manually instead of using #[serde(flatten)] for `compaction`.
127        // See https://github.com/serde-rs/serde/issues/1626
128        let options: RegionOptionsWithoutEnum =
129            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
130        let has_compaction_type = validate_enum_options(options_map, "compaction.type")?;
131        let compaction = if has_compaction_type {
132            serde_json::from_str(&json).context(JsonOptionsSnafu)?
133        } else {
134            CompactionOptions::default()
135        };
136
137        // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
138        let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
139            || Ok(WalOptions::default()),
140            |encoded_wal_options| {
141                serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
142            },
143        )?;
144
145        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
146        let memtable = if validate_enum_options(options_map, "memtable.type")? {
147            Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
148        } else {
149            None
150        };
151
152        let compaction_override_flag = options_map
153            .get(COMPACTION_OVERRIDE)
154            .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
155            .unwrap_or(false);
156        let compaction_override = has_compaction_type || compaction_override_flag;
157
158        let opts = RegionOptions {
159            ttl: options.ttl,
160            compaction,
161            compaction_override,
162            storage: options.storage,
163            append_mode: options.append_mode,
164            wal_options,
165            index_options,
166            memtable,
167            merge_mode: options.merge_mode,
168            sst_format: options.sst_format,
169        };
170        opts.validate()?;
171
172        Ok(opts)
173    }
174}
175
176/// Options for compactions
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
178#[serde(tag = "compaction.type")]
179#[serde(rename_all = "snake_case")]
180pub enum CompactionOptions {
181    /// Time window compaction strategy.
182    #[serde(with = "prefix_twcs")]
183    Twcs(TwcsOptions),
184}
185
186impl CompactionOptions {
187    pub(crate) fn time_window(&self) -> Option<Duration> {
188        match self {
189            CompactionOptions::Twcs(opts) => opts.time_window,
190        }
191    }
192
193    pub(crate) fn remote_compaction(&self) -> bool {
194        match self {
195            CompactionOptions::Twcs(opts) => opts.remote_compaction,
196        }
197    }
198
199    pub(crate) fn fallback_to_local(&self) -> bool {
200        match self {
201            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
202        }
203    }
204}
205
206impl Default for CompactionOptions {
207    fn default() -> Self {
208        Self::Twcs(TwcsOptions::default())
209    }
210}
211
212/// Time window compaction options.
213#[serde_as]
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
215#[serde(default)]
216pub struct TwcsOptions {
217    /// Minimum file num in every time window to trigger a compaction.
218    #[serde_as(as = "DisplayFromStr")]
219    pub trigger_file_num: usize,
220    /// Compaction time window defined when creating tables.
221    #[serde(with = "humantime_serde")]
222    pub time_window: Option<Duration>,
223    /// Compaction time window defined when creating tables.
224    pub max_output_file_size: Option<ReadableSize>,
225    /// Whether to use remote compaction.
226    #[serde_as(as = "DisplayFromStr")]
227    pub remote_compaction: bool,
228    /// Whether to fall back to local compaction if remote compaction fails.
229    #[serde_as(as = "DisplayFromStr")]
230    pub fallback_to_local: bool,
231}
232
233with_prefix!(prefix_twcs "compaction.twcs.");
234
235impl TwcsOptions {
236    /// Returns time window in second resolution.
237    pub fn time_window_seconds(&self) -> Option<i64> {
238        self.time_window.and_then(|window| {
239            let window_secs = window.as_secs();
240            if window_secs == 0 {
241                None
242            } else {
243                window_secs.try_into().ok()
244            }
245        })
246    }
247}
248
249impl Default for TwcsOptions {
250    fn default() -> Self {
251        Self {
252            trigger_file_num: 4,
253            time_window: None,
254            max_output_file_size: Some(ReadableSize::mb(512)),
255            remote_compaction: false,
256            fallback_to_local: true,
257        }
258    }
259}
260
261/// We need to define a new struct without enum fields as `#[serde(default)]` does not
262/// support external tagging.
263#[serde_as]
264#[derive(Debug, Deserialize)]
265#[serde(default)]
266struct RegionOptionsWithoutEnum {
267    /// Region SST files TTL.
268    ttl: Option<TimeToLive>,
269    storage: Option<String>,
270    #[serde_as(as = "DisplayFromStr")]
271    append_mode: bool,
272    #[serde_as(as = "NoneAsEmptyString")]
273    merge_mode: Option<MergeMode>,
274    #[serde_as(as = "NoneAsEmptyString")]
275    sst_format: Option<FormatType>,
276}
277
278impl Default for RegionOptionsWithoutEnum {
279    fn default() -> Self {
280        let options = RegionOptions::default();
281        RegionOptionsWithoutEnum {
282            ttl: options.ttl,
283            storage: options.storage,
284            append_mode: options.append_mode,
285            merge_mode: options.merge_mode,
286            sst_format: options.sst_format,
287        }
288    }
289}
290
291with_prefix!(prefix_inverted_index "index.inverted_index.");
292
293/// Options for index.
294#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
295#[serde(default)]
296pub struct IndexOptions {
297    /// Options for the inverted index.
298    #[serde(flatten, with = "prefix_inverted_index")]
299    pub inverted_index: InvertedIndexOptions,
300}
301
302/// Options for the inverted index.
303#[serde_as]
304#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(default)]
306pub struct InvertedIndexOptions {
307    /// The column ids that should be ignored when building the inverted index.
308    /// The column ids are separated by commas. For example, "1,2,3".
309    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
310    #[serde(serialize_with = "serialize_ignore_column_ids")]
311    pub ignore_column_ids: Vec<ColumnId>,
312
313    /// The number of rows in a segment.
314    #[serde_as(as = "DisplayFromStr")]
315    pub segment_row_count: usize,
316}
317
318impl Default for InvertedIndexOptions {
319    fn default() -> Self {
320        Self {
321            ignore_column_ids: Vec::new(),
322            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
323        }
324    }
325}
326
327/// Options for region level memtable.
328#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
329#[serde(tag = "memtable.type", rename_all = "snake_case")]
330pub enum MemtableOptions {
331    TimeSeries,
332    #[serde(with = "prefix_partition_tree")]
333    PartitionTree(PartitionTreeOptions),
334}
335
336with_prefix!(prefix_partition_tree "memtable.partition_tree.");
337
338impl MemtableOptions {
339    /// Returns the primary key encoding mode.
340    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
341        match self {
342            MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
343            _ => PrimaryKeyEncoding::Dense,
344        }
345    }
346}
347
348/// Partition tree memtable options.
349#[serde_as]
350#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
351#[serde(default)]
352pub struct PartitionTreeOptions {
353    /// Max keys in an index shard.
354    #[serde_as(as = "DisplayFromStr")]
355    pub index_max_keys_per_shard: usize,
356    /// Number of rows to freeze a data part.
357    #[serde_as(as = "DisplayFromStr")]
358    pub data_freeze_threshold: usize,
359    /// Total bytes of dictionary to keep in fork.
360    pub fork_dictionary_bytes: ReadableSize,
361    /// Primary key encoding mode.
362    pub primary_key_encoding: PrimaryKeyEncoding,
363}
364
365impl Default for PartitionTreeOptions {
366    fn default() -> Self {
367        let mut fork_dictionary_bytes = ReadableSize::mb(512);
368        if let Some(total_memory) = get_total_memory_readable() {
369            let adjust_dictionary_bytes = std::cmp::min(
370                total_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
371                fork_dictionary_bytes,
372            );
373            if adjust_dictionary_bytes.0 > 0 {
374                fork_dictionary_bytes = adjust_dictionary_bytes;
375            }
376        }
377        Self {
378            index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
379            data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
380            fork_dictionary_bytes,
381            primary_key_encoding: PrimaryKeyEncoding::Dense,
382        }
383    }
384}
385
386fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
387where
388    D: Deserializer<'de>,
389{
390    let s: String = Deserialize::deserialize(deserializer)?;
391    let mut column_ids = Vec::new();
392    if s.is_empty() {
393        return Ok(column_ids);
394    }
395    for item in s.split(',') {
396        let column_id = item.parse().map_err(D::Error::custom)?;
397        column_ids.push(column_id);
398    }
399    Ok(column_ids)
400}
401
402fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
403where
404    S: serde::Serializer,
405{
406    let s = column_ids
407        .iter()
408        .map(|id| id.to_string())
409        .collect::<Vec<_>>()
410        .join(",");
411    serializer.serialize_str(&s)
412}
413
414/// Converts the `options` map to a json object.
415///
416/// Replaces "null" strings by `null` json values.
417fn options_map_to_value(options: &HashMap<String, String>) -> Value {
418    let map = options
419        .iter()
420        .map(|(key, value)| {
421            // Only convert the key to lowercase.
422            if value.eq_ignore_ascii_case("null") {
423                (key.clone(), Value::Null)
424            } else {
425                (key.clone(), Value::from(value.clone()))
426            }
427        })
428        .collect();
429    Value::Object(map)
430}
431
432// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
433// check the type key first.
434/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
435/// and returns `true` if the map contains enum options.
436fn validate_enum_options(
437    options_map: &HashMap<String, String>,
438    enum_tag_key: &str,
439) -> Result<bool> {
440    let enum_type = enum_tag_key.split('.').next().unwrap();
441    let mut has_other_options = false;
442    let mut has_tag = false;
443    for key in options_map.keys() {
444        if key == enum_tag_key {
445            has_tag = true;
446        } else if key.starts_with(enum_type) {
447            has_other_options = true;
448        }
449    }
450
451    // If tag is not provided, then other options for the enum should not exist.
452    ensure!(
453        has_tag || !has_other_options,
454        InvalidRegionOptionsSnafu {
455            reason: format!("missing key {} in options", enum_tag_key),
456        }
457    );
458
459    Ok(has_tag)
460}
461
462#[cfg(test)]
463mod tests {
464    use common_error::ext::ErrorExt;
465    use common_error::status_code::StatusCode;
466    use common_wal::options::KafkaWalOptions;
467
468    use super::*;
469
470    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
471        options
472            .iter()
473            .map(|(k, v)| (k.to_string(), v.to_string()))
474            .collect()
475    }
476
477    #[test]
478    fn test_empty_region_options() {
479        let map = make_map(&[]);
480        let options = RegionOptions::try_from(&map).unwrap();
481        assert_eq!(RegionOptions::default(), options);
482    }
483
484    #[test]
485    fn test_with_ttl() {
486        let map = make_map(&[("ttl", "7d")]);
487        let options = RegionOptions::try_from(&map).unwrap();
488        let expect = RegionOptions {
489            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
490            ..Default::default()
491        };
492        assert_eq!(expect, options);
493    }
494
495    #[test]
496    fn test_with_storage() {
497        let map = make_map(&[("storage", "S3")]);
498        let options = RegionOptions::try_from(&map).unwrap();
499        let expect = RegionOptions {
500            storage: Some("S3".to_string()),
501            ..Default::default()
502        };
503        assert_eq!(expect, options);
504    }
505
506    #[test]
507    fn test_without_compaction_type() {
508        let map = make_map(&[
509            ("compaction.twcs.trigger_file_num", "8"),
510            ("compaction.twcs.time_window", "2h"),
511        ]);
512        let err = RegionOptions::try_from(&map).unwrap_err();
513        assert_eq!(StatusCode::InvalidArguments, err.status_code());
514    }
515
516    #[test]
517    fn test_with_compaction_type() {
518        let map = make_map(&[
519            ("compaction.twcs.trigger_file_num", "8"),
520            ("compaction.twcs.time_window", "2h"),
521            ("compaction.type", "twcs"),
522        ]);
523        let options = RegionOptions::try_from(&map).unwrap();
524        let expect = RegionOptions {
525            compaction: CompactionOptions::Twcs(TwcsOptions {
526                trigger_file_num: 8,
527                time_window: Some(Duration::from_secs(3600 * 2)),
528                ..Default::default()
529            }),
530            compaction_override: true,
531            ..Default::default()
532        };
533        assert_eq!(expect, options);
534    }
535
536    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
537        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
538        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
539        let got = RegionOptions::try_from(&map).unwrap();
540        let expect = RegionOptions {
541            wal_options: wal_options.clone(),
542            ..Default::default()
543        };
544        expect == got
545    }
546
547    #[test]
548    fn test_with_index() {
549        let map = make_map(&[
550            ("index.inverted_index.ignore_column_ids", "1,2,3"),
551            ("index.inverted_index.segment_row_count", "512"),
552        ]);
553        let options = RegionOptions::try_from(&map).unwrap();
554        let expect = RegionOptions {
555            index_options: IndexOptions {
556                inverted_index: InvertedIndexOptions {
557                    ignore_column_ids: vec![1, 2, 3],
558                    segment_row_count: 512,
559                },
560            },
561            ..Default::default()
562        };
563        assert_eq!(expect, options);
564    }
565
566    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
567    #[test]
568    fn test_with_any_wal_options() {
569        let all_wal_options = [
570            WalOptions::RaftEngine,
571            WalOptions::Kafka(KafkaWalOptions {
572                topic: "test_topic".to_string(),
573            }),
574        ];
575        all_wal_options.iter().all(test_with_wal_options);
576    }
577
578    #[test]
579    fn test_with_memtable() {
580        let map = make_map(&[("memtable.type", "time_series")]);
581        let options = RegionOptions::try_from(&map).unwrap();
582        let expect = RegionOptions {
583            memtable: Some(MemtableOptions::TimeSeries),
584            ..Default::default()
585        };
586        assert_eq!(expect, options);
587
588        let map = make_map(&[("memtable.type", "partition_tree")]);
589        let options = RegionOptions::try_from(&map).unwrap();
590        let expect = RegionOptions {
591            memtable: Some(MemtableOptions::PartitionTree(
592                PartitionTreeOptions::default(),
593            )),
594            ..Default::default()
595        };
596        assert_eq!(expect, options);
597    }
598
599    #[test]
600    fn test_unknown_memtable_type() {
601        let map = make_map(&[("memtable.type", "no_such_memtable")]);
602        let err = RegionOptions::try_from(&map).unwrap_err();
603        assert_eq!(StatusCode::InvalidArguments, err.status_code());
604    }
605
606    #[test]
607    fn test_with_merge_mode() {
608        let map = make_map(&[("merge_mode", "last_row")]);
609        let options = RegionOptions::try_from(&map).unwrap();
610        assert_eq!(MergeMode::LastRow, options.merge_mode());
611
612        let map = make_map(&[("merge_mode", "last_non_null")]);
613        let options = RegionOptions::try_from(&map).unwrap();
614        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
615
616        let map = make_map(&[("merge_mode", "unknown")]);
617        let err = RegionOptions::try_from(&map).unwrap_err();
618        assert_eq!(StatusCode::InvalidArguments, err.status_code());
619    }
620
621    #[test]
622    fn test_with_all() {
623        let wal_options = WalOptions::Kafka(KafkaWalOptions {
624            topic: "test_topic".to_string(),
625        });
626        let map = make_map(&[
627            ("ttl", "7d"),
628            ("compaction.twcs.trigger_file_num", "8"),
629            ("compaction.twcs.max_output_file_size", "1GB"),
630            ("compaction.twcs.time_window", "2h"),
631            ("compaction.type", "twcs"),
632            ("compaction.twcs.remote_compaction", "false"),
633            ("compaction.twcs.fallback_to_local", "true"),
634            ("storage", "S3"),
635            ("append_mode", "false"),
636            ("index.inverted_index.ignore_column_ids", "1,2,3"),
637            ("index.inverted_index.segment_row_count", "512"),
638            (
639                WAL_OPTIONS_KEY,
640                &serde_json::to_string(&wal_options).unwrap(),
641            ),
642            ("memtable.type", "partition_tree"),
643            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
644            ("memtable.partition_tree.data_freeze_threshold", "2048"),
645            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
646            ("merge_mode", "last_non_null"),
647        ]);
648        let options = RegionOptions::try_from(&map).unwrap();
649        let expect = RegionOptions {
650            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
651            compaction: CompactionOptions::Twcs(TwcsOptions {
652                trigger_file_num: 8,
653                time_window: Some(Duration::from_secs(3600 * 2)),
654                max_output_file_size: Some(ReadableSize::gb(1)),
655                remote_compaction: false,
656                fallback_to_local: true,
657            }),
658            compaction_override: true,
659            storage: Some("S3".to_string()),
660            append_mode: false,
661            wal_options,
662            index_options: IndexOptions {
663                inverted_index: InvertedIndexOptions {
664                    ignore_column_ids: vec![1, 2, 3],
665                    segment_row_count: 512,
666                },
667            },
668            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
669                index_max_keys_per_shard: 2048,
670                data_freeze_threshold: 2048,
671                fork_dictionary_bytes: ReadableSize::mb(128),
672                primary_key_encoding: PrimaryKeyEncoding::Dense,
673            })),
674            merge_mode: Some(MergeMode::LastNonNull),
675            sst_format: None,
676        };
677        assert_eq!(expect, options);
678    }
679
680    #[test]
681    fn test_region_options_serde() {
682        let options = RegionOptions {
683            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
684            compaction: CompactionOptions::Twcs(TwcsOptions {
685                trigger_file_num: 8,
686                time_window: Some(Duration::from_secs(3600 * 2)),
687                max_output_file_size: None,
688                remote_compaction: false,
689                fallback_to_local: true,
690            }),
691            compaction_override: false,
692            storage: Some("S3".to_string()),
693            append_mode: false,
694            wal_options: WalOptions::Kafka(KafkaWalOptions {
695                topic: "test_topic".to_string(),
696            }),
697            index_options: IndexOptions {
698                inverted_index: InvertedIndexOptions {
699                    ignore_column_ids: vec![1, 2, 3],
700                    segment_row_count: 512,
701                },
702            },
703            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
704                index_max_keys_per_shard: 2048,
705                data_freeze_threshold: 2048,
706                fork_dictionary_bytes: ReadableSize::mb(128),
707                primary_key_encoding: PrimaryKeyEncoding::Dense,
708            })),
709            merge_mode: Some(MergeMode::LastNonNull),
710            sst_format: None,
711        };
712        let region_options_json_str = serde_json::to_string(&options).unwrap();
713        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
714        assert_eq!(options, got);
715    }
716
717    #[test]
718    fn test_region_options_str_serde() {
719        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
720        let region_options_json_str = r#"{
721  "ttl": "7days",
722  "compaction": {
723    "compaction.type": "twcs",
724    "compaction.twcs.trigger_file_num": "8",
725    "compaction.twcs.max_output_file_size": "7MB",
726    "compaction.twcs.time_window": "2h"
727  },
728  "storage": "S3",
729  "append_mode": false,
730  "wal_options": {
731    "wal.provider": "kafka",
732    "wal.kafka.topic": "test_topic"
733  },
734  "index_options": {
735    "index.inverted_index.ignore_column_ids": "",
736    "index.inverted_index.segment_row_count": "512"
737  },
738  "memtable": {
739    "memtable.type": "partition_tree",
740    "memtable.partition_tree.index_max_keys_per_shard": "2048",
741    "memtable.partition_tree.data_freeze_threshold": "2048",
742    "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
743  },
744  "merge_mode": "last_non_null"
745}"#;
746        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
747        let options = RegionOptions {
748            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
749            compaction: CompactionOptions::Twcs(TwcsOptions {
750                trigger_file_num: 8,
751                time_window: Some(Duration::from_secs(3600 * 2)),
752                max_output_file_size: Some(ReadableSize::mb(7)),
753                remote_compaction: false,
754                fallback_to_local: true,
755            }),
756            compaction_override: false,
757            storage: Some("S3".to_string()),
758            append_mode: false,
759            wal_options: WalOptions::Kafka(KafkaWalOptions {
760                topic: "test_topic".to_string(),
761            }),
762            index_options: IndexOptions {
763                inverted_index: InvertedIndexOptions {
764                    ignore_column_ids: vec![],
765                    segment_row_count: 512,
766                },
767            },
768            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
769                index_max_keys_per_shard: 2048,
770                data_freeze_threshold: 2048,
771                fork_dictionary_bytes: ReadableSize::mb(128),
772                primary_key_encoding: PrimaryKeyEncoding::Dense,
773            })),
774            merge_mode: Some(MergeMode::LastNonNull),
775            sst_format: None,
776        };
777        assert_eq!(options, got);
778    }
779}