1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_telemetry::info;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::metric_engine_consts::{
33 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, PRIMARY_KEY_ENCODING,
34};
35use store_api::mito_engine_options::COMPACTION_OVERRIDE;
36use store_api::storage::{ColumnId, RegionId};
37use strum::EnumString;
38
39use crate::error::{InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
40use crate::memtable::bulk::BulkMemtableConfig;
41use crate::sst::FormatType;
42
43const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
44const COMPACTION_TWCS_PREFIX: &str = "compaction.twcs.";
45const MEMTABLE_PARTITION_TREE_PREFIX: &str = "memtable.partition_tree.";
46const MEMTABLE_BULK_PREFIX: &str = "memtable.bulk.";
47
48const LEGACY_PARTITION_TREE_MEMTABLE_TYPE: &str = "partition_tree";
52
53pub(crate) fn parse_wal_options(
54 options_map: &HashMap<String, String>,
55) -> std::result::Result<WalOptions, serde_json::Error> {
56 options_map
57 .get(WAL_OPTIONS_KEY)
58 .map_or(Ok(WalOptions::default()), |encoded_wal_options| {
59 serde_json::from_str(encoded_wal_options)
60 })
61}
62
63#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)]
65#[serde(rename_all = "snake_case")]
66#[strum(serialize_all = "snake_case")]
67pub enum MergeMode {
68 #[default]
70 LastRow,
71 LastNonNull,
73}
74
75#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(default)]
82pub struct RegionOptions {
83 pub ttl: Option<TimeToLive>,
85 pub compaction: CompactionOptions,
87 pub compaction_override: bool,
88 pub storage: Option<String>,
90 pub append_mode: bool,
92 pub wal_options: WalOptions,
94 pub index_options: IndexOptions,
96 pub memtable: Option<MemtableOptions>,
98 pub merge_mode: Option<MergeMode>,
101 pub sst_format: Option<FormatType>,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub primary_key_encoding: Option<PrimaryKeyEncoding>,
106}
107
108impl RegionOptions {
109 pub fn validate(&self) -> Result<()> {
111 if self.append_mode {
112 ensure!(
113 self.merge_mode
114 .is_none_or(|mode| mode == MergeMode::LastRow),
115 InvalidRegionOptionsSnafu {
116 reason: "only last_row merge_mode is allowed when append_mode is enabled",
117 }
118 );
119 }
120 Ok(())
121 }
122
123 pub fn need_dedup(&self) -> bool {
125 !self.append_mode
126 }
127
128 pub fn merge_mode(&self) -> MergeMode {
130 self.merge_mode.unwrap_or_default()
131 }
132
133 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
135 self.primary_key_encoding.unwrap_or_default()
136 }
137}
138
139impl RegionOptions {
140 pub fn try_from_options(
142 region_id: RegionId,
143 options_map: &HashMap<String, String>,
144 ) -> Result<Self> {
145 let value = options_map_to_value(options_map);
146 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
147
148 let options: RegionOptionsWithoutEnum =
152 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
153 let has_compaction_type =
154 validate_enum_options(options_map, "compaction.type", &[COMPACTION_TWCS_PREFIX])?;
155 let compaction = if has_compaction_type {
156 serde_json::from_str(&json).context(JsonOptionsSnafu)?
157 } else {
158 CompactionOptions::default()
159 };
160
161 let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
162
163 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
164 let is_legacy_partition_tree = options_map
165 .get("memtable.type")
166 .map(|s| s.eq_ignore_ascii_case(LEGACY_PARTITION_TREE_MEMTABLE_TYPE))
167 .unwrap_or(false);
168 let memtable = if validate_enum_options(
169 options_map,
170 "memtable.type",
171 &[MEMTABLE_PARTITION_TREE_PREFIX, MEMTABLE_BULK_PREFIX],
172 )? {
173 if is_legacy_partition_tree {
174 None
178 } else {
179 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
180 }
181 } else {
182 None
183 };
184
185 let mut sst_format = options.sst_format;
188 if is_legacy_partition_tree {
189 info!(
190 "Region {} specified the removed partition_tree memtable; \
191 overriding memtable to the default and SST format to flat",
192 region_id
193 );
194 sst_format = Some(FormatType::Flat);
195 }
196
197 if matches!(memtable, Some(MemtableOptions::Bulk(_))) {
200 if let Some(format) = sst_format
201 && format != FormatType::Flat
202 {
203 info!(
204 "Region {} uses bulk memtable; overriding sst_format from {:?} to flat",
205 region_id, format
206 );
207 }
208 sst_format = Some(FormatType::Flat);
209 }
210
211 let compaction_override_flag = options_map
212 .get(COMPACTION_OVERRIDE)
213 .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
214 .unwrap_or(false);
215 let compaction_override = has_compaction_type || compaction_override_flag;
216 let primary_key_encoding = options_map
217 .get(PRIMARY_KEY_ENCODING)
218 .or_else(|| options_map.get(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING))
219 .map(|v| match v.to_lowercase().as_str() {
220 "dense" => Ok(PrimaryKeyEncoding::Dense),
221 "sparse" => Ok(PrimaryKeyEncoding::Sparse),
222 _ => Err(InvalidRegionOptionsSnafu {
223 reason: format!("Invalid primary key encoding: {v}"),
224 }
225 .build()),
226 })
227 .transpose()?;
228
229 let opts = RegionOptions {
230 ttl: options.ttl,
231 compaction,
232 compaction_override,
233 storage: options.storage,
234 append_mode: options.append_mode,
235 wal_options,
236 index_options,
237 memtable,
238 merge_mode: options.merge_mode,
239 sst_format,
240 primary_key_encoding,
241 };
242 opts.validate()?;
243
244 Ok(opts)
245 }
246}
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(tag = "compaction.type")]
251#[serde(rename_all = "snake_case")]
252pub enum CompactionOptions {
253 #[serde(with = "prefix_twcs")]
255 Twcs(TwcsOptions),
256}
257
258impl CompactionOptions {
259 pub(crate) fn time_window(&self) -> Option<Duration> {
260 match self {
261 CompactionOptions::Twcs(opts) => opts.time_window,
262 }
263 }
264
265 pub(crate) fn remote_compaction(&self) -> bool {
266 match self {
267 CompactionOptions::Twcs(opts) => opts.remote_compaction,
268 }
269 }
270
271 pub(crate) fn fallback_to_local(&self) -> bool {
272 match self {
273 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
274 }
275 }
276}
277
278impl Default for CompactionOptions {
279 fn default() -> Self {
280 Self::Twcs(TwcsOptions::default())
281 }
282}
283
284#[serde_as]
286#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(default)]
288pub struct TwcsOptions {
289 #[serde_as(as = "DisplayFromStr")]
291 pub trigger_file_num: usize,
292 #[serde(with = "humantime_serde")]
294 pub time_window: Option<Duration>,
295 pub max_output_file_size: Option<ReadableSize>,
297 #[serde_as(as = "DisplayFromStr")]
299 pub remote_compaction: bool,
300 #[serde_as(as = "DisplayFromStr")]
302 pub fallback_to_local: bool,
303}
304
305with_prefix!(prefix_twcs "compaction.twcs.");
306
307impl TwcsOptions {
308 pub fn time_window_seconds(&self) -> Option<i64> {
310 self.time_window.and_then(|window| {
311 let window_secs = window.as_secs();
312 if window_secs == 0 {
313 None
314 } else {
315 window_secs.try_into().ok()
316 }
317 })
318 }
319}
320
321impl Default for TwcsOptions {
322 fn default() -> Self {
323 Self {
324 trigger_file_num: 4,
325 time_window: None,
326 max_output_file_size: Some(ReadableSize::mb(512)),
327 remote_compaction: false,
328 fallback_to_local: true,
329 }
330 }
331}
332
333#[serde_as]
336#[derive(Debug, Deserialize)]
337#[serde(default)]
338struct RegionOptionsWithoutEnum {
339 ttl: Option<TimeToLive>,
341 storage: Option<String>,
342 #[serde_as(as = "DisplayFromStr")]
343 append_mode: bool,
344 #[serde_as(as = "NoneAsEmptyString")]
345 merge_mode: Option<MergeMode>,
346 #[serde_as(as = "NoneAsEmptyString")]
347 sst_format: Option<FormatType>,
348}
349
350impl Default for RegionOptionsWithoutEnum {
351 fn default() -> Self {
352 let options = RegionOptions::default();
353 RegionOptionsWithoutEnum {
354 ttl: options.ttl,
355 storage: options.storage,
356 append_mode: options.append_mode,
357 merge_mode: options.merge_mode,
358 sst_format: options.sst_format,
359 }
360 }
361}
362
363with_prefix!(prefix_inverted_index "index.inverted_index.");
364
365#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
367#[serde(default)]
368pub struct IndexOptions {
369 #[serde(flatten, with = "prefix_inverted_index")]
371 pub inverted_index: InvertedIndexOptions,
372}
373
374#[serde_as]
376#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
377#[serde(default)]
378pub struct InvertedIndexOptions {
379 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
382 #[serde(serialize_with = "serialize_ignore_column_ids")]
383 pub ignore_column_ids: Vec<ColumnId>,
384
385 #[serde_as(as = "DisplayFromStr")]
387 pub segment_row_count: usize,
388}
389
390impl Default for InvertedIndexOptions {
391 fn default() -> Self {
392 Self {
393 ignore_column_ids: Vec::new(),
394 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
395 }
396 }
397}
398
399#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
401#[serde(tag = "memtable.type", rename_all = "snake_case")]
402pub enum MemtableOptions {
403 TimeSeries,
404 #[serde(with = "prefix_bulk")]
405 Bulk(BulkMemtableConfig),
406}
407
408with_prefix!(prefix_bulk "memtable.bulk.");
409
410fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
411where
412 D: Deserializer<'de>,
413{
414 let s: String = Deserialize::deserialize(deserializer)?;
415 let mut column_ids = Vec::new();
416 if s.is_empty() {
417 return Ok(column_ids);
418 }
419 for item in s.split(',') {
420 let column_id = item.parse().map_err(D::Error::custom)?;
421 column_ids.push(column_id);
422 }
423 Ok(column_ids)
424}
425
426fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
427where
428 S: serde::Serializer,
429{
430 let s = column_ids
431 .iter()
432 .map(|id| id.to_string())
433 .collect::<Vec<_>>()
434 .join(",");
435 serializer.serialize_str(&s)
436}
437
438fn options_map_to_value(options: &HashMap<String, String>) -> Value {
442 let map = options
443 .iter()
444 .map(|(key, value)| {
445 if value.eq_ignore_ascii_case("null") {
447 (key.clone(), Value::Null)
448 } else {
449 (key.clone(), Value::from(value.clone()))
450 }
451 })
452 .collect();
453 Value::Object(map)
454}
455
456fn validate_enum_options(
464 options_map: &HashMap<String, String>,
465 enum_tag_key: &str,
466 enum_option_prefixes: &[&str],
467) -> Result<bool> {
468 let mut has_enum_options = false;
469 let mut has_tag = false;
470 for key in options_map.keys() {
471 if key == enum_tag_key {
472 has_tag = true;
473 } else if !has_enum_options
474 && enum_option_prefixes
475 .iter()
476 .any(|prefix| key.starts_with(prefix))
477 {
478 has_enum_options = true;
479 }
480
481 if has_tag && has_enum_options {
482 break;
483 }
484 }
485
486 ensure!(
488 has_tag || !has_enum_options,
489 InvalidRegionOptionsSnafu {
490 reason: format!("missing key {} in options", enum_tag_key),
491 }
492 );
493
494 Ok(has_tag)
495}
496
497#[cfg(test)]
498mod tests {
499 use common_error::ext::ErrorExt;
500 use common_error::status_code::StatusCode;
501 use common_wal::options::KafkaWalOptions;
502
503 use super::*;
504
505 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
506 options
507 .iter()
508 .map(|(k, v)| (k.to_string(), v.to_string()))
509 .collect()
510 }
511
512 #[test]
513 fn test_empty_region_options() {
514 let map = make_map(&[]);
515 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
516 assert_eq!(RegionOptions::default(), options);
517 }
518
519 #[test]
520 fn test_with_ttl() {
521 let map = make_map(&[("ttl", "7d")]);
522 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
523 let expect = RegionOptions {
524 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
525 ..Default::default()
526 };
527 assert_eq!(expect, options);
528 }
529
530 #[test]
531 fn test_with_storage() {
532 let map = make_map(&[("storage", "S3")]);
533 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
534 let expect = RegionOptions {
535 storage: Some("S3".to_string()),
536 ..Default::default()
537 };
538 assert_eq!(expect, options);
539 }
540
541 #[test]
542 fn test_without_compaction_type() {
543 let map = make_map(&[
544 ("compaction.twcs.trigger_file_num", "8"),
545 ("compaction.twcs.time_window", "2h"),
546 ]);
547 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
548 assert_eq!(StatusCode::InvalidArguments, err.status_code());
549 }
550
551 #[test]
552 fn test_with_compaction_type() {
553 let map = make_map(&[
554 ("compaction.twcs.trigger_file_num", "8"),
555 ("compaction.twcs.time_window", "2h"),
556 ("compaction.type", "twcs"),
557 ]);
558 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
559 let expect = RegionOptions {
560 compaction: CompactionOptions::Twcs(TwcsOptions {
561 trigger_file_num: 8,
562 time_window: Some(Duration::from_secs(3600 * 2)),
563 ..Default::default()
564 }),
565 compaction_override: true,
566 ..Default::default()
567 };
568 assert_eq!(expect, options);
569 }
570
571 #[test]
572 fn test_with_compaction_override_true_without_compaction_type() {
573 let map = make_map(&[(COMPACTION_OVERRIDE, "true")]);
574 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
575 let expect = RegionOptions {
576 compaction_override: true,
577 ..Default::default()
578 };
579 assert_eq!(expect, options);
580 }
581
582 #[test]
583 fn test_with_compaction_override_false_without_compaction_type() {
584 let map = make_map(&[(COMPACTION_OVERRIDE, "false")]);
585 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
586 assert_eq!(RegionOptions::default(), options);
587 }
588
589 #[test]
590 fn test_compaction_twcs_options_still_require_compaction_type_with_override() {
591 let map = make_map(&[
592 (COMPACTION_OVERRIDE, "true"),
593 ("compaction.twcs.time_window", "2h"),
594 ]);
595 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
596 assert_eq!(StatusCode::InvalidArguments, err.status_code());
597 }
598
599 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
600 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
601 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
602 let got = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
603 let expect = RegionOptions {
604 wal_options: wal_options.clone(),
605 ..Default::default()
606 };
607 expect == got
608 }
609
610 #[test]
611 fn test_with_index() {
612 let map = make_map(&[
613 ("index.inverted_index.ignore_column_ids", "1,2,3"),
614 ("index.inverted_index.segment_row_count", "512"),
615 ]);
616 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
617 let expect = RegionOptions {
618 index_options: IndexOptions {
619 inverted_index: InvertedIndexOptions {
620 ignore_column_ids: vec![1, 2, 3],
621 segment_row_count: 512,
622 },
623 },
624 ..Default::default()
625 };
626 assert_eq!(expect, options);
627 }
628
629 #[test]
631 fn test_with_any_wal_options() {
632 let all_wal_options = [
633 WalOptions::RaftEngine,
634 WalOptions::Kafka(KafkaWalOptions {
635 topic: "test_topic".to_string(),
636 }),
637 ];
638 all_wal_options.iter().all(test_with_wal_options);
639 }
640
641 #[test]
642 fn test_with_memtable() {
643 let map = make_map(&[("memtable.type", "time_series")]);
644 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
645 let expect = RegionOptions {
646 memtable: Some(MemtableOptions::TimeSeries),
647 ..Default::default()
648 };
649 assert_eq!(expect, options);
650
651 let map = make_map(&[("memtable.type", "bulk")]);
652 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
653 let expect = RegionOptions {
654 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
655 sst_format: Some(FormatType::Flat),
656 ..Default::default()
657 };
658 assert_eq!(expect, options);
659
660 let map = make_map(&[
661 ("memtable.type", "bulk"),
662 ("memtable.bulk.merge_threshold", "7"),
663 ("memtable.bulk.encode_row_threshold", "11"),
664 ("memtable.bulk.encode_bytes_threshold", "13"),
665 ("memtable.bulk.max_merge_groups", "17"),
666 ]);
667 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
668 let expect = RegionOptions {
669 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig {
670 merge_threshold: 7,
671 encode_row_threshold: 11,
672 encode_bytes_threshold: 13,
673 max_merge_groups: 17,
674 })),
675 sst_format: Some(FormatType::Flat),
676 ..Default::default()
677 };
678 assert_eq!(expect, options);
679
680 let map = make_map(&[("memtable.type", "partition_tree")]);
683 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
684 let expect = RegionOptions {
685 memtable: None,
686 sst_format: Some(FormatType::Flat),
687 ..Default::default()
688 };
689 assert_eq!(expect, options);
690
691 let map = make_map(&[
693 ("memtable.type", "partition_tree"),
694 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
695 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
696 ]);
697 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
698 let expect = RegionOptions {
699 memtable: None,
700 sst_format: Some(FormatType::Flat),
701 ..Default::default()
702 };
703 assert_eq!(expect, options);
704 }
705
706 #[test]
707 fn test_primary_key_encoding() {
708 let map = make_map(&[("primary_key_encoding", "sparse")]);
710 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
711 assert_eq!(options.primary_key_encoding(), PrimaryKeyEncoding::Sparse);
712 assert_eq!(
713 options.primary_key_encoding,
714 Some(PrimaryKeyEncoding::Sparse)
715 );
716
717 let map = make_map(&[
719 ("memtable.type", "partition_tree"),
720 ("memtable.partition_tree.primary_key_encoding", "sparse"),
721 ]);
722 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
723 assert_eq!(options.memtable, None);
724 assert_eq!(options.sst_format, Some(FormatType::Flat));
725 assert_eq!(options.primary_key_encoding(), PrimaryKeyEncoding::Sparse);
726
727 let map = make_map(&[("primary_key_encoding", "bogus")]);
729 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
730 assert_eq!(StatusCode::InvalidArguments, err.status_code());
731 }
732
733 #[test]
734 fn test_legacy_partition_tree_overrides_sst_format() {
735 let map = make_map(&[
738 ("memtable.type", "partition_tree"),
739 ("sst_format", "primary_key"),
740 ]);
741 let options = RegionOptions::try_from_options(RegionId::new(1, 1), &map).unwrap();
742 assert_eq!(options.memtable, None);
743 assert_eq!(options.sst_format, Some(FormatType::Flat));
744 }
745
746 #[test]
747 fn test_bulk_memtable_overrides_sst_format() {
748 let map = make_map(&[("memtable.type", "bulk"), ("sst_format", "primary_key")]);
752 let options = RegionOptions::try_from_options(RegionId::new(1, 1), &map).unwrap();
753 assert_eq!(
754 options.memtable,
755 Some(MemtableOptions::Bulk(BulkMemtableConfig::default()))
756 );
757 assert_eq!(options.sst_format, Some(FormatType::Flat));
758 }
759
760 #[test]
761 fn test_unknown_memtable_type() {
762 let map = make_map(&[("memtable.type", "no_such_memtable")]);
763 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
764 assert_eq!(StatusCode::InvalidArguments, err.status_code());
765 }
766
767 #[test]
768 fn test_without_memtable_type() {
769 let map = make_map(&[("memtable.partition_tree.index_max_keys_per_shard", "2048")]);
770 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
771 assert_eq!(StatusCode::InvalidArguments, err.status_code());
772
773 let map = make_map(&[("memtable.bulk.merge_threshold", "7")]);
774 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
775 assert_eq!(StatusCode::InvalidArguments, err.status_code());
776 }
777
778 #[test]
779 fn test_with_merge_mode() {
780 let map = make_map(&[("merge_mode", "last_row")]);
781 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
782 assert_eq!(MergeMode::LastRow, options.merge_mode());
783
784 let map = make_map(&[("merge_mode", "last_non_null")]);
785 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
786 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
787
788 let map = make_map(&[("merge_mode", "unknown")]);
789 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
790 assert_eq!(StatusCode::InvalidArguments, err.status_code());
791 }
792
793 #[test]
794 fn test_append_mode_allows_last_row_merge_mode() {
795 let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_row")]);
796 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
797 assert!(options.append_mode);
798 assert_eq!(MergeMode::LastRow, options.merge_mode());
799
800 let map = make_map(&[("append_mode", "true"), ("merge_mode", "last_non_null")]);
801 let err = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap_err();
802 assert_eq!(StatusCode::InvalidArguments, err.status_code());
803 }
804
805 #[test]
806 fn test_with_all() {
807 let wal_options = WalOptions::Kafka(KafkaWalOptions {
808 topic: "test_topic".to_string(),
809 });
810 let map = make_map(&[
811 ("ttl", "7d"),
812 ("compaction.twcs.trigger_file_num", "8"),
813 ("compaction.twcs.max_output_file_size", "1GB"),
814 ("compaction.twcs.time_window", "2h"),
815 ("compaction.type", "twcs"),
816 ("compaction.twcs.remote_compaction", "false"),
817 ("compaction.twcs.fallback_to_local", "true"),
818 ("storage", "S3"),
819 ("append_mode", "false"),
820 ("index.inverted_index.ignore_column_ids", "1,2,3"),
821 ("index.inverted_index.segment_row_count", "512"),
822 (
823 WAL_OPTIONS_KEY,
824 &serde_json::to_string(&wal_options).unwrap(),
825 ),
826 ("memtable.type", "bulk"),
827 ("memtable.bulk.merge_threshold", "7"),
828 ("memtable.bulk.encode_row_threshold", "11"),
829 ("memtable.bulk.encode_bytes_threshold", "13"),
830 ("memtable.bulk.max_merge_groups", "17"),
831 ("merge_mode", "last_non_null"),
832 ]);
833 let options = RegionOptions::try_from_options(RegionId::new(0, 0), &map).unwrap();
834 let expect = RegionOptions {
835 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
836 compaction: CompactionOptions::Twcs(TwcsOptions {
837 trigger_file_num: 8,
838 time_window: Some(Duration::from_secs(3600 * 2)),
839 max_output_file_size: Some(ReadableSize::gb(1)),
840 remote_compaction: false,
841 fallback_to_local: true,
842 }),
843 compaction_override: true,
844 storage: Some("S3".to_string()),
845 append_mode: false,
846 wal_options,
847 index_options: IndexOptions {
848 inverted_index: InvertedIndexOptions {
849 ignore_column_ids: vec![1, 2, 3],
850 segment_row_count: 512,
851 },
852 },
853 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig {
854 merge_threshold: 7,
855 encode_row_threshold: 11,
856 encode_bytes_threshold: 13,
857 max_merge_groups: 17,
858 })),
859 merge_mode: Some(MergeMode::LastNonNull),
860 sst_format: Some(FormatType::Flat),
861 primary_key_encoding: None,
862 };
863 assert_eq!(expect, options);
864 }
865
866 #[test]
867 fn test_region_options_serde() {
868 let options = RegionOptions {
869 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
870 compaction: CompactionOptions::Twcs(TwcsOptions {
871 trigger_file_num: 8,
872 time_window: Some(Duration::from_secs(3600 * 2)),
873 max_output_file_size: None,
874 remote_compaction: false,
875 fallback_to_local: true,
876 }),
877 compaction_override: false,
878 storage: Some("S3".to_string()),
879 append_mode: false,
880 wal_options: WalOptions::Kafka(KafkaWalOptions {
881 topic: "test_topic".to_string(),
882 }),
883 index_options: IndexOptions {
884 inverted_index: InvertedIndexOptions {
885 ignore_column_ids: vec![1, 2, 3],
886 segment_row_count: 512,
887 },
888 },
889 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
890 merge_mode: Some(MergeMode::LastNonNull),
891 sst_format: None,
892 primary_key_encoding: None,
893 };
894 let region_options_json_str = serde_json::to_string(&options).unwrap();
895 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
896 assert_eq!(options, got);
897 }
898
899 #[test]
900 fn test_region_options_str_serde() {
901 let region_options_json_str = r#"{
903 "ttl": "7days",
904 "compaction": {
905 "compaction.type": "twcs",
906 "compaction.twcs.trigger_file_num": "8",
907 "compaction.twcs.max_output_file_size": "7MB",
908 "compaction.twcs.time_window": "2h"
909 },
910 "storage": "S3",
911 "append_mode": false,
912 "wal_options": {
913 "wal.provider": "kafka",
914 "wal.kafka.topic": "test_topic"
915 },
916 "index_options": {
917 "index.inverted_index.ignore_column_ids": "",
918 "index.inverted_index.segment_row_count": "512"
919 },
920 "memtable": {
921 "memtable.type": "bulk"
922 },
923 "merge_mode": "last_non_null"
924}"#;
925 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
926 let options = RegionOptions {
927 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
928 compaction: CompactionOptions::Twcs(TwcsOptions {
929 trigger_file_num: 8,
930 time_window: Some(Duration::from_secs(3600 * 2)),
931 max_output_file_size: Some(ReadableSize::mb(7)),
932 remote_compaction: false,
933 fallback_to_local: true,
934 }),
935 compaction_override: false,
936 storage: Some("S3".to_string()),
937 append_mode: false,
938 wal_options: WalOptions::Kafka(KafkaWalOptions {
939 topic: "test_topic".to_string(),
940 }),
941 index_options: IndexOptions {
942 inverted_index: InvertedIndexOptions {
943 ignore_column_ids: vec![],
944 segment_row_count: 512,
945 },
946 },
947 memtable: Some(MemtableOptions::Bulk(BulkMemtableConfig::default())),
948 merge_mode: Some(MergeMode::LastNonNull),
949 sst_format: None,
950 primary_key_encoding: None,
951 };
952 assert_eq!(options, got);
953 }
954}