mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_time::TimeToLive;
24use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
25use serde::de::Error as _;
26use serde::{Deserialize, Deserializer, Serialize};
27use serde_json::Value;
28use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
29use snafu::{ResultExt, ensure};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::storage::ColumnId;
32use strum::EnumString;
33
34use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
35use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
36use crate::sst::FormatType;
37
38const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
39
40/// Mode to handle duplicate rows while merging.
41#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
42#[serde(rename_all = "snake_case")]
43#[strum(serialize_all = "snake_case")]
44pub enum MergeMode {
45    /// Keeps the last row.
46    #[default]
47    LastRow,
48    /// Keeps the last non-null field for each row.
49    LastNonNull,
50}
51
52// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
53// if we want expose the option to table options.
54/// Options that affect the entire region.
55///
56/// Users need to specify the options while creating/opening a region.
57#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(default)]
59pub struct RegionOptions {
60    /// Region SST files TTL.
61    pub ttl: Option<TimeToLive>,
62    /// Compaction options.
63    pub compaction: CompactionOptions,
64    /// Custom storage. Uses default storage if it is `None`.
65    pub storage: Option<String>,
66    /// If append mode is enabled, the region keeps duplicate rows.
67    pub append_mode: bool,
68    /// Wal options.
69    pub wal_options: WalOptions,
70    /// Index options.
71    pub index_options: IndexOptions,
72    /// Memtable options.
73    pub memtable: Option<MemtableOptions>,
74    /// The mode to merge duplicate rows.
75    /// Only takes effect when `append_mode` is `false`.
76    pub merge_mode: Option<MergeMode>,
77    /// SST format type.
78    pub sst_format: Option<FormatType>,
79}
80
81impl RegionOptions {
82    /// Validates options.
83    pub fn validate(&self) -> Result<()> {
84        if self.append_mode {
85            ensure!(
86                self.merge_mode.is_none(),
87                InvalidRegionOptionsSnafu {
88                    reason: "merge_mode is not allowed when append_mode is enabled",
89                }
90            );
91        }
92        Ok(())
93    }
94
95    /// Returns `true` if deduplication is needed.
96    pub fn need_dedup(&self) -> bool {
97        !self.append_mode
98    }
99
100    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
101    pub fn merge_mode(&self) -> MergeMode {
102        self.merge_mode.unwrap_or_default()
103    }
104
105    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
106    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
107        self.memtable
108            .as_ref()
109            .map_or(PrimaryKeyEncoding::default(), |memtable| {
110                memtable.primary_key_encoding()
111            })
112    }
113}
114
115impl TryFrom<&HashMap<String, String>> for RegionOptions {
116    type Error = Error;
117
118    fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
119        let value = options_map_to_value(options_map);
120        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
121
122        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
123        // each field manually instead of using #[serde(flatten)] for `compaction`.
124        // See https://github.com/serde-rs/serde/issues/1626
125        let options: RegionOptionsWithoutEnum =
126            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
127        let compaction = if validate_enum_options(options_map, "compaction.type")? {
128            serde_json::from_str(&json).context(JsonOptionsSnafu)?
129        } else {
130            CompactionOptions::default()
131        };
132
133        // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
134        let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
135            || Ok(WalOptions::default()),
136            |encoded_wal_options| {
137                serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
138            },
139        )?;
140
141        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
142        let memtable = if validate_enum_options(options_map, "memtable.type")? {
143            Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
144        } else {
145            None
146        };
147
148        let opts = RegionOptions {
149            ttl: options.ttl,
150            compaction,
151            storage: options.storage,
152            append_mode: options.append_mode,
153            wal_options,
154            index_options,
155            memtable,
156            merge_mode: options.merge_mode,
157            sst_format: options.sst_format,
158        };
159        opts.validate()?;
160
161        Ok(opts)
162    }
163}
164
165/// Options for compactions
166#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(tag = "compaction.type")]
168#[serde(rename_all = "snake_case")]
169pub enum CompactionOptions {
170    /// Time window compaction strategy.
171    #[serde(with = "prefix_twcs")]
172    Twcs(TwcsOptions),
173}
174
175impl CompactionOptions {
176    pub(crate) fn time_window(&self) -> Option<Duration> {
177        match self {
178            CompactionOptions::Twcs(opts) => opts.time_window,
179        }
180    }
181
182    pub(crate) fn remote_compaction(&self) -> bool {
183        match self {
184            CompactionOptions::Twcs(opts) => opts.remote_compaction,
185        }
186    }
187
188    pub(crate) fn fallback_to_local(&self) -> bool {
189        match self {
190            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
191        }
192    }
193}
194
195impl Default for CompactionOptions {
196    fn default() -> Self {
197        Self::Twcs(TwcsOptions::default())
198    }
199}
200
201/// Time window compaction options.
202#[serde_as]
203#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(default)]
205pub struct TwcsOptions {
206    /// Minimum file num in every time window to trigger a compaction.
207    #[serde_as(as = "DisplayFromStr")]
208    pub trigger_file_num: usize,
209    /// Compaction time window defined when creating tables.
210    #[serde(with = "humantime_serde")]
211    pub time_window: Option<Duration>,
212    /// Compaction time window defined when creating tables.
213    pub max_output_file_size: Option<ReadableSize>,
214    /// Whether to use remote compaction.
215    #[serde_as(as = "DisplayFromStr")]
216    pub remote_compaction: bool,
217    /// Whether to fall back to local compaction if remote compaction fails.
218    #[serde_as(as = "DisplayFromStr")]
219    pub fallback_to_local: bool,
220}
221
222with_prefix!(prefix_twcs "compaction.twcs.");
223
224impl TwcsOptions {
225    /// Returns time window in second resolution.
226    pub fn time_window_seconds(&self) -> Option<i64> {
227        self.time_window.and_then(|window| {
228            let window_secs = window.as_secs();
229            if window_secs == 0 {
230                None
231            } else {
232                window_secs.try_into().ok()
233            }
234        })
235    }
236}
237
238impl Default for TwcsOptions {
239    fn default() -> Self {
240        Self {
241            trigger_file_num: 4,
242            time_window: None,
243            max_output_file_size: Some(ReadableSize::mb(512)),
244            remote_compaction: false,
245            fallback_to_local: true,
246        }
247    }
248}
249
250/// We need to define a new struct without enum fields as `#[serde(default)]` does not
251/// support external tagging.
252#[serde_as]
253#[derive(Debug, Deserialize)]
254#[serde(default)]
255struct RegionOptionsWithoutEnum {
256    /// Region SST files TTL.
257    ttl: Option<TimeToLive>,
258    storage: Option<String>,
259    #[serde_as(as = "DisplayFromStr")]
260    append_mode: bool,
261    #[serde_as(as = "NoneAsEmptyString")]
262    merge_mode: Option<MergeMode>,
263    #[serde_as(as = "NoneAsEmptyString")]
264    sst_format: Option<FormatType>,
265}
266
267impl Default for RegionOptionsWithoutEnum {
268    fn default() -> Self {
269        let options = RegionOptions::default();
270        RegionOptionsWithoutEnum {
271            ttl: options.ttl,
272            storage: options.storage,
273            append_mode: options.append_mode,
274            merge_mode: options.merge_mode,
275            sst_format: options.sst_format,
276        }
277    }
278}
279
280with_prefix!(prefix_inverted_index "index.inverted_index.");
281
282/// Options for index.
283#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
284#[serde(default)]
285pub struct IndexOptions {
286    /// Options for the inverted index.
287    #[serde(flatten, with = "prefix_inverted_index")]
288    pub inverted_index: InvertedIndexOptions,
289}
290
291/// Options for the inverted index.
292#[serde_as]
293#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
294#[serde(default)]
295pub struct InvertedIndexOptions {
296    /// The column ids that should be ignored when building the inverted index.
297    /// The column ids are separated by commas. For example, "1,2,3".
298    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
299    #[serde(serialize_with = "serialize_ignore_column_ids")]
300    pub ignore_column_ids: Vec<ColumnId>,
301
302    /// The number of rows in a segment.
303    #[serde_as(as = "DisplayFromStr")]
304    pub segment_row_count: usize,
305}
306
307impl Default for InvertedIndexOptions {
308    fn default() -> Self {
309        Self {
310            ignore_column_ids: Vec::new(),
311            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
312        }
313    }
314}
315
316/// Options for region level memtable.
317#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
318#[serde(tag = "memtable.type", rename_all = "snake_case")]
319pub enum MemtableOptions {
320    TimeSeries,
321    #[serde(with = "prefix_partition_tree")]
322    PartitionTree(PartitionTreeOptions),
323}
324
325with_prefix!(prefix_partition_tree "memtable.partition_tree.");
326
327impl MemtableOptions {
328    /// Returns the primary key encoding mode.
329    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
330        match self {
331            MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
332            _ => PrimaryKeyEncoding::Dense,
333        }
334    }
335}
336
337/// Partition tree memtable options.
338#[serde_as]
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340#[serde(default)]
341pub struct PartitionTreeOptions {
342    /// Max keys in an index shard.
343    #[serde_as(as = "DisplayFromStr")]
344    pub index_max_keys_per_shard: usize,
345    /// Number of rows to freeze a data part.
346    #[serde_as(as = "DisplayFromStr")]
347    pub data_freeze_threshold: usize,
348    /// Total bytes of dictionary to keep in fork.
349    pub fork_dictionary_bytes: ReadableSize,
350    /// Primary key encoding mode.
351    pub primary_key_encoding: PrimaryKeyEncoding,
352}
353
354impl Default for PartitionTreeOptions {
355    fn default() -> Self {
356        let mut fork_dictionary_bytes = ReadableSize::mb(512);
357        if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
358            let adjust_dictionary_bytes = std::cmp::min(
359                sys_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
360                fork_dictionary_bytes,
361            );
362            if adjust_dictionary_bytes.0 > 0 {
363                fork_dictionary_bytes = adjust_dictionary_bytes;
364            }
365        }
366        Self {
367            index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
368            data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
369            fork_dictionary_bytes,
370            primary_key_encoding: PrimaryKeyEncoding::Dense,
371        }
372    }
373}
374
375fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
376where
377    D: Deserializer<'de>,
378{
379    let s: String = Deserialize::deserialize(deserializer)?;
380    let mut column_ids = Vec::new();
381    if s.is_empty() {
382        return Ok(column_ids);
383    }
384    for item in s.split(',') {
385        let column_id = item.parse().map_err(D::Error::custom)?;
386        column_ids.push(column_id);
387    }
388    Ok(column_ids)
389}
390
391fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
392where
393    S: serde::Serializer,
394{
395    let s = column_ids
396        .iter()
397        .map(|id| id.to_string())
398        .collect::<Vec<_>>()
399        .join(",");
400    serializer.serialize_str(&s)
401}
402
403/// Converts the `options` map to a json object.
404///
405/// Replaces "null" strings by `null` json values.
406fn options_map_to_value(options: &HashMap<String, String>) -> Value {
407    let map = options
408        .iter()
409        .map(|(key, value)| {
410            // Only convert the key to lowercase.
411            if value.eq_ignore_ascii_case("null") {
412                (key.clone(), Value::Null)
413            } else {
414                (key.clone(), Value::from(value.clone()))
415            }
416        })
417        .collect();
418    Value::Object(map)
419}
420
421// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
422// check the type key first.
423/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
424/// and returns `true` if the map contains enum options.
425fn validate_enum_options(
426    options_map: &HashMap<String, String>,
427    enum_tag_key: &str,
428) -> Result<bool> {
429    let enum_type = enum_tag_key.split('.').next().unwrap();
430    let mut has_other_options = false;
431    let mut has_tag = false;
432    for key in options_map.keys() {
433        if key == enum_tag_key {
434            has_tag = true;
435        } else if key.starts_with(enum_type) {
436            has_other_options = true;
437        }
438    }
439
440    // If tag is not provided, then other options for the enum should not exist.
441    ensure!(
442        has_tag || !has_other_options,
443        InvalidRegionOptionsSnafu {
444            reason: format!("missing key {} in options", enum_tag_key),
445        }
446    );
447
448    Ok(has_tag)
449}
450
451#[cfg(test)]
452mod tests {
453    use common_error::ext::ErrorExt;
454    use common_error::status_code::StatusCode;
455    use common_wal::options::KafkaWalOptions;
456
457    use super::*;
458
459    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
460        options
461            .iter()
462            .map(|(k, v)| (k.to_string(), v.to_string()))
463            .collect()
464    }
465
466    #[test]
467    fn test_empty_region_options() {
468        let map = make_map(&[]);
469        let options = RegionOptions::try_from(&map).unwrap();
470        assert_eq!(RegionOptions::default(), options);
471    }
472
473    #[test]
474    fn test_with_ttl() {
475        let map = make_map(&[("ttl", "7d")]);
476        let options = RegionOptions::try_from(&map).unwrap();
477        let expect = RegionOptions {
478            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
479            ..Default::default()
480        };
481        assert_eq!(expect, options);
482    }
483
484    #[test]
485    fn test_with_storage() {
486        let map = make_map(&[("storage", "S3")]);
487        let options = RegionOptions::try_from(&map).unwrap();
488        let expect = RegionOptions {
489            storage: Some("S3".to_string()),
490            ..Default::default()
491        };
492        assert_eq!(expect, options);
493    }
494
495    #[test]
496    fn test_without_compaction_type() {
497        let map = make_map(&[
498            ("compaction.twcs.trigger_file_num", "8"),
499            ("compaction.twcs.time_window", "2h"),
500        ]);
501        let err = RegionOptions::try_from(&map).unwrap_err();
502        assert_eq!(StatusCode::InvalidArguments, err.status_code());
503    }
504
505    #[test]
506    fn test_with_compaction_type() {
507        let map = make_map(&[
508            ("compaction.twcs.trigger_file_num", "8"),
509            ("compaction.twcs.time_window", "2h"),
510            ("compaction.type", "twcs"),
511        ]);
512        let options = RegionOptions::try_from(&map).unwrap();
513        let expect = RegionOptions {
514            compaction: CompactionOptions::Twcs(TwcsOptions {
515                trigger_file_num: 8,
516                time_window: Some(Duration::from_secs(3600 * 2)),
517                ..Default::default()
518            }),
519            ..Default::default()
520        };
521        assert_eq!(expect, options);
522    }
523
524    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
525        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
526        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
527        let got = RegionOptions::try_from(&map).unwrap();
528        let expect = RegionOptions {
529            wal_options: wal_options.clone(),
530            ..Default::default()
531        };
532        expect == got
533    }
534
535    #[test]
536    fn test_with_index() {
537        let map = make_map(&[
538            ("index.inverted_index.ignore_column_ids", "1,2,3"),
539            ("index.inverted_index.segment_row_count", "512"),
540        ]);
541        let options = RegionOptions::try_from(&map).unwrap();
542        let expect = RegionOptions {
543            index_options: IndexOptions {
544                inverted_index: InvertedIndexOptions {
545                    ignore_column_ids: vec![1, 2, 3],
546                    segment_row_count: 512,
547                },
548            },
549            ..Default::default()
550        };
551        assert_eq!(expect, options);
552    }
553
554    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
555    #[test]
556    fn test_with_any_wal_options() {
557        let all_wal_options = [
558            WalOptions::RaftEngine,
559            WalOptions::Kafka(KafkaWalOptions {
560                topic: "test_topic".to_string(),
561            }),
562        ];
563        all_wal_options.iter().all(test_with_wal_options);
564    }
565
566    #[test]
567    fn test_with_memtable() {
568        let map = make_map(&[("memtable.type", "time_series")]);
569        let options = RegionOptions::try_from(&map).unwrap();
570        let expect = RegionOptions {
571            memtable: Some(MemtableOptions::TimeSeries),
572            ..Default::default()
573        };
574        assert_eq!(expect, options);
575
576        let map = make_map(&[("memtable.type", "partition_tree")]);
577        let options = RegionOptions::try_from(&map).unwrap();
578        let expect = RegionOptions {
579            memtable: Some(MemtableOptions::PartitionTree(
580                PartitionTreeOptions::default(),
581            )),
582            ..Default::default()
583        };
584        assert_eq!(expect, options);
585    }
586
587    #[test]
588    fn test_unknown_memtable_type() {
589        let map = make_map(&[("memtable.type", "no_such_memtable")]);
590        let err = RegionOptions::try_from(&map).unwrap_err();
591        assert_eq!(StatusCode::InvalidArguments, err.status_code());
592    }
593
594    #[test]
595    fn test_with_merge_mode() {
596        let map = make_map(&[("merge_mode", "last_row")]);
597        let options = RegionOptions::try_from(&map).unwrap();
598        assert_eq!(MergeMode::LastRow, options.merge_mode());
599
600        let map = make_map(&[("merge_mode", "last_non_null")]);
601        let options = RegionOptions::try_from(&map).unwrap();
602        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
603
604        let map = make_map(&[("merge_mode", "unknown")]);
605        let err = RegionOptions::try_from(&map).unwrap_err();
606        assert_eq!(StatusCode::InvalidArguments, err.status_code());
607    }
608
609    #[test]
610    fn test_with_all() {
611        let wal_options = WalOptions::Kafka(KafkaWalOptions {
612            topic: "test_topic".to_string(),
613        });
614        let map = make_map(&[
615            ("ttl", "7d"),
616            ("compaction.twcs.trigger_file_num", "8"),
617            ("compaction.twcs.max_output_file_size", "1GB"),
618            ("compaction.twcs.time_window", "2h"),
619            ("compaction.type", "twcs"),
620            ("compaction.twcs.remote_compaction", "false"),
621            ("compaction.twcs.fallback_to_local", "true"),
622            ("storage", "S3"),
623            ("append_mode", "false"),
624            ("index.inverted_index.ignore_column_ids", "1,2,3"),
625            ("index.inverted_index.segment_row_count", "512"),
626            (
627                WAL_OPTIONS_KEY,
628                &serde_json::to_string(&wal_options).unwrap(),
629            ),
630            ("memtable.type", "partition_tree"),
631            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
632            ("memtable.partition_tree.data_freeze_threshold", "2048"),
633            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
634            ("merge_mode", "last_non_null"),
635        ]);
636        let options = RegionOptions::try_from(&map).unwrap();
637        let expect = RegionOptions {
638            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
639            compaction: CompactionOptions::Twcs(TwcsOptions {
640                trigger_file_num: 8,
641                time_window: Some(Duration::from_secs(3600 * 2)),
642                max_output_file_size: Some(ReadableSize::gb(1)),
643                remote_compaction: false,
644                fallback_to_local: true,
645            }),
646            storage: Some("S3".to_string()),
647            append_mode: false,
648            wal_options,
649            index_options: IndexOptions {
650                inverted_index: InvertedIndexOptions {
651                    ignore_column_ids: vec![1, 2, 3],
652                    segment_row_count: 512,
653                },
654            },
655            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
656                index_max_keys_per_shard: 2048,
657                data_freeze_threshold: 2048,
658                fork_dictionary_bytes: ReadableSize::mb(128),
659                primary_key_encoding: PrimaryKeyEncoding::Dense,
660            })),
661            merge_mode: Some(MergeMode::LastNonNull),
662            sst_format: None,
663        };
664        assert_eq!(expect, options);
665    }
666
667    #[test]
668    fn test_region_options_serde() {
669        let options = RegionOptions {
670            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
671            compaction: CompactionOptions::Twcs(TwcsOptions {
672                trigger_file_num: 8,
673                time_window: Some(Duration::from_secs(3600 * 2)),
674                max_output_file_size: None,
675                remote_compaction: false,
676                fallback_to_local: true,
677            }),
678            storage: Some("S3".to_string()),
679            append_mode: false,
680            wal_options: WalOptions::Kafka(KafkaWalOptions {
681                topic: "test_topic".to_string(),
682            }),
683            index_options: IndexOptions {
684                inverted_index: InvertedIndexOptions {
685                    ignore_column_ids: vec![1, 2, 3],
686                    segment_row_count: 512,
687                },
688            },
689            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
690                index_max_keys_per_shard: 2048,
691                data_freeze_threshold: 2048,
692                fork_dictionary_bytes: ReadableSize::mb(128),
693                primary_key_encoding: PrimaryKeyEncoding::Dense,
694            })),
695            merge_mode: Some(MergeMode::LastNonNull),
696            sst_format: None,
697        };
698        let region_options_json_str = serde_json::to_string(&options).unwrap();
699        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
700        assert_eq!(options, got);
701    }
702
703    #[test]
704    fn test_region_options_str_serde() {
705        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
706        let region_options_json_str = r#"{
707  "ttl": "7days",
708  "compaction": {
709    "compaction.type": "twcs",
710    "compaction.twcs.trigger_file_num": "8",
711    "compaction.twcs.max_output_file_size": "7MB",
712    "compaction.twcs.time_window": "2h"
713  },
714  "storage": "S3",
715  "append_mode": false,
716  "wal_options": {
717    "wal.provider": "kafka",
718    "wal.kafka.topic": "test_topic"
719  },
720  "index_options": {
721    "index.inverted_index.ignore_column_ids": "",
722    "index.inverted_index.segment_row_count": "512"
723  },
724  "memtable": {
725    "memtable.type": "partition_tree",
726    "memtable.partition_tree.index_max_keys_per_shard": "2048",
727    "memtable.partition_tree.data_freeze_threshold": "2048",
728    "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
729  },
730  "merge_mode": "last_non_null"
731}"#;
732        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
733        let options = RegionOptions {
734            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
735            compaction: CompactionOptions::Twcs(TwcsOptions {
736                trigger_file_num: 8,
737                time_window: Some(Duration::from_secs(3600 * 2)),
738                max_output_file_size: Some(ReadableSize::mb(7)),
739                remote_compaction: false,
740                fallback_to_local: true,
741            }),
742            storage: Some("S3".to_string()),
743            append_mode: false,
744            wal_options: WalOptions::Kafka(KafkaWalOptions {
745                topic: "test_topic".to_string(),
746            }),
747            index_options: IndexOptions {
748                inverted_index: InvertedIndexOptions {
749                    ignore_column_ids: vec![],
750                    segment_row_count: 512,
751                },
752            },
753            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
754                index_max_keys_per_shard: 2048,
755                data_freeze_threshold: 2048,
756                fork_dictionary_bytes: ReadableSize::mb(128),
757                primary_key_encoding: PrimaryKeyEncoding::Dense,
758            })),
759            merge_mode: Some(MergeMode::LastNonNull),
760            sst_format: None,
761        };
762        assert_eq!(options, got);
763    }
764}