mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_stat::get_total_memory_readable;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::storage::ColumnId;
33use strum::EnumString;
34
35use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
36use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
37use crate::sst::FormatType;
38
39const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
40
41/// Mode to handle duplicate rows while merging.
42#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
43#[serde(rename_all = "snake_case")]
44#[strum(serialize_all = "snake_case")]
45pub enum MergeMode {
46    /// Keeps the last row.
47    #[default]
48    LastRow,
49    /// Keeps the last non-null field for each row.
50    LastNonNull,
51}
52
53// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
54// if we want expose the option to table options.
55/// Options that affect the entire region.
56///
57/// Users need to specify the options while creating/opening a region.
58#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(default)]
60pub struct RegionOptions {
61    /// Region SST files TTL.
62    pub ttl: Option<TimeToLive>,
63    /// Compaction options.
64    pub compaction: CompactionOptions,
65    /// Custom storage. Uses default storage if it is `None`.
66    pub storage: Option<String>,
67    /// If append mode is enabled, the region keeps duplicate rows.
68    pub append_mode: bool,
69    /// Wal options.
70    pub wal_options: WalOptions,
71    /// Index options.
72    pub index_options: IndexOptions,
73    /// Memtable options.
74    pub memtable: Option<MemtableOptions>,
75    /// The mode to merge duplicate rows.
76    /// Only takes effect when `append_mode` is `false`.
77    pub merge_mode: Option<MergeMode>,
78    /// SST format type.
79    pub sst_format: Option<FormatType>,
80}
81
82impl RegionOptions {
83    /// Validates options.
84    pub fn validate(&self) -> Result<()> {
85        if self.append_mode {
86            ensure!(
87                self.merge_mode.is_none(),
88                InvalidRegionOptionsSnafu {
89                    reason: "merge_mode is not allowed when append_mode is enabled",
90                }
91            );
92        }
93        Ok(())
94    }
95
96    /// Returns `true` if deduplication is needed.
97    pub fn need_dedup(&self) -> bool {
98        !self.append_mode
99    }
100
101    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
102    pub fn merge_mode(&self) -> MergeMode {
103        self.merge_mode.unwrap_or_default()
104    }
105
106    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
107    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
108        self.memtable
109            .as_ref()
110            .map_or(PrimaryKeyEncoding::default(), |memtable| {
111                memtable.primary_key_encoding()
112            })
113    }
114}
115
116impl TryFrom<&HashMap<String, String>> for RegionOptions {
117    type Error = Error;
118
119    fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
120        let value = options_map_to_value(options_map);
121        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
122
123        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
124        // each field manually instead of using #[serde(flatten)] for `compaction`.
125        // See https://github.com/serde-rs/serde/issues/1626
126        let options: RegionOptionsWithoutEnum =
127            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
128        let compaction = if validate_enum_options(options_map, "compaction.type")? {
129            serde_json::from_str(&json).context(JsonOptionsSnafu)?
130        } else {
131            CompactionOptions::default()
132        };
133
134        // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map.
135        let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
136            || Ok(WalOptions::default()),
137            |encoded_wal_options| {
138                serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
139            },
140        )?;
141
142        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
143        let memtable = if validate_enum_options(options_map, "memtable.type")? {
144            Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
145        } else {
146            None
147        };
148
149        let opts = RegionOptions {
150            ttl: options.ttl,
151            compaction,
152            storage: options.storage,
153            append_mode: options.append_mode,
154            wal_options,
155            index_options,
156            memtable,
157            merge_mode: options.merge_mode,
158            sst_format: options.sst_format,
159        };
160        opts.validate()?;
161
162        Ok(opts)
163    }
164}
165
166/// Options for compactions
167#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(tag = "compaction.type")]
169#[serde(rename_all = "snake_case")]
170pub enum CompactionOptions {
171    /// Time window compaction strategy.
172    #[serde(with = "prefix_twcs")]
173    Twcs(TwcsOptions),
174}
175
176impl CompactionOptions {
177    pub(crate) fn time_window(&self) -> Option<Duration> {
178        match self {
179            CompactionOptions::Twcs(opts) => opts.time_window,
180        }
181    }
182
183    pub(crate) fn remote_compaction(&self) -> bool {
184        match self {
185            CompactionOptions::Twcs(opts) => opts.remote_compaction,
186        }
187    }
188
189    pub(crate) fn fallback_to_local(&self) -> bool {
190        match self {
191            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
192        }
193    }
194}
195
196impl Default for CompactionOptions {
197    fn default() -> Self {
198        Self::Twcs(TwcsOptions::default())
199    }
200}
201
202/// Time window compaction options.
203#[serde_as]
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
205#[serde(default)]
206pub struct TwcsOptions {
207    /// Minimum file num in every time window to trigger a compaction.
208    #[serde_as(as = "DisplayFromStr")]
209    pub trigger_file_num: usize,
210    /// Compaction time window defined when creating tables.
211    #[serde(with = "humantime_serde")]
212    pub time_window: Option<Duration>,
213    /// Compaction time window defined when creating tables.
214    pub max_output_file_size: Option<ReadableSize>,
215    /// Whether to use remote compaction.
216    #[serde_as(as = "DisplayFromStr")]
217    pub remote_compaction: bool,
218    /// Whether to fall back to local compaction if remote compaction fails.
219    #[serde_as(as = "DisplayFromStr")]
220    pub fallback_to_local: bool,
221}
222
223with_prefix!(prefix_twcs "compaction.twcs.");
224
225impl TwcsOptions {
226    /// Returns time window in second resolution.
227    pub fn time_window_seconds(&self) -> Option<i64> {
228        self.time_window.and_then(|window| {
229            let window_secs = window.as_secs();
230            if window_secs == 0 {
231                None
232            } else {
233                window_secs.try_into().ok()
234            }
235        })
236    }
237}
238
239impl Default for TwcsOptions {
240    fn default() -> Self {
241        Self {
242            trigger_file_num: 4,
243            time_window: None,
244            max_output_file_size: Some(ReadableSize::mb(512)),
245            remote_compaction: false,
246            fallback_to_local: true,
247        }
248    }
249}
250
251/// We need to define a new struct without enum fields as `#[serde(default)]` does not
252/// support external tagging.
253#[serde_as]
254#[derive(Debug, Deserialize)]
255#[serde(default)]
256struct RegionOptionsWithoutEnum {
257    /// Region SST files TTL.
258    ttl: Option<TimeToLive>,
259    storage: Option<String>,
260    #[serde_as(as = "DisplayFromStr")]
261    append_mode: bool,
262    #[serde_as(as = "NoneAsEmptyString")]
263    merge_mode: Option<MergeMode>,
264    #[serde_as(as = "NoneAsEmptyString")]
265    sst_format: Option<FormatType>,
266}
267
268impl Default for RegionOptionsWithoutEnum {
269    fn default() -> Self {
270        let options = RegionOptions::default();
271        RegionOptionsWithoutEnum {
272            ttl: options.ttl,
273            storage: options.storage,
274            append_mode: options.append_mode,
275            merge_mode: options.merge_mode,
276            sst_format: options.sst_format,
277        }
278    }
279}
280
281with_prefix!(prefix_inverted_index "index.inverted_index.");
282
283/// Options for index.
284#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
285#[serde(default)]
286pub struct IndexOptions {
287    /// Options for the inverted index.
288    #[serde(flatten, with = "prefix_inverted_index")]
289    pub inverted_index: InvertedIndexOptions,
290}
291
292/// Options for the inverted index.
293#[serde_as]
294#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
295#[serde(default)]
296pub struct InvertedIndexOptions {
297    /// The column ids that should be ignored when building the inverted index.
298    /// The column ids are separated by commas. For example, "1,2,3".
299    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
300    #[serde(serialize_with = "serialize_ignore_column_ids")]
301    pub ignore_column_ids: Vec<ColumnId>,
302
303    /// The number of rows in a segment.
304    #[serde_as(as = "DisplayFromStr")]
305    pub segment_row_count: usize,
306}
307
308impl Default for InvertedIndexOptions {
309    fn default() -> Self {
310        Self {
311            ignore_column_ids: Vec::new(),
312            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
313        }
314    }
315}
316
317/// Options for region level memtable.
318#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
319#[serde(tag = "memtable.type", rename_all = "snake_case")]
320pub enum MemtableOptions {
321    TimeSeries,
322    #[serde(with = "prefix_partition_tree")]
323    PartitionTree(PartitionTreeOptions),
324}
325
326with_prefix!(prefix_partition_tree "memtable.partition_tree.");
327
328impl MemtableOptions {
329    /// Returns the primary key encoding mode.
330    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
331        match self {
332            MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
333            _ => PrimaryKeyEncoding::Dense,
334        }
335    }
336}
337
338/// Partition tree memtable options.
339#[serde_as]
340#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
341#[serde(default)]
342pub struct PartitionTreeOptions {
343    /// Max keys in an index shard.
344    #[serde_as(as = "DisplayFromStr")]
345    pub index_max_keys_per_shard: usize,
346    /// Number of rows to freeze a data part.
347    #[serde_as(as = "DisplayFromStr")]
348    pub data_freeze_threshold: usize,
349    /// Total bytes of dictionary to keep in fork.
350    pub fork_dictionary_bytes: ReadableSize,
351    /// Primary key encoding mode.
352    pub primary_key_encoding: PrimaryKeyEncoding,
353}
354
355impl Default for PartitionTreeOptions {
356    fn default() -> Self {
357        let mut fork_dictionary_bytes = ReadableSize::mb(512);
358        if let Some(total_memory) = get_total_memory_readable() {
359            let adjust_dictionary_bytes = std::cmp::min(
360                total_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
361                fork_dictionary_bytes,
362            );
363            if adjust_dictionary_bytes.0 > 0 {
364                fork_dictionary_bytes = adjust_dictionary_bytes;
365            }
366        }
367        Self {
368            index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
369            data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
370            fork_dictionary_bytes,
371            primary_key_encoding: PrimaryKeyEncoding::Dense,
372        }
373    }
374}
375
376fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
377where
378    D: Deserializer<'de>,
379{
380    let s: String = Deserialize::deserialize(deserializer)?;
381    let mut column_ids = Vec::new();
382    if s.is_empty() {
383        return Ok(column_ids);
384    }
385    for item in s.split(',') {
386        let column_id = item.parse().map_err(D::Error::custom)?;
387        column_ids.push(column_id);
388    }
389    Ok(column_ids)
390}
391
392fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
393where
394    S: serde::Serializer,
395{
396    let s = column_ids
397        .iter()
398        .map(|id| id.to_string())
399        .collect::<Vec<_>>()
400        .join(",");
401    serializer.serialize_str(&s)
402}
403
404/// Converts the `options` map to a json object.
405///
406/// Replaces "null" strings by `null` json values.
407fn options_map_to_value(options: &HashMap<String, String>) -> Value {
408    let map = options
409        .iter()
410        .map(|(key, value)| {
411            // Only convert the key to lowercase.
412            if value.eq_ignore_ascii_case("null") {
413                (key.clone(), Value::Null)
414            } else {
415                (key.clone(), Value::from(value.clone()))
416            }
417        })
418        .collect();
419    Value::Object(map)
420}
421
422// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
423// check the type key first.
424/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
425/// and returns `true` if the map contains enum options.
426fn validate_enum_options(
427    options_map: &HashMap<String, String>,
428    enum_tag_key: &str,
429) -> Result<bool> {
430    let enum_type = enum_tag_key.split('.').next().unwrap();
431    let mut has_other_options = false;
432    let mut has_tag = false;
433    for key in options_map.keys() {
434        if key == enum_tag_key {
435            has_tag = true;
436        } else if key.starts_with(enum_type) {
437            has_other_options = true;
438        }
439    }
440
441    // If tag is not provided, then other options for the enum should not exist.
442    ensure!(
443        has_tag || !has_other_options,
444        InvalidRegionOptionsSnafu {
445            reason: format!("missing key {} in options", enum_tag_key),
446        }
447    );
448
449    Ok(has_tag)
450}
451
452#[cfg(test)]
453mod tests {
454    use common_error::ext::ErrorExt;
455    use common_error::status_code::StatusCode;
456    use common_wal::options::KafkaWalOptions;
457
458    use super::*;
459
460    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
461        options
462            .iter()
463            .map(|(k, v)| (k.to_string(), v.to_string()))
464            .collect()
465    }
466
467    #[test]
468    fn test_empty_region_options() {
469        let map = make_map(&[]);
470        let options = RegionOptions::try_from(&map).unwrap();
471        assert_eq!(RegionOptions::default(), options);
472    }
473
474    #[test]
475    fn test_with_ttl() {
476        let map = make_map(&[("ttl", "7d")]);
477        let options = RegionOptions::try_from(&map).unwrap();
478        let expect = RegionOptions {
479            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
480            ..Default::default()
481        };
482        assert_eq!(expect, options);
483    }
484
485    #[test]
486    fn test_with_storage() {
487        let map = make_map(&[("storage", "S3")]);
488        let options = RegionOptions::try_from(&map).unwrap();
489        let expect = RegionOptions {
490            storage: Some("S3".to_string()),
491            ..Default::default()
492        };
493        assert_eq!(expect, options);
494    }
495
496    #[test]
497    fn test_without_compaction_type() {
498        let map = make_map(&[
499            ("compaction.twcs.trigger_file_num", "8"),
500            ("compaction.twcs.time_window", "2h"),
501        ]);
502        let err = RegionOptions::try_from(&map).unwrap_err();
503        assert_eq!(StatusCode::InvalidArguments, err.status_code());
504    }
505
506    #[test]
507    fn test_with_compaction_type() {
508        let map = make_map(&[
509            ("compaction.twcs.trigger_file_num", "8"),
510            ("compaction.twcs.time_window", "2h"),
511            ("compaction.type", "twcs"),
512        ]);
513        let options = RegionOptions::try_from(&map).unwrap();
514        let expect = RegionOptions {
515            compaction: CompactionOptions::Twcs(TwcsOptions {
516                trigger_file_num: 8,
517                time_window: Some(Duration::from_secs(3600 * 2)),
518                ..Default::default()
519            }),
520            ..Default::default()
521        };
522        assert_eq!(expect, options);
523    }
524
525    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
526        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
527        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
528        let got = RegionOptions::try_from(&map).unwrap();
529        let expect = RegionOptions {
530            wal_options: wal_options.clone(),
531            ..Default::default()
532        };
533        expect == got
534    }
535
536    #[test]
537    fn test_with_index() {
538        let map = make_map(&[
539            ("index.inverted_index.ignore_column_ids", "1,2,3"),
540            ("index.inverted_index.segment_row_count", "512"),
541        ]);
542        let options = RegionOptions::try_from(&map).unwrap();
543        let expect = RegionOptions {
544            index_options: IndexOptions {
545                inverted_index: InvertedIndexOptions {
546                    ignore_column_ids: vec![1, 2, 3],
547                    segment_row_count: 512,
548                },
549            },
550            ..Default::default()
551        };
552        assert_eq!(expect, options);
553    }
554
555    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
556    #[test]
557    fn test_with_any_wal_options() {
558        let all_wal_options = [
559            WalOptions::RaftEngine,
560            WalOptions::Kafka(KafkaWalOptions {
561                topic: "test_topic".to_string(),
562            }),
563        ];
564        all_wal_options.iter().all(test_with_wal_options);
565    }
566
567    #[test]
568    fn test_with_memtable() {
569        let map = make_map(&[("memtable.type", "time_series")]);
570        let options = RegionOptions::try_from(&map).unwrap();
571        let expect = RegionOptions {
572            memtable: Some(MemtableOptions::TimeSeries),
573            ..Default::default()
574        };
575        assert_eq!(expect, options);
576
577        let map = make_map(&[("memtable.type", "partition_tree")]);
578        let options = RegionOptions::try_from(&map).unwrap();
579        let expect = RegionOptions {
580            memtable: Some(MemtableOptions::PartitionTree(
581                PartitionTreeOptions::default(),
582            )),
583            ..Default::default()
584        };
585        assert_eq!(expect, options);
586    }
587
588    #[test]
589    fn test_unknown_memtable_type() {
590        let map = make_map(&[("memtable.type", "no_such_memtable")]);
591        let err = RegionOptions::try_from(&map).unwrap_err();
592        assert_eq!(StatusCode::InvalidArguments, err.status_code());
593    }
594
595    #[test]
596    fn test_with_merge_mode() {
597        let map = make_map(&[("merge_mode", "last_row")]);
598        let options = RegionOptions::try_from(&map).unwrap();
599        assert_eq!(MergeMode::LastRow, options.merge_mode());
600
601        let map = make_map(&[("merge_mode", "last_non_null")]);
602        let options = RegionOptions::try_from(&map).unwrap();
603        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
604
605        let map = make_map(&[("merge_mode", "unknown")]);
606        let err = RegionOptions::try_from(&map).unwrap_err();
607        assert_eq!(StatusCode::InvalidArguments, err.status_code());
608    }
609
610    #[test]
611    fn test_with_all() {
612        let wal_options = WalOptions::Kafka(KafkaWalOptions {
613            topic: "test_topic".to_string(),
614        });
615        let map = make_map(&[
616            ("ttl", "7d"),
617            ("compaction.twcs.trigger_file_num", "8"),
618            ("compaction.twcs.max_output_file_size", "1GB"),
619            ("compaction.twcs.time_window", "2h"),
620            ("compaction.type", "twcs"),
621            ("compaction.twcs.remote_compaction", "false"),
622            ("compaction.twcs.fallback_to_local", "true"),
623            ("storage", "S3"),
624            ("append_mode", "false"),
625            ("index.inverted_index.ignore_column_ids", "1,2,3"),
626            ("index.inverted_index.segment_row_count", "512"),
627            (
628                WAL_OPTIONS_KEY,
629                &serde_json::to_string(&wal_options).unwrap(),
630            ),
631            ("memtable.type", "partition_tree"),
632            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
633            ("memtable.partition_tree.data_freeze_threshold", "2048"),
634            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
635            ("merge_mode", "last_non_null"),
636        ]);
637        let options = RegionOptions::try_from(&map).unwrap();
638        let expect = RegionOptions {
639            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
640            compaction: CompactionOptions::Twcs(TwcsOptions {
641                trigger_file_num: 8,
642                time_window: Some(Duration::from_secs(3600 * 2)),
643                max_output_file_size: Some(ReadableSize::gb(1)),
644                remote_compaction: false,
645                fallback_to_local: true,
646            }),
647            storage: Some("S3".to_string()),
648            append_mode: false,
649            wal_options,
650            index_options: IndexOptions {
651                inverted_index: InvertedIndexOptions {
652                    ignore_column_ids: vec![1, 2, 3],
653                    segment_row_count: 512,
654                },
655            },
656            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
657                index_max_keys_per_shard: 2048,
658                data_freeze_threshold: 2048,
659                fork_dictionary_bytes: ReadableSize::mb(128),
660                primary_key_encoding: PrimaryKeyEncoding::Dense,
661            })),
662            merge_mode: Some(MergeMode::LastNonNull),
663            sst_format: None,
664        };
665        assert_eq!(expect, options);
666    }
667
668    #[test]
669    fn test_region_options_serde() {
670        let options = RegionOptions {
671            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
672            compaction: CompactionOptions::Twcs(TwcsOptions {
673                trigger_file_num: 8,
674                time_window: Some(Duration::from_secs(3600 * 2)),
675                max_output_file_size: None,
676                remote_compaction: false,
677                fallback_to_local: true,
678            }),
679            storage: Some("S3".to_string()),
680            append_mode: false,
681            wal_options: WalOptions::Kafka(KafkaWalOptions {
682                topic: "test_topic".to_string(),
683            }),
684            index_options: IndexOptions {
685                inverted_index: InvertedIndexOptions {
686                    ignore_column_ids: vec![1, 2, 3],
687                    segment_row_count: 512,
688                },
689            },
690            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
691                index_max_keys_per_shard: 2048,
692                data_freeze_threshold: 2048,
693                fork_dictionary_bytes: ReadableSize::mb(128),
694                primary_key_encoding: PrimaryKeyEncoding::Dense,
695            })),
696            merge_mode: Some(MergeMode::LastNonNull),
697            sst_format: None,
698        };
699        let region_options_json_str = serde_json::to_string(&options).unwrap();
700        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
701        assert_eq!(options, got);
702    }
703
704    #[test]
705    fn test_region_options_str_serde() {
706        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
707        let region_options_json_str = r#"{
708  "ttl": "7days",
709  "compaction": {
710    "compaction.type": "twcs",
711    "compaction.twcs.trigger_file_num": "8",
712    "compaction.twcs.max_output_file_size": "7MB",
713    "compaction.twcs.time_window": "2h"
714  },
715  "storage": "S3",
716  "append_mode": false,
717  "wal_options": {
718    "wal.provider": "kafka",
719    "wal.kafka.topic": "test_topic"
720  },
721  "index_options": {
722    "index.inverted_index.ignore_column_ids": "",
723    "index.inverted_index.segment_row_count": "512"
724  },
725  "memtable": {
726    "memtable.type": "partition_tree",
727    "memtable.partition_tree.index_max_keys_per_shard": "2048",
728    "memtable.partition_tree.data_freeze_threshold": "2048",
729    "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
730  },
731  "merge_mode": "last_non_null"
732}"#;
733        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
734        let options = RegionOptions {
735            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
736            compaction: CompactionOptions::Twcs(TwcsOptions {
737                trigger_file_num: 8,
738                time_window: Some(Duration::from_secs(3600 * 2)),
739                max_output_file_size: Some(ReadableSize::mb(7)),
740                remote_compaction: false,
741                fallback_to_local: true,
742            }),
743            storage: Some("S3".to_string()),
744            append_mode: false,
745            wal_options: WalOptions::Kafka(KafkaWalOptions {
746                topic: "test_topic".to_string(),
747            }),
748            index_options: IndexOptions {
749                inverted_index: InvertedIndexOptions {
750                    ignore_column_ids: vec![],
751                    segment_row_count: 512,
752                },
753            },
754            memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
755                index_max_keys_per_shard: 2048,
756                data_freeze_threshold: 2048,
757                fork_dictionary_bytes: ReadableSize::mb(128),
758                primary_key_encoding: PrimaryKeyEncoding::Dense,
759            })),
760            merge_mode: Some(MergeMode::LastNonNull),
761            sst_format: None,
762        };
763        assert_eq!(options, got);
764    }
765}