1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_stat::get_total_memory_readable;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::mito_engine_options::COMPACTION_OVERRIDE;
33use store_api::storage::ColumnId;
34use strum::EnumString;
35
36use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
37use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
38use crate::sst::FormatType;
39
40const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
41
42#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
44#[serde(rename_all = "snake_case")]
45#[strum(serialize_all = "snake_case")]
46pub enum MergeMode {
47 #[default]
49 LastRow,
50 LastNonNull,
52}
53
54#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(default)]
61pub struct RegionOptions {
62 pub ttl: Option<TimeToLive>,
64 pub compaction: CompactionOptions,
66 pub compaction_override: bool,
67 pub storage: Option<String>,
69 pub append_mode: bool,
71 pub wal_options: WalOptions,
73 pub index_options: IndexOptions,
75 pub memtable: Option<MemtableOptions>,
77 pub merge_mode: Option<MergeMode>,
80 pub sst_format: Option<FormatType>,
82}
83
84impl RegionOptions {
85 pub fn validate(&self) -> Result<()> {
87 if self.append_mode {
88 ensure!(
89 self.merge_mode.is_none(),
90 InvalidRegionOptionsSnafu {
91 reason: "merge_mode is not allowed when append_mode is enabled",
92 }
93 );
94 }
95 Ok(())
96 }
97
98 pub fn need_dedup(&self) -> bool {
100 !self.append_mode
101 }
102
103 pub fn merge_mode(&self) -> MergeMode {
105 self.merge_mode.unwrap_or_default()
106 }
107
108 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
110 self.memtable
111 .as_ref()
112 .map_or(PrimaryKeyEncoding::default(), |memtable| {
113 memtable.primary_key_encoding()
114 })
115 }
116}
117
118impl TryFrom<&HashMap<String, String>> for RegionOptions {
119 type Error = Error;
120
121 fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
122 let value = options_map_to_value(options_map);
123 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
124
125 let options: RegionOptionsWithoutEnum =
129 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
130 let has_compaction_type = validate_enum_options(options_map, "compaction.type")?;
131 let compaction = if has_compaction_type {
132 serde_json::from_str(&json).context(JsonOptionsSnafu)?
133 } else {
134 CompactionOptions::default()
135 };
136
137 let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
139 || Ok(WalOptions::default()),
140 |encoded_wal_options| {
141 serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
142 },
143 )?;
144
145 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
146 let memtable = if validate_enum_options(options_map, "memtable.type")? {
147 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
148 } else {
149 None
150 };
151
152 let compaction_override_flag = options_map
153 .get(COMPACTION_OVERRIDE)
154 .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
155 .unwrap_or(false);
156 let compaction_override = has_compaction_type || compaction_override_flag;
157
158 let opts = RegionOptions {
159 ttl: options.ttl,
160 compaction,
161 compaction_override,
162 storage: options.storage,
163 append_mode: options.append_mode,
164 wal_options,
165 index_options,
166 memtable,
167 merge_mode: options.merge_mode,
168 sst_format: options.sst_format,
169 };
170 opts.validate()?;
171
172 Ok(opts)
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
178#[serde(tag = "compaction.type")]
179#[serde(rename_all = "snake_case")]
180pub enum CompactionOptions {
181 #[serde(with = "prefix_twcs")]
183 Twcs(TwcsOptions),
184}
185
186impl CompactionOptions {
187 pub(crate) fn time_window(&self) -> Option<Duration> {
188 match self {
189 CompactionOptions::Twcs(opts) => opts.time_window,
190 }
191 }
192
193 pub(crate) fn remote_compaction(&self) -> bool {
194 match self {
195 CompactionOptions::Twcs(opts) => opts.remote_compaction,
196 }
197 }
198
199 pub(crate) fn fallback_to_local(&self) -> bool {
200 match self {
201 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
202 }
203 }
204}
205
206impl Default for CompactionOptions {
207 fn default() -> Self {
208 Self::Twcs(TwcsOptions::default())
209 }
210}
211
212#[serde_as]
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
215#[serde(default)]
216pub struct TwcsOptions {
217 #[serde_as(as = "DisplayFromStr")]
219 pub trigger_file_num: usize,
220 #[serde(with = "humantime_serde")]
222 pub time_window: Option<Duration>,
223 pub max_output_file_size: Option<ReadableSize>,
225 #[serde_as(as = "DisplayFromStr")]
227 pub remote_compaction: bool,
228 #[serde_as(as = "DisplayFromStr")]
230 pub fallback_to_local: bool,
231}
232
233with_prefix!(prefix_twcs "compaction.twcs.");
234
235impl TwcsOptions {
236 pub fn time_window_seconds(&self) -> Option<i64> {
238 self.time_window.and_then(|window| {
239 let window_secs = window.as_secs();
240 if window_secs == 0 {
241 None
242 } else {
243 window_secs.try_into().ok()
244 }
245 })
246 }
247}
248
249impl Default for TwcsOptions {
250 fn default() -> Self {
251 Self {
252 trigger_file_num: 4,
253 time_window: None,
254 max_output_file_size: Some(ReadableSize::mb(512)),
255 remote_compaction: false,
256 fallback_to_local: true,
257 }
258 }
259}
260
261#[serde_as]
264#[derive(Debug, Deserialize)]
265#[serde(default)]
266struct RegionOptionsWithoutEnum {
267 ttl: Option<TimeToLive>,
269 storage: Option<String>,
270 #[serde_as(as = "DisplayFromStr")]
271 append_mode: bool,
272 #[serde_as(as = "NoneAsEmptyString")]
273 merge_mode: Option<MergeMode>,
274 #[serde_as(as = "NoneAsEmptyString")]
275 sst_format: Option<FormatType>,
276}
277
278impl Default for RegionOptionsWithoutEnum {
279 fn default() -> Self {
280 let options = RegionOptions::default();
281 RegionOptionsWithoutEnum {
282 ttl: options.ttl,
283 storage: options.storage,
284 append_mode: options.append_mode,
285 merge_mode: options.merge_mode,
286 sst_format: options.sst_format,
287 }
288 }
289}
290
291with_prefix!(prefix_inverted_index "index.inverted_index.");
292
293#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
295#[serde(default)]
296pub struct IndexOptions {
297 #[serde(flatten, with = "prefix_inverted_index")]
299 pub inverted_index: InvertedIndexOptions,
300}
301
302#[serde_as]
304#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(default)]
306pub struct InvertedIndexOptions {
307 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
310 #[serde(serialize_with = "serialize_ignore_column_ids")]
311 pub ignore_column_ids: Vec<ColumnId>,
312
313 #[serde_as(as = "DisplayFromStr")]
315 pub segment_row_count: usize,
316}
317
318impl Default for InvertedIndexOptions {
319 fn default() -> Self {
320 Self {
321 ignore_column_ids: Vec::new(),
322 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
323 }
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
329#[serde(tag = "memtable.type", rename_all = "snake_case")]
330pub enum MemtableOptions {
331 TimeSeries,
332 #[serde(with = "prefix_partition_tree")]
333 PartitionTree(PartitionTreeOptions),
334}
335
336with_prefix!(prefix_partition_tree "memtable.partition_tree.");
337
338impl MemtableOptions {
339 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
341 match self {
342 MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
343 _ => PrimaryKeyEncoding::Dense,
344 }
345 }
346}
347
348#[serde_as]
350#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
351#[serde(default)]
352pub struct PartitionTreeOptions {
353 #[serde_as(as = "DisplayFromStr")]
355 pub index_max_keys_per_shard: usize,
356 #[serde_as(as = "DisplayFromStr")]
358 pub data_freeze_threshold: usize,
359 pub fork_dictionary_bytes: ReadableSize,
361 pub primary_key_encoding: PrimaryKeyEncoding,
363}
364
365impl Default for PartitionTreeOptions {
366 fn default() -> Self {
367 let mut fork_dictionary_bytes = ReadableSize::mb(512);
368 if let Some(total_memory) = get_total_memory_readable() {
369 let adjust_dictionary_bytes = std::cmp::min(
370 total_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
371 fork_dictionary_bytes,
372 );
373 if adjust_dictionary_bytes.0 > 0 {
374 fork_dictionary_bytes = adjust_dictionary_bytes;
375 }
376 }
377 Self {
378 index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
379 data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
380 fork_dictionary_bytes,
381 primary_key_encoding: PrimaryKeyEncoding::Dense,
382 }
383 }
384}
385
386fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
387where
388 D: Deserializer<'de>,
389{
390 let s: String = Deserialize::deserialize(deserializer)?;
391 let mut column_ids = Vec::new();
392 if s.is_empty() {
393 return Ok(column_ids);
394 }
395 for item in s.split(',') {
396 let column_id = item.parse().map_err(D::Error::custom)?;
397 column_ids.push(column_id);
398 }
399 Ok(column_ids)
400}
401
402fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
403where
404 S: serde::Serializer,
405{
406 let s = column_ids
407 .iter()
408 .map(|id| id.to_string())
409 .collect::<Vec<_>>()
410 .join(",");
411 serializer.serialize_str(&s)
412}
413
414fn options_map_to_value(options: &HashMap<String, String>) -> Value {
418 let map = options
419 .iter()
420 .map(|(key, value)| {
421 if value.eq_ignore_ascii_case("null") {
423 (key.clone(), Value::Null)
424 } else {
425 (key.clone(), Value::from(value.clone()))
426 }
427 })
428 .collect();
429 Value::Object(map)
430}
431
432fn validate_enum_options(
437 options_map: &HashMap<String, String>,
438 enum_tag_key: &str,
439) -> Result<bool> {
440 let enum_type = enum_tag_key.split('.').next().unwrap();
441 let mut has_other_options = false;
442 let mut has_tag = false;
443 for key in options_map.keys() {
444 if key == enum_tag_key {
445 has_tag = true;
446 } else if key.starts_with(enum_type) {
447 has_other_options = true;
448 }
449 }
450
451 ensure!(
453 has_tag || !has_other_options,
454 InvalidRegionOptionsSnafu {
455 reason: format!("missing key {} in options", enum_tag_key),
456 }
457 );
458
459 Ok(has_tag)
460}
461
462#[cfg(test)]
463mod tests {
464 use common_error::ext::ErrorExt;
465 use common_error::status_code::StatusCode;
466 use common_wal::options::KafkaWalOptions;
467
468 use super::*;
469
470 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
471 options
472 .iter()
473 .map(|(k, v)| (k.to_string(), v.to_string()))
474 .collect()
475 }
476
477 #[test]
478 fn test_empty_region_options() {
479 let map = make_map(&[]);
480 let options = RegionOptions::try_from(&map).unwrap();
481 assert_eq!(RegionOptions::default(), options);
482 }
483
484 #[test]
485 fn test_with_ttl() {
486 let map = make_map(&[("ttl", "7d")]);
487 let options = RegionOptions::try_from(&map).unwrap();
488 let expect = RegionOptions {
489 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
490 ..Default::default()
491 };
492 assert_eq!(expect, options);
493 }
494
495 #[test]
496 fn test_with_storage() {
497 let map = make_map(&[("storage", "S3")]);
498 let options = RegionOptions::try_from(&map).unwrap();
499 let expect = RegionOptions {
500 storage: Some("S3".to_string()),
501 ..Default::default()
502 };
503 assert_eq!(expect, options);
504 }
505
506 #[test]
507 fn test_without_compaction_type() {
508 let map = make_map(&[
509 ("compaction.twcs.trigger_file_num", "8"),
510 ("compaction.twcs.time_window", "2h"),
511 ]);
512 let err = RegionOptions::try_from(&map).unwrap_err();
513 assert_eq!(StatusCode::InvalidArguments, err.status_code());
514 }
515
516 #[test]
517 fn test_with_compaction_type() {
518 let map = make_map(&[
519 ("compaction.twcs.trigger_file_num", "8"),
520 ("compaction.twcs.time_window", "2h"),
521 ("compaction.type", "twcs"),
522 ]);
523 let options = RegionOptions::try_from(&map).unwrap();
524 let expect = RegionOptions {
525 compaction: CompactionOptions::Twcs(TwcsOptions {
526 trigger_file_num: 8,
527 time_window: Some(Duration::from_secs(3600 * 2)),
528 ..Default::default()
529 }),
530 compaction_override: true,
531 ..Default::default()
532 };
533 assert_eq!(expect, options);
534 }
535
536 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
537 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
538 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
539 let got = RegionOptions::try_from(&map).unwrap();
540 let expect = RegionOptions {
541 wal_options: wal_options.clone(),
542 ..Default::default()
543 };
544 expect == got
545 }
546
547 #[test]
548 fn test_with_index() {
549 let map = make_map(&[
550 ("index.inverted_index.ignore_column_ids", "1,2,3"),
551 ("index.inverted_index.segment_row_count", "512"),
552 ]);
553 let options = RegionOptions::try_from(&map).unwrap();
554 let expect = RegionOptions {
555 index_options: IndexOptions {
556 inverted_index: InvertedIndexOptions {
557 ignore_column_ids: vec![1, 2, 3],
558 segment_row_count: 512,
559 },
560 },
561 ..Default::default()
562 };
563 assert_eq!(expect, options);
564 }
565
566 #[test]
568 fn test_with_any_wal_options() {
569 let all_wal_options = [
570 WalOptions::RaftEngine,
571 WalOptions::Kafka(KafkaWalOptions {
572 topic: "test_topic".to_string(),
573 }),
574 ];
575 all_wal_options.iter().all(test_with_wal_options);
576 }
577
578 #[test]
579 fn test_with_memtable() {
580 let map = make_map(&[("memtable.type", "time_series")]);
581 let options = RegionOptions::try_from(&map).unwrap();
582 let expect = RegionOptions {
583 memtable: Some(MemtableOptions::TimeSeries),
584 ..Default::default()
585 };
586 assert_eq!(expect, options);
587
588 let map = make_map(&[("memtable.type", "partition_tree")]);
589 let options = RegionOptions::try_from(&map).unwrap();
590 let expect = RegionOptions {
591 memtable: Some(MemtableOptions::PartitionTree(
592 PartitionTreeOptions::default(),
593 )),
594 ..Default::default()
595 };
596 assert_eq!(expect, options);
597 }
598
599 #[test]
600 fn test_unknown_memtable_type() {
601 let map = make_map(&[("memtable.type", "no_such_memtable")]);
602 let err = RegionOptions::try_from(&map).unwrap_err();
603 assert_eq!(StatusCode::InvalidArguments, err.status_code());
604 }
605
606 #[test]
607 fn test_with_merge_mode() {
608 let map = make_map(&[("merge_mode", "last_row")]);
609 let options = RegionOptions::try_from(&map).unwrap();
610 assert_eq!(MergeMode::LastRow, options.merge_mode());
611
612 let map = make_map(&[("merge_mode", "last_non_null")]);
613 let options = RegionOptions::try_from(&map).unwrap();
614 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
615
616 let map = make_map(&[("merge_mode", "unknown")]);
617 let err = RegionOptions::try_from(&map).unwrap_err();
618 assert_eq!(StatusCode::InvalidArguments, err.status_code());
619 }
620
621 #[test]
622 fn test_with_all() {
623 let wal_options = WalOptions::Kafka(KafkaWalOptions {
624 topic: "test_topic".to_string(),
625 });
626 let map = make_map(&[
627 ("ttl", "7d"),
628 ("compaction.twcs.trigger_file_num", "8"),
629 ("compaction.twcs.max_output_file_size", "1GB"),
630 ("compaction.twcs.time_window", "2h"),
631 ("compaction.type", "twcs"),
632 ("compaction.twcs.remote_compaction", "false"),
633 ("compaction.twcs.fallback_to_local", "true"),
634 ("storage", "S3"),
635 ("append_mode", "false"),
636 ("index.inverted_index.ignore_column_ids", "1,2,3"),
637 ("index.inverted_index.segment_row_count", "512"),
638 (
639 WAL_OPTIONS_KEY,
640 &serde_json::to_string(&wal_options).unwrap(),
641 ),
642 ("memtable.type", "partition_tree"),
643 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
644 ("memtable.partition_tree.data_freeze_threshold", "2048"),
645 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
646 ("merge_mode", "last_non_null"),
647 ]);
648 let options = RegionOptions::try_from(&map).unwrap();
649 let expect = RegionOptions {
650 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
651 compaction: CompactionOptions::Twcs(TwcsOptions {
652 trigger_file_num: 8,
653 time_window: Some(Duration::from_secs(3600 * 2)),
654 max_output_file_size: Some(ReadableSize::gb(1)),
655 remote_compaction: false,
656 fallback_to_local: true,
657 }),
658 compaction_override: true,
659 storage: Some("S3".to_string()),
660 append_mode: false,
661 wal_options,
662 index_options: IndexOptions {
663 inverted_index: InvertedIndexOptions {
664 ignore_column_ids: vec![1, 2, 3],
665 segment_row_count: 512,
666 },
667 },
668 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
669 index_max_keys_per_shard: 2048,
670 data_freeze_threshold: 2048,
671 fork_dictionary_bytes: ReadableSize::mb(128),
672 primary_key_encoding: PrimaryKeyEncoding::Dense,
673 })),
674 merge_mode: Some(MergeMode::LastNonNull),
675 sst_format: None,
676 };
677 assert_eq!(expect, options);
678 }
679
680 #[test]
681 fn test_region_options_serde() {
682 let options = RegionOptions {
683 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
684 compaction: CompactionOptions::Twcs(TwcsOptions {
685 trigger_file_num: 8,
686 time_window: Some(Duration::from_secs(3600 * 2)),
687 max_output_file_size: None,
688 remote_compaction: false,
689 fallback_to_local: true,
690 }),
691 compaction_override: false,
692 storage: Some("S3".to_string()),
693 append_mode: false,
694 wal_options: WalOptions::Kafka(KafkaWalOptions {
695 topic: "test_topic".to_string(),
696 }),
697 index_options: IndexOptions {
698 inverted_index: InvertedIndexOptions {
699 ignore_column_ids: vec![1, 2, 3],
700 segment_row_count: 512,
701 },
702 },
703 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
704 index_max_keys_per_shard: 2048,
705 data_freeze_threshold: 2048,
706 fork_dictionary_bytes: ReadableSize::mb(128),
707 primary_key_encoding: PrimaryKeyEncoding::Dense,
708 })),
709 merge_mode: Some(MergeMode::LastNonNull),
710 sst_format: None,
711 };
712 let region_options_json_str = serde_json::to_string(&options).unwrap();
713 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
714 assert_eq!(options, got);
715 }
716
717 #[test]
718 fn test_region_options_str_serde() {
719 let region_options_json_str = r#"{
721 "ttl": "7days",
722 "compaction": {
723 "compaction.type": "twcs",
724 "compaction.twcs.trigger_file_num": "8",
725 "compaction.twcs.max_output_file_size": "7MB",
726 "compaction.twcs.time_window": "2h"
727 },
728 "storage": "S3",
729 "append_mode": false,
730 "wal_options": {
731 "wal.provider": "kafka",
732 "wal.kafka.topic": "test_topic"
733 },
734 "index_options": {
735 "index.inverted_index.ignore_column_ids": "",
736 "index.inverted_index.segment_row_count": "512"
737 },
738 "memtable": {
739 "memtable.type": "partition_tree",
740 "memtable.partition_tree.index_max_keys_per_shard": "2048",
741 "memtable.partition_tree.data_freeze_threshold": "2048",
742 "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
743 },
744 "merge_mode": "last_non_null"
745}"#;
746 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
747 let options = RegionOptions {
748 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
749 compaction: CompactionOptions::Twcs(TwcsOptions {
750 trigger_file_num: 8,
751 time_window: Some(Duration::from_secs(3600 * 2)),
752 max_output_file_size: Some(ReadableSize::mb(7)),
753 remote_compaction: false,
754 fallback_to_local: true,
755 }),
756 compaction_override: false,
757 storage: Some("S3".to_string()),
758 append_mode: false,
759 wal_options: WalOptions::Kafka(KafkaWalOptions {
760 topic: "test_topic".to_string(),
761 }),
762 index_options: IndexOptions {
763 inverted_index: InvertedIndexOptions {
764 ignore_column_ids: vec![],
765 segment_row_count: 512,
766 },
767 },
768 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
769 index_max_keys_per_shard: 2048,
770 data_freeze_threshold: 2048,
771 fork_dictionary_bytes: ReadableSize::mb(128),
772 primary_key_encoding: PrimaryKeyEncoding::Dense,
773 })),
774 merge_mode: Some(MergeMode::LastNonNull),
775 sst_format: None,
776 };
777 assert_eq!(options, got);
778 }
779}