1mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30 ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31 DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33 METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34 METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{
37 APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
38};
39use store_api::region_engine::RegionEngine;
40use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
41use store_api::storage::consts::ReservedColumnId;
42use store_api::storage::RegionId;
43
44use crate::engine::create::extract_new_columns::extract_new_columns;
45use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
46use crate::engine::MetricEngineInner;
47use crate::error::{
48 ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
49 InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
50 MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
51 Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
52};
53use crate::metrics::PHYSICAL_REGION_COUNT;
54use crate::utils::{
55 self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
56 to_metadata_region_id,
57};
58
59const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
60
61impl MetricEngineInner {
62 pub async fn create_regions(
63 &self,
64 mut requests: Vec<(RegionId, RegionCreateRequest)>,
65 extension_return_value: &mut HashMap<String, Vec<u8>>,
66 ) -> Result<AffectedRows> {
67 if requests.is_empty() {
68 return Ok(0);
69 }
70
71 for (_, request) in requests.iter() {
72 Self::verify_region_create_request(request)?;
73 }
74
75 let first_request = &requests.first().unwrap().1;
76 if first_request.is_physical_table() {
77 ensure!(
78 requests.len() == 1,
79 UnexpectedRequestSnafu {
80 reason: "Physical table must be created with single request".to_string(),
81 }
82 );
83 let (region_id, request) = requests.pop().unwrap();
84 self.create_physical_region(region_id, request).await?;
85
86 return Ok(0);
87 } else if first_request
88 .options
89 .contains_key(LOGICAL_TABLE_METADATA_KEY)
90 {
91 if requests.len() == 1 {
92 let request = &requests.first().unwrap().1;
93 let physical_region_id = parse_physical_region_id(request)?;
94 let mut manifest_infos = Vec::with_capacity(1);
95 self.create_logical_regions(physical_region_id, requests, extension_return_value)
96 .await?;
97 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
98 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
99 } else {
100 let grouped_requests =
101 group_create_logical_region_requests_by_physical_region_id(requests)?;
102 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
103 for (physical_region_id, requests) in grouped_requests {
104 self.create_logical_regions(
105 physical_region_id,
106 requests,
107 extension_return_value,
108 )
109 .await?;
110 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
111 }
112 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
113 }
114 } else {
115 return MissingRegionOptionSnafu {}.fail();
116 }
117
118 Ok(0)
119 }
120
121 async fn create_physical_region(
123 &self,
124 region_id: RegionId,
125 request: RegionCreateRequest,
126 ) -> Result<()> {
127 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128 let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130 let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132 self.mito
133 .handle_request(
134 metadata_region_id,
135 RegionRequest::Create(create_metadata_region_request),
136 )
137 .await
138 .with_context(|_| CreateMitoRegionSnafu {
139 region_type: METADATA_REGION_SUBDIR,
140 })?;
141
142 let create_data_region_request = self.create_request_for_data_region(&request);
144 let physical_columns = create_data_region_request
145 .column_metadatas
146 .iter()
147 .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148 .collect::<HashMap<_, _>>();
149 let time_index_unit = create_data_region_request
150 .column_metadatas
151 .iter()
152 .find_map(|metadata| {
153 if metadata.semantic_type == SemanticType::Timestamp {
154 metadata
155 .column_schema
156 .data_type
157 .as_timestamp()
158 .map(|data_type| data_type.unit())
159 } else {
160 None
161 }
162 })
163 .context(UnexpectedRequestSnafu {
164 reason: "No time index column found",
165 })?;
166 self.mito
167 .handle_request(
168 data_region_id,
169 RegionRequest::Create(create_data_region_request),
170 )
171 .await
172 .with_context(|_| CreateMitoRegionSnafu {
173 region_type: DATA_REGION_SUBDIR,
174 })?;
175 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
176 PhysicalRegionNotFoundSnafu {
177 region_id: data_region_id,
178 },
179 )?;
180
181 info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
182 PHYSICAL_REGION_COUNT.inc();
183
184 self.state.write().unwrap().add_physical_region(
186 data_region_id,
187 physical_columns,
188 primary_key_encoding,
189 physical_region_options,
190 time_index_unit,
191 );
192
193 Ok(())
194 }
195
196 async fn create_logical_regions(
198 &self,
199 physical_region_id: RegionId,
200 requests: Vec<(RegionId, RegionCreateRequest)>,
201 extension_return_value: &mut HashMap<String, Vec<u8>>,
202 ) -> Result<()> {
203 let data_region_id = utils::to_data_region_id(physical_region_id);
204
205 let unit = self
206 .state
207 .read()
208 .unwrap()
209 .physical_region_time_index_unit(physical_region_id)
210 .context(PhysicalRegionNotFoundSnafu {
211 region_id: data_region_id,
212 })?;
213 for (_, request) in &requests {
215 let time_index_column = request
217 .column_metadatas
218 .iter()
219 .find(|col| col.semantic_type == SemanticType::Timestamp)
220 .unwrap();
221 let request_unit = time_index_column
222 .column_schema
223 .data_type
224 .as_timestamp()
225 .unwrap()
226 .unit();
227 ensure!(
228 request_unit == unit,
229 UnexpectedRequestSnafu {
230 reason: format!(
231 "Metric has differenttime unit ({:?}) than the physical region ({:?})",
232 request_unit, unit
233 ),
234 }
235 );
236 }
237
238 let requests = {
240 let state = self.state.read().unwrap();
241 let mut skipped = Vec::with_capacity(requests.len());
242 let mut kept_requests = Vec::with_capacity(requests.len());
243
244 for (region_id, request) in requests {
245 if state.is_logical_region_exist(region_id) {
246 skipped.push(region_id);
247 } else {
248 kept_requests.push((region_id, request));
249 }
250 }
251
252 if !skipped.is_empty() {
254 info!(
255 "Skipped creating logical regions {skipped:?} because they already exist",
256 skipped = skipped
257 );
258 }
259 kept_requests
260 };
261
262 let mut new_column_names = HashSet::new();
264 let mut new_columns = Vec::new();
265
266 let index_option = {
267 let state = &self.state.read().unwrap();
268 let region_state = state
269 .physical_region_states()
270 .get(&data_region_id)
271 .with_context(|| PhysicalRegionNotFoundSnafu {
272 region_id: data_region_id,
273 })?;
274 let physical_columns = region_state.physical_columns();
275
276 extract_new_columns(
277 &requests,
278 physical_columns,
279 &mut new_column_names,
280 &mut new_columns,
281 )?;
282 region_state.options().index
283 };
284
285 self.data_region
287 .add_columns(data_region_id, new_columns, index_option)
288 .await?;
289
290 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
291 let physical_schema_map = physical_columns
292 .iter()
293 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
294 .collect::<HashMap<_, _>>();
295 let logical_regions = requests
296 .iter()
297 .map(|(region_id, _)| (*region_id))
298 .collect::<Vec<_>>();
299 let logical_region_columns = requests.iter().map(|(region_id, request)| {
300 (
301 *region_id,
302 request
303 .column_metadatas
304 .iter()
305 .map(|metadata| {
306 let column_metadata = *physical_schema_map
308 .get(metadata.column_schema.name.as_str())
309 .unwrap();
310 (metadata.column_schema.name.as_str(), column_metadata)
311 })
312 .collect::<HashMap<_, _>>(),
313 )
314 });
315
316 let new_add_columns = new_column_names.iter().map(|name| {
317 let column_metadata = *physical_schema_map.get(name).unwrap();
319 (name.to_string(), column_metadata.column_id)
320 });
321
322 extension_return_value.insert(
323 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
324 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
325 );
326
327 self.metadata_region
329 .add_logical_regions(physical_region_id, true, logical_region_columns)
330 .await?;
331
332 {
333 let mut state = self.state.write().unwrap();
334 state.add_physical_columns(data_region_id, new_add_columns);
335 state.add_logical_regions(physical_region_id, logical_regions.clone());
336 }
337 for logical_region_id in logical_regions {
338 self.metadata_region
339 .open_logical_region(logical_region_id)
340 .await;
341 }
342
343 Ok(())
344 }
345
346 fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
351 request.validate().context(InvalidMetadataSnafu)?;
352
353 let name_to_index = request
354 .column_metadatas
355 .iter()
356 .enumerate()
357 .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
358 .collect::<HashMap<String, usize>>();
359
360 ensure!(
362 !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
363 InternalColumnOccupiedSnafu {
364 column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
365 }
366 );
367 ensure!(
368 !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
369 InternalColumnOccupiedSnafu {
370 column: DATA_SCHEMA_TSID_COLUMN_NAME,
371 }
372 );
373
374 ensure!(
376 request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
377 MissingRegionOptionSnafu {}
378 );
379 ensure!(
380 !(request.is_physical_table()
381 && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
382 ConflictRegionOptionSnafu {}
383 );
384
385 let mut field_col: Option<&ColumnMetadata> = None;
387 for col in &request.column_metadatas {
388 match col.semantic_type {
389 SemanticType::Tag => ensure!(
390 col.column_schema.data_type == ConcreteDataType::string_datatype(),
391 ColumnTypeMismatchSnafu {
392 expect: ConcreteDataType::string_datatype(),
393 actual: col.column_schema.data_type.clone(),
394 }
395 ),
396 SemanticType::Field => {
397 if field_col.is_some() {
398 MultipleFieldColumnSnafu {
399 previous: field_col.unwrap().column_schema.name.clone(),
400 current: col.column_schema.name.clone(),
401 }
402 .fail()?;
403 }
404 field_col = Some(col)
405 }
406 SemanticType::Timestamp => {}
407 }
408 }
409 let field_col = field_col.context(NoFieldColumnSnafu)?;
410
411 ensure!(
413 field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
414 ColumnTypeMismatchSnafu {
415 expect: ConcreteDataType::float64_datatype(),
416 actual: field_col.column_schema.data_type.clone(),
417 }
418 );
419
420 Ok(())
421 }
422
423 fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
427 (
428 to_data_region_id(region_id),
429 to_metadata_region_id(region_id),
430 )
431 }
432
433 pub fn create_request_for_metadata_region(
437 &self,
438 request: &RegionCreateRequest,
439 ) -> RegionCreateRequest {
440 let timestamp_column_metadata = ColumnMetadata {
442 column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
443 semantic_type: SemanticType::Timestamp,
444 column_schema: ColumnSchema::new(
445 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
446 ConcreteDataType::timestamp_millisecond_datatype(),
447 false,
448 )
449 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
450 Value::Timestamp(Timestamp::new_millisecond(0)),
451 )))
452 .unwrap(),
453 };
454 let key_column_metadata = ColumnMetadata {
456 column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
457 semantic_type: SemanticType::Tag,
458 column_schema: ColumnSchema::new(
459 METADATA_SCHEMA_KEY_COLUMN_NAME,
460 ConcreteDataType::string_datatype(),
461 false,
462 ),
463 };
464 let value_column_metadata = ColumnMetadata {
466 column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
467 semantic_type: SemanticType::Field,
468 column_schema: ColumnSchema::new(
469 METADATA_SCHEMA_VALUE_COLUMN_NAME,
470 ConcreteDataType::string_datatype(),
471 true,
472 ),
473 };
474
475 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
477
478 let options = region_options_for_metadata_region(request.options.clone());
479 RegionCreateRequest {
480 engine: MITO_ENGINE_NAME.to_string(),
481 column_metadatas: vec![
482 timestamp_column_metadata,
483 key_column_metadata,
484 value_column_metadata,
485 ],
486 primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
487 options,
488 region_dir: metadata_region_dir,
489 }
490 }
491
492 pub fn create_request_for_data_region(
499 &self,
500 request: &RegionCreateRequest,
501 ) -> RegionCreateRequest {
502 let mut data_region_request = request.clone();
503 let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
504
505 data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
507
508 data_region_request
510 .column_metadatas
511 .iter_mut()
512 .for_each(|metadata| {
513 if metadata.semantic_type == SemanticType::Tag {
514 metadata.column_schema.set_nullable();
515 primary_key.push(metadata.column_id);
516 }
517 });
518
519 let [table_id_col, tsid_col] = Self::internal_column_metadata();
521 data_region_request.column_metadatas.push(table_id_col);
522 data_region_request.column_metadatas.push(tsid_col);
523 data_region_request.primary_key = primary_key;
524
525 set_data_region_options(
527 &mut data_region_request.options,
528 self.config.experimental_sparse_primary_key_encoding,
529 );
530
531 data_region_request
532 }
533
534 fn internal_column_metadata() -> [ColumnMetadata; 2] {
538 let metric_name_col = ColumnMetadata {
540 column_id: ReservedColumnId::table_id(),
541 semantic_type: SemanticType::Tag,
542 column_schema: ColumnSchema::new(
543 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
544 ConcreteDataType::uint32_datatype(),
545 false,
546 )
547 .with_skipping_options(SkippingIndexOptions {
548 granularity: DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
549 index_type: datatypes::schema::SkippingIndexType::BloomFilter,
550 })
551 .unwrap(),
552 };
553 let tsid_col = ColumnMetadata {
554 column_id: ReservedColumnId::tsid(),
555 semantic_type: SemanticType::Tag,
556 column_schema: ColumnSchema::new(
557 DATA_SCHEMA_TSID_COLUMN_NAME,
558 ConcreteDataType::uint64_datatype(),
559 false,
560 )
561 .with_inverted_index(false),
562 };
563 [metric_name_col, tsid_col]
564 }
565}
566
567fn group_create_logical_region_requests_by_physical_region_id(
569 requests: Vec<(RegionId, RegionCreateRequest)>,
570) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
571 let mut result = HashMap::with_capacity(requests.len());
572 for (region_id, request) in requests {
573 let physical_region_id = parse_physical_region_id(&request)?;
574 result
575 .entry(physical_region_id)
576 .or_insert_with(Vec::new)
577 .push((region_id, request));
578 }
579
580 Ok(result)
581}
582
583fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
585 let physical_region_id_raw = request
586 .options
587 .get(LOGICAL_TABLE_METADATA_KEY)
588 .ok_or(MissingRegionOptionSnafu {}.build())?;
589
590 let physical_region_id: RegionId = physical_region_id_raw
591 .parse::<u64>()
592 .with_context(|_| ParseRegionIdSnafu {
593 raw: physical_region_id_raw,
594 })?
595 .into();
596
597 Ok(physical_region_id)
598}
599
600pub(crate) fn region_options_for_metadata_region(
602 mut original: HashMap<String, String>,
603) -> HashMap<String, String> {
604 original.remove(APPEND_MODE_KEY);
606 original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
608 original.insert(TTL_KEY.to_string(), FOREVER.to_string());
609 original.remove(SKIP_WAL_KEY);
610 original
611}
612
613#[cfg(test)]
614mod test {
615 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
616
617 use super::*;
618 use crate::config::EngineConfig;
619 use crate::engine::MetricEngine;
620 use crate::test_util::TestEnv;
621
622 #[test]
623 fn test_verify_region_create_request() {
624 let request = RegionCreateRequest {
626 column_metadatas: vec![
627 ColumnMetadata {
628 column_id: 0,
629 semantic_type: SemanticType::Timestamp,
630 column_schema: ColumnSchema::new(
631 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
632 ConcreteDataType::timestamp_millisecond_datatype(),
633 false,
634 ),
635 },
636 ColumnMetadata {
637 column_id: 1,
638 semantic_type: SemanticType::Tag,
639 column_schema: ColumnSchema::new(
640 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
641 ConcreteDataType::uint32_datatype(),
642 false,
643 ),
644 },
645 ],
646 region_dir: "test_dir".to_string(),
647 engine: METRIC_ENGINE_NAME.to_string(),
648 primary_key: vec![],
649 options: HashMap::new(),
650 };
651 let result = MetricEngineInner::verify_region_create_request(&request);
652 assert!(result.is_err());
653 assert_eq!(
654 result.unwrap_err().to_string(),
655 "Internal column __table_id is reserved".to_string()
656 );
657
658 let request = RegionCreateRequest {
660 column_metadatas: vec![
661 ColumnMetadata {
662 column_id: 0,
663 semantic_type: SemanticType::Timestamp,
664 column_schema: ColumnSchema::new(
665 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
666 ConcreteDataType::timestamp_millisecond_datatype(),
667 false,
668 ),
669 },
670 ColumnMetadata {
671 column_id: 1,
672 semantic_type: SemanticType::Tag,
673 column_schema: ColumnSchema::new(
674 "column1".to_string(),
675 ConcreteDataType::string_datatype(),
676 false,
677 ),
678 },
679 ColumnMetadata {
680 column_id: 2,
681 semantic_type: SemanticType::Field,
682 column_schema: ColumnSchema::new(
683 "column2".to_string(),
684 ConcreteDataType::float64_datatype(),
685 false,
686 ),
687 },
688 ],
689 region_dir: "test_dir".to_string(),
690 engine: METRIC_ENGINE_NAME.to_string(),
691 primary_key: vec![],
692 options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
693 .into_iter()
694 .collect(),
695 };
696 MetricEngineInner::verify_region_create_request(&request).unwrap();
697 }
698
699 #[test]
700 fn test_verify_region_create_request_options() {
701 let mut request = RegionCreateRequest {
702 column_metadatas: vec![
703 ColumnMetadata {
704 column_id: 0,
705 semantic_type: SemanticType::Timestamp,
706 column_schema: ColumnSchema::new(
707 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
708 ConcreteDataType::timestamp_millisecond_datatype(),
709 false,
710 ),
711 },
712 ColumnMetadata {
713 column_id: 1,
714 semantic_type: SemanticType::Field,
715 column_schema: ColumnSchema::new(
716 "val".to_string(),
717 ConcreteDataType::float64_datatype(),
718 false,
719 ),
720 },
721 ],
722 region_dir: "test_dir".to_string(),
723 engine: METRIC_ENGINE_NAME.to_string(),
724 primary_key: vec![],
725 options: HashMap::new(),
726 };
727 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
728
729 let mut options = HashMap::new();
730 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
731 request.options.clone_from(&options);
732 MetricEngineInner::verify_region_create_request(&request).unwrap();
733
734 options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
735 request.options.clone_from(&options);
736 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
737
738 options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
739 request.options = options;
740 MetricEngineInner::verify_region_create_request(&request).unwrap();
741 }
742
743 #[tokio::test]
744 async fn test_create_request_for_physical_regions() {
745 let options: HashMap<_, _> = [
747 ("ttl".to_string(), "60m".to_string()),
748 ("skip_wal".to_string(), "true".to_string()),
749 ]
750 .into_iter()
751 .collect();
752 let request = RegionCreateRequest {
753 engine: METRIC_ENGINE_NAME.to_string(),
754 column_metadatas: vec![
755 ColumnMetadata {
756 column_id: 0,
757 semantic_type: SemanticType::Timestamp,
758 column_schema: ColumnSchema::new(
759 "timestamp",
760 ConcreteDataType::timestamp_millisecond_datatype(),
761 false,
762 ),
763 },
764 ColumnMetadata {
765 column_id: 1,
766 semantic_type: SemanticType::Tag,
767 column_schema: ColumnSchema::new(
768 "tag",
769 ConcreteDataType::string_datatype(),
770 false,
771 ),
772 },
773 ],
774 primary_key: vec![0],
775 options,
776 region_dir: "/test_dir".to_string(),
777 };
778
779 let env = TestEnv::new().await;
781 let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
782 let engine_inner = engine.inner;
783
784 let data_region_request = engine_inner.create_request_for_data_region(&request);
786 assert_eq!(
787 data_region_request.region_dir,
788 "/test_dir/data/".to_string()
789 );
790 assert_eq!(data_region_request.column_metadatas.len(), 4);
791 assert_eq!(
792 data_region_request.primary_key,
793 vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
794 );
795 assert!(data_region_request.options.contains_key("ttl"));
796
797 let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
799 assert_eq!(
800 metadata_region_request.region_dir,
801 "/test_dir/metadata/".to_string()
802 );
803 assert_eq!(
804 metadata_region_request.options.get("ttl").unwrap(),
805 "forever"
806 );
807 assert!(!metadata_region_request.options.contains_key("skip_wal"));
808 }
809}