1mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{FOREVER, Timestamp};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::metadata::ColumnMetadata;
28use store_api::metric_engine_consts::{
29 ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
30 DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
31 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
32 METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
33 METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
34};
35use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{AffectedRows, PathType, RegionCreateRequest, RegionRequest};
38use store_api::storage::RegionId;
39use store_api::storage::consts::ReservedColumnId;
40
41use crate::engine::MetricEngineInner;
42use crate::engine::create::extract_new_columns::extract_new_columns;
43use crate::engine::options::{PhysicalRegionOptions, set_data_region_options};
44use crate::error::{
45 ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
46 InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
47 MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
48 Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
49};
50use crate::metrics::PHYSICAL_REGION_COUNT;
51use crate::utils::{
52 self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
53 to_metadata_region_id,
54};
55
56const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
58
59impl MetricEngineInner {
60 pub async fn create_regions(
61 &self,
62 mut requests: Vec<(RegionId, RegionCreateRequest)>,
63 extension_return_value: &mut HashMap<String, Vec<u8>>,
64 ) -> Result<AffectedRows> {
65 if requests.is_empty() {
66 return Ok(0);
67 }
68
69 for (_, request) in requests.iter() {
70 Self::verify_region_create_request(request)?;
71 }
72
73 let first_request = &requests.first().unwrap().1;
74 if first_request.is_physical_table() {
75 ensure!(
76 requests.len() == 1,
77 UnexpectedRequestSnafu {
78 reason: "Physical table must be created with single request".to_string(),
79 }
80 );
81 let (region_id, request) = requests.pop().unwrap();
82 self.create_physical_region(region_id, request, extension_return_value)
83 .await?;
84
85 return Ok(0);
86 } else if first_request
87 .options
88 .contains_key(LOGICAL_TABLE_METADATA_KEY)
89 {
90 if requests.len() == 1 {
91 let request = &requests.first().unwrap().1;
92 let physical_region_id = parse_physical_region_id(request)?;
93 let mut manifest_infos = Vec::with_capacity(1);
94 self.create_logical_regions(physical_region_id, requests, extension_return_value)
95 .await?;
96 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
97 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
98 } else {
99 let grouped_requests =
100 group_create_logical_region_requests_by_physical_region_id(requests)?;
101 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
102 for (physical_region_id, requests) in grouped_requests {
103 self.create_logical_regions(
104 physical_region_id,
105 requests,
106 extension_return_value,
107 )
108 .await?;
109 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
110 }
111 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
112 }
113 } else {
114 return MissingRegionOptionSnafu {}.fail();
115 }
116
117 Ok(0)
118 }
119
120 async fn create_physical_region(
122 &self,
123 region_id: RegionId,
124 request: RegionCreateRequest,
125 extension_return_value: &mut HashMap<String, Vec<u8>>,
126 ) -> Result<()> {
127 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128 let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130 let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132 self.mito
133 .handle_request(
134 metadata_region_id,
135 RegionRequest::Create(create_metadata_region_request),
136 )
137 .await
138 .with_context(|_| CreateMitoRegionSnafu {
139 region_type: METADATA_REGION_SUBDIR,
140 })?;
141
142 let create_data_region_request = self.create_request_for_data_region(&request);
144 let physical_columns = create_data_region_request
145 .column_metadatas
146 .iter()
147 .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148 .collect::<HashMap<_, _>>();
149 let time_index_unit = create_data_region_request
150 .column_metadatas
151 .iter()
152 .find_map(|metadata| {
153 if metadata.semantic_type == SemanticType::Timestamp {
154 metadata
155 .column_schema
156 .data_type
157 .as_timestamp()
158 .map(|data_type| data_type.unit())
159 } else {
160 None
161 }
162 })
163 .context(UnexpectedRequestSnafu {
164 reason: "No time index column found",
165 })?;
166 let response = self
167 .mito
168 .handle_request(
169 data_region_id,
170 RegionRequest::Create(create_data_region_request),
171 )
172 .await
173 .with_context(|_| CreateMitoRegionSnafu {
174 region_type: DATA_REGION_SUBDIR,
175 })?;
176 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
177 PhysicalRegionNotFoundSnafu {
178 region_id: data_region_id,
179 },
180 )?;
181 extension_return_value.extend(response.extensions);
182
183 info!(
184 "Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}"
185 );
186 PHYSICAL_REGION_COUNT.inc();
187
188 self.state.write().unwrap().add_physical_region(
190 data_region_id,
191 physical_columns,
192 primary_key_encoding,
193 physical_region_options,
194 time_index_unit,
195 );
196
197 Ok(())
198 }
199
200 async fn create_logical_regions(
202 &self,
203 physical_region_id: RegionId,
204 requests: Vec<(RegionId, RegionCreateRequest)>,
205 extension_return_value: &mut HashMap<String, Vec<u8>>,
206 ) -> Result<()> {
207 let data_region_id = utils::to_data_region_id(physical_region_id);
208
209 let unit = self
210 .state
211 .read()
212 .unwrap()
213 .physical_region_time_index_unit(physical_region_id)
214 .context(PhysicalRegionNotFoundSnafu {
215 region_id: data_region_id,
216 })?;
217 for (_, request) in &requests {
219 let time_index_column = request
221 .column_metadatas
222 .iter()
223 .find(|col| col.semantic_type == SemanticType::Timestamp)
224 .unwrap();
225 let request_unit = time_index_column
226 .column_schema
227 .data_type
228 .as_timestamp()
229 .unwrap()
230 .unit();
231 ensure!(
232 request_unit == unit,
233 UnexpectedRequestSnafu {
234 reason: format!(
235 "Metric has differenttime unit ({:?}) than the physical region ({:?})",
236 request_unit, unit
237 ),
238 }
239 );
240 }
241
242 let requests = {
244 let state = self.state.read().unwrap();
245 let mut skipped = Vec::with_capacity(requests.len());
246 let mut kept_requests = Vec::with_capacity(requests.len());
247
248 for (region_id, request) in requests {
249 if state.is_logical_region_exist(region_id) {
250 skipped.push(region_id);
251 } else {
252 kept_requests.push((region_id, request));
253 }
254 }
255
256 if !skipped.is_empty() {
258 info!(
259 "Skipped creating logical regions {skipped:?} because they already exist",
260 skipped = skipped
261 );
262 }
263 kept_requests
264 };
265
266 let mut new_column_names = HashSet::new();
268 let mut new_columns = Vec::new();
269
270 let index_option = {
271 let state = &self.state.read().unwrap();
272 let region_state = state
273 .physical_region_states()
274 .get(&data_region_id)
275 .with_context(|| PhysicalRegionNotFoundSnafu {
276 region_id: data_region_id,
277 })?;
278 let physical_columns = region_state.physical_columns();
279
280 extract_new_columns(
281 &requests,
282 physical_columns,
283 &mut new_column_names,
284 &mut new_columns,
285 )?;
286 region_state.options().index
287 };
288
289 self.data_region
291 .add_columns(data_region_id, new_columns, index_option)
292 .await?;
293
294 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
295 let physical_schema_map = physical_columns
296 .iter()
297 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
298 .collect::<HashMap<_, _>>();
299 let logical_regions = requests
300 .iter()
301 .map(|(region_id, _)| *region_id)
302 .collect::<Vec<_>>();
303 let logical_region_columns = requests.iter().map(|(region_id, request)| {
304 (
305 *region_id,
306 request
307 .column_metadatas
308 .iter()
309 .map(|metadata| {
310 let column_metadata = *physical_schema_map
312 .get(metadata.column_schema.name.as_str())
313 .unwrap();
314 (metadata.column_schema.name.as_str(), column_metadata)
315 })
316 .collect::<HashMap<_, _>>(),
317 )
318 });
319
320 let new_add_columns = new_column_names.iter().map(|name| {
321 let column_metadata = *physical_schema_map.get(name).unwrap();
323 (name.to_string(), column_metadata.column_id)
324 });
325
326 extension_return_value.insert(
327 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
328 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
329 );
330
331 self.metadata_region
333 .add_logical_regions(physical_region_id, true, logical_region_columns)
334 .await?;
335
336 {
337 let mut state = self.state.write().unwrap();
338 state.add_physical_columns(data_region_id, new_add_columns);
339 state.add_logical_regions(physical_region_id, logical_regions.clone());
340 }
341 for logical_region_id in logical_regions {
342 self.metadata_region
343 .open_logical_region(logical_region_id)
344 .await;
345 }
346
347 Ok(())
348 }
349
350 fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
355 request.validate().context(InvalidMetadataSnafu)?;
356
357 let name_to_index = request
358 .column_metadatas
359 .iter()
360 .enumerate()
361 .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
362 .collect::<HashMap<String, usize>>();
363
364 ensure!(
366 !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
367 InternalColumnOccupiedSnafu {
368 column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
369 }
370 );
371 ensure!(
372 !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
373 InternalColumnOccupiedSnafu {
374 column: DATA_SCHEMA_TSID_COLUMN_NAME,
375 }
376 );
377
378 ensure!(
380 request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
381 MissingRegionOptionSnafu {}
382 );
383 ensure!(
384 !(request.is_physical_table()
385 && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
386 ConflictRegionOptionSnafu {}
387 );
388
389 let mut field_col: Option<&ColumnMetadata> = None;
391 for col in &request.column_metadatas {
392 match col.semantic_type {
393 SemanticType::Tag => ensure!(
394 col.column_schema.data_type == ConcreteDataType::string_datatype(),
395 ColumnTypeMismatchSnafu {
396 expect: ConcreteDataType::string_datatype(),
397 actual: col.column_schema.data_type.clone(),
398 }
399 ),
400 SemanticType::Field => {
401 if let Some(field_col) = field_col {
402 MultipleFieldColumnSnafu {
403 previous: field_col.column_schema.name.clone(),
404 current: col.column_schema.name.clone(),
405 }
406 .fail()?;
407 }
408 field_col = Some(col)
409 }
410 SemanticType::Timestamp => {}
411 }
412 }
413 let field_col = field_col.context(NoFieldColumnSnafu)?;
414
415 ensure!(
417 field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
418 ColumnTypeMismatchSnafu {
419 expect: ConcreteDataType::float64_datatype(),
420 actual: field_col.column_schema.data_type.clone(),
421 }
422 );
423
424 Ok(())
425 }
426
427 fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
431 (
432 to_data_region_id(region_id),
433 to_metadata_region_id(region_id),
434 )
435 }
436
437 pub fn create_request_for_metadata_region(
441 &self,
442 request: &RegionCreateRequest,
443 ) -> RegionCreateRequest {
444 let timestamp_column_metadata = ColumnMetadata {
446 column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
447 semantic_type: SemanticType::Timestamp,
448 column_schema: ColumnSchema::new(
449 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
450 ConcreteDataType::timestamp_millisecond_datatype(),
451 false,
452 )
453 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
454 Value::Timestamp(Timestamp::new_millisecond(0)),
455 )))
456 .unwrap(),
457 };
458 let key_column_metadata = ColumnMetadata {
460 column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
461 semantic_type: SemanticType::Tag,
462 column_schema: ColumnSchema::new(
463 METADATA_SCHEMA_KEY_COLUMN_NAME,
464 ConcreteDataType::string_datatype(),
465 false,
466 ),
467 };
468 let value_column_metadata = ColumnMetadata {
470 column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
471 semantic_type: SemanticType::Field,
472 column_schema: ColumnSchema::new(
473 METADATA_SCHEMA_VALUE_COLUMN_NAME,
474 ConcreteDataType::string_datatype(),
475 true,
476 ),
477 };
478
479 let options = region_options_for_metadata_region(&request.options);
480 RegionCreateRequest {
481 engine: MITO_ENGINE_NAME.to_string(),
482 column_metadatas: vec![
483 timestamp_column_metadata,
484 key_column_metadata,
485 value_column_metadata,
486 ],
487 primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
488 options,
489 table_dir: request.table_dir.clone(),
490 path_type: PathType::Metadata,
491 partition_expr_json: Some("".to_string()),
492 }
493 }
494
495 pub fn create_request_for_data_region(
502 &self,
503 request: &RegionCreateRequest,
504 ) -> RegionCreateRequest {
505 let mut data_region_request = request.clone();
506 let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
507
508 data_region_request.table_dir = request.table_dir.clone();
509 data_region_request.path_type = PathType::Data;
510
511 data_region_request
513 .column_metadatas
514 .iter_mut()
515 .for_each(|metadata| {
516 if metadata.semantic_type == SemanticType::Tag {
517 metadata.column_schema.set_nullable();
518 primary_key.push(metadata.column_id);
519 }
520 });
521
522 let [table_id_col, tsid_col] = Self::internal_column_metadata();
524 data_region_request.column_metadatas.push(table_id_col);
525 data_region_request.column_metadatas.push(tsid_col);
526 data_region_request.primary_key = primary_key;
527
528 set_data_region_options(
530 &mut data_region_request.options,
531 self.config.experimental_sparse_primary_key_encoding,
532 );
533
534 data_region_request
535 }
536
537 fn internal_column_metadata() -> [ColumnMetadata; 2] {
541 let metric_name_col = ColumnMetadata {
543 column_id: ReservedColumnId::table_id(),
544 semantic_type: SemanticType::Tag,
545 column_schema: ColumnSchema::new(
546 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
547 ConcreteDataType::uint32_datatype(),
548 false,
549 )
550 .with_skipping_options(SkippingIndexOptions::new_unchecked(
551 DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
552 DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
553 datatypes::schema::SkippingIndexType::BloomFilter,
554 ))
555 .unwrap(),
556 };
557 let tsid_col = ColumnMetadata {
558 column_id: ReservedColumnId::tsid(),
559 semantic_type: SemanticType::Tag,
560 column_schema: ColumnSchema::new(
561 DATA_SCHEMA_TSID_COLUMN_NAME,
562 ConcreteDataType::uint64_datatype(),
563 false,
564 )
565 .with_inverted_index(false),
566 };
567 [metric_name_col, tsid_col]
568 }
569}
570
571fn group_create_logical_region_requests_by_physical_region_id(
573 requests: Vec<(RegionId, RegionCreateRequest)>,
574) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
575 let mut result = HashMap::with_capacity(requests.len());
576 for (region_id, request) in requests {
577 let physical_region_id = parse_physical_region_id(&request)?;
578 result
579 .entry(physical_region_id)
580 .or_insert_with(Vec::new)
581 .push((region_id, request));
582 }
583
584 Ok(result)
585}
586
587fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
589 let physical_region_id_raw = request
590 .options
591 .get(LOGICAL_TABLE_METADATA_KEY)
592 .ok_or(MissingRegionOptionSnafu {}.build())?;
593
594 let physical_region_id: RegionId = physical_region_id_raw
595 .parse::<u64>()
596 .with_context(|_| ParseRegionIdSnafu {
597 raw: physical_region_id_raw,
598 })?
599 .into();
600
601 Ok(physical_region_id)
602}
603
604pub(crate) fn region_options_for_metadata_region(
606 original: &HashMap<String, String>,
607) -> HashMap<String, String> {
608 let mut metadata_region_options = HashMap::new();
609 metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
610
611 if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
612 metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
613 }
614
615 metadata_region_options
616}
617
618#[cfg(test)]
619mod test {
620 use common_meta::ddl::test_util::assert_column_name_and_id;
621 use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
622 use common_query::prelude::{greptime_timestamp, greptime_value};
623 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
624 use store_api::region_request::BatchRegionDdlRequest;
625
626 use super::*;
627 use crate::config::EngineConfig;
628 use crate::engine::MetricEngine;
629 use crate::test_util::{TestEnv, create_logical_region_request};
630
631 #[test]
632 fn test_verify_region_create_request() {
633 let request = RegionCreateRequest {
635 column_metadatas: vec![
636 ColumnMetadata {
637 column_id: 0,
638 semantic_type: SemanticType::Timestamp,
639 column_schema: ColumnSchema::new(
640 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
641 ConcreteDataType::timestamp_millisecond_datatype(),
642 false,
643 ),
644 },
645 ColumnMetadata {
646 column_id: 1,
647 semantic_type: SemanticType::Tag,
648 column_schema: ColumnSchema::new(
649 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
650 ConcreteDataType::uint32_datatype(),
651 false,
652 ),
653 },
654 ],
655 table_dir: "test_dir".to_string(),
656 path_type: PathType::Bare,
657 engine: METRIC_ENGINE_NAME.to_string(),
658 primary_key: vec![],
659 options: HashMap::new(),
660 partition_expr_json: Some("".to_string()),
661 };
662 let result = MetricEngineInner::verify_region_create_request(&request);
663 assert!(result.is_err());
664 assert_eq!(
665 result.unwrap_err().to_string(),
666 "Internal column __table_id is reserved".to_string()
667 );
668
669 let request = RegionCreateRequest {
671 column_metadatas: vec![
672 ColumnMetadata {
673 column_id: 0,
674 semantic_type: SemanticType::Timestamp,
675 column_schema: ColumnSchema::new(
676 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
677 ConcreteDataType::timestamp_millisecond_datatype(),
678 false,
679 ),
680 },
681 ColumnMetadata {
682 column_id: 1,
683 semantic_type: SemanticType::Tag,
684 column_schema: ColumnSchema::new(
685 "column1".to_string(),
686 ConcreteDataType::string_datatype(),
687 false,
688 ),
689 },
690 ColumnMetadata {
691 column_id: 2,
692 semantic_type: SemanticType::Field,
693 column_schema: ColumnSchema::new(
694 "column2".to_string(),
695 ConcreteDataType::float64_datatype(),
696 false,
697 ),
698 },
699 ],
700 table_dir: "test_dir".to_string(),
701 path_type: PathType::Bare,
702 engine: METRIC_ENGINE_NAME.to_string(),
703 primary_key: vec![],
704 options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
705 .into_iter()
706 .collect(),
707 partition_expr_json: Some("".to_string()),
708 };
709 MetricEngineInner::verify_region_create_request(&request).unwrap();
710 }
711
712 #[test]
713 fn test_verify_region_create_request_options() {
714 let mut request = RegionCreateRequest {
715 column_metadatas: vec![
716 ColumnMetadata {
717 column_id: 0,
718 semantic_type: SemanticType::Timestamp,
719 column_schema: ColumnSchema::new(
720 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
721 ConcreteDataType::timestamp_millisecond_datatype(),
722 false,
723 ),
724 },
725 ColumnMetadata {
726 column_id: 1,
727 semantic_type: SemanticType::Field,
728 column_schema: ColumnSchema::new(
729 "val".to_string(),
730 ConcreteDataType::float64_datatype(),
731 false,
732 ),
733 },
734 ],
735 table_dir: "test_dir".to_string(),
736 path_type: PathType::Bare,
737 engine: METRIC_ENGINE_NAME.to_string(),
738 primary_key: vec![],
739 options: HashMap::new(),
740 partition_expr_json: Some("".to_string()),
741 };
742 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
743
744 let mut options = HashMap::new();
745 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
746 request.options.clone_from(&options);
747 MetricEngineInner::verify_region_create_request(&request).unwrap();
748
749 options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
750 request.options.clone_from(&options);
751 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
752
753 options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
754 request.options = options;
755 MetricEngineInner::verify_region_create_request(&request).unwrap();
756 }
757
758 #[tokio::test]
759 async fn test_create_request_for_physical_regions() {
760 let options: HashMap<_, _> = [
762 ("ttl".to_string(), "60m".to_string()),
763 ("skip_wal".to_string(), "true".to_string()),
764 ]
765 .into_iter()
766 .collect();
767 let request = RegionCreateRequest {
768 engine: METRIC_ENGINE_NAME.to_string(),
769 column_metadatas: vec![
770 ColumnMetadata {
771 column_id: 0,
772 semantic_type: SemanticType::Timestamp,
773 column_schema: ColumnSchema::new(
774 "timestamp",
775 ConcreteDataType::timestamp_millisecond_datatype(),
776 false,
777 ),
778 },
779 ColumnMetadata {
780 column_id: 1,
781 semantic_type: SemanticType::Tag,
782 column_schema: ColumnSchema::new(
783 "tag",
784 ConcreteDataType::string_datatype(),
785 false,
786 ),
787 },
788 ],
789 primary_key: vec![0],
790 options,
791 table_dir: "/test_dir".to_string(),
792 path_type: PathType::Bare,
793 partition_expr_json: Some("".to_string()),
794 };
795
796 let env = TestEnv::new().await;
798 let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
799 let engine_inner = engine.inner;
800
801 let data_region_request = engine_inner.create_request_for_data_region(&request);
803 assert_eq!(data_region_request.table_dir, "/test_dir".to_string());
804 assert_eq!(data_region_request.path_type, PathType::Data);
805 assert_eq!(data_region_request.column_metadatas.len(), 4);
806 assert_eq!(
807 data_region_request.primary_key,
808 vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
809 );
810 assert!(data_region_request.options.contains_key("ttl"));
811
812 let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
814 assert_eq!(metadata_region_request.table_dir, "/test_dir".to_string());
815 assert_eq!(metadata_region_request.path_type, PathType::Metadata);
816 assert_eq!(
817 metadata_region_request.options.get("ttl").unwrap(),
818 "forever"
819 );
820 assert!(!metadata_region_request.options.contains_key("skip_wal"));
821 }
822
823 #[tokio::test]
824 async fn test_create_logical_regions() {
825 let env = TestEnv::new().await;
826 let engine = env.metric();
827 let physical_region_id1 = RegionId::new(1024, 0);
828 let physical_region_id2 = RegionId::new(1024, 1);
829 let logical_region_id1 = RegionId::new(1025, 0);
830 let logical_region_id2 = RegionId::new(1025, 1);
831 env.create_physical_region(physical_region_id1, "/test_dir1")
832 .await;
833 env.create_physical_region(physical_region_id2, "/test_dir2")
834 .await;
835
836 let region_create_request1 =
837 create_logical_region_request(&["job"], physical_region_id1, "logical1");
838 let region_create_request2 =
839 create_logical_region_request(&["job"], physical_region_id2, "logical2");
840
841 let response = engine
842 .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
843 (logical_region_id1, region_create_request1),
844 (logical_region_id2, region_create_request2),
845 ]))
846 .await
847 .unwrap();
848
849 let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
850 assert_eq!(manifest_infos.len(), 2);
851 let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
852 assert!(region_ids.contains(&physical_region_id1));
853 assert!(region_ids.contains(&physical_region_id2));
854
855 let column_metadatas =
856 parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
857 assert_column_name_and_id(
858 &column_metadatas,
859 &[
860 (greptime_timestamp(), 0),
861 (greptime_value(), 1),
862 ("__table_id", ReservedColumnId::table_id()),
863 ("__tsid", ReservedColumnId::tsid()),
864 ("job", 2),
865 ],
866 );
867 }
868}