1use std::collections::HashMap;
20use std::time::Duration;
21
22use common_base::readable_size::ReadableSize;
23use common_stat::get_total_memory_readable;
24use common_time::TimeToLive;
25use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
26use serde::de::Error as _;
27use serde::{Deserialize, Deserializer, Serialize};
28use serde_json::Value;
29use serde_with::{DisplayFromStr, NoneAsEmptyString, serde_as, with_prefix};
30use snafu::{ResultExt, ensure};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::mito_engine_options::COMPACTION_OVERRIDE;
33use store_api::storage::ColumnId;
34use strum::EnumString;
35
36use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
37use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
38use crate::sst::FormatType;
39
40const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
41
42pub(crate) fn parse_wal_options(
43 options_map: &HashMap<String, String>,
44) -> std::result::Result<WalOptions, serde_json::Error> {
45 options_map
46 .get(WAL_OPTIONS_KEY)
47 .map_or(Ok(WalOptions::default()), |encoded_wal_options| {
48 serde_json::from_str(encoded_wal_options)
49 })
50}
51
52#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
54#[serde(rename_all = "snake_case")]
55#[strum(serialize_all = "snake_case")]
56pub enum MergeMode {
57 #[default]
59 LastRow,
60 LastNonNull,
62}
63
64#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
70#[serde(default)]
71pub struct RegionOptions {
72 pub ttl: Option<TimeToLive>,
74 pub compaction: CompactionOptions,
76 pub compaction_override: bool,
77 pub storage: Option<String>,
79 pub append_mode: bool,
81 pub wal_options: WalOptions,
83 pub index_options: IndexOptions,
85 pub memtable: Option<MemtableOptions>,
87 pub merge_mode: Option<MergeMode>,
90 pub sst_format: Option<FormatType>,
92}
93
94impl RegionOptions {
95 pub fn validate(&self) -> Result<()> {
97 if self.append_mode {
98 ensure!(
99 self.merge_mode.is_none(),
100 InvalidRegionOptionsSnafu {
101 reason: "merge_mode is not allowed when append_mode is enabled",
102 }
103 );
104 }
105 Ok(())
106 }
107
108 pub fn need_dedup(&self) -> bool {
110 !self.append_mode
111 }
112
113 pub fn merge_mode(&self) -> MergeMode {
115 self.merge_mode.unwrap_or_default()
116 }
117
118 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
120 self.memtable
121 .as_ref()
122 .map_or(PrimaryKeyEncoding::default(), |memtable| {
123 memtable.primary_key_encoding()
124 })
125 }
126}
127
128impl TryFrom<&HashMap<String, String>> for RegionOptions {
129 type Error = Error;
130
131 fn try_from(options_map: &HashMap<String, String>) -> Result<Self> {
132 let value = options_map_to_value(options_map);
133 let json = serde_json::to_string(&value).context(JsonOptionsSnafu)?;
134
135 let options: RegionOptionsWithoutEnum =
139 serde_json::from_str(&json).context(JsonOptionsSnafu)?;
140 let has_compaction_type = validate_enum_options(options_map, "compaction.type")?;
141 let compaction = if has_compaction_type {
142 serde_json::from_str(&json).context(JsonOptionsSnafu)?
143 } else {
144 CompactionOptions::default()
145 };
146
147 let wal_options = parse_wal_options(options_map).context(JsonOptionsSnafu)?;
148
149 let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;
150 let memtable = if validate_enum_options(options_map, "memtable.type")? {
151 Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?)
152 } else {
153 None
154 };
155
156 let compaction_override_flag = options_map
157 .get(COMPACTION_OVERRIDE)
158 .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1"))
159 .unwrap_or(false);
160 let compaction_override = has_compaction_type || compaction_override_flag;
161
162 let opts = RegionOptions {
163 ttl: options.ttl,
164 compaction,
165 compaction_override,
166 storage: options.storage,
167 append_mode: options.append_mode,
168 wal_options,
169 index_options,
170 memtable,
171 merge_mode: options.merge_mode,
172 sst_format: options.sst_format,
173 };
174 opts.validate()?;
175
176 Ok(opts)
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
182#[serde(tag = "compaction.type")]
183#[serde(rename_all = "snake_case")]
184pub enum CompactionOptions {
185 #[serde(with = "prefix_twcs")]
187 Twcs(TwcsOptions),
188}
189
190impl CompactionOptions {
191 pub(crate) fn time_window(&self) -> Option<Duration> {
192 match self {
193 CompactionOptions::Twcs(opts) => opts.time_window,
194 }
195 }
196
197 pub(crate) fn remote_compaction(&self) -> bool {
198 match self {
199 CompactionOptions::Twcs(opts) => opts.remote_compaction,
200 }
201 }
202
203 pub(crate) fn fallback_to_local(&self) -> bool {
204 match self {
205 CompactionOptions::Twcs(opts) => opts.fallback_to_local,
206 }
207 }
208}
209
210impl Default for CompactionOptions {
211 fn default() -> Self {
212 Self::Twcs(TwcsOptions::default())
213 }
214}
215
216#[serde_as]
218#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
219#[serde(default)]
220pub struct TwcsOptions {
221 #[serde_as(as = "DisplayFromStr")]
223 pub trigger_file_num: usize,
224 #[serde(with = "humantime_serde")]
226 pub time_window: Option<Duration>,
227 pub max_output_file_size: Option<ReadableSize>,
229 #[serde_as(as = "DisplayFromStr")]
231 pub remote_compaction: bool,
232 #[serde_as(as = "DisplayFromStr")]
234 pub fallback_to_local: bool,
235}
236
237with_prefix!(prefix_twcs "compaction.twcs.");
238
239impl TwcsOptions {
240 pub fn time_window_seconds(&self) -> Option<i64> {
242 self.time_window.and_then(|window| {
243 let window_secs = window.as_secs();
244 if window_secs == 0 {
245 None
246 } else {
247 window_secs.try_into().ok()
248 }
249 })
250 }
251}
252
253impl Default for TwcsOptions {
254 fn default() -> Self {
255 Self {
256 trigger_file_num: 4,
257 time_window: None,
258 max_output_file_size: Some(ReadableSize::mb(512)),
259 remote_compaction: false,
260 fallback_to_local: true,
261 }
262 }
263}
264
265#[serde_as]
268#[derive(Debug, Deserialize)]
269#[serde(default)]
270struct RegionOptionsWithoutEnum {
271 ttl: Option<TimeToLive>,
273 storage: Option<String>,
274 #[serde_as(as = "DisplayFromStr")]
275 append_mode: bool,
276 #[serde_as(as = "NoneAsEmptyString")]
277 merge_mode: Option<MergeMode>,
278 #[serde_as(as = "NoneAsEmptyString")]
279 sst_format: Option<FormatType>,
280}
281
282impl Default for RegionOptionsWithoutEnum {
283 fn default() -> Self {
284 let options = RegionOptions::default();
285 RegionOptionsWithoutEnum {
286 ttl: options.ttl,
287 storage: options.storage,
288 append_mode: options.append_mode,
289 merge_mode: options.merge_mode,
290 sst_format: options.sst_format,
291 }
292 }
293}
294
295with_prefix!(prefix_inverted_index "index.inverted_index.");
296
297#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
299#[serde(default)]
300pub struct IndexOptions {
301 #[serde(flatten, with = "prefix_inverted_index")]
303 pub inverted_index: InvertedIndexOptions,
304}
305
306#[serde_as]
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
309#[serde(default)]
310pub struct InvertedIndexOptions {
311 #[serde(deserialize_with = "deserialize_ignore_column_ids")]
314 #[serde(serialize_with = "serialize_ignore_column_ids")]
315 pub ignore_column_ids: Vec<ColumnId>,
316
317 #[serde_as(as = "DisplayFromStr")]
319 pub segment_row_count: usize,
320}
321
322impl Default for InvertedIndexOptions {
323 fn default() -> Self {
324 Self {
325 ignore_column_ids: Vec::new(),
326 segment_row_count: DEFAULT_INDEX_SEGMENT_ROW_COUNT,
327 }
328 }
329}
330
331#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
333#[serde(tag = "memtable.type", rename_all = "snake_case")]
334pub enum MemtableOptions {
335 TimeSeries,
336 #[serde(with = "prefix_partition_tree")]
337 PartitionTree(PartitionTreeOptions),
338}
339
340with_prefix!(prefix_partition_tree "memtable.partition_tree.");
341
342impl MemtableOptions {
343 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
345 match self {
346 MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
347 _ => PrimaryKeyEncoding::Dense,
348 }
349 }
350}
351
352#[serde_as]
354#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
355#[serde(default)]
356pub struct PartitionTreeOptions {
357 #[serde_as(as = "DisplayFromStr")]
359 pub index_max_keys_per_shard: usize,
360 #[serde_as(as = "DisplayFromStr")]
362 pub data_freeze_threshold: usize,
363 pub fork_dictionary_bytes: ReadableSize,
365 pub primary_key_encoding: PrimaryKeyEncoding,
367}
368
369impl Default for PartitionTreeOptions {
370 fn default() -> Self {
371 let mut fork_dictionary_bytes = ReadableSize::mb(512);
372 if let Some(total_memory) = get_total_memory_readable() {
373 let adjust_dictionary_bytes = std::cmp::min(
374 total_memory / crate::memtable::partition_tree::DICTIONARY_SIZE_FACTOR,
375 fork_dictionary_bytes,
376 );
377 if adjust_dictionary_bytes.0 > 0 {
378 fork_dictionary_bytes = adjust_dictionary_bytes;
379 }
380 }
381 Self {
382 index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
383 data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
384 fork_dictionary_bytes,
385 primary_key_encoding: PrimaryKeyEncoding::Dense,
386 }
387 }
388}
389
390fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
391where
392 D: Deserializer<'de>,
393{
394 let s: String = Deserialize::deserialize(deserializer)?;
395 let mut column_ids = Vec::new();
396 if s.is_empty() {
397 return Ok(column_ids);
398 }
399 for item in s.split(',') {
400 let column_id = item.parse().map_err(D::Error::custom)?;
401 column_ids.push(column_id);
402 }
403 Ok(column_ids)
404}
405
406fn serialize_ignore_column_ids<S>(column_ids: &[ColumnId], serializer: S) -> Result<S::Ok, S::Error>
407where
408 S: serde::Serializer,
409{
410 let s = column_ids
411 .iter()
412 .map(|id| id.to_string())
413 .collect::<Vec<_>>()
414 .join(",");
415 serializer.serialize_str(&s)
416}
417
418fn options_map_to_value(options: &HashMap<String, String>) -> Value {
422 let map = options
423 .iter()
424 .map(|(key, value)| {
425 if value.eq_ignore_ascii_case("null") {
427 (key.clone(), Value::Null)
428 } else {
429 (key.clone(), Value::from(value.clone()))
430 }
431 })
432 .collect();
433 Value::Object(map)
434}
435
436fn validate_enum_options(
441 options_map: &HashMap<String, String>,
442 enum_tag_key: &str,
443) -> Result<bool> {
444 let enum_type = enum_tag_key.split('.').next().unwrap();
445 let mut has_other_options = false;
446 let mut has_tag = false;
447 for key in options_map.keys() {
448 if key == enum_tag_key {
449 has_tag = true;
450 } else if key.starts_with(enum_type) {
451 has_other_options = true;
452 }
453 }
454
455 ensure!(
457 has_tag || !has_other_options,
458 InvalidRegionOptionsSnafu {
459 reason: format!("missing key {} in options", enum_tag_key),
460 }
461 );
462
463 Ok(has_tag)
464}
465
466#[cfg(test)]
467mod tests {
468 use common_error::ext::ErrorExt;
469 use common_error::status_code::StatusCode;
470 use common_wal::options::KafkaWalOptions;
471
472 use super::*;
473
474 fn make_map(options: &[(&str, &str)]) -> HashMap<String, String> {
475 options
476 .iter()
477 .map(|(k, v)| (k.to_string(), v.to_string()))
478 .collect()
479 }
480
481 #[test]
482 fn test_empty_region_options() {
483 let map = make_map(&[]);
484 let options = RegionOptions::try_from(&map).unwrap();
485 assert_eq!(RegionOptions::default(), options);
486 }
487
488 #[test]
489 fn test_with_ttl() {
490 let map = make_map(&[("ttl", "7d")]);
491 let options = RegionOptions::try_from(&map).unwrap();
492 let expect = RegionOptions {
493 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
494 ..Default::default()
495 };
496 assert_eq!(expect, options);
497 }
498
499 #[test]
500 fn test_with_storage() {
501 let map = make_map(&[("storage", "S3")]);
502 let options = RegionOptions::try_from(&map).unwrap();
503 let expect = RegionOptions {
504 storage: Some("S3".to_string()),
505 ..Default::default()
506 };
507 assert_eq!(expect, options);
508 }
509
510 #[test]
511 fn test_without_compaction_type() {
512 let map = make_map(&[
513 ("compaction.twcs.trigger_file_num", "8"),
514 ("compaction.twcs.time_window", "2h"),
515 ]);
516 let err = RegionOptions::try_from(&map).unwrap_err();
517 assert_eq!(StatusCode::InvalidArguments, err.status_code());
518 }
519
520 #[test]
521 fn test_with_compaction_type() {
522 let map = make_map(&[
523 ("compaction.twcs.trigger_file_num", "8"),
524 ("compaction.twcs.time_window", "2h"),
525 ("compaction.type", "twcs"),
526 ]);
527 let options = RegionOptions::try_from(&map).unwrap();
528 let expect = RegionOptions {
529 compaction: CompactionOptions::Twcs(TwcsOptions {
530 trigger_file_num: 8,
531 time_window: Some(Duration::from_secs(3600 * 2)),
532 ..Default::default()
533 }),
534 compaction_override: true,
535 ..Default::default()
536 };
537 assert_eq!(expect, options);
538 }
539
540 fn test_with_wal_options(wal_options: &WalOptions) -> bool {
541 let encoded_wal_options = serde_json::to_string(&wal_options).unwrap();
542 let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]);
543 let got = RegionOptions::try_from(&map).unwrap();
544 let expect = RegionOptions {
545 wal_options: wal_options.clone(),
546 ..Default::default()
547 };
548 expect == got
549 }
550
551 #[test]
552 fn test_with_index() {
553 let map = make_map(&[
554 ("index.inverted_index.ignore_column_ids", "1,2,3"),
555 ("index.inverted_index.segment_row_count", "512"),
556 ]);
557 let options = RegionOptions::try_from(&map).unwrap();
558 let expect = RegionOptions {
559 index_options: IndexOptions {
560 inverted_index: InvertedIndexOptions {
561 ignore_column_ids: vec![1, 2, 3],
562 segment_row_count: 512,
563 },
564 },
565 ..Default::default()
566 };
567 assert_eq!(expect, options);
568 }
569
570 #[test]
572 fn test_with_any_wal_options() {
573 let all_wal_options = [
574 WalOptions::RaftEngine,
575 WalOptions::Kafka(KafkaWalOptions {
576 topic: "test_topic".to_string(),
577 }),
578 ];
579 all_wal_options.iter().all(test_with_wal_options);
580 }
581
582 #[test]
583 fn test_with_memtable() {
584 let map = make_map(&[("memtable.type", "time_series")]);
585 let options = RegionOptions::try_from(&map).unwrap();
586 let expect = RegionOptions {
587 memtable: Some(MemtableOptions::TimeSeries),
588 ..Default::default()
589 };
590 assert_eq!(expect, options);
591
592 let map = make_map(&[("memtable.type", "partition_tree")]);
593 let options = RegionOptions::try_from(&map).unwrap();
594 let expect = RegionOptions {
595 memtable: Some(MemtableOptions::PartitionTree(
596 PartitionTreeOptions::default(),
597 )),
598 ..Default::default()
599 };
600 assert_eq!(expect, options);
601 }
602
603 #[test]
604 fn test_unknown_memtable_type() {
605 let map = make_map(&[("memtable.type", "no_such_memtable")]);
606 let err = RegionOptions::try_from(&map).unwrap_err();
607 assert_eq!(StatusCode::InvalidArguments, err.status_code());
608 }
609
610 #[test]
611 fn test_with_merge_mode() {
612 let map = make_map(&[("merge_mode", "last_row")]);
613 let options = RegionOptions::try_from(&map).unwrap();
614 assert_eq!(MergeMode::LastRow, options.merge_mode());
615
616 let map = make_map(&[("merge_mode", "last_non_null")]);
617 let options = RegionOptions::try_from(&map).unwrap();
618 assert_eq!(MergeMode::LastNonNull, options.merge_mode());
619
620 let map = make_map(&[("merge_mode", "unknown")]);
621 let err = RegionOptions::try_from(&map).unwrap_err();
622 assert_eq!(StatusCode::InvalidArguments, err.status_code());
623 }
624
625 #[test]
626 fn test_with_all() {
627 let wal_options = WalOptions::Kafka(KafkaWalOptions {
628 topic: "test_topic".to_string(),
629 });
630 let map = make_map(&[
631 ("ttl", "7d"),
632 ("compaction.twcs.trigger_file_num", "8"),
633 ("compaction.twcs.max_output_file_size", "1GB"),
634 ("compaction.twcs.time_window", "2h"),
635 ("compaction.type", "twcs"),
636 ("compaction.twcs.remote_compaction", "false"),
637 ("compaction.twcs.fallback_to_local", "true"),
638 ("storage", "S3"),
639 ("append_mode", "false"),
640 ("index.inverted_index.ignore_column_ids", "1,2,3"),
641 ("index.inverted_index.segment_row_count", "512"),
642 (
643 WAL_OPTIONS_KEY,
644 &serde_json::to_string(&wal_options).unwrap(),
645 ),
646 ("memtable.type", "partition_tree"),
647 ("memtable.partition_tree.index_max_keys_per_shard", "2048"),
648 ("memtable.partition_tree.data_freeze_threshold", "2048"),
649 ("memtable.partition_tree.fork_dictionary_bytes", "128M"),
650 ("merge_mode", "last_non_null"),
651 ]);
652 let options = RegionOptions::try_from(&map).unwrap();
653 let expect = RegionOptions {
654 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
655 compaction: CompactionOptions::Twcs(TwcsOptions {
656 trigger_file_num: 8,
657 time_window: Some(Duration::from_secs(3600 * 2)),
658 max_output_file_size: Some(ReadableSize::gb(1)),
659 remote_compaction: false,
660 fallback_to_local: true,
661 }),
662 compaction_override: true,
663 storage: Some("S3".to_string()),
664 append_mode: false,
665 wal_options,
666 index_options: IndexOptions {
667 inverted_index: InvertedIndexOptions {
668 ignore_column_ids: vec![1, 2, 3],
669 segment_row_count: 512,
670 },
671 },
672 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
673 index_max_keys_per_shard: 2048,
674 data_freeze_threshold: 2048,
675 fork_dictionary_bytes: ReadableSize::mb(128),
676 primary_key_encoding: PrimaryKeyEncoding::Dense,
677 })),
678 merge_mode: Some(MergeMode::LastNonNull),
679 sst_format: None,
680 };
681 assert_eq!(expect, options);
682 }
683
684 #[test]
685 fn test_region_options_serde() {
686 let options = RegionOptions {
687 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
688 compaction: CompactionOptions::Twcs(TwcsOptions {
689 trigger_file_num: 8,
690 time_window: Some(Duration::from_secs(3600 * 2)),
691 max_output_file_size: None,
692 remote_compaction: false,
693 fallback_to_local: true,
694 }),
695 compaction_override: false,
696 storage: Some("S3".to_string()),
697 append_mode: false,
698 wal_options: WalOptions::Kafka(KafkaWalOptions {
699 topic: "test_topic".to_string(),
700 }),
701 index_options: IndexOptions {
702 inverted_index: InvertedIndexOptions {
703 ignore_column_ids: vec![1, 2, 3],
704 segment_row_count: 512,
705 },
706 },
707 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
708 index_max_keys_per_shard: 2048,
709 data_freeze_threshold: 2048,
710 fork_dictionary_bytes: ReadableSize::mb(128),
711 primary_key_encoding: PrimaryKeyEncoding::Dense,
712 })),
713 merge_mode: Some(MergeMode::LastNonNull),
714 sst_format: None,
715 };
716 let region_options_json_str = serde_json::to_string(&options).unwrap();
717 let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
718 assert_eq!(options, got);
719 }
720
721 #[test]
722 fn test_region_options_str_serde() {
723 let region_options_json_str = r#"{
725 "ttl": "7days",
726 "compaction": {
727 "compaction.type": "twcs",
728 "compaction.twcs.trigger_file_num": "8",
729 "compaction.twcs.max_output_file_size": "7MB",
730 "compaction.twcs.time_window": "2h"
731 },
732 "storage": "S3",
733 "append_mode": false,
734 "wal_options": {
735 "wal.provider": "kafka",
736 "wal.kafka.topic": "test_topic"
737 },
738 "index_options": {
739 "index.inverted_index.ignore_column_ids": "",
740 "index.inverted_index.segment_row_count": "512"
741 },
742 "memtable": {
743 "memtable.type": "partition_tree",
744 "memtable.partition_tree.index_max_keys_per_shard": "2048",
745 "memtable.partition_tree.data_freeze_threshold": "2048",
746 "memtable.partition_tree.fork_dictionary_bytes": "128MiB"
747 },
748 "merge_mode": "last_non_null"
749}"#;
750 let got: RegionOptions = serde_json::from_str(region_options_json_str).unwrap();
751 let options = RegionOptions {
752 ttl: Some(Duration::from_secs(3600 * 24 * 7).into()),
753 compaction: CompactionOptions::Twcs(TwcsOptions {
754 trigger_file_num: 8,
755 time_window: Some(Duration::from_secs(3600 * 2)),
756 max_output_file_size: Some(ReadableSize::mb(7)),
757 remote_compaction: false,
758 fallback_to_local: true,
759 }),
760 compaction_override: false,
761 storage: Some("S3".to_string()),
762 append_mode: false,
763 wal_options: WalOptions::Kafka(KafkaWalOptions {
764 topic: "test_topic".to_string(),
765 }),
766 index_options: IndexOptions {
767 inverted_index: InvertedIndexOptions {
768 ignore_column_ids: vec![],
769 segment_row_count: 512,
770 },
771 },
772 memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
773 index_max_keys_per_shard: 2048,
774 data_freeze_threshold: 2048,
775 fork_dictionary_bytes: ReadableSize::mb(128),
776 primary_key_encoding: PrimaryKeyEncoding::Dense,
777 })),
778 merge_mode: Some(MergeMode::LastNonNull),
779 sst_format: None,
780 };
781 assert_eq!(options, got);
782 }
783}