metric_engine/engine/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30    ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31    DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33    METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34    METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{
37    APPEND_MODE_KEY, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, SKIP_WAL_KEY, TTL_KEY,
38};
39use store_api::region_engine::RegionEngine;
40use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
41use store_api::storage::consts::ReservedColumnId;
42use store_api::storage::RegionId;
43
44use crate::engine::create::extract_new_columns::extract_new_columns;
45use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
46use crate::engine::MetricEngineInner;
47use crate::error::{
48    ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
49    InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
50    MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
51    Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
52};
53use crate::metrics::PHYSICAL_REGION_COUNT;
54use crate::utils::{
55    self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
56    to_metadata_region_id,
57};
58
59const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
60
61impl MetricEngineInner {
62    pub async fn create_regions(
63        &self,
64        mut requests: Vec<(RegionId, RegionCreateRequest)>,
65        extension_return_value: &mut HashMap<String, Vec<u8>>,
66    ) -> Result<AffectedRows> {
67        if requests.is_empty() {
68            return Ok(0);
69        }
70
71        for (_, request) in requests.iter() {
72            Self::verify_region_create_request(request)?;
73        }
74
75        let first_request = &requests.first().unwrap().1;
76        if first_request.is_physical_table() {
77            ensure!(
78                requests.len() == 1,
79                UnexpectedRequestSnafu {
80                    reason: "Physical table must be created with single request".to_string(),
81                }
82            );
83            let (region_id, request) = requests.pop().unwrap();
84            self.create_physical_region(region_id, request).await?;
85
86            return Ok(0);
87        } else if first_request
88            .options
89            .contains_key(LOGICAL_TABLE_METADATA_KEY)
90        {
91            if requests.len() == 1 {
92                let request = &requests.first().unwrap().1;
93                let physical_region_id = parse_physical_region_id(request)?;
94                let mut manifest_infos = Vec::with_capacity(1);
95                self.create_logical_regions(physical_region_id, requests, extension_return_value)
96                    .await?;
97                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
98                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
99            } else {
100                let grouped_requests =
101                    group_create_logical_region_requests_by_physical_region_id(requests)?;
102                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
103                for (physical_region_id, requests) in grouped_requests {
104                    self.create_logical_regions(
105                        physical_region_id,
106                        requests,
107                        extension_return_value,
108                    )
109                    .await?;
110                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
111                }
112                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
113            }
114        } else {
115            return MissingRegionOptionSnafu {}.fail();
116        }
117
118        Ok(0)
119    }
120
121    /// Initialize a physical metric region at given region id.
122    async fn create_physical_region(
123        &self,
124        region_id: RegionId,
125        request: RegionCreateRequest,
126    ) -> Result<()> {
127        let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128        let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130        // create metadata region
131        let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132        self.mito
133            .handle_request(
134                metadata_region_id,
135                RegionRequest::Create(create_metadata_region_request),
136            )
137            .await
138            .with_context(|_| CreateMitoRegionSnafu {
139                region_type: METADATA_REGION_SUBDIR,
140            })?;
141
142        // create data region
143        let create_data_region_request = self.create_request_for_data_region(&request);
144        let physical_columns = create_data_region_request
145            .column_metadatas
146            .iter()
147            .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148            .collect::<HashMap<_, _>>();
149        let time_index_unit = create_data_region_request
150            .column_metadatas
151            .iter()
152            .find_map(|metadata| {
153                if metadata.semantic_type == SemanticType::Timestamp {
154                    metadata
155                        .column_schema
156                        .data_type
157                        .as_timestamp()
158                        .map(|data_type| data_type.unit())
159                } else {
160                    None
161                }
162            })
163            .context(UnexpectedRequestSnafu {
164                reason: "No time index column found",
165            })?;
166        self.mito
167            .handle_request(
168                data_region_id,
169                RegionRequest::Create(create_data_region_request),
170            )
171            .await
172            .with_context(|_| CreateMitoRegionSnafu {
173                region_type: DATA_REGION_SUBDIR,
174            })?;
175        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
176            PhysicalRegionNotFoundSnafu {
177                region_id: data_region_id,
178            },
179        )?;
180
181        info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
182        PHYSICAL_REGION_COUNT.inc();
183
184        // remember this table
185        self.state.write().unwrap().add_physical_region(
186            data_region_id,
187            physical_columns,
188            primary_key_encoding,
189            physical_region_options,
190            time_index_unit,
191        );
192
193        Ok(())
194    }
195
196    /// Create multiple logical regions on the same physical region.
197    async fn create_logical_regions(
198        &self,
199        physical_region_id: RegionId,
200        requests: Vec<(RegionId, RegionCreateRequest)>,
201        extension_return_value: &mut HashMap<String, Vec<u8>>,
202    ) -> Result<()> {
203        let data_region_id = utils::to_data_region_id(physical_region_id);
204
205        let unit = self
206            .state
207            .read()
208            .unwrap()
209            .physical_region_time_index_unit(physical_region_id)
210            .context(PhysicalRegionNotFoundSnafu {
211                region_id: data_region_id,
212            })?;
213        // Checks the time index unit of each request.
214        for (_, request) in &requests {
215            // Safety: verify_region_create_request() ensures that the request is valid.
216            let time_index_column = request
217                .column_metadatas
218                .iter()
219                .find(|col| col.semantic_type == SemanticType::Timestamp)
220                .unwrap();
221            let request_unit = time_index_column
222                .column_schema
223                .data_type
224                .as_timestamp()
225                .unwrap()
226                .unit();
227            ensure!(
228                request_unit == unit,
229                UnexpectedRequestSnafu {
230                    reason: format!(
231                        "Metric has differenttime unit ({:?}) than the physical region ({:?})",
232                        request_unit, unit
233                    ),
234                }
235            );
236        }
237
238        // Filters out the requests that the logical region already exists
239        let requests = {
240            let state = self.state.read().unwrap();
241            let mut skipped = Vec::with_capacity(requests.len());
242            let mut kept_requests = Vec::with_capacity(requests.len());
243
244            for (region_id, request) in requests {
245                if state.is_logical_region_exist(region_id) {
246                    skipped.push(region_id);
247                } else {
248                    kept_requests.push((region_id, request));
249                }
250            }
251
252            // log skipped regions
253            if !skipped.is_empty() {
254                info!(
255                    "Skipped creating logical regions {skipped:?} because they already exist",
256                    skipped = skipped
257                );
258            }
259            kept_requests
260        };
261
262        // Finds new columns to add to physical region
263        let mut new_column_names = HashSet::new();
264        let mut new_columns = Vec::new();
265
266        let index_option = {
267            let state = &self.state.read().unwrap();
268            let region_state = state
269                .physical_region_states()
270                .get(&data_region_id)
271                .with_context(|| PhysicalRegionNotFoundSnafu {
272                    region_id: data_region_id,
273                })?;
274            let physical_columns = region_state.physical_columns();
275
276            extract_new_columns(
277                &requests,
278                physical_columns,
279                &mut new_column_names,
280                &mut new_columns,
281            )?;
282            region_state.options().index
283        };
284
285        // TODO(weny): we dont need to pass a mutable new_columns here.
286        self.data_region
287            .add_columns(data_region_id, new_columns, index_option)
288            .await?;
289
290        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
291        let physical_schema_map = physical_columns
292            .iter()
293            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
294            .collect::<HashMap<_, _>>();
295        let logical_regions = requests
296            .iter()
297            .map(|(region_id, _)| (*region_id))
298            .collect::<Vec<_>>();
299        let logical_region_columns = requests.iter().map(|(region_id, request)| {
300            (
301                *region_id,
302                request
303                    .column_metadatas
304                    .iter()
305                    .map(|metadata| {
306                        // Safety: previous steps ensure the physical region exist
307                        let column_metadata = *physical_schema_map
308                            .get(metadata.column_schema.name.as_str())
309                            .unwrap();
310                        (metadata.column_schema.name.as_str(), column_metadata)
311                    })
312                    .collect::<HashMap<_, _>>(),
313            )
314        });
315
316        let new_add_columns = new_column_names.iter().map(|name| {
317            // Safety: previous steps ensure the physical region exist
318            let column_metadata = *physical_schema_map.get(name).unwrap();
319            (name.to_string(), column_metadata.column_id)
320        });
321
322        extension_return_value.insert(
323            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
324            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
325        );
326
327        // Writes logical regions metadata to metadata region
328        self.metadata_region
329            .add_logical_regions(physical_region_id, true, logical_region_columns)
330            .await?;
331
332        {
333            let mut state = self.state.write().unwrap();
334            state.add_physical_columns(data_region_id, new_add_columns);
335            state.add_logical_regions(physical_region_id, logical_regions.clone());
336        }
337        for logical_region_id in logical_regions {
338            self.metadata_region
339                .open_logical_region(logical_region_id)
340                .await;
341        }
342
343        Ok(())
344    }
345
346    /// Check if
347    /// - internal columns are not occupied
348    /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
349    ///   [LOGICAL_TABLE_METADATA_KEY])
350    fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
351        request.validate().context(InvalidMetadataSnafu)?;
352
353        let name_to_index = request
354            .column_metadatas
355            .iter()
356            .enumerate()
357            .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
358            .collect::<HashMap<String, usize>>();
359
360        // check if internal columns are not occupied
361        ensure!(
362            !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
363            InternalColumnOccupiedSnafu {
364                column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
365            }
366        );
367        ensure!(
368            !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
369            InternalColumnOccupiedSnafu {
370                column: DATA_SCHEMA_TSID_COLUMN_NAME,
371            }
372        );
373
374        // check if required table option is present
375        ensure!(
376            request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
377            MissingRegionOptionSnafu {}
378        );
379        ensure!(
380            !(request.is_physical_table()
381                && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
382            ConflictRegionOptionSnafu {}
383        );
384
385        // check if only one field column is declared, and all tag columns are string
386        let mut field_col: Option<&ColumnMetadata> = None;
387        for col in &request.column_metadatas {
388            match col.semantic_type {
389                SemanticType::Tag => ensure!(
390                    col.column_schema.data_type == ConcreteDataType::string_datatype(),
391                    ColumnTypeMismatchSnafu {
392                        expect: ConcreteDataType::string_datatype(),
393                        actual: col.column_schema.data_type.clone(),
394                    }
395                ),
396                SemanticType::Field => {
397                    if field_col.is_some() {
398                        MultipleFieldColumnSnafu {
399                            previous: field_col.unwrap().column_schema.name.clone(),
400                            current: col.column_schema.name.clone(),
401                        }
402                        .fail()?;
403                    }
404                    field_col = Some(col)
405                }
406                SemanticType::Timestamp => {}
407            }
408        }
409        let field_col = field_col.context(NoFieldColumnSnafu)?;
410
411        // make sure the field column is float64 type
412        ensure!(
413            field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
414            ColumnTypeMismatchSnafu {
415                expect: ConcreteDataType::float64_datatype(),
416                actual: field_col.column_schema.data_type.clone(),
417            }
418        );
419
420        Ok(())
421    }
422
423    /// Build data region id and metadata region id from the given region id.
424    ///
425    /// Return value: (data_region_id, metadata_region_id)
426    fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
427        (
428            to_data_region_id(region_id),
429            to_metadata_region_id(region_id),
430        )
431    }
432
433    /// Build [RegionCreateRequest] for metadata region
434    ///
435    /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
436    pub fn create_request_for_metadata_region(
437        &self,
438        request: &RegionCreateRequest,
439    ) -> RegionCreateRequest {
440        // ts TIME INDEX DEFAULT 0
441        let timestamp_column_metadata = ColumnMetadata {
442            column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
443            semantic_type: SemanticType::Timestamp,
444            column_schema: ColumnSchema::new(
445                METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
446                ConcreteDataType::timestamp_millisecond_datatype(),
447                false,
448            )
449            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
450                Value::Timestamp(Timestamp::new_millisecond(0)),
451            )))
452            .unwrap(),
453        };
454        // key STRING PRIMARY KEY
455        let key_column_metadata = ColumnMetadata {
456            column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
457            semantic_type: SemanticType::Tag,
458            column_schema: ColumnSchema::new(
459                METADATA_SCHEMA_KEY_COLUMN_NAME,
460                ConcreteDataType::string_datatype(),
461                false,
462            ),
463        };
464        // val STRING
465        let value_column_metadata = ColumnMetadata {
466            column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
467            semantic_type: SemanticType::Field,
468            column_schema: ColumnSchema::new(
469                METADATA_SCHEMA_VALUE_COLUMN_NAME,
470                ConcreteDataType::string_datatype(),
471                true,
472            ),
473        };
474
475        // concat region dir
476        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
477
478        let options = region_options_for_metadata_region(request.options.clone());
479        RegionCreateRequest {
480            engine: MITO_ENGINE_NAME.to_string(),
481            column_metadatas: vec![
482                timestamp_column_metadata,
483                key_column_metadata,
484                value_column_metadata,
485            ],
486            primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
487            options,
488            region_dir: metadata_region_dir,
489        }
490    }
491
492    /// Convert [RegionCreateRequest] for data region.
493    ///
494    /// All tag columns in the original request will be converted to value columns.
495    /// Those columns real semantic type is stored in metadata region.
496    ///
497    /// This will also add internal columns to the request.
498    pub fn create_request_for_data_region(
499        &self,
500        request: &RegionCreateRequest,
501    ) -> RegionCreateRequest {
502        let mut data_region_request = request.clone();
503        let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
504
505        // concat region dir
506        data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
507
508        // change nullability for tag columns
509        data_region_request
510            .column_metadatas
511            .iter_mut()
512            .for_each(|metadata| {
513                if metadata.semantic_type == SemanticType::Tag {
514                    metadata.column_schema.set_nullable();
515                    primary_key.push(metadata.column_id);
516                }
517            });
518
519        // add internal columns
520        let [table_id_col, tsid_col] = Self::internal_column_metadata();
521        data_region_request.column_metadatas.push(table_id_col);
522        data_region_request.column_metadatas.push(tsid_col);
523        data_region_request.primary_key = primary_key;
524
525        // set data region options
526        set_data_region_options(
527            &mut data_region_request.options,
528            self.config.experimental_sparse_primary_key_encoding,
529        );
530
531        data_region_request
532    }
533
534    /// Generate internal column metadata.
535    ///
536    /// Return `[table_id_col, tsid_col]`
537    fn internal_column_metadata() -> [ColumnMetadata; 2] {
538        // Safety: BloomFilter is a valid skipping index type
539        let metric_name_col = ColumnMetadata {
540            column_id: ReservedColumnId::table_id(),
541            semantic_type: SemanticType::Tag,
542            column_schema: ColumnSchema::new(
543                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
544                ConcreteDataType::uint32_datatype(),
545                false,
546            )
547            .with_skipping_options(SkippingIndexOptions {
548                granularity: DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
549                index_type: datatypes::schema::SkippingIndexType::BloomFilter,
550            })
551            .unwrap(),
552        };
553        let tsid_col = ColumnMetadata {
554            column_id: ReservedColumnId::tsid(),
555            semantic_type: SemanticType::Tag,
556            column_schema: ColumnSchema::new(
557                DATA_SCHEMA_TSID_COLUMN_NAME,
558                ConcreteDataType::uint64_datatype(),
559                false,
560            )
561            .with_inverted_index(false),
562        };
563        [metric_name_col, tsid_col]
564    }
565}
566
567/// Groups the create logical region requests by physical region id.
568fn group_create_logical_region_requests_by_physical_region_id(
569    requests: Vec<(RegionId, RegionCreateRequest)>,
570) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
571    let mut result = HashMap::with_capacity(requests.len());
572    for (region_id, request) in requests {
573        let physical_region_id = parse_physical_region_id(&request)?;
574        result
575            .entry(physical_region_id)
576            .or_insert_with(Vec::new)
577            .push((region_id, request));
578    }
579
580    Ok(result)
581}
582
583/// Parses the physical region id from the request.
584fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
585    let physical_region_id_raw = request
586        .options
587        .get(LOGICAL_TABLE_METADATA_KEY)
588        .ok_or(MissingRegionOptionSnafu {}.build())?;
589
590    let physical_region_id: RegionId = physical_region_id_raw
591        .parse::<u64>()
592        .with_context(|_| ParseRegionIdSnafu {
593            raw: physical_region_id_raw,
594        })?
595        .into();
596
597    Ok(physical_region_id)
598}
599
600/// Creates the region options for metadata region in metric engine.
601pub(crate) fn region_options_for_metadata_region(
602    mut original: HashMap<String, String>,
603) -> HashMap<String, String> {
604    // TODO(ruihang, weny): add whitelist for metric engine options.
605    original.remove(APPEND_MODE_KEY);
606    // Don't allow to set primary key encoding for metadata region.
607    original.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
608    original.insert(TTL_KEY.to_string(), FOREVER.to_string());
609    original.remove(SKIP_WAL_KEY);
610    original
611}
612
613#[cfg(test)]
614mod test {
615    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
616
617    use super::*;
618    use crate::config::EngineConfig;
619    use crate::engine::MetricEngine;
620    use crate::test_util::TestEnv;
621
622    #[test]
623    fn test_verify_region_create_request() {
624        // internal column is occupied
625        let request = RegionCreateRequest {
626            column_metadatas: vec![
627                ColumnMetadata {
628                    column_id: 0,
629                    semantic_type: SemanticType::Timestamp,
630                    column_schema: ColumnSchema::new(
631                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
632                        ConcreteDataType::timestamp_millisecond_datatype(),
633                        false,
634                    ),
635                },
636                ColumnMetadata {
637                    column_id: 1,
638                    semantic_type: SemanticType::Tag,
639                    column_schema: ColumnSchema::new(
640                        DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
641                        ConcreteDataType::uint32_datatype(),
642                        false,
643                    ),
644                },
645            ],
646            region_dir: "test_dir".to_string(),
647            engine: METRIC_ENGINE_NAME.to_string(),
648            primary_key: vec![],
649            options: HashMap::new(),
650        };
651        let result = MetricEngineInner::verify_region_create_request(&request);
652        assert!(result.is_err());
653        assert_eq!(
654            result.unwrap_err().to_string(),
655            "Internal column __table_id is reserved".to_string()
656        );
657
658        // valid request
659        let request = RegionCreateRequest {
660            column_metadatas: vec![
661                ColumnMetadata {
662                    column_id: 0,
663                    semantic_type: SemanticType::Timestamp,
664                    column_schema: ColumnSchema::new(
665                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
666                        ConcreteDataType::timestamp_millisecond_datatype(),
667                        false,
668                    ),
669                },
670                ColumnMetadata {
671                    column_id: 1,
672                    semantic_type: SemanticType::Tag,
673                    column_schema: ColumnSchema::new(
674                        "column1".to_string(),
675                        ConcreteDataType::string_datatype(),
676                        false,
677                    ),
678                },
679                ColumnMetadata {
680                    column_id: 2,
681                    semantic_type: SemanticType::Field,
682                    column_schema: ColumnSchema::new(
683                        "column2".to_string(),
684                        ConcreteDataType::float64_datatype(),
685                        false,
686                    ),
687                },
688            ],
689            region_dir: "test_dir".to_string(),
690            engine: METRIC_ENGINE_NAME.to_string(),
691            primary_key: vec![],
692            options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
693                .into_iter()
694                .collect(),
695        };
696        MetricEngineInner::verify_region_create_request(&request).unwrap();
697    }
698
699    #[test]
700    fn test_verify_region_create_request_options() {
701        let mut request = RegionCreateRequest {
702            column_metadatas: vec![
703                ColumnMetadata {
704                    column_id: 0,
705                    semantic_type: SemanticType::Timestamp,
706                    column_schema: ColumnSchema::new(
707                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
708                        ConcreteDataType::timestamp_millisecond_datatype(),
709                        false,
710                    ),
711                },
712                ColumnMetadata {
713                    column_id: 1,
714                    semantic_type: SemanticType::Field,
715                    column_schema: ColumnSchema::new(
716                        "val".to_string(),
717                        ConcreteDataType::float64_datatype(),
718                        false,
719                    ),
720                },
721            ],
722            region_dir: "test_dir".to_string(),
723            engine: METRIC_ENGINE_NAME.to_string(),
724            primary_key: vec![],
725            options: HashMap::new(),
726        };
727        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
728
729        let mut options = HashMap::new();
730        options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
731        request.options.clone_from(&options);
732        MetricEngineInner::verify_region_create_request(&request).unwrap();
733
734        options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
735        request.options.clone_from(&options);
736        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
737
738        options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
739        request.options = options;
740        MetricEngineInner::verify_region_create_request(&request).unwrap();
741    }
742
743    #[tokio::test]
744    async fn test_create_request_for_physical_regions() {
745        // original request
746        let options: HashMap<_, _> = [
747            ("ttl".to_string(), "60m".to_string()),
748            ("skip_wal".to_string(), "true".to_string()),
749        ]
750        .into_iter()
751        .collect();
752        let request = RegionCreateRequest {
753            engine: METRIC_ENGINE_NAME.to_string(),
754            column_metadatas: vec![
755                ColumnMetadata {
756                    column_id: 0,
757                    semantic_type: SemanticType::Timestamp,
758                    column_schema: ColumnSchema::new(
759                        "timestamp",
760                        ConcreteDataType::timestamp_millisecond_datatype(),
761                        false,
762                    ),
763                },
764                ColumnMetadata {
765                    column_id: 1,
766                    semantic_type: SemanticType::Tag,
767                    column_schema: ColumnSchema::new(
768                        "tag",
769                        ConcreteDataType::string_datatype(),
770                        false,
771                    ),
772                },
773            ],
774            primary_key: vec![0],
775            options,
776            region_dir: "/test_dir".to_string(),
777        };
778
779        // set up
780        let env = TestEnv::new().await;
781        let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
782        let engine_inner = engine.inner;
783
784        // check create data region request
785        let data_region_request = engine_inner.create_request_for_data_region(&request);
786        assert_eq!(
787            data_region_request.region_dir,
788            "/test_dir/data/".to_string()
789        );
790        assert_eq!(data_region_request.column_metadatas.len(), 4);
791        assert_eq!(
792            data_region_request.primary_key,
793            vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
794        );
795        assert!(data_region_request.options.contains_key("ttl"));
796
797        // check create metadata region request
798        let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
799        assert_eq!(
800            metadata_region_request.region_dir,
801            "/test_dir/metadata/".to_string()
802        );
803        assert_eq!(
804            metadata_region_request.options.get("ttl").unwrap(),
805            "forever"
806        );
807        assert!(!metadata_region_request.options.contains_key("skip_wal"));
808    }
809}