Skip to main content

mito2/region/
options.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Options for a region.
16//!
17//! If we add options in this mod, we also need to modify [store_api::mito_engine_options].
18
19use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_telemetry::info;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metric_engine_consts::{
33    MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, PRIMARY_KEY_ENCODING,
34};
35use store_api::mito_engine_options::COMPACTION_OVERRIDE;
36use store_api::storage::{ColumnId, RegionId};
37use strum::EnumString;
38
39use crate::error::{InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
40use crate::memtable::bulk::BulkMemtableConfig;
41use crate::sst::FormatType;
42
43const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
44const COMPACTION_TWCS_PREFIX: &str = "compaction.twcs.";
45const MEMTABLE_PARTITION_TREE_PREFIX: &str = "memtable.partition_tree.";
46const MEMTABLE_BULK_PREFIX: &str = "memtable.bulk.";
47
48/// Legacy memtable type identifier accepted for backward compatibility.
49/// The partition tree memtable has been removed; parsing this value falls
50/// back to the default (bulk) memtable at runtime.
51const LEGACY_PARTITION_TREE_MEMTABLE_TYPE: &str = "partition_tree";
52
53pub(crate) fn parse_wal_options(
54    options_map: &HashMap<String, String>,
55) -> std::result::Result<WalOptions, serde_json::Error> {
56    options_map
57        .get(WAL_OPTIONS_KEY)
58        .map_or(Ok(WalOptions::default()), |encoded_wal_options| {
59            serde_json::from_str(encoded_wal_options)
60        })
61}
62
63/// Mode to handle duplicate rows while merging.
64#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)]
65#[serde(rename_all = "snake_case")]
66#[strum(serialize_all = "snake_case")]
67pub enum MergeMode {
68    /// Keeps the last row.
69    #[default]
70    LastRow,
71    /// Keeps the last non-null field for each row.
72    LastNonNull,
73}
74
75// Note: We need to update [store_api::mito_engine_options::is_mito_engine_option_key()]
76// if we want expose the option to table options.
77/// Options that affect the entire region.
78///
79/// Users need to specify the options while creating/opening a region.
80#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(default)]
82pub struct RegionOptions {
83    /// Region SST files TTL.
84    pub ttl: Option<TimeToLive>,
85    /// Compaction options.
86    pub compaction: CompactionOptions,
87    pub compaction_override: bool,
88    /// Custom storage. Uses default storage if it is `None`.
89    pub storage: Option<String>,
90    /// If append mode is enabled, the region keeps duplicate rows.
91    pub append_mode: bool,
92    /// Wal options.
93    pub wal_options: WalOptions,
94    /// Index options.
95    pub index_options: IndexOptions,
96    /// Memtable options.
97    pub memtable: Option<MemtableOptions>,
98    /// The mode to merge duplicate rows.
99    /// Only takes effect when `append_mode` is `false`.
100    pub merge_mode: Option<MergeMode>,
101    /// SST format type.
102    pub sst_format: Option<FormatType>,
103    /// Internal primary key encoding override used by metric-engine.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub primary_key_encoding: Option<PrimaryKeyEncoding>,
106}
107
108impl RegionOptions {
109    /// Validates options.
110    pub fn validate(&self) -> Result<()> {
111        if self.append_mode {
112            ensure!(
113                self.merge_mode
114                    .is_none_or(|mode| mode == MergeMode::LastRow),
115                InvalidRegionOptionsSnafu {
116                    reason: "only last_row merge_mode is allowed when append_mode is enabled",
117                }
118            );
119        }
120        Ok(())
121    }
122
123    /// Returns `true` if deduplication is needed.
124    pub fn need_dedup(&self) -> bool {
125        !self.append_mode
126    }
127
128    /// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
129    pub fn merge_mode(&self) -> MergeMode {
130        self.merge_mode.unwrap_or_default()
131    }
132
133    /// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
134    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
135        self.primary_key_encoding.unwrap_or_default()
136    }
137}
138
139impl RegionOptions {
140    /// Parses [RegionOptions] from the raw `options_map`.
141    pub fn try_from_options(
142        region_id: RegionId,
143        options_map: &HashMap<String, String>,
144    ) -> Result<Self> {
145        let value = options_map_to_value(options_map);
146        let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
147
148        // #[serde(flatten)] doesn't work with #[serde(default)] so we need to parse
149        // each field manually instead of using #[serde(flatten)] for `compaction`.
150        // See https://github.com/serde-rs/serde/issues/1626
151        let options: RegionOptionsWithoutEnum =
152            serde_json::from_str(&json).context(JsonOptionsSnafu)?;
153        let has_compaction_type =
154            validate_enum_options(options_map, "compaction.type", &[COMPACTION_TWCS_PREFIX])?;
155        let compaction = if has_compaction_type {
156            serde_json::from_str(&json).context(JsonOptionsSnafu)?
157        } else {
158            CompactionOptions::default()
159        };
160
161        let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
162
163        let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
164        let is_legacy_partition_tree = options_map
165            .get("memtable.type")
166            .map(|s| s.eq_ignore_ascii_case(LEGACY_PARTITION_TREE_MEMTABLE_TYPE))
167            .unwrap_or(false);
168        let memtable = if validate_enum_options(
169            options_map,
170            "memtable.type",
171            &[MEMTABLE_PARTITION_TREE_PREFIX, MEMTABLE_BULK_PREFIX],
172        )? {
173            if is_legacy_partition_tree {
174                // The partition tree memtable has been removed. Fall back to the
175                // default memtable; the primary key encoding (if any) is still
176                // read separately below from the legacy nested key.
177                None
178            } else {
179                Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
180            }
181        } else {
182            None
183        };
184
185        // The partition tree memtable has been removed. Besides falling back to
186        // the default memtable, also override the SST format to flat.
187        let mut sst_format = options.sst_format;
188        if is_legacy_partition_tree {
189            info!(
190                "Region {} specified the removed partition_tree memtable; \
191                 overriding memtable to the default and SST format to flat",
192                region_id
193            );
194            sst_format = Some(FormatType::Flat);
195        }
196
197        // Bulk memtable produces flat-encoded ranges and flushes them through
198        // `put_sst()`, so the SST format must be flat to match.
199        if matches!(memtable, Some(MemtableOptions::Bulk(_))) {
200            if let Some(format) = sst_format
201                && format != FormatType::Flat
202            {
203                info!(
204                    "Region {} uses bulk memtable; overriding sst_format from {:?} to flat",
205                    region_id, format
206                );
207            }
208            sst_format = Some(FormatType::Flat);
209        }
210
211        let compaction_override_flag = options_map
212            .get(COMPACTION_OVERRIDE)
213            .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
214            .unwrap_or(false);
215        let compaction_override = has_compaction_type || compaction_override_flag;
216        let primary_key_encoding = options_map
217            .get(PRIMARY_KEY_ENCODING)
218            .or_else(|| options_map.get(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING))
219            .map(|v| match v.to_lowercase().as_str() {
220                "dense" => Ok(PrimaryKeyEncoding::Dense),
221                "sparse" => Ok(PrimaryKeyEncoding::Sparse),
222                _ => Err(InvalidRegionOptionsSnafu {
223                    reason: format!("Invalid primary key encoding: {v}"),
224                }
225                .build()),
226            })
227            .transpose()?;
228
229        let opts = RegionOptions {
230            ttl: options.ttl,
231            compaction,
232            compaction_override,
233            storage: options.storage,
234            append_mode: options.append_mode,
235            wal_options,
236            index_options,
237            memtable,
238            merge_mode: options.merge_mode,
239            sst_format,
240            primary_key_encoding,
241        };
242        opts.validate()?;
243
244        Ok(opts)
245    }
246}
247
248/// Options for compactions
249#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "compaction.type")]
251#[serde(rename_all = "snake_case")]
252pub enum CompactionOptions {
253    /// Time window compaction strategy.
254    #[serde(with = "prefix_twcs")]
255    Twcs(TwcsOptions),
256}
257
258impl CompactionOptions {
259    pub(crate) fn time_window(&self) -> Option<Duration> {
260        match self {
261            CompactionOptions::Twcs(opts) => opts.time_window,
262        }
263    }
264
265    pub(crate) fn remote_compaction(&self) -> bool {
266        match self {
267            CompactionOptions::Twcs(opts) => opts.remote_compaction,
268        }
269    }
270
271    pub(crate) fn fallback_to_local(&self) -> bool {
272        match self {
273            CompactionOptions::Twcs(opts) => opts.fallback_to_local,
274        }
275    }
276}
277
278impl Default for CompactionOptions {
279    fn default() -> Self {
280        Self::Twcs(TwcsOptions::default())
281    }
282}
283
284/// Time window compaction options.
285#[serde_as]
286#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(default)]
288pub struct TwcsOptions {
289    /// Minimum file num in every time window to trigger a compaction.
290    #[serde_as(as = "DisplayFromStr")]
291    pub trigger_file_num: usize,
292    /// Compaction time window defined when creating tables.
293    #[serde(with = "humantime_serde")]
294    pub time_window: Option<Duration>,
295    /// Compaction time window defined when creating tables.
296    pub max_output_file_size: Option<ReadableSize>,
297    /// Whether to use remote compaction.
298    #[serde_as(as = "DisplayFromStr")]
299    pub remote_compaction: bool,
300    /// Whether to fall back to local compaction if remote compaction fails.
301    #[serde_as(as = "DisplayFromStr")]
302    pub fallback_to_local: bool,
303}
304
305with_prefix!(prefix_twcs "compaction.twcs.");
306
307impl TwcsOptions {
308    /// Returns time window in second resolution.
309    pub fn time_window_seconds(&self) -> Option<i64> {
310        self.time_window.and_then(|window| {
311            let window_secs = window.as_secs();
312            if window_secs == 0 {
313                None
314            } else {
315                window_secs.try_into().ok()
316            }
317        })
318    }
319}
320
321impl Default for TwcsOptions {
322    fn default() -> Self {
323        Self {
324            trigger_file_num: 4,
325            time_window: None,
326            max_output_file_size: Some(ReadableSize::mb(512)),
327            remote_compaction: false,
328            fallback_to_local: true,
329        }
330    }
331}
332
333/// We need to define a new struct without enum fields as `#[serde(default)]` does not
334/// support external tagging.
335#[serde_as]
336#[derive(Debug, Deserialize)]
337#[serde(default)]
338struct RegionOptionsWithoutEnum {
339    /// Region SST files TTL.
340    ttl: Option<TimeToLive>,
341    storage: Option<String>,
342    #[serde_as(as = "DisplayFromStr")]
343    append_mode: bool,
344    #[serde_as(as = "NoneAsEmptyString")]
345    merge_mode: Option<MergeMode>,
346    #[serde_as(as = "NoneAsEmptyString")]
347    sst_format: Option<FormatType>,
348}
349
350impl Default for RegionOptionsWithoutEnum {
351    fn default() -> Self {
352        let options = RegionOptions::default();
353        RegionOptionsWithoutEnum {
354            ttl: options.ttl,
355            storage: options.storage,
356            append_mode: options.append_mode,
357            merge_mode: options.merge_mode,
358            sst_format: options.sst_format,
359        }
360    }
361}
362
363with_prefix!(prefix_inverted_index "index.inverted_index.");
364
365/// Options for index.
366#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
367#[serde(default)]
368pub struct IndexOptions {
369    /// Options for the inverted index.
370    #[serde(flatten, with = "prefix_inverted_index")]
371    pub inverted_index: InvertedIndexOptions,
372}
373
374/// Options for the inverted index.
375#[serde_as]
376#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
377#[serde(default)]
378pub struct InvertedIndexOptions {
379    /// The column ids that should be ignored when building the inverted index.
380    /// The column ids are separated by commas. For example, "1,2,3".
381    #[serde(deserialize_with = "deserialize_ignore_column_ids")]
382    #[serde(serialize_with = "serialize_ignore_column_ids")]
383    pub ignore_column_ids: Vec<ColumnId>,
384
385    /// The number of rows in a segment.
386    #[serde_as(as = "DisplayFromStr")]
387    pub segment_row_count: usize,
388}
389
390impl Default for InvertedIndexOptions {
391    fn default() -> Self {
392        Self {
393            ignore_column_ids: Vec::new(),
394            segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
395        }
396    }
397}
398
399/// Options for region level memtable.
400#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
401#[serde(tag = "memtable.type", rename_all = "snake_case")]
402pub enum MemtableOptions {
403    TimeSeries,
404    #[serde(with = "prefix_bulk")]
405    Bulk(BulkMemtableConfig),
406}
407
408with_prefix!(prefix_bulk "memtable.bulk.");
409
410fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
411where
412    D: Deserializer<'de>,
413{
414    let s: String = Deserialize::deserialize(deserializer)?;
415    let mut column_ids = Vec::new();
416    if s.is_empty() {
417        return Ok(column_ids);
418    }
419    for item in s.split(',') {
420        let column_id = item.parse().map_err(D::Error::custom)?;
421        column_ids.push(column_id);
422    }
423    Ok(column_ids)
424}
425
426fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
427where
428    S: serde::Serializer,
429{
430    let s = column_ids
431        .iter()
432        .map(|id| id.to_string())
433        .collect::<Vec<_>>()
434        .join(",");
435    serializer.serialize_str(&s)
436}
437
438/// Converts the `options` map to a json object.
439///
440/// Replaces "null" strings by `null` json values.
441fn options_map_to_value(options: &HashMap<String, String>) -> Value {
442    let map = options
443        .iter()
444        .map(|(key, value)| {
445            // Only convert the key to lowercase.
446            if value.eq_ignore_ascii_case("null") {
447                (key.clone(), Value::Null)
448            } else {
449                (key.clone(), Value::from(value.clone()))
450            }
451        })
452        .collect();
453    Value::Object(map)
454}
455
456// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we
457// check the type key first.
458/// Validates whether the `options_map` has valid options for specific `enum_tag_key`
459/// and returns `true` if the map contains the enum tag.
460///
461/// Variant options must start with one of `enum_option_prefixes`. If variant options
462/// are provided, the tagged enum type key must also be provided.
463fn validate_enum_options(
464    options_map: &HashMap<String, String>,
465    enum_tag_key: &str,
466    enum_option_prefixes: &[&str],
467) -> Result<bool> {
468    let mut has_enum_options = false;
469    let mut has_tag = false;
470    for key in options_map.keys() {
471        if key == enum_tag_key {
472            has_tag = true;
473        } else if !has_enum_options
474            && enum_option_prefixes
475                .iter()
476                .any(|prefix| key.starts_with(prefix))
477        {
478            has_enum_options = true;
479        }
480
481        if has_tag && has_enum_options {
482            break;
483        }
484    }
485
486    // If tag is not provided, then other options for the enum should not exist.
487    ensure!(
488        has_tag || !has_enum_options,
489        InvalidRegionOptionsSnafu {
490            reason: format!("missing key {} in options", enum_tag_key),
491        }
492    );
493
494    Ok(has_tag)
495}
496
497#[cfg(test)]
498mod tests {
499    use common_error::ext::ErrorExt;
500    use common_error::status_code::StatusCode;
501    use common_wal::options::KafkaWalOptions;
502
503    use super::*;
504
505    fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
506        options
507            .iter()
508            .map(|(k, v)| (k.to_string(), v.to_string()))
509            .collect()
510    }
511
512    #[test]
513    fn test_empty_region_options() {
514        let map = make_map(&[]);
515        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
516        assert_eq!(RegionOptions::default(), options);
517    }
518
519    #[test]
520    fn test_with_ttl() {
521        let map = make_map(&[("ttl", "7d")]);
522        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
523        let expect = RegionOptions {
524            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
525            ..Default::default()
526        };
527        assert_eq!(expect, options);
528    }
529
530    #[test]
531    fn test_with_storage() {
532        let map = make_map(&[("storage", "S3")]);
533        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
534        let expect = RegionOptions {
535            storage: Some("S3".to_string()),
536            ..Default::default()
537        };
538        assert_eq!(expect, options);
539    }
540
541    #[test]
542    fn test_without_compaction_type() {
543        let map = make_map(&[
544            ("compaction.twcs.trigger_file_num", "8"),
545            ("compaction.twcs.time_window", "2h"),
546        ]);
547        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
548        assert_eq!(StatusCode::InvalidArguments, err.status_code());
549    }
550
551    #[test]
552    fn test_with_compaction_type() {
553        let map = make_map(&[
554            ("compaction.twcs.trigger_file_num", "8"),
555            ("compaction.twcs.time_window", "2h"),
556            ("compaction.type", "twcs"),
557        ]);
558        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
559        let expect = RegionOptions {
560            compaction: CompactionOptions::Twcs(TwcsOptions {
561                trigger_file_num: 8,
562                time_window: Some(Duration::from_secs(3600 * 2)),
563                ..Default::default()
564            }),
565            compaction_override: true,
566            ..Default::default()
567        };
568        assert_eq!(expect, options);
569    }
570
571    #[test]
572    fn test_with_compaction_override_true_without_compaction_type() {
573        let map = make_map(&[(COMPACTION_OVERRIDE, "true")]);
574        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
575        let expect = RegionOptions {
576            compaction_override: true,
577            ..Default::default()
578        };
579        assert_eq!(expect, options);
580    }
581
582    #[test]
583    fn test_with_compaction_override_false_without_compaction_type() {
584        let map = make_map(&[(COMPACTION_OVERRIDE, "false")]);
585        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
586        assert_eq!(RegionOptions::default(), options);
587    }
588
589    #[test]
590    fn test_compaction_twcs_options_still_require_compaction_type_with_override() {
591        let map = make_map(&[
592            (COMPACTION_OVERRIDE, "true"),
593            ("compaction.twcs.time_window", "2h"),
594        ]);
595        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
596        assert_eq!(StatusCode::InvalidArguments, err.status_code());
597    }
598
599    fn test_with_wal_options(wal_options: &WalOptions) -> bool {
600        let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
601        let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
602        let got = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
603        let expect = RegionOptions {
604            wal_options: wal_options.clone(),
605            ..Default::default()
606        };
607        expect == got
608    }
609
610    #[test]
611    fn test_with_index() {
612        let map = make_map(&[
613            ("index.inverted_index.ignore_column_ids", "1,2,3"),
614            ("index.inverted_index.segment_row_count", "512"),
615        ]);
616        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
617        let expect = RegionOptions {
618            index_options: IndexOptions {
619                inverted_index: InvertedIndexOptions {
620                    ignore_column_ids: vec![1, 2, 3],
621                    segment_row_count: 512,
622                },
623            },
624            ..Default::default()
625        };
626        assert_eq!(expect, options);
627    }
628
629    // No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
630    #[test]
631    fn test_with_any_wal_options() {
632        let all_wal_options = [
633            WalOptions::RaftEngine,
634            WalOptions::Kafka(KafkaWalOptions {
635                topic: "test_topic".to_string(),
636            }),
637        ];
638        all_wal_options.iter().all(test_with_wal_options);
639    }
640
641    #[test]
642    fn test_with_memtable() {
643        let map = make_map(&[("memtable.type", "time_series")]);
644        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
645        let expect = RegionOptions {
646            memtable: Some(MemtableOptions::TimeSeries),
647            ..Default::default()
648        };
649        assert_eq!(expect, options);
650
651        let map = make_map(&[("memtable.type", "bulk")]);
652        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
653        let expect = RegionOptions {
654            memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
655            sst_format: Some(FormatType::Flat),
656            ..Default::default()
657        };
658        assert_eq!(expect, options);
659
660        let map = make_map(&[
661            ("memtable.type", "bulk"),
662            ("memtable.bulk.merge_threshold", "7"),
663            ("memtable.bulk.encode_row_threshold", "11"),
664            ("memtable.bulk.encode_bytes_threshold", "13"),
665            ("memtable.bulk.max_merge_groups", "17"),
666        ]);
667        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
668        let expect = RegionOptions {
669            memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig {
670                merge_threshold: 7,
671                encode_row_threshold: 11,
672                encode_bytes_threshold: 13,
673                max_merge_groups: 17,
674            })),
675            sst_format: Some(FormatType::Flat),
676            ..Default::default()
677        };
678        assert_eq!(expect, options);
679
680        // Legacy partition_tree memtable falls back to the default memtable and
681        // overrides the SST format to flat.
682        let map = make_map(&[("memtable.type", "partition_tree")]);
683        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
684        let expect = RegionOptions {
685            memtable: None,
686            sst_format: Some(FormatType::Flat),
687            ..Default::default()
688        };
689        assert_eq!(expect, options);
690
691        // Legacy partition_tree options are tolerated alongside the type tag.
692        let map = make_map(&[
693            ("memtable.type", "partition_tree"),
694            ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
695            ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
696        ]);
697        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
698        let expect = RegionOptions {
699            memtable: None,
700            sst_format: Some(FormatType::Flat),
701            ..Default::default()
702        };
703        assert_eq!(expect, options);
704    }
705
706    #[test]
707    fn test_primary_key_encoding() {
708        // New top-level key.
709        let map = make_map(&[("primary_key_encoding", "sparse")]);
710        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
711        assert_eq!(options.primary_key_encoding(), PrimaryKeyEncoding::Sparse);
712        assert_eq!(
713            options.primary_key_encoding,
714            Some(PrimaryKeyEncoding::Sparse)
715        );
716
717        // Legacy memtable.type=partition_tree + legacy encoding.
718        let map = make_map(&[
719            ("memtable.type", "partition_tree"),
720            ("memtable.partition_tree.primary_key_encoding", "sparse"),
721        ]);
722        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
723        assert_eq!(options.memtable, None);
724        assert_eq!(options.sst_format, Some(FormatType::Flat));
725        assert_eq!(options.primary_key_encoding(), PrimaryKeyEncoding::Sparse);
726
727        // Invalid value rejected.
728        let map = make_map(&[("primary_key_encoding", "bogus")]);
729        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
730        assert_eq!(StatusCode::InvalidArguments, err.status_code());
731    }
732
733    #[test]
734    fn test_legacy_partition_tree_overrides_sst_format() {
735        // Legacy partition_tree memtable falls back to the default memtable and
736        // overrides the SST format to flat, even when a different format was set.
737        let map = make_map(&[
738            ("memtable.type", "partition_tree"),
739            ("sst_format", "primary_key"),
740        ]);
741        let options = RegionOptions::try_from_options(RegionId::new(1, 1), &map).unwrap();
742        assert_eq!(options.memtable, None);
743        assert_eq!(options.sst_format, Some(FormatType::Flat));
744    }
745
746    #[test]
747    fn test_bulk_memtable_overrides_sst_format() {
748        // Bulk memtable produces flat-encoded ranges, so an explicit
749        // `sst_format=primary_key` must be overridden to flat to keep the
750        // in-memory and on-disk encodings in sync.
751        let map = make_map(&[("memtable.type", "bulk"), ("sst_format", "primary_key")]);
752        let options = RegionOptions::try_from_options(RegionId::new(1, 1), &map).unwrap();
753        assert_eq!(
754            options.memtable,
755            Some(MemtableOptions::Bulk(BulkMemtableConfig::default()))
756        );
757        assert_eq!(options.sst_format, Some(FormatType::Flat));
758    }
759
760    #[test]
761    fn test_unknown_memtable_type() {
762        let map = make_map(&[("memtable.type", "no_such_memtable")]);
763        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
764        assert_eq!(StatusCode::InvalidArguments, err.status_code());
765    }
766
767    #[test]
768    fn test_without_memtable_type() {
769        let map = make_map(&[("memtable.partition_tree.index_max_keys_per_shard", "2048")]);
770        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
771        assert_eq!(StatusCode::InvalidArguments, err.status_code());
772
773        let map = make_map(&[("memtable.bulk.merge_threshold", "7")]);
774        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
775        assert_eq!(StatusCode::InvalidArguments, err.status_code());
776    }
777
778    #[test]
779    fn test_with_merge_mode() {
780        let map = make_map(&[("merge_mode", "last_row")]);
781        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
782        assert_eq!(MergeMode::LastRow, options.merge_mode());
783
784        let map = make_map(&[("merge_mode", "last_non_null")]);
785        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
786        assert_eq!(MergeMode::LastNonNull, options.merge_mode());
787
788        let map = make_map(&[("merge_mode", "unknown")]);
789        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
790        assert_eq!(StatusCode::InvalidArguments, err.status_code());
791    }
792
793    #[test]
794    fn test_append_mode_allows_last_row_merge_mode() {
795        let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_row")]);
796        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
797        assert!(options.append_mode);
798        assert_eq!(MergeMode::LastRow, options.merge_mode());
799
800        let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_non_null")]);
801        let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
802        assert_eq!(StatusCode::InvalidArguments, err.status_code());
803    }
804
805    #[test]
806    fn test_with_all() {
807        let wal_options = WalOptions::Kafka(KafkaWalOptions {
808            topic: "test_topic".to_string(),
809        });
810        let map = make_map(&[
811            ("ttl", "7d"),
812            ("compaction.twcs.trigger_file_num", "8"),
813            ("compaction.twcs.max_output_file_size", "1GB"),
814            ("compaction.twcs.time_window", "2h"),
815            ("compaction.type", "twcs"),
816            ("compaction.twcs.remote_compaction", "false"),
817            ("compaction.twcs.fallback_to_local", "true"),
818            ("storage", "S3"),
819            ("append_mode", "false"),
820            ("index.inverted_index.ignore_column_ids", "1,2,3"),
821            ("index.inverted_index.segment_row_count", "512"),
822            (
823                WAL_OPTIONS_KEY,
824                &serde_json::to_string(&wal_options).unwrap(),
825            ),
826            ("memtable.type", "bulk"),
827            ("memtable.bulk.merge_threshold", "7"),
828            ("memtable.bulk.encode_row_threshold", "11"),
829            ("memtable.bulk.encode_bytes_threshold", "13"),
830            ("memtable.bulk.max_merge_groups", "17"),
831            ("merge_mode", "last_non_null"),
832        ]);
833        let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
834        let expect = RegionOptions {
835            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
836            compaction: CompactionOptions::Twcs(TwcsOptions {
837                trigger_file_num: 8,
838                time_window: Some(Duration::from_secs(3600 * 2)),
839                max_output_file_size: Some(ReadableSize::gb(1)),
840                remote_compaction: false,
841                fallback_to_local: true,
842            }),
843            compaction_override: true,
844            storage: Some("S3".to_string()),
845            append_mode: false,
846            wal_options,
847            index_options: IndexOptions {
848                inverted_index: InvertedIndexOptions {
849                    ignore_column_ids: vec![1, 2, 3],
850                    segment_row_count: 512,
851                },
852            },
853            memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig {
854                merge_threshold: 7,
855                encode_row_threshold: 11,
856                encode_bytes_threshold: 13,
857                max_merge_groups: 17,
858            })),
859            merge_mode: Some(MergeMode::LastNonNull),
860            sst_format: Some(FormatType::Flat),
861            primary_key_encoding: None,
862        };
863        assert_eq!(expect, options);
864    }
865
866    #[test]
867    fn test_region_options_serde() {
868        let options = RegionOptions {
869            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
870            compaction: CompactionOptions::Twcs(TwcsOptions {
871                trigger_file_num: 8,
872                time_window: Some(Duration::from_secs(3600 * 2)),
873                max_output_file_size: None,
874                remote_compaction: false,
875                fallback_to_local: true,
876            }),
877            compaction_override: false,
878            storage: Some("S3".to_string()),
879            append_mode: false,
880            wal_options: WalOptions::Kafka(KafkaWalOptions {
881                topic: "test_topic".to_string(),
882            }),
883            index_options: IndexOptions {
884                inverted_index: InvertedIndexOptions {
885                    ignore_column_ids: vec![1, 2, 3],
886                    segment_row_count: 512,
887                },
888            },
889            memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
890            merge_mode: Some(MergeMode::LastNonNull),
891            sst_format: None,
892            primary_key_encoding: None,
893        };
894        let region_options_json_str = serde_json::to_string(&options).unwrap();
895        let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
896        assert_eq!(options, got);
897    }
898
899    #[test]
900    fn test_region_options_str_serde() {
901        // Notes: use empty string for `ignore_column_ids` to test the empty string case.
902        let region_options_json_str = r#"{
903  "ttl": "7days",
904  "compaction": {
905    "compaction.type": "twcs",
906    "compaction.twcs.trigger_file_num": "8",
907    "compaction.twcs.max_output_file_size": "7MB",
908    "compaction.twcs.time_window": "2h"
909  },
910  "storage": "S3",
911  "append_mode": false,
912  "wal_options": {
913    "wal.provider": "kafka",
914    "wal.kafka.topic": "test_topic"
915  },
916  "index_options": {
917    "index.inverted_index.ignore_column_ids": "",
918    "index.inverted_index.segment_row_count": "512"
919  },
920  "memtable": {
921    "memtable.type": "bulk"
922  },
923  "merge_mode": "last_non_null"
924}"#;
925        let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
926        let options = RegionOptions {
927            ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
928            compaction: CompactionOptions::Twcs(TwcsOptions {
929                trigger_file_num: 8,
930                time_window: Some(Duration::from_secs(3600 * 2)),
931                max_output_file_size: Some(ReadableSize::mb(7)),
932                remote_compaction: false,
933                fallback_to_local: true,
934            }),
935            compaction_override: false,
936            storage: Some("S3".to_string()),
937            append_mode: false,
938            wal_options: WalOptions::Kafka(KafkaWalOptions {
939                topic: "test_topic".to_string(),
940            }),
941            index_options: IndexOptions {
942                inverted_index: InvertedIndexOptions {
943                    ignore_column_ids: vec![],
944                    segment_row_count: 512,
945                },
946            },
947            memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
948            merge_mode: Some(MergeMode::LastNonNull),
949            sst_format: None,
950            primary_key_encoding: None,
951        };
952        assert_eq!(options, got);
953    }
954}