metric_engine/engine/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30    ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31    DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33    METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34    METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{
37    APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
38};
39use store_api::region_engine::RegionEngine;
40use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
41use store_api::storage::consts::ReservedColumnId;
42use store_api::storage::RegionId;
43
44use crate::engine::create::extract_new_columns::extract_new_columns;
45use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
46use crate::engine::MetricEngineInner;
47use crate::error::{
48    ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
49    InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
50    MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
51    Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
52};
53use crate::metrics::PHYSICAL_REGION_COUNT;
54use crate::utils::{
55    self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
56    to_metadata_region_id,
57};
58
59const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
60
61impl MetricEngineInner {
62    pub async fn create_regions(
63        &self,
64        mut requests: Vec<(RegionId, RegionCreateRequest)>,
65        extension_return_value: &mut HashMap<String, Vec<u8>>,
66    ) -> Result<AffectedRows> {
67        if requests.is_empty() {
68            return Ok(0);
69        }
70
71        for (_, request) in requests.iter() {
72            Self::verify_region_create_request(request)?;
73        }
74
75        let first_request = &requests.first().unwrap().1;
76        if first_request.is_physical_table() {
77            ensure!(
78                requests.len() == 1,
79                UnexpectedRequestSnafu {
80                    reason: "Physical table must be created with single request".to_string(),
81                }
82            );
83            let (region_id, request) = requests.pop().unwrap();
84            self.create_physical_region(region_id, request).await?;
85
86            return Ok(0);
87        } else if first_request
88            .options
89            .contains_key(LOGICAL_TABLE_METADATA_KEY)
90        {
91            if requests.len() == 1 {
92                let request = &requests.first().unwrap().1;
93                let physical_region_id = parse_physical_region_id(request)?;
94                let mut manifest_infos = Vec::with_capacity(1);
95                self.create_logical_regions(physical_region_id, requests, extension_return_value)
96                    .await?;
97                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
98                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
99            } else {
100                let grouped_requests =
101                    group_create_logical_region_requests_by_physical_region_id(requests)?;
102                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
103                for (physical_region_id, requests) in grouped_requests {
104                    self.create_logical_regions(
105                        physical_region_id,
106                        requests,
107                        extension_return_value,
108                    )
109                    .await?;
110                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
111                }
112                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
113            }
114        } else {
115            return MissingRegionOptionSnafu {}.fail();
116        }
117
118        Ok(0)
119    }
120
121    /// Initialize a physical metric region at given region id.
122    async fn create_physical_region(
123        &self,
124        region_id: RegionId,
125        request: RegionCreateRequest,
126    ) -> Result<()> {
127        let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128        let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130        // create metadata region
131        let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132        self.mito
133            .handle_request(
134                metadata_region_id,
135                RegionRequest::Create(create_metadata_region_request),
136            )
137            .await
138            .with_context(|_| CreateMitoRegionSnafu {
139                region_type: METADATA_REGION_SUBDIR,
140            })?;
141
142        // create data region
143        let create_data_region_request = self.create_request_for_data_region(&request);
144        let physical_columns = create_data_region_request
145            .column_metadatas
146            .iter()
147            .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148            .collect::<HashMap<_, _>>();
149        self.mito
150            .handle_request(
151                data_region_id,
152                RegionRequest::Create(create_data_region_request),
153            )
154            .await
155            .with_context(|_| CreateMitoRegionSnafu {
156                region_type: DATA_REGION_SUBDIR,
157            })?;
158        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
159            PhysicalRegionNotFoundSnafu {
160                region_id: data_region_id,
161            },
162        )?;
163
164        info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
165        PHYSICAL_REGION_COUNT.inc();
166
167        // remember this table
168        self.state.write().unwrap().add_physical_region(
169            data_region_id,
170            physical_columns,
171            primary_key_encoding,
172            physical_region_options,
173        );
174
175        Ok(())
176    }
177
178    /// Create multiple logical regions on the same physical region.
179    async fn create_logical_regions(
180        &self,
181        physical_region_id: RegionId,
182        requests: Vec<(RegionId, RegionCreateRequest)>,
183        extension_return_value: &mut HashMap<String, Vec<u8>>,
184    ) -> Result<()> {
185        let data_region_id = utils::to_data_region_id(physical_region_id);
186
187        ensure!(
188            self.state
189                .read()
190                .unwrap()
191                .exist_physical_region(data_region_id),
192            PhysicalRegionNotFoundSnafu {
193                region_id: data_region_id,
194            }
195        );
196
197        // Filters out the requests that the logical region already exists
198        let requests = {
199            let state = self.state.read().unwrap();
200            let mut skipped = Vec::with_capacity(requests.len());
201            let mut kept_requests = Vec::with_capacity(requests.len());
202
203            for (region_id, request) in requests {
204                if state.is_logical_region_exist(region_id) {
205                    skipped.push(region_id);
206                } else {
207                    kept_requests.push((region_id, request));
208                }
209            }
210
211            // log skipped regions
212            if !skipped.is_empty() {
213                info!(
214                    "Skipped creating logical regions {skipped:?} because they already exist",
215                    skipped = skipped
216                );
217            }
218            kept_requests
219        };
220
221        // Finds new columns to add to physical region
222        let mut new_column_names = HashSet::new();
223        let mut new_columns = Vec::new();
224
225        let index_option = {
226            let state = &self.state.read().unwrap();
227            let region_state = state
228                .physical_region_states()
229                .get(&data_region_id)
230                .with_context(|| PhysicalRegionNotFoundSnafu {
231                    region_id: data_region_id,
232                })?;
233            let physical_columns = region_state.physical_columns();
234
235            extract_new_columns(
236                &requests,
237                physical_columns,
238                &mut new_column_names,
239                &mut new_columns,
240            )?;
241            region_state.options().index
242        };
243
244        // TODO(weny): we dont need to pass a mutable new_columns here.
245        self.data_region
246            .add_columns(data_region_id, new_columns, index_option)
247            .await?;
248
249        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
250        let physical_schema_map = physical_columns
251            .iter()
252            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
253            .collect::<HashMap<_, _>>();
254        let logical_regions = requests
255            .iter()
256            .map(|(region_id, _)| (*region_id))
257            .collect::<Vec<_>>();
258        let logical_region_columns = requests.iter().map(|(region_id, request)| {
259            (
260                *region_id,
261                request
262                    .column_metadatas
263                    .iter()
264                    .map(|metadata| {
265                        // Safety: previous steps ensure the physical region exist
266                        let column_metadata = *physical_schema_map
267                            .get(metadata.column_schema.name.as_str())
268                            .unwrap();
269                        (metadata.column_schema.name.as_str(), column_metadata)
270                    })
271                    .collect::<HashMap<_, _>>(),
272            )
273        });
274
275        let new_add_columns = new_column_names.iter().map(|name| {
276            // Safety: previous steps ensure the physical region exist
277            let column_metadata = *physical_schema_map.get(name).unwrap();
278            (name.to_string(), column_metadata.column_id)
279        });
280
281        extension_return_value.insert(
282            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
283            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
284        );
285
286        // Writes logical regions metadata to metadata region
287        self.metadata_region
288            .add_logical_regions(physical_region_id, true, logical_region_columns)
289            .await?;
290
291        {
292            let mut state = self.state.write().unwrap();
293            state.add_physical_columns(data_region_id, new_add_columns);
294            state.add_logical_regions(physical_region_id, logical_regions.clone());
295        }
296        for logical_region_id in logical_regions {
297            self.metadata_region
298                .open_logical_region(logical_region_id)
299                .await;
300        }
301
302        Ok(())
303    }
304
305    /// Check if
306    /// - internal columns are not occupied
307    /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
308    ///   [LOGICAL_TABLE_METADATA_KEY])
309    fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
310        request.validate().context(InvalidMetadataSnafu)?;
311
312        let name_to_index = request
313            .column_metadatas
314            .iter()
315            .enumerate()
316            .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
317            .collect::<HashMap<String, usize>>();
318
319        // check if internal columns are not occupied
320        ensure!(
321            !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
322            InternalColumnOccupiedSnafu {
323                column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
324            }
325        );
326        ensure!(
327            !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
328            InternalColumnOccupiedSnafu {
329                column: DATA_SCHEMA_TSID_COLUMN_NAME,
330            }
331        );
332
333        // check if required table option is present
334        ensure!(
335            request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
336            MissingRegionOptionSnafu {}
337        );
338        ensure!(
339            !(request.is_physical_table()
340                && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
341            ConflictRegionOptionSnafu {}
342        );
343
344        // check if only one field column is declared, and all tag columns are string
345        let mut field_col: Option<&ColumnMetadata> = None;
346        for col in &request.column_metadatas {
347            match col.semantic_type {
348                SemanticType::Tag => ensure!(
349                    col.column_schema.data_type == ConcreteDataType::string_datatype(),
350                    ColumnTypeMismatchSnafu {
351                        expect: ConcreteDataType::string_datatype(),
352                        actual: col.column_schema.data_type.clone(),
353                    }
354                ),
355                SemanticType::Field => {
356                    if field_col.is_some() {
357                        MultipleFieldColumnSnafu {
358                            previous: field_col.unwrap().column_schema.name.clone(),
359                            current: col.column_schema.name.clone(),
360                        }
361                        .fail()?;
362                    }
363                    field_col = Some(col)
364                }
365                SemanticType::Timestamp => {}
366            }
367        }
368        let field_col = field_col.context(NoFieldColumnSnafu)?;
369
370        // make sure the field column is float64 type
371        ensure!(
372            field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
373            ColumnTypeMismatchSnafu {
374                expect: ConcreteDataType::float64_datatype(),
375                actual: field_col.column_schema.data_type.clone(),
376            }
377        );
378
379        Ok(())
380    }
381
382    /// Build data region id and metadata region id from the given region id.
383    ///
384    /// Return value: (data_region_id, metadata_region_id)
385    fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
386        (
387            to_data_region_id(region_id),
388            to_metadata_region_id(region_id),
389        )
390    }
391
392    /// Build [RegionCreateRequest] for metadata region
393    ///
394    /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
395    pub fn create_request_for_metadata_region(
396        &self,
397        request: &RegionCreateRequest,
398    ) -> RegionCreateRequest {
399        // ts TIME INDEX DEFAULT 0
400        let timestamp_column_metadata = ColumnMetadata {
401            column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
402            semantic_type: SemanticType::Timestamp,
403            column_schema: ColumnSchema::new(
404                METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
405                ConcreteDataType::timestamp_millisecond_datatype(),
406                false,
407            )
408            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
409                Value::Timestamp(Timestamp::new_millisecond(0)),
410            )))
411            .unwrap(),
412        };
413        // key STRING PRIMARY KEY
414        let key_column_metadata = ColumnMetadata {
415            column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
416            semantic_type: SemanticType::Tag,
417            column_schema: ColumnSchema::new(
418                METADATA_SCHEMA_KEY_COLUMN_NAME,
419                ConcreteDataType::string_datatype(),
420                false,
421            ),
422        };
423        // val STRING
424        let value_column_metadata = ColumnMetadata {
425            column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
426            semantic_type: SemanticType::Field,
427            column_schema: ColumnSchema::new(
428                METADATA_SCHEMA_VALUE_COLUMN_NAME,
429                ConcreteDataType::string_datatype(),
430                true,
431            ),
432        };
433
434        // concat region dir
435        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
436
437        let options = region_options_for_metadata_region(request.options.clone());
438        RegionCreateRequest {
439            engine: MITO_ENGINE_NAME.to_string(),
440            column_metadatas: vec![
441                timestamp_column_metadata,
442                key_column_metadata,
443                value_column_metadata,
444            ],
445            primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
446            options,
447            region_dir: metadata_region_dir,
448        }
449    }
450
451    /// Convert [RegionCreateRequest] for data region.
452    ///
453    /// All tag columns in the original request will be converted to value columns.
454    /// Those columns real semantic type is stored in metadata region.
455    ///
456    /// This will also add internal columns to the request.
457    pub fn create_request_for_data_region(
458        &self,
459        request: &RegionCreateRequest,
460    ) -> RegionCreateRequest {
461        let mut data_region_request = request.clone();
462        let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
463
464        // concat region dir
465        data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
466
467        // change nullability for tag columns
468        data_region_request
469            .column_metadatas
470            .iter_mut()
471            .for_each(|metadata| {
472                if metadata.semantic_type == SemanticType::Tag {
473                    metadata.column_schema.set_nullable();
474                    primary_key.push(metadata.column_id);
475                }
476            });
477
478        // add internal columns
479        let [table_id_col, tsid_col] = Self::internal_column_metadata();
480        data_region_request.column_metadatas.push(table_id_col);
481        data_region_request.column_metadatas.push(tsid_col);
482        data_region_request.primary_key = primary_key;
483
484        // set data region options
485        set_data_region_options(
486            &mut data_region_request.options,
487            self.config.experimental_sparse_primary_key_encoding,
488        );
489
490        data_region_request
491    }
492
493    /// Generate internal column metadata.
494    ///
495    /// Return `[table_id_col, tsid_col]`
496    fn internal_column_metadata() -> [ColumnMetadata; 2] {
497        // Safety: BloomFilter is a valid skipping index type
498        let metric_name_col = ColumnMetadata {
499            column_id: ReservedColumnId::table_id(),
500            semantic_type: SemanticType::Tag,
501            column_schema: ColumnSchema::new(
502                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
503                ConcreteDataType::uint32_datatype(),
504                false,
505            )
506            .with_skipping_options(SkippingIndexOptions {
507                granularity: DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
508                index_type: datatypes::schema::SkippingIndexType::BloomFilter,
509            })
510            .unwrap(),
511        };
512        let tsid_col = ColumnMetadata {
513            column_id: ReservedColumnId::tsid(),
514            semantic_type: SemanticType::Tag,
515            column_schema: ColumnSchema::new(
516                DATA_SCHEMA_TSID_COLUMN_NAME,
517                ConcreteDataType::uint64_datatype(),
518                false,
519            )
520            .with_inverted_index(false),
521        };
522        [metric_name_col, tsid_col]
523    }
524}
525
526/// Groups the create logical region requests by physical region id.
527fn group_create_logical_region_requests_by_physical_region_id(
528    requests: Vec<(RegionId, RegionCreateRequest)>,
529) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
530    let mut result = HashMap::with_capacity(requests.len());
531    for (region_id, request) in requests {
532        let physical_region_id = parse_physical_region_id(&request)?;
533        result
534            .entry(physical_region_id)
535            .or_insert_with(Vec::new)
536            .push((region_id, request));
537    }
538
539    Ok(result)
540}
541
542/// Parses the physical region id from the request.
543fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
544    let physical_region_id_raw = request
545        .options
546        .get(LOGICAL_TABLE_METADATA_KEY)
547        .ok_or(MissingRegionOptionSnafu {}.build())?;
548
549    let physical_region_id: RegionId = physical_region_id_raw
550        .parse::<u64>()
551        .with_context(|_| ParseRegionIdSnafu {
552            raw: physical_region_id_raw,
553        })?
554        .into();
555
556    Ok(physical_region_id)
557}
558
559/// Creates the region options for metadata region in metric engine.
560pub(crate) fn region_options_for_metadata_region(
561    mut original: HashMap<String, String>,
562) -> HashMap<String, String> {
563    // TODO(ruihang, weny): add whitelist for metric engine options.
564    original.remove(APPEND_MODE_KEY);
565    // Don't allow to set primary key encoding for metadata region.
566    original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
567    original.insert(TTL_KEY.to_string(), FOREVER.to_string());
568    original.remove(SKIP_WAL_KEY);
569    original
570}
571
572#[cfg(test)]
573mod test {
574    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
575
576    use super::*;
577    use crate::config::EngineConfig;
578    use crate::engine::MetricEngine;
579    use crate::test_util::TestEnv;
580
581    #[test]
582    fn test_verify_region_create_request() {
583        // internal column is occupied
584        let request = RegionCreateRequest {
585            column_metadatas: vec![
586                ColumnMetadata {
587                    column_id: 0,
588                    semantic_type: SemanticType::Timestamp,
589                    column_schema: ColumnSchema::new(
590                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
591                        ConcreteDataType::timestamp_millisecond_datatype(),
592                        false,
593                    ),
594                },
595                ColumnMetadata {
596                    column_id: 1,
597                    semantic_type: SemanticType::Tag,
598                    column_schema: ColumnSchema::new(
599                        DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
600                        ConcreteDataType::uint32_datatype(),
601                        false,
602                    ),
603                },
604            ],
605            region_dir: "test_dir".to_string(),
606            engine: METRIC_ENGINE_NAME.to_string(),
607            primary_key: vec![],
608            options: HashMap::new(),
609        };
610        let result = MetricEngineInner::verify_region_create_request(&request);
611        assert!(result.is_err());
612        assert_eq!(
613            result.unwrap_err().to_string(),
614            "Internal column __table_id is reserved".to_string()
615        );
616
617        // valid request
618        let request = RegionCreateRequest {
619            column_metadatas: vec![
620                ColumnMetadata {
621                    column_id: 0,
622                    semantic_type: SemanticType::Timestamp,
623                    column_schema: ColumnSchema::new(
624                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
625                        ConcreteDataType::timestamp_millisecond_datatype(),
626                        false,
627                    ),
628                },
629                ColumnMetadata {
630                    column_id: 1,
631                    semantic_type: SemanticType::Tag,
632                    column_schema: ColumnSchema::new(
633                        "column1".to_string(),
634                        ConcreteDataType::string_datatype(),
635                        false,
636                    ),
637                },
638                ColumnMetadata {
639                    column_id: 2,
640                    semantic_type: SemanticType::Field,
641                    column_schema: ColumnSchema::new(
642                        "column2".to_string(),
643                        ConcreteDataType::float64_datatype(),
644                        false,
645                    ),
646                },
647            ],
648            region_dir: "test_dir".to_string(),
649            engine: METRIC_ENGINE_NAME.to_string(),
650            primary_key: vec![],
651            options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
652                .into_iter()
653                .collect(),
654        };
655        MetricEngineInner::verify_region_create_request(&request).unwrap();
656    }
657
658    #[test]
659    fn test_verify_region_create_request_options() {
660        let mut request = RegionCreateRequest {
661            column_metadatas: vec![
662                ColumnMetadata {
663                    column_id: 0,
664                    semantic_type: SemanticType::Timestamp,
665                    column_schema: ColumnSchema::new(
666                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
667                        ConcreteDataType::timestamp_millisecond_datatype(),
668                        false,
669                    ),
670                },
671                ColumnMetadata {
672                    column_id: 1,
673                    semantic_type: SemanticType::Field,
674                    column_schema: ColumnSchema::new(
675                        "val".to_string(),
676                        ConcreteDataType::float64_datatype(),
677                        false,
678                    ),
679                },
680            ],
681            region_dir: "test_dir".to_string(),
682            engine: METRIC_ENGINE_NAME.to_string(),
683            primary_key: vec![],
684            options: HashMap::new(),
685        };
686        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
687
688        let mut options = HashMap::new();
689        options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
690        request.options.clone_from(&options);
691        MetricEngineInner::verify_region_create_request(&request).unwrap();
692
693        options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
694        request.options.clone_from(&options);
695        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
696
697        options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
698        request.options = options;
699        MetricEngineInner::verify_region_create_request(&request).unwrap();
700    }
701
702    #[tokio::test]
703    async fn test_create_request_for_physical_regions() {
704        // original request
705        let options: HashMap<_, _> = [
706            ("ttl".to_string(), "60m".to_string()),
707            ("skip_wal".to_string(), "true".to_string()),
708        ]
709        .into_iter()
710        .collect();
711        let request = RegionCreateRequest {
712            engine: METRIC_ENGINE_NAME.to_string(),
713            column_metadatas: vec![
714                ColumnMetadata {
715                    column_id: 0,
716                    semantic_type: SemanticType::Timestamp,
717                    column_schema: ColumnSchema::new(
718                        "timestamp",
719                        ConcreteDataType::timestamp_millisecond_datatype(),
720                        false,
721                    ),
722                },
723                ColumnMetadata {
724                    column_id: 1,
725                    semantic_type: SemanticType::Tag,
726                    column_schema: ColumnSchema::new(
727                        "tag",
728                        ConcreteDataType::string_datatype(),
729                        false,
730                    ),
731                },
732            ],
733            primary_key: vec![0],
734            options,
735            region_dir: "/test_dir".to_string(),
736        };
737
738        // set up
739        let env = TestEnv::new().await;
740        let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
741        let engine_inner = engine.inner;
742
743        // check create data region request
744        let data_region_request = engine_inner.create_request_for_data_region(&request);
745        assert_eq!(
746            data_region_request.region_dir,
747            "/test_dir/data/".to_string()
748        );
749        assert_eq!(data_region_request.column_metadatas.len(), 4);
750        assert_eq!(
751            data_region_request.primary_key,
752            vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
753        );
754        assert!(data_region_request.options.contains_key("ttl"));
755
756        // check create metadata region request
757        let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
758        assert_eq!(
759            metadata_region_request.region_dir,
760            "/test_dir/metadata/".to_string()
761        );
762        assert_eq!(
763            metadata_region_request.options.get("ttl").unwrap(),
764            "forever"
765        );
766        assert!(!metadata_region_request.options.contains_key("skip_wal"));
767    }
768}