1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_stat::get_total_memory_readable;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::storage::ColumnId;
33use strum::EnumString;
34
35use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
36use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
37use crate::sst::FormatType;
38
39const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
40
41#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
43#[serde(rename_all = "snake_case")]
44#[strum(serialize_all = "snake_case")]
45pub enum MergeMode {
46 #[default]
48 LastRow,
49 LastNonNull,
51}
52
53#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(default)]
60pub struct RegionOptions {
61 pub ttl: Option<TimeToLive>,
63 pub compaction: CompactionOptions,
65 pub storage: Option<String>,
67 pub append_mode: bool,
69 pub wal_options: WalOptions,
71 pub index_options: IndexOptions,
73 pub memtable: Option<MemtableOptions>,
75 pub merge_mode: Option<MergeMode>,
78 pub sst_format: Option<FormatType>,
80}
81
82impl RegionOptions {
83 pub fn validate(&self) -> Result<()> {
85 if self.append_mode {
86 ensure!(
87 self.merge_mode.is_none(),
88 InvalidRegionOptionsSnafu {
89 reason: "merge_mode is not allowed when append_mode is enabled",
90 }
91 );
92 }
93 Ok(())
94 }
95
96 pub fn need_dedup(&self) -> bool {
98 !self.append_mode
99 }
100
101 pub fn merge_mode(&self) -> MergeMode {
103 self.merge_mode.unwrap_or_default()
104 }
105
106 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
108 self.memtable
109 .as_ref()
110 .map_or(PrimaryKeyEncoding::default(), |memtable| {
111 memtable.primary_key_encoding()
112 })
113 }
114}
115
116impl TryFrom<&HashMap<String, String>> for RegionOptions {
117 type Error = Error;
118
119 fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
120 let value = options_map_to_value(options_map);
121 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
122
123 let options: RegionOptionsWithoutEnum =
127 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
128 let compaction = if validate_enum_options(options_map, "compaction.type")? {
129 serde_json::from_str(&json).context(JsonOptionsSnafu)?
130 } else {
131 CompactionOptions::default()
132 };
133
134 let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
136 || Ok(WalOptions::default()),
137 |encoded_wal_options| {
138 serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
139 },
140 )?;
141
142 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
143 let memtable = if validate_enum_options(options_map, "memtable.type")? {
144 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
145 } else {
146 None
147 };
148
149 let opts = RegionOptions {
150 ttl: options.ttl,
151 compaction,
152 storage: options.storage,
153 append_mode: options.append_mode,
154 wal_options,
155 index_options,
156 memtable,
157 merge_mode: options.merge_mode,
158 sst_format: options.sst_format,
159 };
160 opts.validate()?;
161
162 Ok(opts)
163 }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
168#[serde(tag = "compaction.type")]
169#[serde(rename_all = "snake_case")]
170pub enum CompactionOptions {
171 #[serde(with = "prefix_twcs")]
173 Twcs(TwcsOptions),
174}
175
176impl CompactionOptions {
177 pub(crate) fn time_window(&self) -> Option<Duration> {
178 match self {
179 CompactionOptions::Twcs(opts) => opts.time_window,
180 }
181 }
182
183 pub(crate) fn remote_compaction(&self) -> bool {
184 match self {
185 CompactionOptions::Twcs(opts) => opts.remote_compaction,
186 }
187 }
188
189 pub(crate) fn fallback_to_local(&self) -> bool {
190 match self {
191 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
192 }
193 }
194}
195
196impl Default for CompactionOptions {
197 fn default() -> Self {
198 Self::Twcs(TwcsOptions::default())
199 }
200}
201
202#[serde_as]
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
205#[serde(default)]
206pub struct TwcsOptions {
207 #[serde_as(as = "DisplayFromStr")]
209 pub trigger_file_num: usize,
210 #[serde(with = "humantime_serde")]
212 pub time_window: Option<Duration>,
213 pub max_output_file_size: Option<ReadableSize>,
215 #[serde_as(as = "DisplayFromStr")]
217 pub remote_compaction: bool,
218 #[serde_as(as = "DisplayFromStr")]
220 pub fallback_to_local: bool,
221}
222
223with_prefix!(prefix_twcs "compaction.twcs.");
224
225impl TwcsOptions {
226 pub fn time_window_seconds(&self) -> Option<i64> {
228 self.time_window.and_then(|window| {
229 let window_secs = window.as_secs();
230 if window_secs == 0 {
231 None
232 } else {
233 window_secs.try_into().ok()
234 }
235 })
236 }
237}
238
239impl Default for TwcsOptions {
240 fn default() -> Self {
241 Self {
242 trigger_file_num: 4,
243 time_window: None,
244 max_output_file_size: Some(ReadableSize::mb(512)),
245 remote_compaction: false,
246 fallback_to_local: true,
247 }
248 }
249}
250
251#[serde_as]
254#[derive(Debug, Deserialize)]
255#[serde(default)]
256struct RegionOptionsWithoutEnum {
257 ttl: Option<TimeToLive>,
259 storage: Option<String>,
260 #[serde_as(as = "DisplayFromStr")]
261 append_mode: bool,
262 #[serde_as(as = "NoneAsEmptyString")]
263 merge_mode: Option<MergeMode>,
264 #[serde_as(as = "NoneAsEmptyString")]
265 sst_format: Option<FormatType>,
266}
267
268impl Default for RegionOptionsWithoutEnum {
269 fn default() -> Self {
270 let options = RegionOptions::default();
271 RegionOptionsWithoutEnum {
272 ttl: options.ttl,
273 storage: options.storage,
274 append_mode: options.append_mode,
275 merge_mode: options.merge_mode,
276 sst_format: options.sst_format,
277 }
278 }
279}
280
281with_prefix!(prefix_inverted_index "index.inverted_index.");
282
283#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
285#[serde(default)]
286pub struct IndexOptions {
287 #[serde(flatten, with = "prefix_inverted_index")]
289 pub inverted_index: InvertedIndexOptions,
290}
291
292#[serde_as]
294#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
295#[serde(default)]
296pub struct InvertedIndexOptions {
297 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
300 #[serde(serialize_with = "serialize_ignore_column_ids")]
301 pub ignore_column_ids: Vec<ColumnId>,
302
303 #[serde_as(as = "DisplayFromStr")]
305 pub segment_row_count: usize,
306}
307
308impl Default for InvertedIndexOptions {
309 fn default() -> Self {
310 Self {
311 ignore_column_ids: Vec::new(),
312 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
313 }
314 }
315}
316
317#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
319#[serde(tag = "memtable.type", rename_all = "snake_case")]
320pub enum MemtableOptions {
321 TimeSeries,
322 #[serde(with = "prefix_partition_tree")]
323 PartitionTree(PartitionTreeOptions),
324}
325
326with_prefix!(prefix_partition_tree "memtable.partition_tree.");
327
328impl MemtableOptions {
329 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
331 match self {
332 MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
333 _ => PrimaryKeyEncoding::Dense,
334 }
335 }
336}
337
338#[serde_as]
340#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
341#[serde(default)]
342pub struct PartitionTreeOptions {
343 #[serde_as(as = "DisplayFromStr")]
345 pub index_max_keys_per_shard: usize,
346 #[serde_as(as = "DisplayFromStr")]
348 pub data_freeze_threshold: usize,
349 pub fork_dictionary_bytes: ReadableSize,
351 pub primary_key_encoding: PrimaryKeyEncoding,
353}
354
355impl Default for PartitionTreeOptions {
356 fn default() -> Self {
357 let mut fork_dictionary_bytes = ReadableSize::mb(512);
358 if let Some(total_memory) = get_total_memory_readable() {
359 let adjust_dictionary_bytes = std::cmp::min(
360 total_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
361 fork_dictionary_bytes,
362 );
363 if adjust_dictionary_bytes.0 > 0 {
364 fork_dictionary_bytes = adjust_dictionary_bytes;
365 }
366 }
367 Self {
368 index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
369 data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
370 fork_dictionary_bytes,
371 primary_key_encoding: PrimaryKeyEncoding::Dense,
372 }
373 }
374}
375
376fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
377where
378 D: Deserializer<'de>,
379{
380 let s: String = Deserialize::deserialize(deserializer)?;
381 let mut column_ids = Vec::new();
382 if s.is_empty() {
383 return Ok(column_ids);
384 }
385 for item in s.split(',') {
386 let column_id = item.parse().map_err(D::Error::custom)?;
387 column_ids.push(column_id);
388 }
389 Ok(column_ids)
390}
391
392fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
393where
394 S: serde::Serializer,
395{
396 let s = column_ids
397 .iter()
398 .map(|id| id.to_string())
399 .collect::<Vec<_>>()
400 .join(",");
401 serializer.serialize_str(&s)
402}
403
404fn options_map_to_value(options: &HashMap<String, String>) -> Value {
408 let map = options
409 .iter()
410 .map(|(key, value)| {
411 if value.eq_ignore_ascii_case("null") {
413 (key.clone(), Value::Null)
414 } else {
415 (key.clone(), Value::from(value.clone()))
416 }
417 })
418 .collect();
419 Value::Object(map)
420}
421
422fn validate_enum_options(
427 options_map: &HashMap<String, String>,
428 enum_tag_key: &str,
429) -> Result<bool> {
430 let enum_type = enum_tag_key.split('.').next().unwrap();
431 let mut has_other_options = false;
432 let mut has_tag = false;
433 for key in options_map.keys() {
434 if key == enum_tag_key {
435 has_tag = true;
436 } else if key.starts_with(enum_type) {
437 has_other_options = true;
438 }
439 }
440
441 ensure!(
443 has_tag || !has_other_options,
444 InvalidRegionOptionsSnafu {
445 reason: format!("missing key {} in options", enum_tag_key),
446 }
447 );
448
449 Ok(has_tag)
450}
451
452#[cfg(test)]
453mod tests {
454 use common_error::ext::ErrorExt;
455 use common_error::status_code::StatusCode;
456 use common_wal::options::KafkaWalOptions;
457
458 use super::*;
459
460 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
461 options
462 .iter()
463 .map(|(k, v)| (k.to_string(), v.to_string()))
464 .collect()
465 }
466
467 #[test]
468 fn test_empty_region_options() {
469 let map = make_map(&[]);
470 let options = RegionOptions::try_from(&map).unwrap();
471 assert_eq!(RegionOptions::default(), options);
472 }
473
474 #[test]
475 fn test_with_ttl() {
476 let map = make_map(&[("ttl", "7d")]);
477 let options = RegionOptions::try_from(&map).unwrap();
478 let expect = RegionOptions {
479 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
480 ..Default::default()
481 };
482 assert_eq!(expect, options);
483 }
484
485 #[test]
486 fn test_with_storage() {
487 let map = make_map(&[("storage", "S3")]);
488 let options = RegionOptions::try_from(&map).unwrap();
489 let expect = RegionOptions {
490 storage: Some("S3".to_string()),
491 ..Default::default()
492 };
493 assert_eq!(expect, options);
494 }
495
496 #[test]
497 fn test_without_compaction_type() {
498 let map = make_map(&[
499 ("compaction.twcs.trigger_file_num", "8"),
500 ("compaction.twcs.time_window", "2h"),
501 ]);
502 let err = RegionOptions::try_from(&map).unwrap_err();
503 assert_eq!(StatusCode::InvalidArguments, err.status_code());
504 }
505
506 #[test]
507 fn test_with_compaction_type() {
508 let map = make_map(&[
509 ("compaction.twcs.trigger_file_num", "8"),
510 ("compaction.twcs.time_window", "2h"),
511 ("compaction.type", "twcs"),
512 ]);
513 let options = RegionOptions::try_from(&map).unwrap();
514 let expect = RegionOptions {
515 compaction: CompactionOptions::Twcs(TwcsOptions {
516 trigger_file_num: 8,
517 time_window: Some(Duration::from_secs(3600 * 2)),
518 ..Default::default()
519 }),
520 ..Default::default()
521 };
522 assert_eq!(expect, options);
523 }
524
525 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
526 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
527 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
528 let got = RegionOptions::try_from(&map).unwrap();
529 let expect = RegionOptions {
530 wal_options: wal_options.clone(),
531 ..Default::default()
532 };
533 expect == got
534 }
535
536 #[test]
537 fn test_with_index() {
538 let map = make_map(&[
539 ("index.inverted_index.ignore_column_ids", "1,2,3"),
540 ("index.inverted_index.segment_row_count", "512"),
541 ]);
542 let options = RegionOptions::try_from(&map).unwrap();
543 let expect = RegionOptions {
544 index_options: IndexOptions {
545 inverted_index: InvertedIndexOptions {
546 ignore_column_ids: vec![1, 2, 3],
547 segment_row_count: 512,
548 },
549 },
550 ..Default::default()
551 };
552 assert_eq!(expect, options);
553 }
554
555 #[test]
557 fn test_with_any_wal_options() {
558 let all_wal_options = [
559 WalOptions::RaftEngine,
560 WalOptions::Kafka(KafkaWalOptions {
561 topic: "test_topic".to_string(),
562 }),
563 ];
564 all_wal_options.iter().all(test_with_wal_options);
565 }
566
567 #[test]
568 fn test_with_memtable() {
569 let map = make_map(&[("memtable.type", "time_series")]);
570 let options = RegionOptions::try_from(&map).unwrap();
571 let expect = RegionOptions {
572 memtable: Some(MemtableOptions::TimeSeries),
573 ..Default::default()
574 };
575 assert_eq!(expect, options);
576
577 let map = make_map(&[("memtable.type", "partition_tree")]);
578 let options = RegionOptions::try_from(&map).unwrap();
579 let expect = RegionOptions {
580 memtable: Some(MemtableOptions::PartitionTree(
581 PartitionTreeOptions::default(),
582 )),
583 ..Default::default()
584 };
585 assert_eq!(expect, options);
586 }
587
588 #[test]
589 fn test_unknown_memtable_type() {
590 let map = make_map(&[("memtable.type", "no_such_memtable")]);
591 let err = RegionOptions::try_from(&map).unwrap_err();
592 assert_eq!(StatusCode::InvalidArguments, err.status_code());
593 }
594
595 #[test]
596 fn test_with_merge_mode() {
597 let map = make_map(&[("merge_mode", "last_row")]);
598 let options = RegionOptions::try_from(&map).unwrap();
599 assert_eq!(MergeMode::LastRow, options.merge_mode());
600
601 let map = make_map(&[("merge_mode", "last_non_null")]);
602 let options = RegionOptions::try_from(&map).unwrap();
603 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
604
605 let map = make_map(&[("merge_mode", "unknown")]);
606 let err = RegionOptions::try_from(&map).unwrap_err();
607 assert_eq!(StatusCode::InvalidArguments, err.status_code());
608 }
609
610 #[test]
611 fn test_with_all() {
612 let wal_options = WalOptions::Kafka(KafkaWalOptions {
613 topic: "test_topic".to_string(),
614 });
615 let map = make_map(&[
616 ("ttl", "7d"),
617 ("compaction.twcs.trigger_file_num", "8"),
618 ("compaction.twcs.max_output_file_size", "1GB"),
619 ("compaction.twcs.time_window", "2h"),
620 ("compaction.type", "twcs"),
621 ("compaction.twcs.remote_compaction", "false"),
622 ("compaction.twcs.fallback_to_local", "true"),
623 ("storage", "S3"),
624 ("append_mode", "false"),
625 ("index.inverted_index.ignore_column_ids", "1,2,3"),
626 ("index.inverted_index.segment_row_count", "512"),
627 (
628 WAL_OPTIONS_KEY,
629 &serde_json::to_string(&wal_options).unwrap(),
630 ),
631 ("memtable.type", "partition_tree"),
632 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
633 ("memtable.partition_tree.data_freeze_threshold", "2048"),
634 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
635 ("merge_mode", "last_non_null"),
636 ]);
637 let options = RegionOptions::try_from(&map).unwrap();
638 let expect = RegionOptions {
639 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
640 compaction: CompactionOptions::Twcs(TwcsOptions {
641 trigger_file_num: 8,
642 time_window: Some(Duration::from_secs(3600 * 2)),
643 max_output_file_size: Some(ReadableSize::gb(1)),
644 remote_compaction: false,
645 fallback_to_local: true,
646 }),
647 storage: Some("S3".to_string()),
648 append_mode: false,
649 wal_options,
650 index_options: IndexOptions {
651 inverted_index: InvertedIndexOptions {
652 ignore_column_ids: vec![1, 2, 3],
653 segment_row_count: 512,
654 },
655 },
656 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
657 index_max_keys_per_shard: 2048,
658 data_freeze_threshold: 2048,
659 fork_dictionary_bytes: ReadableSize::mb(128),
660 primary_key_encoding: PrimaryKeyEncoding::Dense,
661 })),
662 merge_mode: Some(MergeMode::LastNonNull),
663 sst_format: None,
664 };
665 assert_eq!(expect, options);
666 }
667
668 #[test]
669 fn test_region_options_serde() {
670 let options = RegionOptions {
671 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
672 compaction: CompactionOptions::Twcs(TwcsOptions {
673 trigger_file_num: 8,
674 time_window: Some(Duration::from_secs(3600 * 2)),
675 max_output_file_size: None,
676 remote_compaction: false,
677 fallback_to_local: true,
678 }),
679 storage: Some("S3".to_string()),
680 append_mode: false,
681 wal_options: WalOptions::Kafka(KafkaWalOptions {
682 topic: "test_topic".to_string(),
683 }),
684 index_options: IndexOptions {
685 inverted_index: InvertedIndexOptions {
686 ignore_column_ids: vec![1, 2, 3],
687 segment_row_count: 512,
688 },
689 },
690 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
691 index_max_keys_per_shard: 2048,
692 data_freeze_threshold: 2048,
693 fork_dictionary_bytes: ReadableSize::mb(128),
694 primary_key_encoding: PrimaryKeyEncoding::Dense,
695 })),
696 merge_mode: Some(MergeMode::LastNonNull),
697 sst_format: None,
698 };
699 let region_options_json_str = serde_json::to_string(&options).unwrap();
700 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
701 assert_eq!(options, got);
702 }
703
704 #[test]
705 fn test_region_options_str_serde() {
706 let region_options_json_str = r#"{
708 "ttl": "7days",
709 "compaction": {
710 "compaction.type": "twcs",
711 "compaction.twcs.trigger_file_num": "8",
712 "compaction.twcs.max_output_file_size": "7MB",
713 "compaction.twcs.time_window": "2h"
714 },
715 "storage": "S3",
716 "append_mode": false,
717 "wal_options": {
718 "wal.provider": "kafka",
719 "wal.kafka.topic": "test_topic"
720 },
721 "index_options": {
722 "index.inverted_index.ignore_column_ids": "",
723 "index.inverted_index.segment_row_count": "512"
724 },
725 "memtable": {
726 "memtable.type": "partition_tree",
727 "memtable.partition_tree.index_max_keys_per_shard": "2048",
728 "memtable.partition_tree.data_freeze_threshold": "2048",
729 "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
730 },
731 "merge_mode": "last_non_null"
732}"#;
733 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
734 let options = RegionOptions {
735 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
736 compaction: CompactionOptions::Twcs(TwcsOptions {
737 trigger_file_num: 8,
738 time_window: Some(Duration::from_secs(3600 * 2)),
739 max_output_file_size: Some(ReadableSize::mb(7)),
740 remote_compaction: false,
741 fallback_to_local: true,
742 }),
743 storage: Some("S3".to_string()),
744 append_mode: false,
745 wal_options: WalOptions::Kafka(KafkaWalOptions {
746 topic: "test_topic".to_string(),
747 }),
748 index_options: IndexOptions {
749 inverted_index: InvertedIndexOptions {
750 ignore_column_ids: vec![],
751 segment_row_count: 512,
752 },
753 },
754 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
755 index_max_keys_per_shard: 2048,
756 data_freeze_threshold: 2048,
757 fork_dictionary_bytes: ReadableSize::mb(128),
758 primary_key_encoding: PrimaryKeyEncoding::Dense,
759 })),
760 merge_mode: Some(MergeMode::LastNonNull),
761 sst_format: None,
762 };
763 assert_eq!(options, got);
764 }
765}