1mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30 ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31 DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33 METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34 METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
37use store_api::region_engine::RegionEngine;
38use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
39use store_api::storage::consts::ReservedColumnId;
40use store_api::storage::RegionId;
41
42use crate::engine::create::extract_new_columns::extract_new_columns;
43use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
44use crate::engine::MetricEngineInner;
45use crate::error::{
46 ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
47 InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
48 MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
49 Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
50};
51use crate::metrics::PHYSICAL_REGION_COUNT;
52use crate::utils::{
53 self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
54 to_metadata_region_id,
55};
56
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
58const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
59
60impl MetricEngineInner {
61 pub async fn create_regions(
62 &self,
63 mut requests: Vec<(RegionId, RegionCreateRequest)>,
64 extension_return_value: &mut HashMap<String, Vec<u8>>,
65 ) -> Result<AffectedRows> {
66 if requests.is_empty() {
67 return Ok(0);
68 }
69
70 for (_, request) in requests.iter() {
71 Self::verify_region_create_request(request)?;
72 }
73
74 let first_request = &requests.first().unwrap().1;
75 if first_request.is_physical_table() {
76 ensure!(
77 requests.len() == 1,
78 UnexpectedRequestSnafu {
79 reason: "Physical table must be created with single request".to_string(),
80 }
81 );
82 let (region_id, request) = requests.pop().unwrap();
83 self.create_physical_region(region_id, request).await?;
84
85 return Ok(0);
86 } else if first_request
87 .options
88 .contains_key(LOGICAL_TABLE_METADATA_KEY)
89 {
90 if requests.len() == 1 {
91 let request = &requests.first().unwrap().1;
92 let physical_region_id = parse_physical_region_id(request)?;
93 let mut manifest_infos = Vec::with_capacity(1);
94 self.create_logical_regions(physical_region_id, requests, extension_return_value)
95 .await?;
96 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
97 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
98 } else {
99 let grouped_requests =
100 group_create_logical_region_requests_by_physical_region_id(requests)?;
101 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
102 for (physical_region_id, requests) in grouped_requests {
103 self.create_logical_regions(
104 physical_region_id,
105 requests,
106 extension_return_value,
107 )
108 .await?;
109 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
110 }
111 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
112 }
113 } else {
114 return MissingRegionOptionSnafu {}.fail();
115 }
116
117 Ok(0)
118 }
119
120 async fn create_physical_region(
122 &self,
123 region_id: RegionId,
124 request: RegionCreateRequest,
125 ) -> Result<()> {
126 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
127 let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
128
129 let create_metadata_region_request = self.create_request_for_metadata_region(&request);
131 self.mito
132 .handle_request(
133 metadata_region_id,
134 RegionRequest::Create(create_metadata_region_request),
135 )
136 .await
137 .with_context(|_| CreateMitoRegionSnafu {
138 region_type: METADATA_REGION_SUBDIR,
139 })?;
140
141 let create_data_region_request = self.create_request_for_data_region(&request);
143 let physical_columns = create_data_region_request
144 .column_metadatas
145 .iter()
146 .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
147 .collect::<HashMap<_, _>>();
148 let time_index_unit = create_data_region_request
149 .column_metadatas
150 .iter()
151 .find_map(|metadata| {
152 if metadata.semantic_type == SemanticType::Timestamp {
153 metadata
154 .column_schema
155 .data_type
156 .as_timestamp()
157 .map(|data_type| data_type.unit())
158 } else {
159 None
160 }
161 })
162 .context(UnexpectedRequestSnafu {
163 reason: "No time index column found",
164 })?;
165 self.mito
166 .handle_request(
167 data_region_id,
168 RegionRequest::Create(create_data_region_request),
169 )
170 .await
171 .with_context(|_| CreateMitoRegionSnafu {
172 region_type: DATA_REGION_SUBDIR,
173 })?;
174 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
175 PhysicalRegionNotFoundSnafu {
176 region_id: data_region_id,
177 },
178 )?;
179
180 info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
181 PHYSICAL_REGION_COUNT.inc();
182
183 self.state.write().unwrap().add_physical_region(
185 data_region_id,
186 physical_columns,
187 primary_key_encoding,
188 physical_region_options,
189 time_index_unit,
190 );
191
192 Ok(())
193 }
194
195 async fn create_logical_regions(
197 &self,
198 physical_region_id: RegionId,
199 requests: Vec<(RegionId, RegionCreateRequest)>,
200 extension_return_value: &mut HashMap<String, Vec<u8>>,
201 ) -> Result<()> {
202 let data_region_id = utils::to_data_region_id(physical_region_id);
203
204 let unit = self
205 .state
206 .read()
207 .unwrap()
208 .physical_region_time_index_unit(physical_region_id)
209 .context(PhysicalRegionNotFoundSnafu {
210 region_id: data_region_id,
211 })?;
212 for (_, request) in &requests {
214 let time_index_column = request
216 .column_metadatas
217 .iter()
218 .find(|col| col.semantic_type == SemanticType::Timestamp)
219 .unwrap();
220 let request_unit = time_index_column
221 .column_schema
222 .data_type
223 .as_timestamp()
224 .unwrap()
225 .unit();
226 ensure!(
227 request_unit == unit,
228 UnexpectedRequestSnafu {
229 reason: format!(
230 "Metric has differenttime unit ({:?}) than the physical region ({:?})",
231 request_unit, unit
232 ),
233 }
234 );
235 }
236
237 let requests = {
239 let state = self.state.read().unwrap();
240 let mut skipped = Vec::with_capacity(requests.len());
241 let mut kept_requests = Vec::with_capacity(requests.len());
242
243 for (region_id, request) in requests {
244 if state.is_logical_region_exist(region_id) {
245 skipped.push(region_id);
246 } else {
247 kept_requests.push((region_id, request));
248 }
249 }
250
251 if !skipped.is_empty() {
253 info!(
254 "Skipped creating logical regions {skipped:?} because they already exist",
255 skipped = skipped
256 );
257 }
258 kept_requests
259 };
260
261 let mut new_column_names = HashSet::new();
263 let mut new_columns = Vec::new();
264
265 let index_option = {
266 let state = &self.state.read().unwrap();
267 let region_state = state
268 .physical_region_states()
269 .get(&data_region_id)
270 .with_context(|| PhysicalRegionNotFoundSnafu {
271 region_id: data_region_id,
272 })?;
273 let physical_columns = region_state.physical_columns();
274
275 extract_new_columns(
276 &requests,
277 physical_columns,
278 &mut new_column_names,
279 &mut new_columns,
280 )?;
281 region_state.options().index
282 };
283
284 self.data_region
286 .add_columns(data_region_id, new_columns, index_option)
287 .await?;
288
289 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
290 let physical_schema_map = physical_columns
291 .iter()
292 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
293 .collect::<HashMap<_, _>>();
294 let logical_regions = requests
295 .iter()
296 .map(|(region_id, _)| (*region_id))
297 .collect::<Vec<_>>();
298 let logical_region_columns = requests.iter().map(|(region_id, request)| {
299 (
300 *region_id,
301 request
302 .column_metadatas
303 .iter()
304 .map(|metadata| {
305 let column_metadata = *physical_schema_map
307 .get(metadata.column_schema.name.as_str())
308 .unwrap();
309 (metadata.column_schema.name.as_str(), column_metadata)
310 })
311 .collect::<HashMap<_, _>>(),
312 )
313 });
314
315 let new_add_columns = new_column_names.iter().map(|name| {
316 let column_metadata = *physical_schema_map.get(name).unwrap();
318 (name.to_string(), column_metadata.column_id)
319 });
320
321 extension_return_value.insert(
322 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
323 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
324 );
325
326 self.metadata_region
328 .add_logical_regions(physical_region_id, true, logical_region_columns)
329 .await?;
330
331 {
332 let mut state = self.state.write().unwrap();
333 state.add_physical_columns(data_region_id, new_add_columns);
334 state.add_logical_regions(physical_region_id, logical_regions.clone());
335 }
336 for logical_region_id in logical_regions {
337 self.metadata_region
338 .open_logical_region(logical_region_id)
339 .await;
340 }
341
342 Ok(())
343 }
344
345 fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
350 request.validate().context(InvalidMetadataSnafu)?;
351
352 let name_to_index = request
353 .column_metadatas
354 .iter()
355 .enumerate()
356 .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
357 .collect::<HashMap<String, usize>>();
358
359 ensure!(
361 !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
362 InternalColumnOccupiedSnafu {
363 column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
364 }
365 );
366 ensure!(
367 !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
368 InternalColumnOccupiedSnafu {
369 column: DATA_SCHEMA_TSID_COLUMN_NAME,
370 }
371 );
372
373 ensure!(
375 request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
376 MissingRegionOptionSnafu {}
377 );
378 ensure!(
379 !(request.is_physical_table()
380 && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
381 ConflictRegionOptionSnafu {}
382 );
383
384 let mut field_col: Option<&ColumnMetadata> = None;
386 for col in &request.column_metadatas {
387 match col.semantic_type {
388 SemanticType::Tag => ensure!(
389 col.column_schema.data_type == ConcreteDataType::string_datatype(),
390 ColumnTypeMismatchSnafu {
391 expect: ConcreteDataType::string_datatype(),
392 actual: col.column_schema.data_type.clone(),
393 }
394 ),
395 SemanticType::Field => {
396 if field_col.is_some() {
397 MultipleFieldColumnSnafu {
398 previous: field_col.unwrap().column_schema.name.clone(),
399 current: col.column_schema.name.clone(),
400 }
401 .fail()?;
402 }
403 field_col = Some(col)
404 }
405 SemanticType::Timestamp => {}
406 }
407 }
408 let field_col = field_col.context(NoFieldColumnSnafu)?;
409
410 ensure!(
412 field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
413 ColumnTypeMismatchSnafu {
414 expect: ConcreteDataType::float64_datatype(),
415 actual: field_col.column_schema.data_type.clone(),
416 }
417 );
418
419 Ok(())
420 }
421
422 fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
426 (
427 to_data_region_id(region_id),
428 to_metadata_region_id(region_id),
429 )
430 }
431
432 pub fn create_request_for_metadata_region(
436 &self,
437 request: &RegionCreateRequest,
438 ) -> RegionCreateRequest {
439 let timestamp_column_metadata = ColumnMetadata {
441 column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
442 semantic_type: SemanticType::Timestamp,
443 column_schema: ColumnSchema::new(
444 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
445 ConcreteDataType::timestamp_millisecond_datatype(),
446 false,
447 )
448 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
449 Value::Timestamp(Timestamp::new_millisecond(0)),
450 )))
451 .unwrap(),
452 };
453 let key_column_metadata = ColumnMetadata {
455 column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
456 semantic_type: SemanticType::Tag,
457 column_schema: ColumnSchema::new(
458 METADATA_SCHEMA_KEY_COLUMN_NAME,
459 ConcreteDataType::string_datatype(),
460 false,
461 ),
462 };
463 let value_column_metadata = ColumnMetadata {
465 column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
466 semantic_type: SemanticType::Field,
467 column_schema: ColumnSchema::new(
468 METADATA_SCHEMA_VALUE_COLUMN_NAME,
469 ConcreteDataType::string_datatype(),
470 true,
471 ),
472 };
473
474 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
476
477 let options = region_options_for_metadata_region(&request.options);
478 RegionCreateRequest {
479 engine: MITO_ENGINE_NAME.to_string(),
480 column_metadatas: vec![
481 timestamp_column_metadata,
482 key_column_metadata,
483 value_column_metadata,
484 ],
485 primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
486 options,
487 region_dir: metadata_region_dir,
488 }
489 }
490
491 pub fn create_request_for_data_region(
498 &self,
499 request: &RegionCreateRequest,
500 ) -> RegionCreateRequest {
501 let mut data_region_request = request.clone();
502 let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
503
504 data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
506
507 data_region_request
509 .column_metadatas
510 .iter_mut()
511 .for_each(|metadata| {
512 if metadata.semantic_type == SemanticType::Tag {
513 metadata.column_schema.set_nullable();
514 primary_key.push(metadata.column_id);
515 }
516 });
517
518 let [table_id_col, tsid_col] = Self::internal_column_metadata();
520 data_region_request.column_metadatas.push(table_id_col);
521 data_region_request.column_metadatas.push(tsid_col);
522 data_region_request.primary_key = primary_key;
523
524 set_data_region_options(
526 &mut data_region_request.options,
527 self.config.experimental_sparse_primary_key_encoding,
528 );
529
530 data_region_request
531 }
532
533 fn internal_column_metadata() -> [ColumnMetadata; 2] {
537 let metric_name_col = ColumnMetadata {
539 column_id: ReservedColumnId::table_id(),
540 semantic_type: SemanticType::Tag,
541 column_schema: ColumnSchema::new(
542 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
543 ConcreteDataType::uint32_datatype(),
544 false,
545 )
546 .with_skipping_options(SkippingIndexOptions::new_unchecked(
547 DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
548 DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
549 datatypes::schema::SkippingIndexType::BloomFilter,
550 ))
551 .unwrap(),
552 };
553 let tsid_col = ColumnMetadata {
554 column_id: ReservedColumnId::tsid(),
555 semantic_type: SemanticType::Tag,
556 column_schema: ColumnSchema::new(
557 DATA_SCHEMA_TSID_COLUMN_NAME,
558 ConcreteDataType::uint64_datatype(),
559 false,
560 )
561 .with_inverted_index(false),
562 };
563 [metric_name_col, tsid_col]
564 }
565}
566
567fn group_create_logical_region_requests_by_physical_region_id(
569 requests: Vec<(RegionId, RegionCreateRequest)>,
570) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
571 let mut result = HashMap::with_capacity(requests.len());
572 for (region_id, request) in requests {
573 let physical_region_id = parse_physical_region_id(&request)?;
574 result
575 .entry(physical_region_id)
576 .or_insert_with(Vec::new)
577 .push((region_id, request));
578 }
579
580 Ok(result)
581}
582
583fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
585 let physical_region_id_raw = request
586 .options
587 .get(LOGICAL_TABLE_METADATA_KEY)
588 .ok_or(MissingRegionOptionSnafu {}.build())?;
589
590 let physical_region_id: RegionId = physical_region_id_raw
591 .parse::<u64>()
592 .with_context(|_| ParseRegionIdSnafu {
593 raw: physical_region_id_raw,
594 })?
595 .into();
596
597 Ok(physical_region_id)
598}
599
600pub(crate) fn region_options_for_metadata_region(
602 original: &HashMap<String, String>,
603) -> HashMap<String, String> {
604 let mut metadata_region_options = HashMap::new();
605 metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
606
607 if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
608 metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.to_string());
609 }
610
611 metadata_region_options
612}
613
614#[cfg(test)]
615mod test {
616 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
617
618 use super::*;
619 use crate::config::EngineConfig;
620 use crate::engine::MetricEngine;
621 use crate::test_util::TestEnv;
622
623 #[test]
624 fn test_verify_region_create_request() {
625 let request = RegionCreateRequest {
627 column_metadatas: vec![
628 ColumnMetadata {
629 column_id: 0,
630 semantic_type: SemanticType::Timestamp,
631 column_schema: ColumnSchema::new(
632 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
633 ConcreteDataType::timestamp_millisecond_datatype(),
634 false,
635 ),
636 },
637 ColumnMetadata {
638 column_id: 1,
639 semantic_type: SemanticType::Tag,
640 column_schema: ColumnSchema::new(
641 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
642 ConcreteDataType::uint32_datatype(),
643 false,
644 ),
645 },
646 ],
647 region_dir: "test_dir".to_string(),
648 engine: METRIC_ENGINE_NAME.to_string(),
649 primary_key: vec![],
650 options: HashMap::new(),
651 };
652 let result = MetricEngineInner::verify_region_create_request(&request);
653 assert!(result.is_err());
654 assert_eq!(
655 result.unwrap_err().to_string(),
656 "Internal column __table_id is reserved".to_string()
657 );
658
659 let request = RegionCreateRequest {
661 column_metadatas: vec![
662 ColumnMetadata {
663 column_id: 0,
664 semantic_type: SemanticType::Timestamp,
665 column_schema: ColumnSchema::new(
666 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
667 ConcreteDataType::timestamp_millisecond_datatype(),
668 false,
669 ),
670 },
671 ColumnMetadata {
672 column_id: 1,
673 semantic_type: SemanticType::Tag,
674 column_schema: ColumnSchema::new(
675 "column1".to_string(),
676 ConcreteDataType::string_datatype(),
677 false,
678 ),
679 },
680 ColumnMetadata {
681 column_id: 2,
682 semantic_type: SemanticType::Field,
683 column_schema: ColumnSchema::new(
684 "column2".to_string(),
685 ConcreteDataType::float64_datatype(),
686 false,
687 ),
688 },
689 ],
690 region_dir: "test_dir".to_string(),
691 engine: METRIC_ENGINE_NAME.to_string(),
692 primary_key: vec![],
693 options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
694 .into_iter()
695 .collect(),
696 };
697 MetricEngineInner::verify_region_create_request(&request).unwrap();
698 }
699
700 #[test]
701 fn test_verify_region_create_request_options() {
702 let mut request = RegionCreateRequest {
703 column_metadatas: vec![
704 ColumnMetadata {
705 column_id: 0,
706 semantic_type: SemanticType::Timestamp,
707 column_schema: ColumnSchema::new(
708 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
709 ConcreteDataType::timestamp_millisecond_datatype(),
710 false,
711 ),
712 },
713 ColumnMetadata {
714 column_id: 1,
715 semantic_type: SemanticType::Field,
716 column_schema: ColumnSchema::new(
717 "val".to_string(),
718 ConcreteDataType::float64_datatype(),
719 false,
720 ),
721 },
722 ],
723 region_dir: "test_dir".to_string(),
724 engine: METRIC_ENGINE_NAME.to_string(),
725 primary_key: vec![],
726 options: HashMap::new(),
727 };
728 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
729
730 let mut options = HashMap::new();
731 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
732 request.options.clone_from(&options);
733 MetricEngineInner::verify_region_create_request(&request).unwrap();
734
735 options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
736 request.options.clone_from(&options);
737 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
738
739 options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
740 request.options = options;
741 MetricEngineInner::verify_region_create_request(&request).unwrap();
742 }
743
744 #[tokio::test]
745 async fn test_create_request_for_physical_regions() {
746 let options: HashMap<_, _> = [
748 ("ttl".to_string(), "60m".to_string()),
749 ("skip_wal".to_string(), "true".to_string()),
750 ]
751 .into_iter()
752 .collect();
753 let request = RegionCreateRequest {
754 engine: METRIC_ENGINE_NAME.to_string(),
755 column_metadatas: vec![
756 ColumnMetadata {
757 column_id: 0,
758 semantic_type: SemanticType::Timestamp,
759 column_schema: ColumnSchema::new(
760 "timestamp",
761 ConcreteDataType::timestamp_millisecond_datatype(),
762 false,
763 ),
764 },
765 ColumnMetadata {
766 column_id: 1,
767 semantic_type: SemanticType::Tag,
768 column_schema: ColumnSchema::new(
769 "tag",
770 ConcreteDataType::string_datatype(),
771 false,
772 ),
773 },
774 ],
775 primary_key: vec![0],
776 options,
777 region_dir: "/test_dir".to_string(),
778 };
779
780 let env = TestEnv::new().await;
782 let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
783 let engine_inner = engine.inner;
784
785 let data_region_request = engine_inner.create_request_for_data_region(&request);
787 assert_eq!(
788 data_region_request.region_dir,
789 "/test_dir/data/".to_string()
790 );
791 assert_eq!(data_region_request.column_metadatas.len(), 4);
792 assert_eq!(
793 data_region_request.primary_key,
794 vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
795 );
796 assert!(data_region_request.options.contains_key("ttl"));
797
798 let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
800 assert_eq!(
801 metadata_region_request.region_dir,
802 "/test_dir/metadata/".to_string()
803 );
804 assert_eq!(
805 metadata_region_request.options.get("ttl").unwrap(),
806 "forever"
807 );
808 assert!(!metadata_region_request.options.contains_key("skip_wal"));
809 }
810}