1mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30 ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31 DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33 METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34 METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
37use store_api::region_engine::RegionEngine;
38use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
39use store_api::storage::consts::ReservedColumnId;
40use store_api::storage::RegionId;
41
42use crate::engine::create::extract_new_columns::extract_new_columns;
43use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
44use crate::engine::MetricEngineInner;
45use crate::error::{
46 ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
47 InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
48 MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
49 Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
50};
51use crate::metrics::PHYSICAL_REGION_COUNT;
52use crate::utils::{
53 self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
54 to_metadata_region_id,
55};
56
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
58const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
59
60impl MetricEngineInner {
61 pub async fn create_regions(
62 &self,
63 mut requests: Vec<(RegionId, RegionCreateRequest)>,
64 extension_return_value: &mut HashMap<String, Vec<u8>>,
65 ) -> Result<AffectedRows> {
66 if requests.is_empty() {
67 return Ok(0);
68 }
69
70 for (_, request) in requests.iter() {
71 Self::verify_region_create_request(request)?;
72 }
73
74 let first_request = &requests.first().unwrap().1;
75 if first_request.is_physical_table() {
76 ensure!(
77 requests.len() == 1,
78 UnexpectedRequestSnafu {
79 reason: "Physical table must be created with single request".to_string(),
80 }
81 );
82 let (region_id, request) = requests.pop().unwrap();
83 self.create_physical_region(region_id, request, extension_return_value)
84 .await?;
85
86 return Ok(0);
87 } else if first_request
88 .options
89 .contains_key(LOGICAL_TABLE_METADATA_KEY)
90 {
91 if requests.len() == 1 {
92 let request = &requests.first().unwrap().1;
93 let physical_region_id = parse_physical_region_id(request)?;
94 let mut manifest_infos = Vec::with_capacity(1);
95 self.create_logical_regions(physical_region_id, requests, extension_return_value)
96 .await?;
97 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
98 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
99 } else {
100 let grouped_requests =
101 group_create_logical_region_requests_by_physical_region_id(requests)?;
102 let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
103 for (physical_region_id, requests) in grouped_requests {
104 self.create_logical_regions(
105 physical_region_id,
106 requests,
107 extension_return_value,
108 )
109 .await?;
110 append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
111 }
112 encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
113 }
114 } else {
115 return MissingRegionOptionSnafu {}.fail();
116 }
117
118 Ok(0)
119 }
120
121 async fn create_physical_region(
123 &self,
124 region_id: RegionId,
125 request: RegionCreateRequest,
126 extension_return_value: &mut HashMap<String, Vec<u8>>,
127 ) -> Result<()> {
128 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
129 let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
130
131 let create_metadata_region_request = self.create_request_for_metadata_region(&request);
133 self.mito
134 .handle_request(
135 metadata_region_id,
136 RegionRequest::Create(create_metadata_region_request),
137 )
138 .await
139 .with_context(|_| CreateMitoRegionSnafu {
140 region_type: METADATA_REGION_SUBDIR,
141 })?;
142
143 let create_data_region_request = self.create_request_for_data_region(&request);
145 let physical_columns = create_data_region_request
146 .column_metadatas
147 .iter()
148 .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
149 .collect::<HashMap<_, _>>();
150 let time_index_unit = create_data_region_request
151 .column_metadatas
152 .iter()
153 .find_map(|metadata| {
154 if metadata.semantic_type == SemanticType::Timestamp {
155 metadata
156 .column_schema
157 .data_type
158 .as_timestamp()
159 .map(|data_type| data_type.unit())
160 } else {
161 None
162 }
163 })
164 .context(UnexpectedRequestSnafu {
165 reason: "No time index column found",
166 })?;
167 let response = self
168 .mito
169 .handle_request(
170 data_region_id,
171 RegionRequest::Create(create_data_region_request),
172 )
173 .await
174 .with_context(|_| CreateMitoRegionSnafu {
175 region_type: DATA_REGION_SUBDIR,
176 })?;
177 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
178 PhysicalRegionNotFoundSnafu {
179 region_id: data_region_id,
180 },
181 )?;
182 extension_return_value.extend(response.extensions);
183
184 info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
185 PHYSICAL_REGION_COUNT.inc();
186
187 self.state.write().unwrap().add_physical_region(
189 data_region_id,
190 physical_columns,
191 primary_key_encoding,
192 physical_region_options,
193 time_index_unit,
194 );
195
196 Ok(())
197 }
198
199 async fn create_logical_regions(
201 &self,
202 physical_region_id: RegionId,
203 requests: Vec<(RegionId, RegionCreateRequest)>,
204 extension_return_value: &mut HashMap<String, Vec<u8>>,
205 ) -> Result<()> {
206 let data_region_id = utils::to_data_region_id(physical_region_id);
207
208 let unit = self
209 .state
210 .read()
211 .unwrap()
212 .physical_region_time_index_unit(physical_region_id)
213 .context(PhysicalRegionNotFoundSnafu {
214 region_id: data_region_id,
215 })?;
216 for (_, request) in &requests {
218 let time_index_column = request
220 .column_metadatas
221 .iter()
222 .find(|col| col.semantic_type == SemanticType::Timestamp)
223 .unwrap();
224 let request_unit = time_index_column
225 .column_schema
226 .data_type
227 .as_timestamp()
228 .unwrap()
229 .unit();
230 ensure!(
231 request_unit == unit,
232 UnexpectedRequestSnafu {
233 reason: format!(
234 "Metric has differenttime unit ({:?}) than the physical region ({:?})",
235 request_unit, unit
236 ),
237 }
238 );
239 }
240
241 let requests = {
243 let state = self.state.read().unwrap();
244 let mut skipped = Vec::with_capacity(requests.len());
245 let mut kept_requests = Vec::with_capacity(requests.len());
246
247 for (region_id, request) in requests {
248 if state.is_logical_region_exist(region_id) {
249 skipped.push(region_id);
250 } else {
251 kept_requests.push((region_id, request));
252 }
253 }
254
255 if !skipped.is_empty() {
257 info!(
258 "Skipped creating logical regions {skipped:?} because they already exist",
259 skipped = skipped
260 );
261 }
262 kept_requests
263 };
264
265 let mut new_column_names = HashSet::new();
267 let mut new_columns = Vec::new();
268
269 let index_option = {
270 let state = &self.state.read().unwrap();
271 let region_state = state
272 .physical_region_states()
273 .get(&data_region_id)
274 .with_context(|| PhysicalRegionNotFoundSnafu {
275 region_id: data_region_id,
276 })?;
277 let physical_columns = region_state.physical_columns();
278
279 extract_new_columns(
280 &requests,
281 physical_columns,
282 &mut new_column_names,
283 &mut new_columns,
284 )?;
285 region_state.options().index
286 };
287
288 self.data_region
290 .add_columns(data_region_id, new_columns, index_option)
291 .await?;
292
293 let physical_columns = self.data_region.physical_columns(data_region_id).await?;
294 let physical_schema_map = physical_columns
295 .iter()
296 .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
297 .collect::<HashMap<_, _>>();
298 let logical_regions = requests
299 .iter()
300 .map(|(region_id, _)| (*region_id))
301 .collect::<Vec<_>>();
302 let logical_region_columns = requests.iter().map(|(region_id, request)| {
303 (
304 *region_id,
305 request
306 .column_metadatas
307 .iter()
308 .map(|metadata| {
309 let column_metadata = *physical_schema_map
311 .get(metadata.column_schema.name.as_str())
312 .unwrap();
313 (metadata.column_schema.name.as_str(), column_metadata)
314 })
315 .collect::<HashMap<_, _>>(),
316 )
317 });
318
319 let new_add_columns = new_column_names.iter().map(|name| {
320 let column_metadata = *physical_schema_map.get(name).unwrap();
322 (name.to_string(), column_metadata.column_id)
323 });
324
325 extension_return_value.insert(
326 ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
327 ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
328 );
329
330 self.metadata_region
332 .add_logical_regions(physical_region_id, true, logical_region_columns)
333 .await?;
334
335 {
336 let mut state = self.state.write().unwrap();
337 state.add_physical_columns(data_region_id, new_add_columns);
338 state.add_logical_regions(physical_region_id, logical_regions.clone());
339 }
340 for logical_region_id in logical_regions {
341 self.metadata_region
342 .open_logical_region(logical_region_id)
343 .await;
344 }
345
346 Ok(())
347 }
348
349 fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
354 request.validate().context(InvalidMetadataSnafu)?;
355
356 let name_to_index = request
357 .column_metadatas
358 .iter()
359 .enumerate()
360 .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
361 .collect::<HashMap<String, usize>>();
362
363 ensure!(
365 !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
366 InternalColumnOccupiedSnafu {
367 column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
368 }
369 );
370 ensure!(
371 !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
372 InternalColumnOccupiedSnafu {
373 column: DATA_SCHEMA_TSID_COLUMN_NAME,
374 }
375 );
376
377 ensure!(
379 request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
380 MissingRegionOptionSnafu {}
381 );
382 ensure!(
383 !(request.is_physical_table()
384 && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
385 ConflictRegionOptionSnafu {}
386 );
387
388 let mut field_col: Option<&ColumnMetadata> = None;
390 for col in &request.column_metadatas {
391 match col.semantic_type {
392 SemanticType::Tag => ensure!(
393 col.column_schema.data_type == ConcreteDataType::string_datatype(),
394 ColumnTypeMismatchSnafu {
395 expect: ConcreteDataType::string_datatype(),
396 actual: col.column_schema.data_type.clone(),
397 }
398 ),
399 SemanticType::Field => {
400 if field_col.is_some() {
401 MultipleFieldColumnSnafu {
402 previous: field_col.unwrap().column_schema.name.clone(),
403 current: col.column_schema.name.clone(),
404 }
405 .fail()?;
406 }
407 field_col = Some(col)
408 }
409 SemanticType::Timestamp => {}
410 }
411 }
412 let field_col = field_col.context(NoFieldColumnSnafu)?;
413
414 ensure!(
416 field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
417 ColumnTypeMismatchSnafu {
418 expect: ConcreteDataType::float64_datatype(),
419 actual: field_col.column_schema.data_type.clone(),
420 }
421 );
422
423 Ok(())
424 }
425
426 fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
430 (
431 to_data_region_id(region_id),
432 to_metadata_region_id(region_id),
433 )
434 }
435
436 pub fn create_request_for_metadata_region(
440 &self,
441 request: &RegionCreateRequest,
442 ) -> RegionCreateRequest {
443 let timestamp_column_metadata = ColumnMetadata {
445 column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
446 semantic_type: SemanticType::Timestamp,
447 column_schema: ColumnSchema::new(
448 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
449 ConcreteDataType::timestamp_millisecond_datatype(),
450 false,
451 )
452 .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
453 Value::Timestamp(Timestamp::new_millisecond(0)),
454 )))
455 .unwrap(),
456 };
457 let key_column_metadata = ColumnMetadata {
459 column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
460 semantic_type: SemanticType::Tag,
461 column_schema: ColumnSchema::new(
462 METADATA_SCHEMA_KEY_COLUMN_NAME,
463 ConcreteDataType::string_datatype(),
464 false,
465 ),
466 };
467 let value_column_metadata = ColumnMetadata {
469 column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
470 semantic_type: SemanticType::Field,
471 column_schema: ColumnSchema::new(
472 METADATA_SCHEMA_VALUE_COLUMN_NAME,
473 ConcreteDataType::string_datatype(),
474 true,
475 ),
476 };
477
478 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
480
481 let options = region_options_for_metadata_region(&request.options);
482 RegionCreateRequest {
483 engine: MITO_ENGINE_NAME.to_string(),
484 column_metadatas: vec![
485 timestamp_column_metadata,
486 key_column_metadata,
487 value_column_metadata,
488 ],
489 primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
490 options,
491 region_dir: metadata_region_dir,
492 }
493 }
494
495 pub fn create_request_for_data_region(
502 &self,
503 request: &RegionCreateRequest,
504 ) -> RegionCreateRequest {
505 let mut data_region_request = request.clone();
506 let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
507
508 data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
510
511 data_region_request
513 .column_metadatas
514 .iter_mut()
515 .for_each(|metadata| {
516 if metadata.semantic_type == SemanticType::Tag {
517 metadata.column_schema.set_nullable();
518 primary_key.push(metadata.column_id);
519 }
520 });
521
522 let [table_id_col, tsid_col] = Self::internal_column_metadata();
524 data_region_request.column_metadatas.push(table_id_col);
525 data_region_request.column_metadatas.push(tsid_col);
526 data_region_request.primary_key = primary_key;
527
528 set_data_region_options(
530 &mut data_region_request.options,
531 self.config.experimental_sparse_primary_key_encoding,
532 );
533
534 data_region_request
535 }
536
537 fn internal_column_metadata() -> [ColumnMetadata; 2] {
541 let metric_name_col = ColumnMetadata {
543 column_id: ReservedColumnId::table_id(),
544 semantic_type: SemanticType::Tag,
545 column_schema: ColumnSchema::new(
546 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
547 ConcreteDataType::uint32_datatype(),
548 false,
549 )
550 .with_skipping_options(SkippingIndexOptions::new_unchecked(
551 DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
552 DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
553 datatypes::schema::SkippingIndexType::BloomFilter,
554 ))
555 .unwrap(),
556 };
557 let tsid_col = ColumnMetadata {
558 column_id: ReservedColumnId::tsid(),
559 semantic_type: SemanticType::Tag,
560 column_schema: ColumnSchema::new(
561 DATA_SCHEMA_TSID_COLUMN_NAME,
562 ConcreteDataType::uint64_datatype(),
563 false,
564 )
565 .with_inverted_index(false),
566 };
567 [metric_name_col, tsid_col]
568 }
569}
570
571fn group_create_logical_region_requests_by_physical_region_id(
573 requests: Vec<(RegionId, RegionCreateRequest)>,
574) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
575 let mut result = HashMap::with_capacity(requests.len());
576 for (region_id, request) in requests {
577 let physical_region_id = parse_physical_region_id(&request)?;
578 result
579 .entry(physical_region_id)
580 .or_insert_with(Vec::new)
581 .push((region_id, request));
582 }
583
584 Ok(result)
585}
586
587fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
589 let physical_region_id_raw = request
590 .options
591 .get(LOGICAL_TABLE_METADATA_KEY)
592 .ok_or(MissingRegionOptionSnafu {}.build())?;
593
594 let physical_region_id: RegionId = physical_region_id_raw
595 .parse::<u64>()
596 .with_context(|_| ParseRegionIdSnafu {
597 raw: physical_region_id_raw,
598 })?
599 .into();
600
601 Ok(physical_region_id)
602}
603
604pub(crate) fn region_options_for_metadata_region(
606 original: &HashMap<String, String>,
607) -> HashMap<String, String> {
608 let mut metadata_region_options = HashMap::new();
609 metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
610
611 if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
612 metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.to_string());
613 }
614
615 metadata_region_options
616}
617
618#[cfg(test)]
619mod test {
620 use common_meta::ddl::test_util::assert_column_name_and_id;
621 use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
622 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
623 use store_api::region_request::BatchRegionDdlRequest;
624
625 use super::*;
626 use crate::config::EngineConfig;
627 use crate::engine::MetricEngine;
628 use crate::test_util::{create_logical_region_request, TestEnv};
629
630 #[test]
631 fn test_verify_region_create_request() {
632 let request = RegionCreateRequest {
634 column_metadatas: vec![
635 ColumnMetadata {
636 column_id: 0,
637 semantic_type: SemanticType::Timestamp,
638 column_schema: ColumnSchema::new(
639 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
640 ConcreteDataType::timestamp_millisecond_datatype(),
641 false,
642 ),
643 },
644 ColumnMetadata {
645 column_id: 1,
646 semantic_type: SemanticType::Tag,
647 column_schema: ColumnSchema::new(
648 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
649 ConcreteDataType::uint32_datatype(),
650 false,
651 ),
652 },
653 ],
654 region_dir: "test_dir".to_string(),
655 engine: METRIC_ENGINE_NAME.to_string(),
656 primary_key: vec![],
657 options: HashMap::new(),
658 };
659 let result = MetricEngineInner::verify_region_create_request(&request);
660 assert!(result.is_err());
661 assert_eq!(
662 result.unwrap_err().to_string(),
663 "Internal column __table_id is reserved".to_string()
664 );
665
666 let request = RegionCreateRequest {
668 column_metadatas: vec![
669 ColumnMetadata {
670 column_id: 0,
671 semantic_type: SemanticType::Timestamp,
672 column_schema: ColumnSchema::new(
673 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
674 ConcreteDataType::timestamp_millisecond_datatype(),
675 false,
676 ),
677 },
678 ColumnMetadata {
679 column_id: 1,
680 semantic_type: SemanticType::Tag,
681 column_schema: ColumnSchema::new(
682 "column1".to_string(),
683 ConcreteDataType::string_datatype(),
684 false,
685 ),
686 },
687 ColumnMetadata {
688 column_id: 2,
689 semantic_type: SemanticType::Field,
690 column_schema: ColumnSchema::new(
691 "column2".to_string(),
692 ConcreteDataType::float64_datatype(),
693 false,
694 ),
695 },
696 ],
697 region_dir: "test_dir".to_string(),
698 engine: METRIC_ENGINE_NAME.to_string(),
699 primary_key: vec![],
700 options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
701 .into_iter()
702 .collect(),
703 };
704 MetricEngineInner::verify_region_create_request(&request).unwrap();
705 }
706
707 #[test]
708 fn test_verify_region_create_request_options() {
709 let mut request = RegionCreateRequest {
710 column_metadatas: vec![
711 ColumnMetadata {
712 column_id: 0,
713 semantic_type: SemanticType::Timestamp,
714 column_schema: ColumnSchema::new(
715 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
716 ConcreteDataType::timestamp_millisecond_datatype(),
717 false,
718 ),
719 },
720 ColumnMetadata {
721 column_id: 1,
722 semantic_type: SemanticType::Field,
723 column_schema: ColumnSchema::new(
724 "val".to_string(),
725 ConcreteDataType::float64_datatype(),
726 false,
727 ),
728 },
729 ],
730 region_dir: "test_dir".to_string(),
731 engine: METRIC_ENGINE_NAME.to_string(),
732 primary_key: vec![],
733 options: HashMap::new(),
734 };
735 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
736
737 let mut options = HashMap::new();
738 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
739 request.options.clone_from(&options);
740 MetricEngineInner::verify_region_create_request(&request).unwrap();
741
742 options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
743 request.options.clone_from(&options);
744 MetricEngineInner::verify_region_create_request(&request).unwrap_err();
745
746 options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
747 request.options = options;
748 MetricEngineInner::verify_region_create_request(&request).unwrap();
749 }
750
751 #[tokio::test]
752 async fn test_create_request_for_physical_regions() {
753 let options: HashMap<_, _> = [
755 ("ttl".to_string(), "60m".to_string()),
756 ("skip_wal".to_string(), "true".to_string()),
757 ]
758 .into_iter()
759 .collect();
760 let request = RegionCreateRequest {
761 engine: METRIC_ENGINE_NAME.to_string(),
762 column_metadatas: vec![
763 ColumnMetadata {
764 column_id: 0,
765 semantic_type: SemanticType::Timestamp,
766 column_schema: ColumnSchema::new(
767 "timestamp",
768 ConcreteDataType::timestamp_millisecond_datatype(),
769 false,
770 ),
771 },
772 ColumnMetadata {
773 column_id: 1,
774 semantic_type: SemanticType::Tag,
775 column_schema: ColumnSchema::new(
776 "tag",
777 ConcreteDataType::string_datatype(),
778 false,
779 ),
780 },
781 ],
782 primary_key: vec![0],
783 options,
784 region_dir: "/test_dir".to_string(),
785 };
786
787 let env = TestEnv::new().await;
789 let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
790 let engine_inner = engine.inner;
791
792 let data_region_request = engine_inner.create_request_for_data_region(&request);
794 assert_eq!(
795 data_region_request.region_dir,
796 "/test_dir/data/".to_string()
797 );
798 assert_eq!(data_region_request.column_metadatas.len(), 4);
799 assert_eq!(
800 data_region_request.primary_key,
801 vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
802 );
803 assert!(data_region_request.options.contains_key("ttl"));
804
805 let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
807 assert_eq!(
808 metadata_region_request.region_dir,
809 "/test_dir/metadata/".to_string()
810 );
811 assert_eq!(
812 metadata_region_request.options.get("ttl").unwrap(),
813 "forever"
814 );
815 assert!(!metadata_region_request.options.contains_key("skip_wal"));
816 }
817
818 #[tokio::test]
819 async fn test_create_logical_regions() {
820 let env = TestEnv::new().await;
821 let engine = env.metric();
822 let physical_region_id1 = RegionId::new(1024, 0);
823 let physical_region_id2 = RegionId::new(1024, 1);
824 let logical_region_id1 = RegionId::new(1025, 0);
825 let logical_region_id2 = RegionId::new(1025, 1);
826 env.create_physical_region(physical_region_id1, "/test_dir1")
827 .await;
828 env.create_physical_region(physical_region_id2, "/test_dir2")
829 .await;
830
831 let region_create_request1 =
832 create_logical_region_request(&["job"], physical_region_id1, "logical1");
833 let region_create_request2 =
834 create_logical_region_request(&["job"], physical_region_id2, "logical2");
835
836 let response = engine
837 .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
838 (logical_region_id1, region_create_request1),
839 (logical_region_id2, region_create_request2),
840 ]))
841 .await
842 .unwrap();
843
844 let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
845 assert_eq!(manifest_infos.len(), 2);
846 let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
847 assert!(region_ids.contains(&physical_region_id1));
848 assert!(region_ids.contains(&physical_region_id2));
849
850 let column_metadatas =
851 parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
852 assert_column_name_and_id(
853 &column_metadatas,
854 &[
855 ("greptime_timestamp", 0),
856 ("greptime_value", 1),
857 ("__table_id", ReservedColumnId::table_id()),
858 ("__tsid", ReservedColumnId::tsid()),
859 ("job", 2),
860 ],
861 );
862 }
863}