1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_telemetry::info;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metric_engine_consts::{
33 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, PRIMARY_KEY_ENCODING,
34};
35use store_api::mito_engine_options::COMPACTION_OVERRIDE;
36use store_api::storage::{ColumnId, RegionId};
37use strum::EnumString;
38
39use crate::error::{InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
40use crate::memtable::bulk::BulkMemtableConfig;
41use crate::sst::FormatType;
42
43const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
44const COMPACTION_TWCS_PREFIX: &str = "compaction.twcs.";
45const MEMTABLE_PARTITION_TREE_PREFIX: &str = "memtable.partition_tree.";
46const MEMTABLE_BULK_PREFIX: &str = "memtable.bulk.";
47
48const LEGACY_PARTITION_TREE_MEMTABLE_TYPE: &str = "partition_tree";
52
53pub(crate) fn parse_wal_options(
54 options_map: &HashMap<String, String>,
55) -> std::result::Result<WalOptions, serde_json::Error> {
56 options_map
57 .get(WAL_OPTIONS_KEY)
58 .map_or(Ok(WalOptions::default()), |encoded_wal_options| {
59 serde_json::from_str(encoded_wal_options)
60 })
61}
62
63#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)]
65#[serde(rename_all = "snake_case")]
66#[strum(serialize_all = "snake_case")]
67pub enum MergeMode {
68 #[default]
70 LastRow,
71 LastNonNull,
73}
74
75#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(default)]
82pub struct RegionOptions {
83 pub ttl: Option<TimeToLive>,
85 pub compaction: CompactionOptions,
87 pub compaction_override: bool,
88 pub storage: Option<String>,
90 pub append_mode: bool,
92 pub wal_options: WalOptions,
94 pub index_options: IndexOptions,
96 pub memtable: Option<MemtableOptions>,
98 pub merge_mode: Option<MergeMode>,
101 pub sst_format: Option<FormatType>,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub primary_key_encoding: Option<PrimaryKeyEncoding>,
106}
107
108impl RegionOptions {
109 pub fn validate(&self) -> Result<()> {
111 if self.append_mode {
112 ensure!(
113 self.merge_mode
114 .is_none_or(|mode| mode == MergeMode::LastRow),
115 InvalidRegionOptionsSnafu {
116 reason: "only last_row merge_mode is allowed when append_mode is enabled",
117 }
118 );
119 }
120 Ok(())
121 }
122
123 pub fn need_dedup(&self) -> bool {
125 !self.append_mode
126 }
127
128 pub fn merge_mode(&self) -> MergeMode {
130 self.merge_mode.unwrap_or_default()
131 }
132
133 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
135 self.primary_key_encoding.unwrap_or_default()
136 }
137}
138
139impl RegionOptions {
140 pub fn try_from_options(
142 region_id: RegionId,
143 options_map: &HashMap<String, String>,
144 ) -> Result<Self> {
145 let value = options_map_to_value(options_map);
146 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
147
148 let options: RegionOptionsWithoutEnum =
152 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
153 let has_compaction_type =
154 validate_enum_options(options_map, "compaction.type", &[COMPACTION_TWCS_PREFIX])?;
155 let compaction = if has_compaction_type {
156 serde_json::from_str(&json).context(JsonOptionsSnafu)?
157 } else {
158 CompactionOptions::default()
159 };
160
161 let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
162
163 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
164 let is_legacy_partition_tree = options_map
165 .get("memtable.type")
166 .map(|s| s.eq_ignore_ascii_case(LEGACY_PARTITION_TREE_MEMTABLE_TYPE))
167 .unwrap_or(false);
168 let memtable = if validate_enum_options(
169 options_map,
170 "memtable.type",
171 &[MEMTABLE_PARTITION_TREE_PREFIX, MEMTABLE_BULK_PREFIX],
172 )? {
173 if is_legacy_partition_tree {
174 None
178 } else {
179 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
180 }
181 } else {
182 None
183 };
184
185 let mut sst_format = options.sst_format;
188 if is_legacy_partition_tree {
189 info!(
190 "Region {} specified the removed partition_tree memtable; \
191 overriding memtable to the default and SST format to flat",
192 region_id
193 );
194 sst_format = Some(FormatType::Flat);
195 }
196
197 if matches!(memtable, Some(MemtableOptions::Bulk(_))) {
200 if let Some(format) = sst_format
201 && format != FormatType::Flat
202 {
203 info!(
204 "Region {} uses bulk memtable; overriding sst_format from {:?} to flat",
205 region_id, format
206 );
207 }
208 sst_format = Some(FormatType::Flat);
209 }
210
211 let compaction_override_flag = options_map
212 .get(COMPACTION_OVERRIDE)
213 .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
214 .unwrap_or(false);
215 let compaction_override = has_compaction_type || compaction_override_flag;
216 let primary_key_encoding = options_map
217 .get(PRIMARY_KEY_ENCODING)
218 .or_else(|| options_map.get(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING))
219 .map(|v| match v.to_lowercase().as_str() {
220 "dense" => Ok(PrimaryKeyEncoding::Dense),
221 "sparse" => Ok(PrimaryKeyEncoding::Sparse),
222 _ => Err(InvalidRegionOptionsSnafu {
223 reason: format!("Invalid primary key encoding: {v}"),
224 }
225 .build()),
226 })
227 .transpose()?;
228
229 let opts = RegionOptions {
230 ttl: options.ttl,
231 compaction,
232 compaction_override,
233 storage: options.storage,
234 append_mode: options.append_mode,
235 wal_options,
236 index_options,
237 memtable,
238 merge_mode: options.merge_mode,
239 sst_format,
240 primary_key_encoding,
241 };
242 opts.validate()?;
243
244 Ok(opts)
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "compaction.type")]
251#[serde(rename_all = "snake_case")]
252pub enum CompactionOptions {
253 #[serde(with = "prefix_twcs")]
255 Twcs(TwcsOptions),
256}
257
258impl CompactionOptions {
259 pub(crate) fn time_window(&self) -> Option<Duration> {
260 match self {
261 CompactionOptions::Twcs(opts) => opts.time_window,
262 }
263 }
264
265 pub(crate) fn remote_compaction(&self) -> bool {
266 match self {
267 CompactionOptions::Twcs(opts) => opts.remote_compaction,
268 }
269 }
270
271 pub(crate) fn fallback_to_local(&self) -> bool {
272 match self {
273 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
274 }
275 }
276}
277
278impl Default for CompactionOptions {
279 fn default() -> Self {
280 Self::Twcs(TwcsOptions::default())
281 }
282}
283
284#[serde_as]
286#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(default)]
288pub struct TwcsOptions {
289 #[serde_as(as = "DisplayFromStr")]
291 pub trigger_file_num: usize,
292 #[serde(with = "humantime_serde")]
294 pub time_window: Option<Duration>,
295 pub max_output_file_size: Option<ReadableSize>,
297 #[serde_as(as = "DisplayFromStr")]
299 pub remote_compaction: bool,
300 #[serde_as(as = "DisplayFromStr")]
302 pub fallback_to_local: bool,
303}
304
305with_prefix!(prefix_twcs "compaction.twcs.");
306
307impl TwcsOptions {
308 pub fn time_window_seconds(&self) -> Option<i64> {
310 self.time_window.and_then(|window| {
311 let window_secs = window.as_secs();
312 if window_secs == 0 {
313 None
314 } else {
315 window_secs.try_into().ok()
316 }
317 })
318 }
319}
320
321impl Default for TwcsOptions {
322 fn default() -> Self {
323 Self {
324 trigger_file_num: 4,
325 time_window: None,
326 max_output_file_size: Some(ReadableSize::mb(512)),
327 remote_compaction: false,
328 fallback_to_local: true,
329 }
330 }
331}
332
333#[serde_as]
336#[derive(Debug, Deserialize)]
337#[serde(default)]
338struct RegionOptionsWithoutEnum {
339 ttl: Option<TimeToLive>,
341 storage: Option<String>,
342 #[serde_as(as = "DisplayFromStr")]
343 append_mode: bool,
344 #[serde_as(as = "NoneAsEmptyString")]
345 merge_mode: Option<MergeMode>,
346 #[serde_as(as = "NoneAsEmptyString")]
347 sst_format: Option<FormatType>,
348}
349
350impl Default for RegionOptionsWithoutEnum {
351 fn default() -> Self {
352 let options = RegionOptions::default();
353 RegionOptionsWithoutEnum {
354 ttl: options.ttl,
355 storage: options.storage,
356 append_mode: options.append_mode,
357 merge_mode: options.merge_mode,
358 sst_format: options.sst_format,
359 }
360 }
361}
362
363with_prefix!(prefix_inverted_index "index.inverted_index.");
364
365#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
367#[serde(default)]
368pub struct IndexOptions {
369 #[serde(flatten, with = "prefix_inverted_index")]
371 pub inverted_index: InvertedIndexOptions,
372}
373
374#[serde_as]
376#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
377#[serde(default)]
378pub struct InvertedIndexOptions {
379 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
382 #[serde(serialize_with = "serialize_ignore_column_ids")]
383 pub ignore_column_ids: Vec<ColumnId>,
384
385 #[serde_as(as = "DisplayFromStr")]
387 pub segment_row_count: usize,
388}
389
390impl Default for InvertedIndexOptions {
391 fn default() -> Self {
392 Self {
393 ignore_column_ids: Vec::new(),
394 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
395 }
396 }
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
401#[serde(tag = "memtable.type", rename_all = "snake_case")]
402pub enum MemtableOptions {
403 TimeSeries,
404 #[serde(with = "prefix_bulk")]
405 Bulk(BulkMemtableConfig),
406}
407
408with_prefix!(prefix_bulk "memtable.bulk.");
409
410fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
411where
412 D: Deserializer<'de>,
413{
414 let s: String = Deserialize::deserialize(deserializer)?;
415 let mut column_ids = Vec::new();
416 if s.is_empty() {
417 return Ok(column_ids);
418 }
419 for item in s.split(',') {
420 let column_id = item.parse().map_err(D::Error::custom)?;
421 column_ids.push(column_id);
422 }
423 Ok(column_ids)
424}
425
426fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
427where
428 S: serde::Serializer,
429{
430 let s = column_ids
431 .iter()
432 .map(|id| id.to_string())
433 .collect::<Vec<_>>()
434 .join(",");
435 serializer.serialize_str(&s)
436}
437
438fn options_map_to_value(options: &HashMap<String, String>) -> Value {
442 let map = options
443 .iter()
444 .map(|(key, value)| {
445 if value.eq_ignore_ascii_case("null") {
447 (key.clone(), Value::Null)
448 } else {
449 (key.clone(), Value::from(value.clone()))
450 }
451 })
452 .collect();
453 Value::Object(map)
454}
455
456fn validate_enum_options(
464 options_map: &HashMap<String, String>,
465 enum_tag_key: &str,
466 enum_option_prefixes: &[&str],
467) -> Result<bool> {
468 let mut has_enum_options = false;
469 let mut has_tag = false;
470 for key in options_map.keys() {
471 if key == enum_tag_key {
472 has_tag = true;
473 } else if !has_enum_options
474 && enum_option_prefixes
475 .iter()
476 .any(|prefix| key.starts_with(prefix))
477 {
478 has_enum_options = true;
479 }
480
481 if has_tag && has_enum_options {
482 break;
483 }
484 }
485
486 ensure!(
488 has_tag || !has_enum_options,
489 InvalidRegionOptionsSnafu {
490 reason: format!("missing key {} in options", enum_tag_key),
491 }
492 );
493
494 Ok(has_tag)
495}
496
497#[cfg(test)]
498mod tests {
499 use common_error::ext::ErrorExt;
500 use common_error::status_code::StatusCode;
501 use common_wal::options::KafkaWalOptions;
502
503 use super::*;
504
505 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
506 options
507 .iter()
508 .map(|(k, v)| (k.to_string(), v.to_string()))
509 .collect()
510 }
511
512 #[test]
513 fn test_empty_region_options() {
514 let map = make_map(&[]);
515 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
516 assert_eq!(RegionOptions::default(), options);
517 }
518
519 #[test]
520 fn test_with_ttl() {
521 let map = make_map(&[("ttl", "7d")]);
522 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
523 let expect = RegionOptions {
524 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
525 ..Default::default()
526 };
527 assert_eq!(expect, options);
528 }
529
530 #[test]
531 fn test_with_storage() {
532 let map = make_map(&[("storage", "S3")]);
533 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
534 let expect = RegionOptions {
535 storage: Some("S3".to_string()),
536 ..Default::default()
537 };
538 assert_eq!(expect, options);
539 }
540
541 #[test]
542 fn test_without_compaction_type() {
543 let map = make_map(&[
544 ("compaction.twcs.trigger_file_num", "8"),
545 ("compaction.twcs.time_window", "2h"),
546 ]);
547 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
548 assert_eq!(StatusCode::InvalidArguments, err.status_code());
549 }
550
551 #[test]
552 fn test_with_compaction_type() {
553 let map = make_map(&[
554 ("compaction.twcs.trigger_file_num", "8"),
555 ("compaction.twcs.time_window", "2h"),
556 ("compaction.type", "twcs"),
557 ]);
558 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
559 let expect = RegionOptions {
560 compaction: CompactionOptions::Twcs(TwcsOptions {
561 trigger_file_num: 8,
562 time_window: Some(Duration::from_secs(3600 * 2)),
563 ..Default::default()
564 }),
565 compaction_override: true,
566 ..Default::default()
567 };
568 assert_eq!(expect, options);
569 }
570
571 #[test]
572 fn test_with_compaction_override_true_without_compaction_type() {
573 let map = make_map(&[(COMPACTION_OVERRIDE, "true")]);
574 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
575 let expect = RegionOptions {
576 compaction_override: true,
577 ..Default::default()
578 };
579 assert_eq!(expect, options);
580 }
581
582 #[test]
583 fn test_with_compaction_override_false_without_compaction_type() {
584 let map = make_map(&[(COMPACTION_OVERRIDE, "false")]);
585 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
586 assert_eq!(RegionOptions::default(), options);
587 }
588
589 #[test]
590 fn test_compaction_twcs_options_still_require_compaction_type_with_override() {
591 let map = make_map(&[
592 (COMPACTION_OVERRIDE, "true"),
593 ("compaction.twcs.time_window", "2h"),
594 ]);
595 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
596 assert_eq!(StatusCode::InvalidArguments, err.status_code());
597 }
598
599 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
600 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
601 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
602 let got = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
603 let expect = RegionOptions {
604 wal_options: wal_options.clone(),
605 ..Default::default()
606 };
607 expect == got
608 }
609
610 #[test]
611 fn test_with_index() {
612 let map = make_map(&[
613 ("index.inverted_index.ignore_column_ids", "1,2,3"),
614 ("index.inverted_index.segment_row_count", "512"),
615 ]);
616 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
617 let expect = RegionOptions {
618 index_options: IndexOptions {
619 inverted_index: InvertedIndexOptions {
620 ignore_column_ids: vec![1, 2, 3],
621 segment_row_count: 512,
622 },
623 },
624 ..Default::default()
625 };
626 assert_eq!(expect, options);
627 }
628
629 #[test]
631 fn test_with_any_wal_options() {
632 let all_wal_options = [
633 WalOptions::RaftEngine,
634 WalOptions::Kafka(KafkaWalOptions::new("test_topic".to_string())),
635 ];
636 all_wal_options.iter().all(test_with_wal_options);
637 }
638
639 #[test]
640 fn test_with_memtable() {
641 let map = make_map(&[("memtable.type", "time_series")]);
642 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
643 let expect = RegionOptions {
644 memtable: Some(MemtableOptions::TimeSeries),
645 ..Default::default()
646 };
647 assert_eq!(expect, options);
648
649 let map = make_map(&[("memtable.type", "bulk")]);
650 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
651 let expect = RegionOptions {
652 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
653 sst_format: Some(FormatType::Flat),
654 ..Default::default()
655 };
656 assert_eq!(expect, options);
657
658 let map = make_map(&[
659 ("memtable.type", "bulk"),
660 ("memtable.bulk.merge_threshold", "7"),
661 ("memtable.bulk.encode_row_threshold", "11"),
662 ("memtable.bulk.encode_bytes_threshold", "13"),
663 ("memtable.bulk.max_merge_groups", "17"),
664 ]);
665 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
666 let expect = RegionOptions {
667 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig {
668 merge_threshold: 7,
669 encode_row_threshold: 11,
670 encode_bytes_threshold: 13,
671 max_merge_groups: 17,
672 })),
673 sst_format: Some(FormatType::Flat),
674 ..Default::default()
675 };
676 assert_eq!(expect, options);
677
678 let map = make_map(&[("memtable.type", "partition_tree")]);
681 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
682 let expect = RegionOptions {
683 memtable: None,
684 sst_format: Some(FormatType::Flat),
685 ..Default::default()
686 };
687 assert_eq!(expect, options);
688
689 let map = make_map(&[
691 ("memtable.type", "partition_tree"),
692 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
693 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
694 ]);
695 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
696 let expect = RegionOptions {
697 memtable: None,
698 sst_format: Some(FormatType::Flat),
699 ..Default::default()
700 };
701 assert_eq!(expect, options);
702 }
703
704 #[test]
705 fn test_primary_key_encoding() {
706 let map = make_map(&[("primary_key_encoding", "sparse")]);
708 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
709 assert_eq!(options.primary_key_encoding(), PrimaryKeyEncoding::Sparse);
710 assert_eq!(
711 options.primary_key_encoding,
712 Some(PrimaryKeyEncoding::Sparse)
713 );
714
715 let map = make_map(&[
717 ("memtable.type", "partition_tree"),
718 ("memtable.partition_tree.primary_key_encoding", "sparse"),
719 ]);
720 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
721 assert_eq!(options.memtable, None);
722 assert_eq!(options.sst_format, Some(FormatType::Flat));
723 assert_eq!(options.primary_key_encoding(), PrimaryKeyEncoding::Sparse);
724
725 let map = make_map(&[("primary_key_encoding", "bogus")]);
727 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
728 assert_eq!(StatusCode::InvalidArguments, err.status_code());
729 }
730
731 #[test]
732 fn test_legacy_partition_tree_overrides_sst_format() {
733 let map = make_map(&[
736 ("memtable.type", "partition_tree"),
737 ("sst_format", "primary_key"),
738 ]);
739 let options = RegionOptions::try_from_options(RegionId::new(1, 1), &map).unwrap();
740 assert_eq!(options.memtable, None);
741 assert_eq!(options.sst_format, Some(FormatType::Flat));
742 }
743
744 #[test]
745 fn test_bulk_memtable_overrides_sst_format() {
746 let map = make_map(&[("memtable.type", "bulk"), ("sst_format", "primary_key")]);
750 let options = RegionOptions::try_from_options(RegionId::new(1, 1), &map).unwrap();
751 assert_eq!(
752 options.memtable,
753 Some(MemtableOptions::Bulk(BulkMemtableConfig::default()))
754 );
755 assert_eq!(options.sst_format, Some(FormatType::Flat));
756 }
757
758 #[test]
759 fn test_unknown_memtable_type() {
760 let map = make_map(&[("memtable.type", "no_such_memtable")]);
761 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
762 assert_eq!(StatusCode::InvalidArguments, err.status_code());
763 }
764
765 #[test]
766 fn test_without_memtable_type() {
767 let map = make_map(&[("memtable.partition_tree.index_max_keys_per_shard", "2048")]);
768 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
769 assert_eq!(StatusCode::InvalidArguments, err.status_code());
770
771 let map = make_map(&[("memtable.bulk.merge_threshold", "7")]);
772 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
773 assert_eq!(StatusCode::InvalidArguments, err.status_code());
774 }
775
776 #[test]
777 fn test_with_merge_mode() {
778 let map = make_map(&[("merge_mode", "last_row")]);
779 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
780 assert_eq!(MergeMode::LastRow, options.merge_mode());
781
782 let map = make_map(&[("merge_mode", "last_non_null")]);
783 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
784 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
785
786 let map = make_map(&[("merge_mode", "unknown")]);
787 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
788 assert_eq!(StatusCode::InvalidArguments, err.status_code());
789 }
790
791 #[test]
792 fn test_append_mode_allows_last_row_merge_mode() {
793 let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_row")]);
794 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
795 assert!(options.append_mode);
796 assert_eq!(MergeMode::LastRow, options.merge_mode());
797
798 let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_non_null")]);
799 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
800 assert_eq!(StatusCode::InvalidArguments, err.status_code());
801 }
802
803 #[test]
804 fn test_with_all() {
805 let wal_options = WalOptions::Kafka(KafkaWalOptions::new("test_topic".to_string()));
806 let map = make_map(&[
807 ("ttl", "7d"),
808 ("compaction.twcs.trigger_file_num", "8"),
809 ("compaction.twcs.max_output_file_size", "1GB"),
810 ("compaction.twcs.time_window", "2h"),
811 ("compaction.type", "twcs"),
812 ("compaction.twcs.remote_compaction", "false"),
813 ("compaction.twcs.fallback_to_local", "true"),
814 ("storage", "S3"),
815 ("append_mode", "false"),
816 ("index.inverted_index.ignore_column_ids", "1,2,3"),
817 ("index.inverted_index.segment_row_count", "512"),
818 (
819 WAL_OPTIONS_KEY,
820 &serde_json::to_string(&wal_options).unwrap(),
821 ),
822 ("memtable.type", "bulk"),
823 ("memtable.bulk.merge_threshold", "7"),
824 ("memtable.bulk.encode_row_threshold", "11"),
825 ("memtable.bulk.encode_bytes_threshold", "13"),
826 ("memtable.bulk.max_merge_groups", "17"),
827 ("merge_mode", "last_non_null"),
828 ]);
829 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
830 let expect = RegionOptions {
831 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
832 compaction: CompactionOptions::Twcs(TwcsOptions {
833 trigger_file_num: 8,
834 time_window: Some(Duration::from_secs(3600 * 2)),
835 max_output_file_size: Some(ReadableSize::gb(1)),
836 remote_compaction: false,
837 fallback_to_local: true,
838 }),
839 compaction_override: true,
840 storage: Some("S3".to_string()),
841 append_mode: false,
842 wal_options,
843 index_options: IndexOptions {
844 inverted_index: InvertedIndexOptions {
845 ignore_column_ids: vec![1, 2, 3],
846 segment_row_count: 512,
847 },
848 },
849 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig {
850 merge_threshold: 7,
851 encode_row_threshold: 11,
852 encode_bytes_threshold: 13,
853 max_merge_groups: 17,
854 })),
855 merge_mode: Some(MergeMode::LastNonNull),
856 sst_format: Some(FormatType::Flat),
857 primary_key_encoding: None,
858 };
859 assert_eq!(expect, options);
860 }
861
862 #[test]
863 fn test_region_options_serde() {
864 let options = RegionOptions {
865 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
866 compaction: CompactionOptions::Twcs(TwcsOptions {
867 trigger_file_num: 8,
868 time_window: Some(Duration::from_secs(3600 * 2)),
869 max_output_file_size: None,
870 remote_compaction: false,
871 fallback_to_local: true,
872 }),
873 compaction_override: false,
874 storage: Some("S3".to_string()),
875 append_mode: false,
876 wal_options: WalOptions::Kafka(KafkaWalOptions::new("test_topic".to_string())),
877 index_options: IndexOptions {
878 inverted_index: InvertedIndexOptions {
879 ignore_column_ids: vec![1, 2, 3],
880 segment_row_count: 512,
881 },
882 },
883 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
884 merge_mode: Some(MergeMode::LastNonNull),
885 sst_format: None,
886 primary_key_encoding: None,
887 };
888 let region_options_json_str = serde_json::to_string(&options).unwrap();
889 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
890 assert_eq!(options, got);
891 }
892
893 #[test]
894 fn test_region_options_str_serde() {
895 let region_options_json_str = r#"{
897 "ttl": "7days",
898 "compaction": {
899 "compaction.type": "twcs",
900 "compaction.twcs.trigger_file_num": "8",
901 "compaction.twcs.max_output_file_size": "7MB",
902 "compaction.twcs.time_window": "2h"
903 },
904 "storage": "S3",
905 "append_mode": false,
906 "wal_options": {
907 "wal.provider": "kafka",
908 "wal.kafka.topic": "test_topic"
909 },
910 "index_options": {
911 "index.inverted_index.ignore_column_ids": "",
912 "index.inverted_index.segment_row_count": "512"
913 },
914 "memtable": {
915 "memtable.type": "bulk"
916 },
917 "merge_mode": "last_non_null"
918}"#;
919 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
920 let options = RegionOptions {
921 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
922 compaction: CompactionOptions::Twcs(TwcsOptions {
923 trigger_file_num: 8,
924 time_window: Some(Duration::from_secs(3600 * 2)),
925 max_output_file_size: Some(ReadableSize::mb(7)),
926 remote_compaction: false,
927 fallback_to_local: true,
928 }),
929 compaction_override: false,
930 storage: Some("S3".to_string()),
931 append_mode: false,
932 wal_options: WalOptions::Kafka(KafkaWalOptions::new("test_topic".to_string())),
933 index_options: IndexOptions {
934 inverted_index: InvertedIndexOptions {
935 ignore_column_ids: vec![],
936 segment_row_count: 512,
937 },
938 },
939 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
940 merge_mode: Some(MergeMode::LastNonNull),
941 sst_format: None,
942 primary_key_encoding: None,
943 };
944 assert_eq!(options, got);
945 }
946}