1mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30 ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31 DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33 METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34 METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{
37 APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
38};
39use store_api::region_engine::RegionEngine;
40use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
41use store_api::storage::consts::ReservedColumnId;
42use store_api::storage::RegionId;
43
44use crate::engine::create::extract_new_columns::extract_new_columns;
45use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
46use crate::engine::MetricEngineInner;
47use crate::error::{
48 ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
49 InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
50 MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
51 Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
52};
53use crate::metrics::PHYSICAL_REGION_COUNT;
54use crate::utils::{
55 self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
56 to_metadata_region_id,
57};
58
59const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
60
61impl MetricEngineInner {
62 pub async fn create_regions(
63 &self,
64 mut requests: Vec<(RegionId, RegionCreateRequest)>,
65 extension_return_value: &mut HashMap<String, Vec<u8>>,
66 ) -> Result<AffectedRows> {
67 if requests.is_empty() {
68 return Ok(0);
69 }
70
71 for (_, request) in requests.iter() {
72 Self::verify_region_create_request(request)?;
73 }
74
75 let first_request = &requests.first().unwrap().1;
76 if first_request.is_physical_table() {
77 ensure!(
78 requests.len() == 1,
79 UnexpectedRequestSnafu {
80 reason: "Physical table must be created with single request".to_string(),
81 }
82 );
83 let (region_id, request) = requests.pop().unwrap();
84 self.create_physical_region(region_id, request).await?;
85
86 return Ok(0);
87 } else if first_request
88 .options
89 .contains_key(LOGICAL_TABLE_METADATA_KEY)
90 {
91 if requests.len() == 1 {
92 let request = &requests.first().unwrap().1;
93 let physical_region_id = parse_physical_region_id(request)?;
94 let mut manifest_infos = Vec::with_capacity(1);
95 self.create_logical_regions(physical_region_id, requests, extension_return_value)
96 .await?;
97 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
98 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
99 } else {
100 let grouped_requests =
101 group_create_logical_region_requests_by_physical_region_id(requests)?;
102 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
103 for (physical_region_id, requests) in grouped_requests {
104 self.create_logical_regions(
105 physical_region_id,
106 requests,
107 extension_return_value,
108 )
109 .await?;
110 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
111 }
112 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
113 }
114 } else {
115 return MissingRegionOptionSnafu {}.fail();
116 }
117
118 Ok(0)
119 }
120
121 async fn create_physical_region(
123 &self,
124 region_id: RegionId,
125 request: RegionCreateRequest,
126 ) -> Result<()> {
127 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128 let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130 let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132 self.mito
133 .handle_request(
134 metadata_region_id,
135 RegionRequest::Create(create_metadata_region_request),
136 )
137 .await
138 .with_context(|_| CreateMitoRegionSnafu {
139 region_type: METADATA_REGION_SUBDIR,
140 })?;
141
142 let create_data_region_request = self.create_request_for_data_region(&request);
144 let physical_columns = create_data_region_request
145 .column_metadatas
146 .iter()
147 .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148 .collect::<HashMap<_, _>>();
149 self.mito
150 .handle_request(
151 data_region_id,
152 RegionRequest::Create(create_data_region_request),
153 )
154 .await
155 .with_context(|_| CreateMitoRegionSnafu {
156 region_type: DATA_REGION_SUBDIR,
157 })?;
158 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
159 PhysicalRegionNotFoundSnafu {
160 region_id: data_region_id,
161 },
162 )?;
163
164 info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
165 PHYSICAL_REGION_COUNT.inc();
166
167 self.state.write().unwrap().add_physical_region(
169 data_region_id,
170 physical_columns,
171 primary_key_encoding,
172 physical_region_options,
173 );
174
175 Ok(())
176 }
177
178 async fn create_logical_regions(
180 &self,
181 physical_region_id: RegionId,
182 requests: Vec<(RegionId, RegionCreateRequest)>,
183 extension_return_value: &mut HashMap<String, Vec<u8>>,
184 ) -> Result<()> {
185 let data_region_id = utils::to_data_region_id(physical_region_id);
186
187 ensure!(
188 self.state
189 .read()
190 .unwrap()
191 .exist_physical_region(data_region_id),
192 PhysicalRegionNotFoundSnafu {
193 region_id: data_region_id,
194 }
195 );
196
197 let requests = {
199 let state = self.state.read().unwrap();
200 let mut skipped = Vec::with_capacity(requests.len());
201 let mut kept_requests = Vec::with_capacity(requests.len());
202
203 for (region_id, request) in requests {
204 if state.is_logical_region_exist(region_id) {
205 skipped.push(region_id);
206 } else {
207 kept_requests.push((region_id, request));
208 }
209 }
210
211 if !skipped.is_empty() {
213 info!(
214 "Skipped creating logical regions {skipped:?} because they already exist",
215 skipped = skipped
216 );
217 }
218 kept_requests
219 };
220
221 let mut new_column_names = HashSet::new();
223 let mut new_columns = Vec::new();
224
225 let index_option = {
226 let state = &self.state.read().unwrap();
227 let region_state = state
228 .physical_region_states()
229 .get(&data_region_id)
230 .with_context(|| PhysicalRegionNotFoundSnafu {
231 region_id: data_region_id,
232 })?;
233 let physical_columns = region_state.physical_columns();
234
235 extract_new_columns(
236 &requests,
237 physical_columns,
238 &mut new_column_names,
239 &mut new_columns,
240 )?;
241 region_state.options().index
242 };
243
244 self.data_region
246 .add_columns(data_region_id, new_columns, index_option)
247 .await?;
248
249 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
250 let physical_schema_map = physical_columns
251 .iter()
252 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
253 .collect::<HashMap<_, _>>();
254 let logical_regions = requests
255 .iter()
256 .map(|(region_id, _)| (*region_id))
257 .collect::<Vec<_>>();
258 let logical_region_columns = requests.iter().map(|(region_id, request)| {
259 (
260 *region_id,
261 request
262 .column_metadatas
263 .iter()
264 .map(|metadata| {
265 let column_metadata = *physical_schema_map
267 .get(metadata.column_schema.name.as_str())
268 .unwrap();
269 (metadata.column_schema.name.as_str(), column_metadata)
270 })
271 .collect::<HashMap<_, _>>(),
272 )
273 });
274
275 let new_add_columns = new_column_names.iter().map(|name| {
276 let column_metadata = *physical_schema_map.get(name).unwrap();
278 (name.to_string(), column_metadata.column_id)
279 });
280
281 extension_return_value.insert(
282 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
283 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
284 );
285
286 self.metadata_region
288 .add_logical_regions(physical_region_id, true, logical_region_columns)
289 .await?;
290
291 {
292 let mut state = self.state.write().unwrap();
293 state.add_physical_columns(data_region_id, new_add_columns);
294 state.add_logical_regions(physical_region_id, logical_regions.clone());
295 }
296 for logical_region_id in logical_regions {
297 self.metadata_region
298 .open_logical_region(logical_region_id)
299 .await;
300 }
301
302 Ok(())
303 }
304
305 fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
310 request.validate().context(InvalidMetadataSnafu)?;
311
312 let name_to_index = request
313 .column_metadatas
314 .iter()
315 .enumerate()
316 .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
317 .collect::<HashMap<String, usize>>();
318
319 ensure!(
321 !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
322 InternalColumnOccupiedSnafu {
323 column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
324 }
325 );
326 ensure!(
327 !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
328 InternalColumnOccupiedSnafu {
329 column: DATA_SCHEMA_TSID_COLUMN_NAME,
330 }
331 );
332
333 ensure!(
335 request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
336 MissingRegionOptionSnafu {}
337 );
338 ensure!(
339 !(request.is_physical_table()
340 && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
341 ConflictRegionOptionSnafu {}
342 );
343
344 let mut field_col: Option<&ColumnMetadata> = None;
346 for col in &request.column_metadatas {
347 match col.semantic_type {
348 SemanticType::Tag => ensure!(
349 col.column_schema.data_type == ConcreteDataType::string_datatype(),
350 ColumnTypeMismatchSnafu {
351 expect: ConcreteDataType::string_datatype(),
352 actual: col.column_schema.data_type.clone(),
353 }
354 ),
355 SemanticType::Field => {
356 if field_col.is_some() {
357 MultipleFieldColumnSnafu {
358 previous: field_col.unwrap().column_schema.name.clone(),
359 current: col.column_schema.name.clone(),
360 }
361 .fail()?;
362 }
363 field_col = Some(col)
364 }
365 SemanticType::Timestamp => {}
366 }
367 }
368 let field_col = field_col.context(NoFieldColumnSnafu)?;
369
370 ensure!(
372 field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
373 ColumnTypeMismatchSnafu {
374 expect: ConcreteDataType::float64_datatype(),
375 actual: field_col.column_schema.data_type.clone(),
376 }
377 );
378
379 Ok(())
380 }
381
382 fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
386 (
387 to_data_region_id(region_id),
388 to_metadata_region_id(region_id),
389 )
390 }
391
392 pub fn create_request_for_metadata_region(
396 &self,
397 request: &RegionCreateRequest,
398 ) -> RegionCreateRequest {
399 let timestamp_column_metadata = ColumnMetadata {
401 column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
402 semantic_type: SemanticType::Timestamp,
403 column_schema: ColumnSchema::new(
404 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
405 ConcreteDataType::timestamp_millisecond_datatype(),
406 false,
407 )
408 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
409 Value::Timestamp(Timestamp::new_millisecond(0)),
410 )))
411 .unwrap(),
412 };
413 let key_column_metadata = ColumnMetadata {
415 column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
416 semantic_type: SemanticType::Tag,
417 column_schema: ColumnSchema::new(
418 METADATA_SCHEMA_KEY_COLUMN_NAME,
419 ConcreteDataType::string_datatype(),
420 false,
421 ),
422 };
423 let value_column_metadata = ColumnMetadata {
425 column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
426 semantic_type: SemanticType::Field,
427 column_schema: ColumnSchema::new(
428 METADATA_SCHEMA_VALUE_COLUMN_NAME,
429 ConcreteDataType::string_datatype(),
430 true,
431 ),
432 };
433
434 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
436
437 let options = region_options_for_metadata_region(request.options.clone());
438 RegionCreateRequest {
439 engine: MITO_ENGINE_NAME.to_string(),
440 column_metadatas: vec![
441 timestamp_column_metadata,
442 key_column_metadata,
443 value_column_metadata,
444 ],
445 primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
446 options,
447 region_dir: metadata_region_dir,
448 }
449 }
450
451 pub fn create_request_for_data_region(
458 &self,
459 request: &RegionCreateRequest,
460 ) -> RegionCreateRequest {
461 let mut data_region_request = request.clone();
462 let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
463
464 data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
466
467 data_region_request
469 .column_metadatas
470 .iter_mut()
471 .for_each(|metadata| {
472 if metadata.semantic_type == SemanticType::Tag {
473 metadata.column_schema.set_nullable();
474 primary_key.push(metadata.column_id);
475 }
476 });
477
478 let [table_id_col, tsid_col] = Self::internal_column_metadata();
480 data_region_request.column_metadatas.push(table_id_col);
481 data_region_request.column_metadatas.push(tsid_col);
482 data_region_request.primary_key = primary_key;
483
484 set_data_region_options(
486 &mut data_region_request.options,
487 self.config.experimental_sparse_primary_key_encoding,
488 );
489
490 data_region_request
491 }
492
493 fn internal_column_metadata() -> [ColumnMetadata; 2] {
497 let metric_name_col = ColumnMetadata {
499 column_id: ReservedColumnId::table_id(),
500 semantic_type: SemanticType::Tag,
501 column_schema: ColumnSchema::new(
502 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
503 ConcreteDataType::uint32_datatype(),
504 false,
505 )
506 .with_skipping_options(SkippingIndexOptions {
507 granularity: DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
508 index_type: datatypes::schema::SkippingIndexType::BloomFilter,
509 })
510 .unwrap(),
511 };
512 let tsid_col = ColumnMetadata {
513 column_id: ReservedColumnId::tsid(),
514 semantic_type: SemanticType::Tag,
515 column_schema: ColumnSchema::new(
516 DATA_SCHEMA_TSID_COLUMN_NAME,
517 ConcreteDataType::uint64_datatype(),
518 false,
519 )
520 .with_inverted_index(false),
521 };
522 [metric_name_col, tsid_col]
523 }
524}
525
526fn group_create_logical_region_requests_by_physical_region_id(
528 requests: Vec<(RegionId, RegionCreateRequest)>,
529) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
530 let mut result = HashMap::with_capacity(requests.len());
531 for (region_id, request) in requests {
532 let physical_region_id = parse_physical_region_id(&request)?;
533 result
534 .entry(physical_region_id)
535 .or_insert_with(Vec::new)
536 .push((region_id, request));
537 }
538
539 Ok(result)
540}
541
542fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
544 let physical_region_id_raw = request
545 .options
546 .get(LOGICAL_TABLE_METADATA_KEY)
547 .ok_or(MissingRegionOptionSnafu {}.build())?;
548
549 let physical_region_id: RegionId = physical_region_id_raw
550 .parse::<u64>()
551 .with_context(|_| ParseRegionIdSnafu {
552 raw: physical_region_id_raw,
553 })?
554 .into();
555
556 Ok(physical_region_id)
557}
558
559pub(crate) fn region_options_for_metadata_region(
561 mut original: HashMap<String, String>,
562) -> HashMap<String, String> {
563 original.remove(APPEND_MODE_KEY);
565 original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
567 original.insert(TTL_KEY.to_string(), FOREVER.to_string());
568 original.remove(SKIP_WAL_KEY);
569 original
570}
571
572#[cfg(test)]
573mod test {
574 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
575
576 use super::*;
577 use crate::config::EngineConfig;
578 use crate::engine::MetricEngine;
579 use crate::test_util::TestEnv;
580
581 #[test]
582 fn test_verify_region_create_request() {
583 let request = RegionCreateRequest {
585 column_metadatas: vec![
586 ColumnMetadata {
587 column_id: 0,
588 semantic_type: SemanticType::Timestamp,
589 column_schema: ColumnSchema::new(
590 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
591 ConcreteDataType::timestamp_millisecond_datatype(),
592 false,
593 ),
594 },
595 ColumnMetadata {
596 column_id: 1,
597 semantic_type: SemanticType::Tag,
598 column_schema: ColumnSchema::new(
599 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
600 ConcreteDataType::uint32_datatype(),
601 false,
602 ),
603 },
604 ],
605 region_dir: "test_dir".to_string(),
606 engine: METRIC_ENGINE_NAME.to_string(),
607 primary_key: vec![],
608 options: HashMap::new(),
609 };
610 let result = MetricEngineInner::verify_region_create_request(&request);
611 assert!(result.is_err());
612 assert_eq!(
613 result.unwrap_err().to_string(),
614 "Internal column __table_id is reserved".to_string()
615 );
616
617 let request = RegionCreateRequest {
619 column_metadatas: vec![
620 ColumnMetadata {
621 column_id: 0,
622 semantic_type: SemanticType::Timestamp,
623 column_schema: ColumnSchema::new(
624 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
625 ConcreteDataType::timestamp_millisecond_datatype(),
626 false,
627 ),
628 },
629 ColumnMetadata {
630 column_id: 1,
631 semantic_type: SemanticType::Tag,
632 column_schema: ColumnSchema::new(
633 "column1".to_string(),
634 ConcreteDataType::string_datatype(),
635 false,
636 ),
637 },
638 ColumnMetadata {
639 column_id: 2,
640 semantic_type: SemanticType::Field,
641 column_schema: ColumnSchema::new(
642 "column2".to_string(),
643 ConcreteDataType::float64_datatype(),
644 false,
645 ),
646 },
647 ],
648 region_dir: "test_dir".to_string(),
649 engine: METRIC_ENGINE_NAME.to_string(),
650 primary_key: vec![],
651 options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
652 .into_iter()
653 .collect(),
654 };
655 MetricEngineInner::verify_region_create_request(&request).unwrap();
656 }
657
658 #[test]
659 fn test_verify_region_create_request_options() {
660 let mut request = RegionCreateRequest {
661 column_metadatas: vec![
662 ColumnMetadata {
663 column_id: 0,
664 semantic_type: SemanticType::Timestamp,
665 column_schema: ColumnSchema::new(
666 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
667 ConcreteDataType::timestamp_millisecond_datatype(),
668 false,
669 ),
670 },
671 ColumnMetadata {
672 column_id: 1,
673 semantic_type: SemanticType::Field,
674 column_schema: ColumnSchema::new(
675 "val".to_string(),
676 ConcreteDataType::float64_datatype(),
677 false,
678 ),
679 },
680 ],
681 region_dir: "test_dir".to_string(),
682 engine: METRIC_ENGINE_NAME.to_string(),
683 primary_key: vec![],
684 options: HashMap::new(),
685 };
686 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
687
688 let mut options = HashMap::new();
689 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
690 request.options.clone_from(&options);
691 MetricEngineInner::verify_region_create_request(&request).unwrap();
692
693 options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
694 request.options.clone_from(&options);
695 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
696
697 options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
698 request.options = options;
699 MetricEngineInner::verify_region_create_request(&request).unwrap();
700 }
701
702 #[tokio::test]
703 async fn test_create_request_for_physical_regions() {
704 let options: HashMap<_, _> = [
706 ("ttl".to_string(), "60m".to_string()),
707 ("skip_wal".to_string(), "true".to_string()),
708 ]
709 .into_iter()
710 .collect();
711 let request = RegionCreateRequest {
712 engine: METRIC_ENGINE_NAME.to_string(),
713 column_metadatas: vec![
714 ColumnMetadata {
715 column_id: 0,
716 semantic_type: SemanticType::Timestamp,
717 column_schema: ColumnSchema::new(
718 "timestamp",
719 ConcreteDataType::timestamp_millisecond_datatype(),
720 false,
721 ),
722 },
723 ColumnMetadata {
724 column_id: 1,
725 semantic_type: SemanticType::Tag,
726 column_schema: ColumnSchema::new(
727 "tag",
728 ConcreteDataType::string_datatype(),
729 false,
730 ),
731 },
732 ],
733 primary_key: vec![0],
734 options,
735 region_dir: "/test_dir".to_string(),
736 };
737
738 let env = TestEnv::new().await;
740 let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
741 let engine_inner = engine.inner;
742
743 let data_region_request = engine_inner.create_request_for_data_region(&request);
745 assert_eq!(
746 data_region_request.region_dir,
747 "/test_dir/data/".to_string()
748 );
749 assert_eq!(data_region_request.column_metadatas.len(), 4);
750 assert_eq!(
751 data_region_request.primary_key,
752 vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
753 );
754 assert!(data_region_request.options.contains_key("ttl"));
755
756 let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
758 assert_eq!(
759 metadata_region_request.region_dir,
760 "/test_dir/metadata/".to_string()
761 );
762 assert_eq!(
763 metadata_region_request.options.get("ttl").unwrap(),
764 "forever"
765 );
766 assert!(!metadata_region_request.options.contains_key("skip_wal"));
767 }
768}