mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_stat::get_total_memory_readable;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::mito_engine_options::COMPACTION_OVERRIDE;
33use store_api::storage::ColumnId;
34use strum::EnumString;
35
36use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
37use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
38use crate::sst::FormatType;
39
40const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
41
42pub(crate) fn parse_wal_options(
43    options_map: &HashMap<String, String>,
44) -> std::result::Result<WalOptions, serde_json::Error> {
45    options_map
46        .get(WAL_OPTIONS_KEY)
47        .map_or(Ok(WalOptions::default()), |encoded_wal_options| {
48            serde_json::from_str(encoded_wal_options)
49        })
50}
51
52/// Mode to handle duplicate rows while merging.
53#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
54#[serde(rename_all = "snake_case")]
55#[strum(serialize_all = "snake_case")]
56pub enum MergeMode {
57    /// Keeps the last row.
58    #[default]
59    LastRow,
60    /// Keeps the last non-null field for each row.
61    LastNonNull,
62}
63
64// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
65// if we want expose the option to table options.
66/// Options that affect the entire region.
67///
68/// Users need to specify the options while creating/opening a region.
69#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(default)]
71pub struct RegionOptions {
72    /// Region SST files TTL.
73    pub ttl: Option<TimeToLive>,
74    /// Compaction options.
75    pub compaction: CompactionOptions,
76    pub compaction_override: bool,
77    /// Custom storage. Uses default storage if it is `None`.
78    pub storage: Option<String>,
79    /// If append mode is enabled, the region keeps duplicate rows.
80    pub append_mode: bool,
81    /// Wal options.
82    pub wal_options: WalOptions,
83    /// Index options.
84    pub index_options: IndexOptions,
85    /// Memtable options.
86    pub memtable: Option<MemtableOptions>,
87    /// The mode to merge duplicate rows.
88    /// Only takes effect when `append_mode` is `false`.
89    pub merge_mode: Option<MergeMode>,
90    /// SST format type.
91    pub sst_format: Option<FormatType>,
92}
93
94impl RegionOptions {
95    /// Validates options.
96    pub fn validate(&self) -> Result<()> {
97        if self.append_mode {
98            ensure!(
99                self.merge_mode.is_none(),
100                InvalidRegionOptionsSnafu {
101                    reason: "merge_mode is not allowed when append_mode is enabled",
102                }
103            );
104        }
105        Ok(())
106    }
107
108    /// Returns `true` if deduplication is needed.
109    pub fn need_dedup(&self) -> bool {
110        !self.append_mode
111    }
112
113    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
114    pub fn merge_mode(&self) -> MergeMode {
115        self.merge_mode.unwrap_or_default()
116    }
117
118    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
119    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
120        self.memtable
121            .as_ref()
122            .map_or(PrimaryKeyEncoding::default(), |memtable| {
123                memtable.primary_key_encoding()
124            })
125    }
126}
127
128impl TryFrom<&HashMap<String, String>> for RegionOptions {
129    type Error = Error;
130
131    fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
132        let value = options_map_to_value(options_map);
133        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
134
135        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
136        // each field manually instead of using #[serde(flatten)] for `compaction`.
137        // See https://github.com/serde-rs/serde/issues/1626
138        let options: RegionOptionsWithoutEnum =
139            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
140        let has_compaction_type = validate_enum_options(options_map, "compaction.type")?;
141        let compaction = if has_compaction_type {
142            serde_json::from_str(&json).context(JsonOptionsSnafu)?
143        } else {
144            CompactionOptions::default()
145        };
146
147        let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
148
149        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
150        let memtable = if validate_enum_options(options_map, "memtable.type")? {
151            Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
152        } else {
153            None
154        };
155
156        let compaction_override_flag = options_map
157            .get(COMPACTION_OVERRIDE)
158            .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
159            .unwrap_or(false);
160        let compaction_override = has_compaction_type || compaction_override_flag;
161
162        let opts = RegionOptions {
163            ttl: options.ttl,
164            compaction,
165            compaction_override,
166            storage: options.storage,
167            append_mode: options.append_mode,
168            wal_options,
169            index_options,
170            memtable,
171            merge_mode: options.merge_mode,
172            sst_format: options.sst_format,
173        };
174        opts.validate()?;
175
176        Ok(opts)
177    }
178}
179
180/// Options for compactions
181#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
182#[serde(tag = "compaction.type")]
183#[serde(rename_all = "snake_case")]
184pub enum CompactionOptions {
185    /// Time window compaction strategy.
186    #[serde(with = "prefix_twcs")]
187    Twcs(TwcsOptions),
188}
189
190impl CompactionOptions {
191    pub(crate) fn time_window(&self) -> Option<Duration> {
192        match self {
193            CompactionOptions::Twcs(opts) => opts.time_window,
194        }
195    }
196
197    pub(crate) fn remote_compaction(&self) -> bool {
198        match self {
199            CompactionOptions::Twcs(opts) => opts.remote_compaction,
200        }
201    }
202
203    pub(crate) fn fallback_to_local(&self) -> bool {
204        match self {
205            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
206        }
207    }
208}
209
210impl Default for CompactionOptions {
211    fn default() -> Self {
212        Self::Twcs(TwcsOptions::default())
213    }
214}
215
216/// Time window compaction options.
217#[serde_as]
218#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
219#[serde(default)]
220pub struct TwcsOptions {
221    /// Minimum file num in every time window to trigger a compaction.
222    #[serde_as(as = "DisplayFromStr")]
223    pub trigger_file_num: usize,
224    /// Compaction time window defined when creating tables.
225    #[serde(with = "humantime_serde")]
226    pub time_window: Option<Duration>,
227    /// Compaction time window defined when creating tables.
228    pub max_output_file_size: Option<ReadableSize>,
229    /// Whether to use remote compaction.
230    #[serde_as(as = "DisplayFromStr")]
231    pub remote_compaction: bool,
232    /// Whether to fall back to local compaction if remote compaction fails.
233    #[serde_as(as = "DisplayFromStr")]
234    pub fallback_to_local: bool,
235}
236
237with_prefix!(prefix_twcs "compaction.twcs.");
238
239impl TwcsOptions {
240    /// Returns time window in second resolution.
241    pub fn time_window_seconds(&self) -> Option<i64> {
242        self.time_window.and_then(|window| {
243            let window_secs = window.as_secs();
244            if window_secs == 0 {
245                None
246            } else {
247                window_secs.try_into().ok()
248            }
249        })
250    }
251}
252
253impl Default for TwcsOptions {
254    fn default() -> Self {
255        Self {
256            trigger_file_num: 4,
257            time_window: None,
258            max_output_file_size: Some(ReadableSize::mb(512)),
259            remote_compaction: false,
260            fallback_to_local: true,
261        }
262    }
263}
264
265/// We need to define a new struct without enum fields as `#[serde(default)]` does not
266/// support external tagging.
267#[serde_as]
268#[derive(Debug, Deserialize)]
269#[serde(default)]
270struct RegionOptionsWithoutEnum {
271    /// Region SST files TTL.
272    ttl: Option<TimeToLive>,
273    storage: Option<String>,
274    #[serde_as(as = "DisplayFromStr")]
275    append_mode: bool,
276    #[serde_as(as = "NoneAsEmptyString")]
277    merge_mode: Option<MergeMode>,
278    #[serde_as(as = "NoneAsEmptyString")]
279    sst_format: Option<FormatType>,
280}
281
282impl Default for RegionOptionsWithoutEnum {
283    fn default() -> Self {
284        let options = RegionOptions::default();
285        RegionOptionsWithoutEnum {
286            ttl: options.ttl,
287            storage: options.storage,
288            append_mode: options.append_mode,
289            merge_mode: options.merge_mode,
290            sst_format: options.sst_format,
291        }
292    }
293}
294
295with_prefix!(prefix_inverted_index "index.inverted_index.");
296
297/// Options for index.
298#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
299#[serde(default)]
300pub struct IndexOptions {
301    /// Options for the inverted index.
302    #[serde(flatten, with = "prefix_inverted_index")]
303    pub inverted_index: InvertedIndexOptions,
304}
305
306/// Options for the inverted index.
307#[serde_as]
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
309#[serde(default)]
310pub struct InvertedIndexOptions {
311    /// The column ids that should be ignored when building the inverted index.
312    /// The column ids are separated by commas. For example, "1,2,3".
313    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
314    #[serde(serialize_with = "serialize_ignore_column_ids")]
315    pub ignore_column_ids: Vec<ColumnId>,
316
317    /// The number of rows in a segment.
318    #[serde_as(as = "DisplayFromStr")]
319    pub segment_row_count: usize,
320}
321
322impl Default for InvertedIndexOptions {
323    fn default() -> Self {
324        Self {
325            ignore_column_ids: Vec::new(),
326            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
327        }
328    }
329}
330
331/// Options for region level memtable.
332#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
333#[serde(tag = "memtable.type", rename_all = "snake_case")]
334pub enum MemtableOptions {
335    TimeSeries,
336    #[serde(with = "prefix_partition_tree")]
337    PartitionTree(PartitionTreeOptions),
338}
339
340with_prefix!(prefix_partition_tree "memtable.partition_tree.");
341
342impl MemtableOptions {
343    /// Returns the primary key encoding mode.
344    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
345        match self {
346            MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
347            _ => PrimaryKeyEncoding::Dense,
348        }
349    }
350}
351
352/// Partition tree memtable options.
353#[serde_as]
354#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
355#[serde(default)]
356pub struct PartitionTreeOptions {
357    /// Max keys in an index shard.
358    #[serde_as(as = "DisplayFromStr")]
359    pub index_max_keys_per_shard: usize,
360    /// Number of rows to freeze a data part.
361    #[serde_as(as = "DisplayFromStr")]
362    pub data_freeze_threshold: usize,
363    /// Total bytes of dictionary to keep in fork.
364    pub fork_dictionary_bytes: ReadableSize,
365    /// Primary key encoding mode.
366    pub primary_key_encoding: PrimaryKeyEncoding,
367}
368
369impl Default for PartitionTreeOptions {
370    fn default() -> Self {
371        let mut fork_dictionary_bytes = ReadableSize::mb(512);
372        if let Some(total_memory) = get_total_memory_readable() {
373            let adjust_dictionary_bytes = std::cmp::min(
374                total_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
375                fork_dictionary_bytes,
376            );
377            if adjust_dictionary_bytes.0 > 0 {
378                fork_dictionary_bytes = adjust_dictionary_bytes;
379            }
380        }
381        Self {
382            index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
383            data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
384            fork_dictionary_bytes,
385            primary_key_encoding: PrimaryKeyEncoding::Dense,
386        }
387    }
388}
389
390fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
391where
392    D: Deserializer<'de>,
393{
394    let s: String = Deserialize::deserialize(deserializer)?;
395    let mut column_ids = Vec::new();
396    if s.is_empty() {
397        return Ok(column_ids);
398    }
399    for item in s.split(',') {
400        let column_id = item.parse().map_err(D::Error::custom)?;
401        column_ids.push(column_id);
402    }
403    Ok(column_ids)
404}
405
406fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
407where
408    S: serde::Serializer,
409{
410    let s = column_ids
411        .iter()
412        .map(|id| id.to_string())
413        .collect::<Vec<_>>()
414        .join(",");
415    serializer.serialize_str(&s)
416}
417
418/// Converts the `options` map to a json object.
419///
420/// Replaces "null" strings by `null` json values.
421fn options_map_to_value(options: &HashMap<String, String>) -> Value {
422    let map = options
423        .iter()
424        .map(|(key, value)| {
425            // Only convert the key to lowercase.
426            if value.eq_ignore_ascii_case("null") {
427                (key.clone(), Value::Null)
428            } else {
429                (key.clone(), Value::from(value.clone()))
430            }
431        })
432        .collect();
433    Value::Object(map)
434}
435
436// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
437// check the type key first.
438/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
439/// and returns `true` if the map contains enum options.
440fn validate_enum_options(
441    options_map: &HashMap<String, String>,
442    enum_tag_key: &str,
443) -> Result<bool> {
444    let enum_type = enum_tag_key.split('.').next().unwrap();
445    let mut has_other_options = false;
446    let mut has_tag = false;
447    for key in options_map.keys() {
448        if key == enum_tag_key {
449            has_tag = true;
450        } else if key.starts_with(enum_type) {
451            has_other_options = true;
452        }
453    }
454
455    // If tag is not provided, then other options for the enum should not exist.
456    ensure!(
457        has_tag || !has_other_options,
458        InvalidRegionOptionsSnafu {
459            reason: format!("missing key {} in options", enum_tag_key),
460        }
461    );
462
463    Ok(has_tag)
464}
465
466#[cfg(test)]
467mod tests {
468    use common_error::ext::ErrorExt;
469    use common_error::status_code::StatusCode;
470    use common_wal::options::KafkaWalOptions;
471
472    use super::*;
473
474    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
475        options
476            .iter()
477            .map(|(k, v)| (k.to_string(), v.to_string()))
478            .collect()
479    }
480
481    #[test]
482    fn test_empty_region_options() {
483        let map = make_map(&[]);
484        let options = RegionOptions::try_from(&map).unwrap();
485        assert_eq!(RegionOptions::default(), options);
486    }
487
488    #[test]
489    fn test_with_ttl() {
490        let map = make_map(&[("ttl", "7d")]);
491        let options = RegionOptions::try_from(&map).unwrap();
492        let expect = RegionOptions {
493            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
494            ..Default::default()
495        };
496        assert_eq!(expect, options);
497    }
498
499    #[test]
500    fn test_with_storage() {
501        let map = make_map(&[("storage", "S3")]);
502        let options = RegionOptions::try_from(&map).unwrap();
503        let expect = RegionOptions {
504            storage: Some("S3".to_string()),
505            ..Default::default()
506        };
507        assert_eq!(expect, options);
508    }
509
510    #[test]
511    fn test_without_compaction_type() {
512        let map = make_map(&[
513            ("compaction.twcs.trigger_file_num", "8"),
514            ("compaction.twcs.time_window", "2h"),
515        ]);
516        let err = RegionOptions::try_from(&map).unwrap_err();
517        assert_eq!(StatusCode::InvalidArguments, err.status_code());
518    }
519
520    #[test]
521    fn test_with_compaction_type() {
522        let map = make_map(&[
523            ("compaction.twcs.trigger_file_num", "8"),
524            ("compaction.twcs.time_window", "2h"),
525            ("compaction.type", "twcs"),
526        ]);
527        let options = RegionOptions::try_from(&map).unwrap();
528        let expect = RegionOptions {
529            compaction: CompactionOptions::Twcs(TwcsOptions {
530                trigger_file_num: 8,
531                time_window: Some(Duration::from_secs(3600 * 2)),
532                ..Default::default()
533            }),
534            compaction_override: true,
535            ..Default::default()
536        };
537        assert_eq!(expect, options);
538    }
539
540    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
541        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
542        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
543        let got = RegionOptions::try_from(&map).unwrap();
544        let expect = RegionOptions {
545            wal_options: wal_options.clone(),
546            ..Default::default()
547        };
548        expect == got
549    }
550
551    #[test]
552    fn test_with_index() {
553        let map = make_map(&[
554            ("index.inverted_index.ignore_column_ids", "1,2,3"),
555            ("index.inverted_index.segment_row_count", "512"),
556        ]);
557        let options = RegionOptions::try_from(&map).unwrap();
558        let expect = RegionOptions {
559            index_options: IndexOptions {
560                inverted_index: InvertedIndexOptions {
561                    ignore_column_ids: vec![1, 2, 3],
562                    segment_row_count: 512,
563                },
564            },
565            ..Default::default()
566        };
567        assert_eq!(expect, options);
568    }
569
570    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
571    #[test]
572    fn test_with_any_wal_options() {
573        let all_wal_options = [
574            WalOptions::RaftEngine,
575            WalOptions::Kafka(KafkaWalOptions {
576                topic: "test_topic".to_string(),
577            }),
578        ];
579        all_wal_options.iter().all(test_with_wal_options);
580    }
581
582    #[test]
583    fn test_with_memtable() {
584        let map = make_map(&[("memtable.type", "time_series")]);
585        let options = RegionOptions::try_from(&map).unwrap();
586        let expect = RegionOptions {
587            memtable: Some(MemtableOptions::TimeSeries),
588            ..Default::default()
589        };
590        assert_eq!(expect, options);
591
592        let map = make_map(&[("memtable.type", "partition_tree")]);
593        let options = RegionOptions::try_from(&map).unwrap();
594        let expect = RegionOptions {
595            memtable: Some(MemtableOptions::PartitionTree(
596                PartitionTreeOptions::default(),
597            )),
598            ..Default::default()
599        };
600        assert_eq!(expect, options);
601    }
602
603    #[test]
604    fn test_unknown_memtable_type() {
605        let map = make_map(&[("memtable.type", "no_such_memtable")]);
606        let err = RegionOptions::try_from(&map).unwrap_err();
607        assert_eq!(StatusCode::InvalidArguments, err.status_code());
608    }
609
610    #[test]
611    fn test_with_merge_mode() {
612        let map = make_map(&[("merge_mode", "last_row")]);
613        let options = RegionOptions::try_from(&map).unwrap();
614        assert_eq!(MergeMode::LastRow, options.merge_mode());
615
616        let map = make_map(&[("merge_mode", "last_non_null")]);
617        let options = RegionOptions::try_from(&map).unwrap();
618        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
619
620        let map = make_map(&[("merge_mode", "unknown")]);
621        let err = RegionOptions::try_from(&map).unwrap_err();
622        assert_eq!(StatusCode::InvalidArguments, err.status_code());
623    }
624
625    #[test]
626    fn test_with_all() {
627        let wal_options = WalOptions::Kafka(KafkaWalOptions {
628            topic: "test_topic".to_string(),
629        });
630        let map = make_map(&[
631            ("ttl", "7d"),
632            ("compaction.twcs.trigger_file_num", "8"),
633            ("compaction.twcs.max_output_file_size", "1GB"),
634            ("compaction.twcs.time_window", "2h"),
635            ("compaction.type", "twcs"),
636            ("compaction.twcs.remote_compaction", "false"),
637            ("compaction.twcs.fallback_to_local", "true"),
638            ("storage", "S3"),
639            ("append_mode", "false"),
640            ("index.inverted_index.ignore_column_ids", "1,2,3"),
641            ("index.inverted_index.segment_row_count", "512"),
642            (
643                WAL_OPTIONS_KEY,
644                &serde_json::to_string(&wal_options).unwrap(),
645            ),
646            ("memtable.type", "partition_tree"),
647            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
648            ("memtable.partition_tree.data_freeze_threshold", "2048"),
649            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
650            ("merge_mode", "last_non_null"),
651        ]);
652        let options = RegionOptions::try_from(&map).unwrap();
653        let expect = RegionOptions {
654            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
655            compaction: CompactionOptions::Twcs(TwcsOptions {
656                trigger_file_num: 8,
657                time_window: Some(Duration::from_secs(3600 * 2)),
658                max_output_file_size: Some(ReadableSize::gb(1)),
659                remote_compaction: false,
660                fallback_to_local: true,
661            }),
662            compaction_override: true,
663            storage: Some("S3".to_string()),
664            append_mode: false,
665            wal_options,
666            index_options: IndexOptions {
667                inverted_index: InvertedIndexOptions {
668                    ignore_column_ids: vec![1, 2, 3],
669                    segment_row_count: 512,
670                },
671            },
672            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
673                index_max_keys_per_shard: 2048,
674                data_freeze_threshold: 2048,
675                fork_dictionary_bytes: ReadableSize::mb(128),
676                primary_key_encoding: PrimaryKeyEncoding::Dense,
677            })),
678            merge_mode: Some(MergeMode::LastNonNull),
679            sst_format: None,
680        };
681        assert_eq!(expect, options);
682    }
683
684    #[test]
685    fn test_region_options_serde() {
686        let options = RegionOptions {
687            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
688            compaction: CompactionOptions::Twcs(TwcsOptions {
689                trigger_file_num: 8,
690                time_window: Some(Duration::from_secs(3600 * 2)),
691                max_output_file_size: None,
692                remote_compaction: false,
693                fallback_to_local: true,
694            }),
695            compaction_override: false,
696            storage: Some("S3".to_string()),
697            append_mode: false,
698            wal_options: WalOptions::Kafka(KafkaWalOptions {
699                topic: "test_topic".to_string(),
700            }),
701            index_options: IndexOptions {
702                inverted_index: InvertedIndexOptions {
703                    ignore_column_ids: vec![1, 2, 3],
704                    segment_row_count: 512,
705                },
706            },
707            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
708                index_max_keys_per_shard: 2048,
709                data_freeze_threshold: 2048,
710                fork_dictionary_bytes: ReadableSize::mb(128),
711                primary_key_encoding: PrimaryKeyEncoding::Dense,
712            })),
713            merge_mode: Some(MergeMode::LastNonNull),
714            sst_format: None,
715        };
716        let region_options_json_str = serde_json::to_string(&options).unwrap();
717        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
718        assert_eq!(options, got);
719    }
720
721    #[test]
722    fn test_region_options_str_serde() {
723        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
724        let region_options_json_str = r#"{
725  "ttl": "7days",
726  "compaction": {
727    "compaction.type": "twcs",
728    "compaction.twcs.trigger_file_num": "8",
729    "compaction.twcs.max_output_file_size": "7MB",
730    "compaction.twcs.time_window": "2h"
731  },
732  "storage": "S3",
733  "append_mode": false,
734  "wal_options": {
735    "wal.provider": "kafka",
736    "wal.kafka.topic": "test_topic"
737  },
738  "index_options": {
739    "index.inverted_index.ignore_column_ids": "",
740    "index.inverted_index.segment_row_count": "512"
741  },
742  "memtable": {
743    "memtable.type": "partition_tree",
744    "memtable.partition_tree.index_max_keys_per_shard": "2048",
745    "memtable.partition_tree.data_freeze_threshold": "2048",
746    "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
747  },
748  "merge_mode": "last_non_null"
749}"#;
750        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
751        let options = RegionOptions {
752            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
753            compaction: CompactionOptions::Twcs(TwcsOptions {
754                trigger_file_num: 8,
755                time_window: Some(Duration::from_secs(3600 * 2)),
756                max_output_file_size: Some(ReadableSize::mb(7)),
757                remote_compaction: false,
758                fallback_to_local: true,
759            }),
760            compaction_override: false,
761            storage: Some("S3".to_string()),
762            append_mode: false,
763            wal_options: WalOptions::Kafka(KafkaWalOptions {
764                topic: "test_topic".to_string(),
765            }),
766            index_options: IndexOptions {
767                inverted_index: InvertedIndexOptions {
768                    ignore_column_ids: vec![],
769                    segment_row_count: 512,
770                },
771            },
772            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
773                index_max_keys_per_shard: 2048,
774                data_freeze_threshold: 2048,
775                fork_dictionary_bytes: ReadableSize::mb(128),
776                primary_key_encoding: PrimaryKeyEncoding::Dense,
777            })),
778            merge_mode: Some(MergeMode::LastNonNull),
779            sst_format: None,
780        };
781        assert_eq!(options, got);
782    }
783}