1mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use snafu::{ensure, OptionExt, ResultExt};
27use store_api::metadata::ColumnMetadata;
28use store_api::metric_engine_consts::{
29 ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
30 DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
31 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
32 METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
33 METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
34};
35use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{AffectedRows, PathType, RegionCreateRequest, RegionRequest};
38use store_api::storage::consts::ReservedColumnId;
39use store_api::storage::RegionId;
40
41use crate::engine::create::extract_new_columns::extract_new_columns;
42use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
43use crate::engine::MetricEngineInner;
44use crate::error::{
45 ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
46 InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
47 MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
48 Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
49};
50use crate::metrics::PHYSICAL_REGION_COUNT;
51use crate::utils::{
52 self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
53 to_metadata_region_id,
54};
55
56const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
58
59impl MetricEngineInner {
60 pub async fn create_regions(
61 &self,
62 mut requests: Vec<(RegionId, RegionCreateRequest)>,
63 extension_return_value: &mut HashMap<String, Vec<u8>>,
64 ) -> Result<AffectedRows> {
65 if requests.is_empty() {
66 return Ok(0);
67 }
68
69 for (_, request) in requests.iter() {
70 Self::verify_region_create_request(request)?;
71 }
72
73 let first_request = &requests.first().unwrap().1;
74 if first_request.is_physical_table() {
75 ensure!(
76 requests.len() == 1,
77 UnexpectedRequestSnafu {
78 reason: "Physical table must be created with single request".to_string(),
79 }
80 );
81 let (region_id, request) = requests.pop().unwrap();
82 self.create_physical_region(region_id, request, extension_return_value)
83 .await?;
84
85 return Ok(0);
86 } else if first_request
87 .options
88 .contains_key(LOGICAL_TABLE_METADATA_KEY)
89 {
90 if requests.len() == 1 {
91 let request = &requests.first().unwrap().1;
92 let physical_region_id = parse_physical_region_id(request)?;
93 let mut manifest_infos = Vec::with_capacity(1);
94 self.create_logical_regions(physical_region_id, requests, extension_return_value)
95 .await?;
96 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
97 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
98 } else {
99 let grouped_requests =
100 group_create_logical_region_requests_by_physical_region_id(requests)?;
101 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
102 for (physical_region_id, requests) in grouped_requests {
103 self.create_logical_regions(
104 physical_region_id,
105 requests,
106 extension_return_value,
107 )
108 .await?;
109 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
110 }
111 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
112 }
113 } else {
114 return MissingRegionOptionSnafu {}.fail();
115 }
116
117 Ok(0)
118 }
119
120 async fn create_physical_region(
122 &self,
123 region_id: RegionId,
124 request: RegionCreateRequest,
125 extension_return_value: &mut HashMap<String, Vec<u8>>,
126 ) -> Result<()> {
127 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128 let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130 let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132 self.mito
133 .handle_request(
134 metadata_region_id,
135 RegionRequest::Create(create_metadata_region_request),
136 )
137 .await
138 .with_context(|_| CreateMitoRegionSnafu {
139 region_type: METADATA_REGION_SUBDIR,
140 })?;
141
142 let create_data_region_request = self.create_request_for_data_region(&request);
144 let physical_columns = create_data_region_request
145 .column_metadatas
146 .iter()
147 .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148 .collect::<HashMap<_, _>>();
149 let time_index_unit = create_data_region_request
150 .column_metadatas
151 .iter()
152 .find_map(|metadata| {
153 if metadata.semantic_type == SemanticType::Timestamp {
154 metadata
155 .column_schema
156 .data_type
157 .as_timestamp()
158 .map(|data_type| data_type.unit())
159 } else {
160 None
161 }
162 })
163 .context(UnexpectedRequestSnafu {
164 reason: "No time index column found",
165 })?;
166 let response = self
167 .mito
168 .handle_request(
169 data_region_id,
170 RegionRequest::Create(create_data_region_request),
171 )
172 .await
173 .with_context(|_| CreateMitoRegionSnafu {
174 region_type: DATA_REGION_SUBDIR,
175 })?;
176 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
177 PhysicalRegionNotFoundSnafu {
178 region_id: data_region_id,
179 },
180 )?;
181 extension_return_value.extend(response.extensions);
182
183 info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
184 PHYSICAL_REGION_COUNT.inc();
185
186 self.state.write().unwrap().add_physical_region(
188 data_region_id,
189 physical_columns,
190 primary_key_encoding,
191 physical_region_options,
192 time_index_unit,
193 );
194
195 Ok(())
196 }
197
198 async fn create_logical_regions(
200 &self,
201 physical_region_id: RegionId,
202 requests: Vec<(RegionId, RegionCreateRequest)>,
203 extension_return_value: &mut HashMap<String, Vec<u8>>,
204 ) -> Result<()> {
205 let data_region_id = utils::to_data_region_id(physical_region_id);
206
207 let unit = self
208 .state
209 .read()
210 .unwrap()
211 .physical_region_time_index_unit(physical_region_id)
212 .context(PhysicalRegionNotFoundSnafu {
213 region_id: data_region_id,
214 })?;
215 for (_, request) in &requests {
217 let time_index_column = request
219 .column_metadatas
220 .iter()
221 .find(|col| col.semantic_type == SemanticType::Timestamp)
222 .unwrap();
223 let request_unit = time_index_column
224 .column_schema
225 .data_type
226 .as_timestamp()
227 .unwrap()
228 .unit();
229 ensure!(
230 request_unit == unit,
231 UnexpectedRequestSnafu {
232 reason: format!(
233 "Metric has differenttime unit ({:?}) than the physical region ({:?})",
234 request_unit, unit
235 ),
236 }
237 );
238 }
239
240 let requests = {
242 let state = self.state.read().unwrap();
243 let mut skipped = Vec::with_capacity(requests.len());
244 let mut kept_requests = Vec::with_capacity(requests.len());
245
246 for (region_id, request) in requests {
247 if state.is_logical_region_exist(region_id) {
248 skipped.push(region_id);
249 } else {
250 kept_requests.push((region_id, request));
251 }
252 }
253
254 if !skipped.is_empty() {
256 info!(
257 "Skipped creating logical regions {skipped:?} because they already exist",
258 skipped = skipped
259 );
260 }
261 kept_requests
262 };
263
264 let mut new_column_names = HashSet::new();
266 let mut new_columns = Vec::new();
267
268 let index_option = {
269 let state = &self.state.read().unwrap();
270 let region_state = state
271 .physical_region_states()
272 .get(&data_region_id)
273 .with_context(|| PhysicalRegionNotFoundSnafu {
274 region_id: data_region_id,
275 })?;
276 let physical_columns = region_state.physical_columns();
277
278 extract_new_columns(
279 &requests,
280 physical_columns,
281 &mut new_column_names,
282 &mut new_columns,
283 )?;
284 region_state.options().index
285 };
286
287 self.data_region
289 .add_columns(data_region_id, new_columns, index_option)
290 .await?;
291
292 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
293 let physical_schema_map = physical_columns
294 .iter()
295 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
296 .collect::<HashMap<_, _>>();
297 let logical_regions = requests
298 .iter()
299 .map(|(region_id, _)| (*region_id))
300 .collect::<Vec<_>>();
301 let logical_region_columns = requests.iter().map(|(region_id, request)| {
302 (
303 *region_id,
304 request
305 .column_metadatas
306 .iter()
307 .map(|metadata| {
308 let column_metadata = *physical_schema_map
310 .get(metadata.column_schema.name.as_str())
311 .unwrap();
312 (metadata.column_schema.name.as_str(), column_metadata)
313 })
314 .collect::<HashMap<_, _>>(),
315 )
316 });
317
318 let new_add_columns = new_column_names.iter().map(|name| {
319 let column_metadata = *physical_schema_map.get(name).unwrap();
321 (name.to_string(), column_metadata.column_id)
322 });
323
324 extension_return_value.insert(
325 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
326 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
327 );
328
329 self.metadata_region
331 .add_logical_regions(physical_region_id, true, logical_region_columns)
332 .await?;
333
334 {
335 let mut state = self.state.write().unwrap();
336 state.add_physical_columns(data_region_id, new_add_columns);
337 state.add_logical_regions(physical_region_id, logical_regions.clone());
338 }
339 for logical_region_id in logical_regions {
340 self.metadata_region
341 .open_logical_region(logical_region_id)
342 .await;
343 }
344
345 Ok(())
346 }
347
348 fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
353 request.validate().context(InvalidMetadataSnafu)?;
354
355 let name_to_index = request
356 .column_metadatas
357 .iter()
358 .enumerate()
359 .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
360 .collect::<HashMap<String, usize>>();
361
362 ensure!(
364 !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
365 InternalColumnOccupiedSnafu {
366 column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
367 }
368 );
369 ensure!(
370 !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
371 InternalColumnOccupiedSnafu {
372 column: DATA_SCHEMA_TSID_COLUMN_NAME,
373 }
374 );
375
376 ensure!(
378 request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
379 MissingRegionOptionSnafu {}
380 );
381 ensure!(
382 !(request.is_physical_table()
383 && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
384 ConflictRegionOptionSnafu {}
385 );
386
387 let mut field_col: Option<&ColumnMetadata> = None;
389 for col in &request.column_metadatas {
390 match col.semantic_type {
391 SemanticType::Tag => ensure!(
392 col.column_schema.data_type == ConcreteDataType::string_datatype(),
393 ColumnTypeMismatchSnafu {
394 expect: ConcreteDataType::string_datatype(),
395 actual: col.column_schema.data_type.clone(),
396 }
397 ),
398 SemanticType::Field => {
399 if field_col.is_some() {
400 MultipleFieldColumnSnafu {
401 previous: field_col.unwrap().column_schema.name.clone(),
402 current: col.column_schema.name.clone(),
403 }
404 .fail()?;
405 }
406 field_col = Some(col)
407 }
408 SemanticType::Timestamp => {}
409 }
410 }
411 let field_col = field_col.context(NoFieldColumnSnafu)?;
412
413 ensure!(
415 field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
416 ColumnTypeMismatchSnafu {
417 expect: ConcreteDataType::float64_datatype(),
418 actual: field_col.column_schema.data_type.clone(),
419 }
420 );
421
422 Ok(())
423 }
424
425 fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
429 (
430 to_data_region_id(region_id),
431 to_metadata_region_id(region_id),
432 )
433 }
434
435 pub fn create_request_for_metadata_region(
439 &self,
440 request: &RegionCreateRequest,
441 ) -> RegionCreateRequest {
442 let timestamp_column_metadata = ColumnMetadata {
444 column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
445 semantic_type: SemanticType::Timestamp,
446 column_schema: ColumnSchema::new(
447 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
448 ConcreteDataType::timestamp_millisecond_datatype(),
449 false,
450 )
451 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
452 Value::Timestamp(Timestamp::new_millisecond(0)),
453 )))
454 .unwrap(),
455 };
456 let key_column_metadata = ColumnMetadata {
458 column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
459 semantic_type: SemanticType::Tag,
460 column_schema: ColumnSchema::new(
461 METADATA_SCHEMA_KEY_COLUMN_NAME,
462 ConcreteDataType::string_datatype(),
463 false,
464 ),
465 };
466 let value_column_metadata = ColumnMetadata {
468 column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
469 semantic_type: SemanticType::Field,
470 column_schema: ColumnSchema::new(
471 METADATA_SCHEMA_VALUE_COLUMN_NAME,
472 ConcreteDataType::string_datatype(),
473 true,
474 ),
475 };
476
477 let options = region_options_for_metadata_region(&request.options);
478 RegionCreateRequest {
479 engine: MITO_ENGINE_NAME.to_string(),
480 column_metadatas: vec![
481 timestamp_column_metadata,
482 key_column_metadata,
483 value_column_metadata,
484 ],
485 primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
486 options,
487 table_dir: request.table_dir.clone(),
488 path_type: PathType::Metadata,
489 partition_expr_json: Some("".to_string()),
490 }
491 }
492
493 pub fn create_request_for_data_region(
500 &self,
501 request: &RegionCreateRequest,
502 ) -> RegionCreateRequest {
503 let mut data_region_request = request.clone();
504 let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
505
506 data_region_request.table_dir = request.table_dir.clone();
507 data_region_request.path_type = PathType::Data;
508
509 data_region_request
511 .column_metadatas
512 .iter_mut()
513 .for_each(|metadata| {
514 if metadata.semantic_type == SemanticType::Tag {
515 metadata.column_schema.set_nullable();
516 primary_key.push(metadata.column_id);
517 }
518 });
519
520 let [table_id_col, tsid_col] = Self::internal_column_metadata();
522 data_region_request.column_metadatas.push(table_id_col);
523 data_region_request.column_metadatas.push(tsid_col);
524 data_region_request.primary_key = primary_key;
525
526 set_data_region_options(
528 &mut data_region_request.options,
529 self.config.experimental_sparse_primary_key_encoding,
530 );
531
532 data_region_request
533 }
534
535 fn internal_column_metadata() -> [ColumnMetadata; 2] {
539 let metric_name_col = ColumnMetadata {
541 column_id: ReservedColumnId::table_id(),
542 semantic_type: SemanticType::Tag,
543 column_schema: ColumnSchema::new(
544 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
545 ConcreteDataType::uint32_datatype(),
546 false,
547 )
548 .with_skipping_options(SkippingIndexOptions::new_unchecked(
549 DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
550 DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
551 datatypes::schema::SkippingIndexType::BloomFilter,
552 ))
553 .unwrap(),
554 };
555 let tsid_col = ColumnMetadata {
556 column_id: ReservedColumnId::tsid(),
557 semantic_type: SemanticType::Tag,
558 column_schema: ColumnSchema::new(
559 DATA_SCHEMA_TSID_COLUMN_NAME,
560 ConcreteDataType::uint64_datatype(),
561 false,
562 )
563 .with_inverted_index(false),
564 };
565 [metric_name_col, tsid_col]
566 }
567}
568
569fn group_create_logical_region_requests_by_physical_region_id(
571 requests: Vec<(RegionId, RegionCreateRequest)>,
572) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
573 let mut result = HashMap::with_capacity(requests.len());
574 for (region_id, request) in requests {
575 let physical_region_id = parse_physical_region_id(&request)?;
576 result
577 .entry(physical_region_id)
578 .or_insert_with(Vec::new)
579 .push((region_id, request));
580 }
581
582 Ok(result)
583}
584
585fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
587 let physical_region_id_raw = request
588 .options
589 .get(LOGICAL_TABLE_METADATA_KEY)
590 .ok_or(MissingRegionOptionSnafu {}.build())?;
591
592 let physical_region_id: RegionId = physical_region_id_raw
593 .parse::<u64>()
594 .with_context(|_| ParseRegionIdSnafu {
595 raw: physical_region_id_raw,
596 })?
597 .into();
598
599 Ok(physical_region_id)
600}
601
602pub(crate) fn region_options_for_metadata_region(
604 original: &HashMap<String, String>,
605) -> HashMap<String, String> {
606 let mut metadata_region_options = HashMap::new();
607 metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
608
609 if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
610 metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.to_string());
611 }
612
613 metadata_region_options
614}
615
616#[cfg(test)]
617mod test {
618 use common_meta::ddl::test_util::assert_column_name_and_id;
619 use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
620 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
621 use store_api::region_request::BatchRegionDdlRequest;
622
623 use super::*;
624 use crate::config::EngineConfig;
625 use crate::engine::MetricEngine;
626 use crate::test_util::{create_logical_region_request, TestEnv};
627
628 #[test]
629 fn test_verify_region_create_request() {
630 let request = RegionCreateRequest {
632 column_metadatas: vec![
633 ColumnMetadata {
634 column_id: 0,
635 semantic_type: SemanticType::Timestamp,
636 column_schema: ColumnSchema::new(
637 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
638 ConcreteDataType::timestamp_millisecond_datatype(),
639 false,
640 ),
641 },
642 ColumnMetadata {
643 column_id: 1,
644 semantic_type: SemanticType::Tag,
645 column_schema: ColumnSchema::new(
646 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
647 ConcreteDataType::uint32_datatype(),
648 false,
649 ),
650 },
651 ],
652 table_dir: "test_dir".to_string(),
653 path_type: PathType::Bare,
654 engine: METRIC_ENGINE_NAME.to_string(),
655 primary_key: vec![],
656 options: HashMap::new(),
657 partition_expr_json: Some("".to_string()),
658 };
659 let result = MetricEngineInner::verify_region_create_request(&request);
660 assert!(result.is_err());
661 assert_eq!(
662 result.unwrap_err().to_string(),
663 "Internal column __table_id is reserved".to_string()
664 );
665
666 let request = RegionCreateRequest {
668 column_metadatas: vec![
669 ColumnMetadata {
670 column_id: 0,
671 semantic_type: SemanticType::Timestamp,
672 column_schema: ColumnSchema::new(
673 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
674 ConcreteDataType::timestamp_millisecond_datatype(),
675 false,
676 ),
677 },
678 ColumnMetadata {
679 column_id: 1,
680 semantic_type: SemanticType::Tag,
681 column_schema: ColumnSchema::new(
682 "column1".to_string(),
683 ConcreteDataType::string_datatype(),
684 false,
685 ),
686 },
687 ColumnMetadata {
688 column_id: 2,
689 semantic_type: SemanticType::Field,
690 column_schema: ColumnSchema::new(
691 "column2".to_string(),
692 ConcreteDataType::float64_datatype(),
693 false,
694 ),
695 },
696 ],
697 table_dir: "test_dir".to_string(),
698 path_type: PathType::Bare,
699 engine: METRIC_ENGINE_NAME.to_string(),
700 primary_key: vec![],
701 options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
702 .into_iter()
703 .collect(),
704 partition_expr_json: Some("".to_string()),
705 };
706 MetricEngineInner::verify_region_create_request(&request).unwrap();
707 }
708
709 #[test]
710 fn test_verify_region_create_request_options() {
711 let mut request = RegionCreateRequest {
712 column_metadatas: vec![
713 ColumnMetadata {
714 column_id: 0,
715 semantic_type: SemanticType::Timestamp,
716 column_schema: ColumnSchema::new(
717 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
718 ConcreteDataType::timestamp_millisecond_datatype(),
719 false,
720 ),
721 },
722 ColumnMetadata {
723 column_id: 1,
724 semantic_type: SemanticType::Field,
725 column_schema: ColumnSchema::new(
726 "val".to_string(),
727 ConcreteDataType::float64_datatype(),
728 false,
729 ),
730 },
731 ],
732 table_dir: "test_dir".to_string(),
733 path_type: PathType::Bare,
734 engine: METRIC_ENGINE_NAME.to_string(),
735 primary_key: vec![],
736 options: HashMap::new(),
737 partition_expr_json: Some("".to_string()),
738 };
739 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
740
741 let mut options = HashMap::new();
742 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
743 request.options.clone_from(&options);
744 MetricEngineInner::verify_region_create_request(&request).unwrap();
745
746 options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
747 request.options.clone_from(&options);
748 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
749
750 options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
751 request.options = options;
752 MetricEngineInner::verify_region_create_request(&request).unwrap();
753 }
754
755 #[tokio::test]
756 async fn test_create_request_for_physical_regions() {
757 let options: HashMap<_, _> = [
759 ("ttl".to_string(), "60m".to_string()),
760 ("skip_wal".to_string(), "true".to_string()),
761 ]
762 .into_iter()
763 .collect();
764 let request = RegionCreateRequest {
765 engine: METRIC_ENGINE_NAME.to_string(),
766 column_metadatas: vec![
767 ColumnMetadata {
768 column_id: 0,
769 semantic_type: SemanticType::Timestamp,
770 column_schema: ColumnSchema::new(
771 "timestamp",
772 ConcreteDataType::timestamp_millisecond_datatype(),
773 false,
774 ),
775 },
776 ColumnMetadata {
777 column_id: 1,
778 semantic_type: SemanticType::Tag,
779 column_schema: ColumnSchema::new(
780 "tag",
781 ConcreteDataType::string_datatype(),
782 false,
783 ),
784 },
785 ],
786 primary_key: vec![0],
787 options,
788 table_dir: "/test_dir".to_string(),
789 path_type: PathType::Bare,
790 partition_expr_json: Some("".to_string()),
791 };
792
793 let env = TestEnv::new().await;
795 let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
796 let engine_inner = engine.inner;
797
798 let data_region_request = engine_inner.create_request_for_data_region(&request);
800 assert_eq!(data_region_request.table_dir, "/test_dir".to_string());
801 assert_eq!(data_region_request.path_type, PathType::Data);
802 assert_eq!(data_region_request.column_metadatas.len(), 4);
803 assert_eq!(
804 data_region_request.primary_key,
805 vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
806 );
807 assert!(data_region_request.options.contains_key("ttl"));
808
809 let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
811 assert_eq!(metadata_region_request.table_dir, "/test_dir".to_string());
812 assert_eq!(metadata_region_request.path_type, PathType::Metadata);
813 assert_eq!(
814 metadata_region_request.options.get("ttl").unwrap(),
815 "forever"
816 );
817 assert!(!metadata_region_request.options.contains_key("skip_wal"));
818 }
819
820 #[tokio::test]
821 async fn test_create_logical_regions() {
822 let env = TestEnv::new().await;
823 let engine = env.metric();
824 let physical_region_id1 = RegionId::new(1024, 0);
825 let physical_region_id2 = RegionId::new(1024, 1);
826 let logical_region_id1 = RegionId::new(1025, 0);
827 let logical_region_id2 = RegionId::new(1025, 1);
828 env.create_physical_region(physical_region_id1, "/test_dir1")
829 .await;
830 env.create_physical_region(physical_region_id2, "/test_dir2")
831 .await;
832
833 let region_create_request1 =
834 create_logical_region_request(&["job"], physical_region_id1, "logical1");
835 let region_create_request2 =
836 create_logical_region_request(&["job"], physical_region_id2, "logical2");
837
838 let response = engine
839 .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
840 (logical_region_id1, region_create_request1),
841 (logical_region_id2, region_create_request2),
842 ]))
843 .await
844 .unwrap();
845
846 let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
847 assert_eq!(manifest_infos.len(), 2);
848 let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
849 assert!(region_ids.contains(&physical_region_id1));
850 assert!(region_ids.contains(&physical_region_id2));
851
852 let column_metadatas =
853 parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
854 assert_column_name_and_id(
855 &column_metadatas,
856 &[
857 ("greptime_timestamp", 0),
858 ("greptime_value", 1),
859 ("__table_id", ReservedColumnId::table_id()),
860 ("__tsid", ReservedColumnId::tsid()),
861 ("job", 2),
862 ],
863 );
864 }
865}