1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_time::TimeToLive;
24use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
25use serde::de::Error as _;
26use serde::{Deserialize, Deserializer, Serialize};
27use serde_json::Value;
28use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
29use snafu::{ResultExt, ensure};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::storage::ColumnId;
32use strum::EnumString;
33
34use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
35use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
36use crate::sst::FormatType;
37
38const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
39
40#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
42#[serde(rename_all = "snake_case")]
43#[strum(serialize_all = "snake_case")]
44pub enum MergeMode {
45 #[default]
47 LastRow,
48 LastNonNull,
50}
51
52#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
58#[serde(default)]
59pub struct RegionOptions {
60 pub ttl: Option<TimeToLive>,
62 pub compaction: CompactionOptions,
64 pub storage: Option<String>,
66 pub append_mode: bool,
68 pub wal_options: WalOptions,
70 pub index_options: IndexOptions,
72 pub memtable: Option<MemtableOptions>,
74 pub merge_mode: Option<MergeMode>,
77 pub sst_format: Option<FormatType>,
79}
80
81impl RegionOptions {
82 pub fn validate(&self) -> Result<()> {
84 if self.append_mode {
85 ensure!(
86 self.merge_mode.is_none(),
87 InvalidRegionOptionsSnafu {
88 reason: "merge_mode is not allowed when append_mode is enabled",
89 }
90 );
91 }
92 Ok(())
93 }
94
95 pub fn need_dedup(&self) -> bool {
97 !self.append_mode
98 }
99
100 pub fn merge_mode(&self) -> MergeMode {
102 self.merge_mode.unwrap_or_default()
103 }
104
105 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
107 self.memtable
108 .as_ref()
109 .map_or(PrimaryKeyEncoding::default(), |memtable| {
110 memtable.primary_key_encoding()
111 })
112 }
113}
114
115impl TryFrom<&HashMap<String, String>> for RegionOptions {
116 type Error = Error;
117
118 fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
119 let value = options_map_to_value(options_map);
120 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
121
122 let options: RegionOptionsWithoutEnum =
126 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
127 let compaction = if validate_enum_options(options_map, "compaction.type")? {
128 serde_json::from_str(&json).context(JsonOptionsSnafu)?
129 } else {
130 CompactionOptions::default()
131 };
132
133 let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
135 || Ok(WalOptions::default()),
136 |encoded_wal_options| {
137 serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
138 },
139 )?;
140
141 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
142 let memtable = if validate_enum_options(options_map, "memtable.type")? {
143 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
144 } else {
145 None
146 };
147
148 let opts = RegionOptions {
149 ttl: options.ttl,
150 compaction,
151 storage: options.storage,
152 append_mode: options.append_mode,
153 wal_options,
154 index_options,
155 memtable,
156 merge_mode: options.merge_mode,
157 sst_format: options.sst_format,
158 };
159 opts.validate()?;
160
161 Ok(opts)
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
167#[serde(tag = "compaction.type")]
168#[serde(rename_all = "snake_case")]
169pub enum CompactionOptions {
170 #[serde(with = "prefix_twcs")]
172 Twcs(TwcsOptions),
173}
174
175impl CompactionOptions {
176 pub(crate) fn time_window(&self) -> Option<Duration> {
177 match self {
178 CompactionOptions::Twcs(opts) => opts.time_window,
179 }
180 }
181
182 pub(crate) fn remote_compaction(&self) -> bool {
183 match self {
184 CompactionOptions::Twcs(opts) => opts.remote_compaction,
185 }
186 }
187
188 pub(crate) fn fallback_to_local(&self) -> bool {
189 match self {
190 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
191 }
192 }
193}
194
195impl Default for CompactionOptions {
196 fn default() -> Self {
197 Self::Twcs(TwcsOptions::default())
198 }
199}
200
201#[serde_as]
203#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(default)]
205pub struct TwcsOptions {
206 #[serde_as(as = "DisplayFromStr")]
208 pub trigger_file_num: usize,
209 #[serde(with = "humantime_serde")]
211 pub time_window: Option<Duration>,
212 pub max_output_file_size: Option<ReadableSize>,
214 #[serde_as(as = "DisplayFromStr")]
216 pub remote_compaction: bool,
217 #[serde_as(as = "DisplayFromStr")]
219 pub fallback_to_local: bool,
220}
221
222with_prefix!(prefix_twcs "compaction.twcs.");
223
224impl TwcsOptions {
225 pub fn time_window_seconds(&self) -> Option<i64> {
227 self.time_window.and_then(|window| {
228 let window_secs = window.as_secs();
229 if window_secs == 0 {
230 None
231 } else {
232 window_secs.try_into().ok()
233 }
234 })
235 }
236}
237
238impl Default for TwcsOptions {
239 fn default() -> Self {
240 Self {
241 trigger_file_num: 4,
242 time_window: None,
243 max_output_file_size: Some(ReadableSize::mb(512)),
244 remote_compaction: false,
245 fallback_to_local: true,
246 }
247 }
248}
249
250#[serde_as]
253#[derive(Debug, Deserialize)]
254#[serde(default)]
255struct RegionOptionsWithoutEnum {
256 ttl: Option<TimeToLive>,
258 storage: Option<String>,
259 #[serde_as(as = "DisplayFromStr")]
260 append_mode: bool,
261 #[serde_as(as = "NoneAsEmptyString")]
262 merge_mode: Option<MergeMode>,
263 #[serde_as(as = "NoneAsEmptyString")]
264 sst_format: Option<FormatType>,
265}
266
267impl Default for RegionOptionsWithoutEnum {
268 fn default() -> Self {
269 let options = RegionOptions::default();
270 RegionOptionsWithoutEnum {
271 ttl: options.ttl,
272 storage: options.storage,
273 append_mode: options.append_mode,
274 merge_mode: options.merge_mode,
275 sst_format: options.sst_format,
276 }
277 }
278}
279
280with_prefix!(prefix_inverted_index "index.inverted_index.");
281
282#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
284#[serde(default)]
285pub struct IndexOptions {
286 #[serde(flatten, with = "prefix_inverted_index")]
288 pub inverted_index: InvertedIndexOptions,
289}
290
291#[serde_as]
293#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
294#[serde(default)]
295pub struct InvertedIndexOptions {
296 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
299 #[serde(serialize_with = "serialize_ignore_column_ids")]
300 pub ignore_column_ids: Vec<ColumnId>,
301
302 #[serde_as(as = "DisplayFromStr")]
304 pub segment_row_count: usize,
305}
306
307impl Default for InvertedIndexOptions {
308 fn default() -> Self {
309 Self {
310 ignore_column_ids: Vec::new(),
311 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
312 }
313 }
314}
315
316#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
318#[serde(tag = "memtable.type", rename_all = "snake_case")]
319pub enum MemtableOptions {
320 TimeSeries,
321 #[serde(with = "prefix_partition_tree")]
322 PartitionTree(PartitionTreeOptions),
323}
324
325with_prefix!(prefix_partition_tree "memtable.partition_tree.");
326
327impl MemtableOptions {
328 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
330 match self {
331 MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
332 _ => PrimaryKeyEncoding::Dense,
333 }
334 }
335}
336
337#[serde_as]
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340#[serde(default)]
341pub struct PartitionTreeOptions {
342 #[serde_as(as = "DisplayFromStr")]
344 pub index_max_keys_per_shard: usize,
345 #[serde_as(as = "DisplayFromStr")]
347 pub data_freeze_threshold: usize,
348 pub fork_dictionary_bytes: ReadableSize,
350 pub primary_key_encoding: PrimaryKeyEncoding,
352}
353
354impl Default for PartitionTreeOptions {
355 fn default() -> Self {
356 let mut fork_dictionary_bytes = ReadableSize::mb(512);
357 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
358 let adjust_dictionary_bytes = std::cmp::min(
359 sys_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
360 fork_dictionary_bytes,
361 );
362 if adjust_dictionary_bytes.0 > 0 {
363 fork_dictionary_bytes = adjust_dictionary_bytes;
364 }
365 }
366 Self {
367 index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
368 data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
369 fork_dictionary_bytes,
370 primary_key_encoding: PrimaryKeyEncoding::Dense,
371 }
372 }
373}
374
375fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
376where
377 D: Deserializer<'de>,
378{
379 let s: String = Deserialize::deserialize(deserializer)?;
380 let mut column_ids = Vec::new();
381 if s.is_empty() {
382 return Ok(column_ids);
383 }
384 for item in s.split(',') {
385 let column_id = item.parse().map_err(D::Error::custom)?;
386 column_ids.push(column_id);
387 }
388 Ok(column_ids)
389}
390
391fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
392where
393 S: serde::Serializer,
394{
395 let s = column_ids
396 .iter()
397 .map(|id| id.to_string())
398 .collect::<Vec<_>>()
399 .join(",");
400 serializer.serialize_str(&s)
401}
402
403fn options_map_to_value(options: &HashMap<String, String>) -> Value {
407 let map = options
408 .iter()
409 .map(|(key, value)| {
410 if value.eq_ignore_ascii_case("null") {
412 (key.clone(), Value::Null)
413 } else {
414 (key.clone(), Value::from(value.clone()))
415 }
416 })
417 .collect();
418 Value::Object(map)
419}
420
421fn validate_enum_options(
426 options_map: &HashMap<String, String>,
427 enum_tag_key: &str,
428) -> Result<bool> {
429 let enum_type = enum_tag_key.split('.').next().unwrap();
430 let mut has_other_options = false;
431 let mut has_tag = false;
432 for key in options_map.keys() {
433 if key == enum_tag_key {
434 has_tag = true;
435 } else if key.starts_with(enum_type) {
436 has_other_options = true;
437 }
438 }
439
440 ensure!(
442 has_tag || !has_other_options,
443 InvalidRegionOptionsSnafu {
444 reason: format!("missing key {} in options", enum_tag_key),
445 }
446 );
447
448 Ok(has_tag)
449}
450
451#[cfg(test)]
452mod tests {
453 use common_error::ext::ErrorExt;
454 use common_error::status_code::StatusCode;
455 use common_wal::options::KafkaWalOptions;
456
457 use super::*;
458
459 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
460 options
461 .iter()
462 .map(|(k, v)| (k.to_string(), v.to_string()))
463 .collect()
464 }
465
466 #[test]
467 fn test_empty_region_options() {
468 let map = make_map(&[]);
469 let options = RegionOptions::try_from(&map).unwrap();
470 assert_eq!(RegionOptions::default(), options);
471 }
472
473 #[test]
474 fn test_with_ttl() {
475 let map = make_map(&[("ttl", "7d")]);
476 let options = RegionOptions::try_from(&map).unwrap();
477 let expect = RegionOptions {
478 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
479 ..Default::default()
480 };
481 assert_eq!(expect, options);
482 }
483
484 #[test]
485 fn test_with_storage() {
486 let map = make_map(&[("storage", "S3")]);
487 let options = RegionOptions::try_from(&map).unwrap();
488 let expect = RegionOptions {
489 storage: Some("S3".to_string()),
490 ..Default::default()
491 };
492 assert_eq!(expect, options);
493 }
494
495 #[test]
496 fn test_without_compaction_type() {
497 let map = make_map(&[
498 ("compaction.twcs.trigger_file_num", "8"),
499 ("compaction.twcs.time_window", "2h"),
500 ]);
501 let err = RegionOptions::try_from(&map).unwrap_err();
502 assert_eq!(StatusCode::InvalidArguments, err.status_code());
503 }
504
505 #[test]
506 fn test_with_compaction_type() {
507 let map = make_map(&[
508 ("compaction.twcs.trigger_file_num", "8"),
509 ("compaction.twcs.time_window", "2h"),
510 ("compaction.type", "twcs"),
511 ]);
512 let options = RegionOptions::try_from(&map).unwrap();
513 let expect = RegionOptions {
514 compaction: CompactionOptions::Twcs(TwcsOptions {
515 trigger_file_num: 8,
516 time_window: Some(Duration::from_secs(3600 * 2)),
517 ..Default::default()
518 }),
519 ..Default::default()
520 };
521 assert_eq!(expect, options);
522 }
523
524 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
525 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
526 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
527 let got = RegionOptions::try_from(&map).unwrap();
528 let expect = RegionOptions {
529 wal_options: wal_options.clone(),
530 ..Default::default()
531 };
532 expect == got
533 }
534
535 #[test]
536 fn test_with_index() {
537 let map = make_map(&[
538 ("index.inverted_index.ignore_column_ids", "1,2,3"),
539 ("index.inverted_index.segment_row_count", "512"),
540 ]);
541 let options = RegionOptions::try_from(&map).unwrap();
542 let expect = RegionOptions {
543 index_options: IndexOptions {
544 inverted_index: InvertedIndexOptions {
545 ignore_column_ids: vec![1, 2, 3],
546 segment_row_count: 512,
547 },
548 },
549 ..Default::default()
550 };
551 assert_eq!(expect, options);
552 }
553
554 #[test]
556 fn test_with_any_wal_options() {
557 let all_wal_options = [
558 WalOptions::RaftEngine,
559 WalOptions::Kafka(KafkaWalOptions {
560 topic: "test_topic".to_string(),
561 }),
562 ];
563 all_wal_options.iter().all(test_with_wal_options);
564 }
565
566 #[test]
567 fn test_with_memtable() {
568 let map = make_map(&[("memtable.type", "time_series")]);
569 let options = RegionOptions::try_from(&map).unwrap();
570 let expect = RegionOptions {
571 memtable: Some(MemtableOptions::TimeSeries),
572 ..Default::default()
573 };
574 assert_eq!(expect, options);
575
576 let map = make_map(&[("memtable.type", "partition_tree")]);
577 let options = RegionOptions::try_from(&map).unwrap();
578 let expect = RegionOptions {
579 memtable: Some(MemtableOptions::PartitionTree(
580 PartitionTreeOptions::default(),
581 )),
582 ..Default::default()
583 };
584 assert_eq!(expect, options);
585 }
586
587 #[test]
588 fn test_unknown_memtable_type() {
589 let map = make_map(&[("memtable.type", "no_such_memtable")]);
590 let err = RegionOptions::try_from(&map).unwrap_err();
591 assert_eq!(StatusCode::InvalidArguments, err.status_code());
592 }
593
594 #[test]
595 fn test_with_merge_mode() {
596 let map = make_map(&[("merge_mode", "last_row")]);
597 let options = RegionOptions::try_from(&map).unwrap();
598 assert_eq!(MergeMode::LastRow, options.merge_mode());
599
600 let map = make_map(&[("merge_mode", "last_non_null")]);
601 let options = RegionOptions::try_from(&map).unwrap();
602 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
603
604 let map = make_map(&[("merge_mode", "unknown")]);
605 let err = RegionOptions::try_from(&map).unwrap_err();
606 assert_eq!(StatusCode::InvalidArguments, err.status_code());
607 }
608
609 #[test]
610 fn test_with_all() {
611 let wal_options = WalOptions::Kafka(KafkaWalOptions {
612 topic: "test_topic".to_string(),
613 });
614 let map = make_map(&[
615 ("ttl", "7d"),
616 ("compaction.twcs.trigger_file_num", "8"),
617 ("compaction.twcs.max_output_file_size", "1GB"),
618 ("compaction.twcs.time_window", "2h"),
619 ("compaction.type", "twcs"),
620 ("compaction.twcs.remote_compaction", "false"),
621 ("compaction.twcs.fallback_to_local", "true"),
622 ("storage", "S3"),
623 ("append_mode", "false"),
624 ("index.inverted_index.ignore_column_ids", "1,2,3"),
625 ("index.inverted_index.segment_row_count", "512"),
626 (
627 WAL_OPTIONS_KEY,
628 &serde_json::to_string(&wal_options).unwrap(),
629 ),
630 ("memtable.type", "partition_tree"),
631 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
632 ("memtable.partition_tree.data_freeze_threshold", "2048"),
633 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
634 ("merge_mode", "last_non_null"),
635 ]);
636 let options = RegionOptions::try_from(&map).unwrap();
637 let expect = RegionOptions {
638 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
639 compaction: CompactionOptions::Twcs(TwcsOptions {
640 trigger_file_num: 8,
641 time_window: Some(Duration::from_secs(3600 * 2)),
642 max_output_file_size: Some(ReadableSize::gb(1)),
643 remote_compaction: false,
644 fallback_to_local: true,
645 }),
646 storage: Some("S3".to_string()),
647 append_mode: false,
648 wal_options,
649 index_options: IndexOptions {
650 inverted_index: InvertedIndexOptions {
651 ignore_column_ids: vec![1, 2, 3],
652 segment_row_count: 512,
653 },
654 },
655 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
656 index_max_keys_per_shard: 2048,
657 data_freeze_threshold: 2048,
658 fork_dictionary_bytes: ReadableSize::mb(128),
659 primary_key_encoding: PrimaryKeyEncoding::Dense,
660 })),
661 merge_mode: Some(MergeMode::LastNonNull),
662 sst_format: None,
663 };
664 assert_eq!(expect, options);
665 }
666
667 #[test]
668 fn test_region_options_serde() {
669 let options = RegionOptions {
670 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
671 compaction: CompactionOptions::Twcs(TwcsOptions {
672 trigger_file_num: 8,
673 time_window: Some(Duration::from_secs(3600 * 2)),
674 max_output_file_size: None,
675 remote_compaction: false,
676 fallback_to_local: true,
677 }),
678 storage: Some("S3".to_string()),
679 append_mode: false,
680 wal_options: WalOptions::Kafka(KafkaWalOptions {
681 topic: "test_topic".to_string(),
682 }),
683 index_options: IndexOptions {
684 inverted_index: InvertedIndexOptions {
685 ignore_column_ids: vec![1, 2, 3],
686 segment_row_count: 512,
687 },
688 },
689 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
690 index_max_keys_per_shard: 2048,
691 data_freeze_threshold: 2048,
692 fork_dictionary_bytes: ReadableSize::mb(128),
693 primary_key_encoding: PrimaryKeyEncoding::Dense,
694 })),
695 merge_mode: Some(MergeMode::LastNonNull),
696 sst_format: None,
697 };
698 let region_options_json_str = serde_json::to_string(&options).unwrap();
699 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
700 assert_eq!(options, got);
701 }
702
703 #[test]
704 fn test_region_options_str_serde() {
705 let region_options_json_str = r#"{
707 "ttl": "7days",
708 "compaction": {
709 "compaction.type": "twcs",
710 "compaction.twcs.trigger_file_num": "8",
711 "compaction.twcs.max_output_file_size": "7MB",
712 "compaction.twcs.time_window": "2h"
713 },
714 "storage": "S3",
715 "append_mode": false,
716 "wal_options": {
717 "wal.provider": "kafka",
718 "wal.kafka.topic": "test_topic"
719 },
720 "index_options": {
721 "index.inverted_index.ignore_column_ids": "",
722 "index.inverted_index.segment_row_count": "512"
723 },
724 "memtable": {
725 "memtable.type": "partition_tree",
726 "memtable.partition_tree.index_max_keys_per_shard": "2048",
727 "memtable.partition_tree.data_freeze_threshold": "2048",
728 "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
729 },
730 "merge_mode": "last_non_null"
731}"#;
732 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
733 let options = RegionOptions {
734 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
735 compaction: CompactionOptions::Twcs(TwcsOptions {
736 trigger_file_num: 8,
737 time_window: Some(Duration::from_secs(3600 * 2)),
738 max_output_file_size: Some(ReadableSize::mb(7)),
739 remote_compaction: false,
740 fallback_to_local: true,
741 }),
742 storage: Some("S3".to_string()),
743 append_mode: false,
744 wal_options: WalOptions::Kafka(KafkaWalOptions {
745 topic: "test_topic".to_string(),
746 }),
747 index_options: IndexOptions {
748 inverted_index: InvertedIndexOptions {
749 ignore_column_ids: vec![],
750 segment_row_count: 512,
751 },
752 },
753 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
754 index_max_keys_per_shard: 2048,
755 data_freeze_threshold: 2048,
756 fork_dictionary_bytes: ReadableSize::mb(128),
757 primary_key_encoding: PrimaryKeyEncoding::Dense,
758 })),
759 merge_mode: Some(MergeMode::LastNonNull),
760 sst_format: None,
761 };
762 assert_eq!(options, got);
763 }
764}