1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_time::TimeToLive;
24use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
25use serde::de::Error as _;
26use serde::{Deserialize, Deserializer, Serialize};
27use serde_json::Value;
28use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString};
29use snafu::{ensure, ResultExt};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::storage::ColumnId;
32use strum::EnumString;
33
34use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
35use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
36
37const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
38
39#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
41#[serde(rename_all = "snake_case")]
42#[strum(serialize_all = "snake_case")]
43pub enum MergeMode {
44 #[default]
46 LastRow,
47 LastNonNull,
49}
50
51#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
57#[serde(default)]
58pub struct RegionOptions {
59 pub ttl: Option<TimeToLive>,
61 pub compaction: CompactionOptions,
63 pub storage: Option<String>,
65 pub append_mode: bool,
67 pub wal_options: WalOptions,
69 pub index_options: IndexOptions,
71 pub memtable: Option<MemtableOptions>,
73 pub merge_mode: Option<MergeMode>,
76}
77
78impl RegionOptions {
79 pub fn validate(&self) -> Result<()> {
81 if self.append_mode {
82 ensure!(
83 self.merge_mode.is_none(),
84 InvalidRegionOptionsSnafu {
85 reason: "merge_mode is not allowed when append_mode is enabled",
86 }
87 );
88 }
89 Ok(())
90 }
91
92 pub fn need_dedup(&self) -> bool {
94 !self.append_mode
95 }
96
97 pub fn merge_mode(&self) -> MergeMode {
99 self.merge_mode.unwrap_or_default()
100 }
101
102 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
104 self.memtable
105 .as_ref()
106 .map_or(PrimaryKeyEncoding::default(), |memtable| {
107 memtable.primary_key_encoding()
108 })
109 }
110}
111
112impl TryFrom<&HashMap<String, String>> for RegionOptions {
113 type Error = Error;
114
115 fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
116 let value = options_map_to_value(options_map);
117 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
118
119 let options: RegionOptionsWithoutEnum =
123 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
124 let compaction = if validate_enum_options(options_map, "compaction.type")? {
125 serde_json::from_str(&json).context(JsonOptionsSnafu)?
126 } else {
127 CompactionOptions::default()
128 };
129
130 let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else(
132 || Ok(WalOptions::default()),
133 |encoded_wal_options| {
134 serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu)
135 },
136 )?;
137
138 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
139 let memtable = if validate_enum_options(options_map, "memtable.type")? {
140 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
141 } else {
142 None
143 };
144
145 let opts = RegionOptions {
146 ttl: options.ttl,
147 compaction,
148 storage: options.storage,
149 append_mode: options.append_mode,
150 wal_options,
151 index_options,
152 memtable,
153 merge_mode: options.merge_mode,
154 };
155 opts.validate()?;
156
157 Ok(opts)
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
163#[serde(tag = "compaction.type")]
164#[serde(rename_all = "snake_case")]
165pub enum CompactionOptions {
166 #[serde(with = "prefix_twcs")]
168 Twcs(TwcsOptions),
169}
170
171impl CompactionOptions {
172 pub(crate) fn time_window(&self) -> Option<Duration> {
173 match self {
174 CompactionOptions::Twcs(opts) => opts.time_window,
175 }
176 }
177
178 pub(crate) fn remote_compaction(&self) -> bool {
179 match self {
180 CompactionOptions::Twcs(opts) => opts.remote_compaction,
181 }
182 }
183
184 pub(crate) fn fallback_to_local(&self) -> bool {
185 match self {
186 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
187 }
188 }
189}
190
191impl Default for CompactionOptions {
192 fn default() -> Self {
193 Self::Twcs(TwcsOptions::default())
194 }
195}
196
197#[serde_as]
199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
200#[serde(default)]
201pub struct TwcsOptions {
202 #[serde_as(as = "DisplayFromStr")]
204 pub trigger_file_num: usize,
205 #[serde(with = "humantime_serde")]
207 pub time_window: Option<Duration>,
208 pub max_output_file_size: Option<ReadableSize>,
210 #[serde_as(as = "DisplayFromStr")]
212 pub remote_compaction: bool,
213 #[serde_as(as = "DisplayFromStr")]
215 pub fallback_to_local: bool,
216}
217
218with_prefix!(prefix_twcs "compaction.twcs.");
219
220impl TwcsOptions {
221 pub fn time_window_seconds(&self) -> Option<i64> {
223 self.time_window.and_then(|window| {
224 let window_secs = window.as_secs();
225 if window_secs == 0 {
226 None
227 } else {
228 window_secs.try_into().ok()
229 }
230 })
231 }
232}
233
234impl Default for TwcsOptions {
235 fn default() -> Self {
236 Self {
237 trigger_file_num: 4,
238 time_window: None,
239 max_output_file_size: Some(ReadableSize::mb(512)),
240 remote_compaction: false,
241 fallback_to_local: true,
242 }
243 }
244}
245
246#[serde_as]
249#[derive(Debug, Deserialize)]
250#[serde(default)]
251struct RegionOptionsWithoutEnum {
252 ttl: Option<TimeToLive>,
254 storage: Option<String>,
255 #[serde_as(as = "DisplayFromStr")]
256 append_mode: bool,
257 #[serde_as(as = "NoneAsEmptyString")]
258 merge_mode: Option<MergeMode>,
259}
260
261impl Default for RegionOptionsWithoutEnum {
262 fn default() -> Self {
263 let options = RegionOptions::default();
264 RegionOptionsWithoutEnum {
265 ttl: options.ttl,
266 storage: options.storage,
267 append_mode: options.append_mode,
268 merge_mode: options.merge_mode,
269 }
270 }
271}
272
273with_prefix!(prefix_inverted_index "index.inverted_index.");
274
275#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
277#[serde(default)]
278pub struct IndexOptions {
279 #[serde(flatten, with = "prefix_inverted_index")]
281 pub inverted_index: InvertedIndexOptions,
282}
283
284#[serde_as]
286#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(default)]
288pub struct InvertedIndexOptions {
289 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
292 #[serde(serialize_with = "serialize_ignore_column_ids")]
293 pub ignore_column_ids: Vec<ColumnId>,
294
295 #[serde_as(as = "DisplayFromStr")]
297 pub segment_row_count: usize,
298}
299
300impl Default for InvertedIndexOptions {
301 fn default() -> Self {
302 Self {
303 ignore_column_ids: Vec::new(),
304 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
305 }
306 }
307}
308
309#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
311#[serde(tag = "memtable.type", rename_all = "snake_case")]
312pub enum MemtableOptions {
313 TimeSeries,
314 #[serde(with = "prefix_partition_tree")]
315 PartitionTree(PartitionTreeOptions),
316}
317
318with_prefix!(prefix_partition_tree "memtable.partition_tree.");
319
320impl MemtableOptions {
321 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
323 match self {
324 MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
325 _ => PrimaryKeyEncoding::Dense,
326 }
327 }
328}
329
330#[serde_as]
332#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
333#[serde(default)]
334pub struct PartitionTreeOptions {
335 #[serde_as(as = "DisplayFromStr")]
337 pub index_max_keys_per_shard: usize,
338 #[serde_as(as = "DisplayFromStr")]
340 pub data_freeze_threshold: usize,
341 pub fork_dictionary_bytes: ReadableSize,
343 pub primary_key_encoding: PrimaryKeyEncoding,
345}
346
347impl Default for PartitionTreeOptions {
348 fn default() -> Self {
349 let mut fork_dictionary_bytes = ReadableSize::mb(512);
350 if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
351 let adjust_dictionary_bytes = std::cmp::min(
352 sys_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
353 fork_dictionary_bytes,
354 );
355 if adjust_dictionary_bytes.0 > 0 {
356 fork_dictionary_bytes = adjust_dictionary_bytes;
357 }
358 }
359 Self {
360 index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
361 data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
362 fork_dictionary_bytes,
363 primary_key_encoding: PrimaryKeyEncoding::Dense,
364 }
365 }
366}
367
368fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
369where
370 D: Deserializer<'de>,
371{
372 let s: String = Deserialize::deserialize(deserializer)?;
373 let mut column_ids = Vec::new();
374 if s.is_empty() {
375 return Ok(column_ids);
376 }
377 for item in s.split(',') {
378 let column_id = item.parse().map_err(D::Error::custom)?;
379 column_ids.push(column_id);
380 }
381 Ok(column_ids)
382}
383
384fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
385where
386 S: serde::Serializer,
387{
388 let s = column_ids
389 .iter()
390 .map(|id| id.to_string())
391 .collect::<Vec<_>>()
392 .join(",");
393 serializer.serialize_str(&s)
394}
395
396fn options_map_to_value(options: &HashMap<String, String>) -> Value {
400 let map = options
401 .iter()
402 .map(|(key, value)| {
403 if value.eq_ignore_ascii_case("null") {
405 (key.to_string(), Value::Null)
406 } else {
407 (key.to_string(), Value::from(value.to_string()))
408 }
409 })
410 .collect();
411 Value::Object(map)
412}
413
414fn validate_enum_options(
419 options_map: &HashMap<String, String>,
420 enum_tag_key: &str,
421) -> Result<bool> {
422 let enum_type = enum_tag_key.split('.').next().unwrap();
423 let mut has_other_options = false;
424 let mut has_tag = false;
425 for key in options_map.keys() {
426 if key == enum_tag_key {
427 has_tag = true;
428 } else if key.starts_with(enum_type) {
429 has_other_options = true;
430 }
431 }
432
433 ensure!(
435 has_tag || !has_other_options,
436 InvalidRegionOptionsSnafu {
437 reason: format!("missing key {} in options", enum_tag_key),
438 }
439 );
440
441 Ok(has_tag)
442}
443
444#[cfg(test)]
445mod tests {
446 use common_error::ext::ErrorExt;
447 use common_error::status_code::StatusCode;
448 use common_wal::options::KafkaWalOptions;
449
450 use super::*;
451
452 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
453 options
454 .iter()
455 .map(|(k, v)| (k.to_string(), v.to_string()))
456 .collect()
457 }
458
459 #[test]
460 fn test_empty_region_options() {
461 let map = make_map(&[]);
462 let options = RegionOptions::try_from(&map).unwrap();
463 assert_eq!(RegionOptions::default(), options);
464 }
465
466 #[test]
467 fn test_with_ttl() {
468 let map = make_map(&[("ttl", "7d")]);
469 let options = RegionOptions::try_from(&map).unwrap();
470 let expect = RegionOptions {
471 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
472 ..Default::default()
473 };
474 assert_eq!(expect, options);
475 }
476
477 #[test]
478 fn test_with_storage() {
479 let map = make_map(&[("storage", "S3")]);
480 let options = RegionOptions::try_from(&map).unwrap();
481 let expect = RegionOptions {
482 storage: Some("S3".to_string()),
483 ..Default::default()
484 };
485 assert_eq!(expect, options);
486 }
487
488 #[test]
489 fn test_without_compaction_type() {
490 let map = make_map(&[
491 ("compaction.twcs.trigger_file_num", "8"),
492 ("compaction.twcs.time_window", "2h"),
493 ]);
494 let err = RegionOptions::try_from(&map).unwrap_err();
495 assert_eq!(StatusCode::InvalidArguments, err.status_code());
496 }
497
498 #[test]
499 fn test_with_compaction_type() {
500 let map = make_map(&[
501 ("compaction.twcs.trigger_file_num", "8"),
502 ("compaction.twcs.time_window", "2h"),
503 ("compaction.type", "twcs"),
504 ]);
505 let options = RegionOptions::try_from(&map).unwrap();
506 let expect = RegionOptions {
507 compaction: CompactionOptions::Twcs(TwcsOptions {
508 trigger_file_num: 8,
509 time_window: Some(Duration::from_secs(3600 * 2)),
510 ..Default::default()
511 }),
512 ..Default::default()
513 };
514 assert_eq!(expect, options);
515 }
516
517 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
518 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
519 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
520 let got = RegionOptions::try_from(&map).unwrap();
521 let expect = RegionOptions {
522 wal_options: wal_options.clone(),
523 ..Default::default()
524 };
525 expect == got
526 }
527
528 #[test]
529 fn test_with_index() {
530 let map = make_map(&[
531 ("index.inverted_index.ignore_column_ids", "1,2,3"),
532 ("index.inverted_index.segment_row_count", "512"),
533 ]);
534 let options = RegionOptions::try_from(&map).unwrap();
535 let expect = RegionOptions {
536 index_options: IndexOptions {
537 inverted_index: InvertedIndexOptions {
538 ignore_column_ids: vec![1, 2, 3],
539 segment_row_count: 512,
540 },
541 },
542 ..Default::default()
543 };
544 assert_eq!(expect, options);
545 }
546
547 #[test]
549 fn test_with_any_wal_options() {
550 let all_wal_options = [
551 WalOptions::RaftEngine,
552 WalOptions::Kafka(KafkaWalOptions {
553 topic: "test_topic".to_string(),
554 }),
555 ];
556 all_wal_options.iter().all(test_with_wal_options);
557 }
558
559 #[test]
560 fn test_with_memtable() {
561 let map = make_map(&[("memtable.type", "time_series")]);
562 let options = RegionOptions::try_from(&map).unwrap();
563 let expect = RegionOptions {
564 memtable: Some(MemtableOptions::TimeSeries),
565 ..Default::default()
566 };
567 assert_eq!(expect, options);
568
569 let map = make_map(&[("memtable.type", "partition_tree")]);
570 let options = RegionOptions::try_from(&map).unwrap();
571 let expect = RegionOptions {
572 memtable: Some(MemtableOptions::PartitionTree(
573 PartitionTreeOptions::default(),
574 )),
575 ..Default::default()
576 };
577 assert_eq!(expect, options);
578 }
579
580 #[test]
581 fn test_unknown_memtable_type() {
582 let map = make_map(&[("memtable.type", "no_such_memtable")]);
583 let err = RegionOptions::try_from(&map).unwrap_err();
584 assert_eq!(StatusCode::InvalidArguments, err.status_code());
585 }
586
587 #[test]
588 fn test_with_merge_mode() {
589 let map = make_map(&[("merge_mode", "last_row")]);
590 let options = RegionOptions::try_from(&map).unwrap();
591 assert_eq!(MergeMode::LastRow, options.merge_mode());
592
593 let map = make_map(&[("merge_mode", "last_non_null")]);
594 let options = RegionOptions::try_from(&map).unwrap();
595 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
596
597 let map = make_map(&[("merge_mode", "unknown")]);
598 let err = RegionOptions::try_from(&map).unwrap_err();
599 assert_eq!(StatusCode::InvalidArguments, err.status_code());
600 }
601
602 #[test]
603 fn test_with_all() {
604 let wal_options = WalOptions::Kafka(KafkaWalOptions {
605 topic: "test_topic".to_string(),
606 });
607 let map = make_map(&[
608 ("ttl", "7d"),
609 ("compaction.twcs.trigger_file_num", "8"),
610 ("compaction.twcs.max_output_file_size", "1GB"),
611 ("compaction.twcs.time_window", "2h"),
612 ("compaction.type", "twcs"),
613 ("compaction.twcs.remote_compaction", "false"),
614 ("compaction.twcs.fallback_to_local", "true"),
615 ("storage", "S3"),
616 ("append_mode", "false"),
617 ("index.inverted_index.ignore_column_ids", "1,2,3"),
618 ("index.inverted_index.segment_row_count", "512"),
619 (
620 WAL_OPTIONS_KEY,
621 &serde_json::to_string(&wal_options).unwrap(),
622 ),
623 ("memtable.type", "partition_tree"),
624 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
625 ("memtable.partition_tree.data_freeze_threshold", "2048"),
626 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
627 ("merge_mode", "last_non_null"),
628 ]);
629 let options = RegionOptions::try_from(&map).unwrap();
630 let expect = RegionOptions {
631 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
632 compaction: CompactionOptions::Twcs(TwcsOptions {
633 trigger_file_num: 8,
634 time_window: Some(Duration::from_secs(3600 * 2)),
635 max_output_file_size: Some(ReadableSize::gb(1)),
636 remote_compaction: false,
637 fallback_to_local: true,
638 }),
639 storage: Some("S3".to_string()),
640 append_mode: false,
641 wal_options,
642 index_options: IndexOptions {
643 inverted_index: InvertedIndexOptions {
644 ignore_column_ids: vec![1, 2, 3],
645 segment_row_count: 512,
646 },
647 },
648 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
649 index_max_keys_per_shard: 2048,
650 data_freeze_threshold: 2048,
651 fork_dictionary_bytes: ReadableSize::mb(128),
652 primary_key_encoding: PrimaryKeyEncoding::Dense,
653 })),
654 merge_mode: Some(MergeMode::LastNonNull),
655 };
656 assert_eq!(expect, options);
657 }
658
659 #[test]
660 fn test_region_options_serde() {
661 let options = RegionOptions {
662 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
663 compaction: CompactionOptions::Twcs(TwcsOptions {
664 trigger_file_num: 8,
665 time_window: Some(Duration::from_secs(3600 * 2)),
666 max_output_file_size: None,
667 remote_compaction: false,
668 fallback_to_local: true,
669 }),
670 storage: Some("S3".to_string()),
671 append_mode: false,
672 wal_options: WalOptions::Kafka(KafkaWalOptions {
673 topic: "test_topic".to_string(),
674 }),
675 index_options: IndexOptions {
676 inverted_index: InvertedIndexOptions {
677 ignore_column_ids: vec![1, 2, 3],
678 segment_row_count: 512,
679 },
680 },
681 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
682 index_max_keys_per_shard: 2048,
683 data_freeze_threshold: 2048,
684 fork_dictionary_bytes: ReadableSize::mb(128),
685 primary_key_encoding: PrimaryKeyEncoding::Dense,
686 })),
687 merge_mode: Some(MergeMode::LastNonNull),
688 };
689 let region_options_json_str = serde_json::to_string(&options).unwrap();
690 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
691 assert_eq!(options, got);
692 }
693
694 #[test]
695 fn test_region_options_str_serde() {
696 let region_options_json_str = r#"{
698 "ttl": "7days",
699 "compaction": {
700 "compaction.type": "twcs",
701 "compaction.twcs.trigger_file_num": "8",
702 "compaction.twcs.max_output_file_size": "7MB",
703 "compaction.twcs.time_window": "2h"
704 },
705 "storage": "S3",
706 "append_mode": false,
707 "wal_options": {
708 "wal.provider": "kafka",
709 "wal.kafka.topic": "test_topic"
710 },
711 "index_options": {
712 "index.inverted_index.ignore_column_ids": "",
713 "index.inverted_index.segment_row_count": "512"
714 },
715 "memtable": {
716 "memtable.type": "partition_tree",
717 "memtable.partition_tree.index_max_keys_per_shard": "2048",
718 "memtable.partition_tree.data_freeze_threshold": "2048",
719 "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
720 },
721 "merge_mode": "last_non_null"
722}"#;
723 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
724 let options = RegionOptions {
725 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
726 compaction: CompactionOptions::Twcs(TwcsOptions {
727 trigger_file_num: 8,
728 time_window: Some(Duration::from_secs(3600 * 2)),
729 max_output_file_size: Some(ReadableSize::mb(7)),
730 remote_compaction: false,
731 fallback_to_local: true,
732 }),
733 storage: Some("S3".to_string()),
734 append_mode: false,
735 wal_options: WalOptions::Kafka(KafkaWalOptions {
736 topic: "test_topic".to_string(),
737 }),
738 index_options: IndexOptions {
739 inverted_index: InvertedIndexOptions {
740 ignore_column_ids: vec![],
741 segment_row_count: 512,
742 },
743 },
744 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
745 index_max_keys_per_shard: 2048,
746 data_freeze_threshold: 2048,
747 fork_dictionary_bytes: ReadableSize::mb(128),
748 primary_key_encoding: PrimaryKeyEncoding::Dense,
749 })),
750 merge_mode: Some(MergeMode::LastNonNull),
751 };
752 assert_eq!(options, got);
753 }
754}