metric_engine/engine/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30    ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31    DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33    METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34    METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
37use store_api::region_engine::RegionEngine;
38use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
39use store_api::storage::consts::ReservedColumnId;
40use store_api::storage::RegionId;
41
42use crate::engine::create::extract_new_columns::extract_new_columns;
43use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
44use crate::engine::MetricEngineInner;
45use crate::error::{
46    ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
47    InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
48    MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
49    Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
50};
51use crate::metrics::PHYSICAL_REGION_COUNT;
52use crate::utils::{
53    self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
54    to_metadata_region_id,
55};
56
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
58const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
59
60impl MetricEngineInner {
61    pub async fn create_regions(
62        &self,
63        mut requests: Vec<(RegionId, RegionCreateRequest)>,
64        extension_return_value: &mut HashMap<String, Vec<u8>>,
65    ) -> Result<AffectedRows> {
66        if requests.is_empty() {
67            return Ok(0);
68        }
69
70        for (_, request) in requests.iter() {
71            Self::verify_region_create_request(request)?;
72        }
73
74        let first_request = &requests.first().unwrap().1;
75        if first_request.is_physical_table() {
76            ensure!(
77                requests.len() == 1,
78                UnexpectedRequestSnafu {
79                    reason: "Physical table must be created with single request".to_string(),
80                }
81            );
82            let (region_id, request) = requests.pop().unwrap();
83            self.create_physical_region(region_id, request).await?;
84
85            return Ok(0);
86        } else if first_request
87            .options
88            .contains_key(LOGICAL_TABLE_METADATA_KEY)
89        {
90            if requests.len() == 1 {
91                let request = &requests.first().unwrap().1;
92                let physical_region_id = parse_physical_region_id(request)?;
93                let mut manifest_infos = Vec::with_capacity(1);
94                self.create_logical_regions(physical_region_id, requests, extension_return_value)
95                    .await?;
96                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
97                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
98            } else {
99                let grouped_requests =
100                    group_create_logical_region_requests_by_physical_region_id(requests)?;
101                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
102                for (physical_region_id, requests) in grouped_requests {
103                    self.create_logical_regions(
104                        physical_region_id,
105                        requests,
106                        extension_return_value,
107                    )
108                    .await?;
109                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
110                }
111                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
112            }
113        } else {
114            return MissingRegionOptionSnafu {}.fail();
115        }
116
117        Ok(0)
118    }
119
120    /// Initialize a physical metric region at given region id.
121    async fn create_physical_region(
122        &self,
123        region_id: RegionId,
124        request: RegionCreateRequest,
125    ) -> Result<()> {
126        let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
127        let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
128
129        // create metadata region
130        let create_metadata_region_request = self.create_request_for_metadata_region(&request);
131        self.mito
132            .handle_request(
133                metadata_region_id,
134                RegionRequest::Create(create_metadata_region_request),
135            )
136            .await
137            .with_context(|_| CreateMitoRegionSnafu {
138                region_type: METADATA_REGION_SUBDIR,
139            })?;
140
141        // create data region
142        let create_data_region_request = self.create_request_for_data_region(&request);
143        let physical_columns = create_data_region_request
144            .column_metadatas
145            .iter()
146            .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
147            .collect::<HashMap<_, _>>();
148        let time_index_unit = create_data_region_request
149            .column_metadatas
150            .iter()
151            .find_map(|metadata| {
152                if metadata.semantic_type == SemanticType::Timestamp {
153                    metadata
154                        .column_schema
155                        .data_type
156                        .as_timestamp()
157                        .map(|data_type| data_type.unit())
158                } else {
159                    None
160                }
161            })
162            .context(UnexpectedRequestSnafu {
163                reason: "No time index column found",
164            })?;
165        self.mito
166            .handle_request(
167                data_region_id,
168                RegionRequest::Create(create_data_region_request),
169            )
170            .await
171            .with_context(|_| CreateMitoRegionSnafu {
172                region_type: DATA_REGION_SUBDIR,
173            })?;
174        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
175            PhysicalRegionNotFoundSnafu {
176                region_id: data_region_id,
177            },
178        )?;
179
180        info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
181        PHYSICAL_REGION_COUNT.inc();
182
183        // remember this table
184        self.state.write().unwrap().add_physical_region(
185            data_region_id,
186            physical_columns,
187            primary_key_encoding,
188            physical_region_options,
189            time_index_unit,
190        );
191
192        Ok(())
193    }
194
195    /// Create multiple logical regions on the same physical region.
196    async fn create_logical_regions(
197        &self,
198        physical_region_id: RegionId,
199        requests: Vec<(RegionId, RegionCreateRequest)>,
200        extension_return_value: &mut HashMap<String, Vec<u8>>,
201    ) -> Result<()> {
202        let data_region_id = utils::to_data_region_id(physical_region_id);
203
204        let unit = self
205            .state
206            .read()
207            .unwrap()
208            .physical_region_time_index_unit(physical_region_id)
209            .context(PhysicalRegionNotFoundSnafu {
210                region_id: data_region_id,
211            })?;
212        // Checks the time index unit of each request.
213        for (_, request) in &requests {
214            // Safety: verify_region_create_request() ensures that the request is valid.
215            let time_index_column = request
216                .column_metadatas
217                .iter()
218                .find(|col| col.semantic_type == SemanticType::Timestamp)
219                .unwrap();
220            let request_unit = time_index_column
221                .column_schema
222                .data_type
223                .as_timestamp()
224                .unwrap()
225                .unit();
226            ensure!(
227                request_unit == unit,
228                UnexpectedRequestSnafu {
229                    reason: format!(
230                        "Metric has differenttime unit ({:?}) than the physical region ({:?})",
231                        request_unit, unit
232                    ),
233                }
234            );
235        }
236
237        // Filters out the requests that the logical region already exists
238        let requests = {
239            let state = self.state.read().unwrap();
240            let mut skipped = Vec::with_capacity(requests.len());
241            let mut kept_requests = Vec::with_capacity(requests.len());
242
243            for (region_id, request) in requests {
244                if state.is_logical_region_exist(region_id) {
245                    skipped.push(region_id);
246                } else {
247                    kept_requests.push((region_id, request));
248                }
249            }
250
251            // log skipped regions
252            if !skipped.is_empty() {
253                info!(
254                    "Skipped creating logical regions {skipped:?} because they already exist",
255                    skipped = skipped
256                );
257            }
258            kept_requests
259        };
260
261        // Finds new columns to add to physical region
262        let mut new_column_names = HashSet::new();
263        let mut new_columns = Vec::new();
264
265        let index_option = {
266            let state = &self.state.read().unwrap();
267            let region_state = state
268                .physical_region_states()
269                .get(&data_region_id)
270                .with_context(|| PhysicalRegionNotFoundSnafu {
271                    region_id: data_region_id,
272                })?;
273            let physical_columns = region_state.physical_columns();
274
275            extract_new_columns(
276                &requests,
277                physical_columns,
278                &mut new_column_names,
279                &mut new_columns,
280            )?;
281            region_state.options().index
282        };
283
284        // TODO(weny): we dont need to pass a mutable new_columns here.
285        self.data_region
286            .add_columns(data_region_id, new_columns, index_option)
287            .await?;
288
289        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
290        let physical_schema_map = physical_columns
291            .iter()
292            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
293            .collect::<HashMap<_, _>>();
294        let logical_regions = requests
295            .iter()
296            .map(|(region_id, _)| (*region_id))
297            .collect::<Vec<_>>();
298        let logical_region_columns = requests.iter().map(|(region_id, request)| {
299            (
300                *region_id,
301                request
302                    .column_metadatas
303                    .iter()
304                    .map(|metadata| {
305                        // Safety: previous steps ensure the physical region exist
306                        let column_metadata = *physical_schema_map
307                            .get(metadata.column_schema.name.as_str())
308                            .unwrap();
309                        (metadata.column_schema.name.as_str(), column_metadata)
310                    })
311                    .collect::<HashMap<_, _>>(),
312            )
313        });
314
315        let new_add_columns = new_column_names.iter().map(|name| {
316            // Safety: previous steps ensure the physical region exist
317            let column_metadata = *physical_schema_map.get(name).unwrap();
318            (name.to_string(), column_metadata.column_id)
319        });
320
321        extension_return_value.insert(
322            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
323            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
324        );
325
326        // Writes logical regions metadata to metadata region
327        self.metadata_region
328            .add_logical_regions(physical_region_id, true, logical_region_columns)
329            .await?;
330
331        {
332            let mut state = self.state.write().unwrap();
333            state.add_physical_columns(data_region_id, new_add_columns);
334            state.add_logical_regions(physical_region_id, logical_regions.clone());
335        }
336        for logical_region_id in logical_regions {
337            self.metadata_region
338                .open_logical_region(logical_region_id)
339                .await;
340        }
341
342        Ok(())
343    }
344
345    /// Check if
346    /// - internal columns are not occupied
347    /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
348    ///   [LOGICAL_TABLE_METADATA_KEY])
349    fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
350        request.validate().context(InvalidMetadataSnafu)?;
351
352        let name_to_index = request
353            .column_metadatas
354            .iter()
355            .enumerate()
356            .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
357            .collect::<HashMap<String, usize>>();
358
359        // check if internal columns are not occupied
360        ensure!(
361            !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
362            InternalColumnOccupiedSnafu {
363                column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
364            }
365        );
366        ensure!(
367            !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
368            InternalColumnOccupiedSnafu {
369                column: DATA_SCHEMA_TSID_COLUMN_NAME,
370            }
371        );
372
373        // check if required table option is present
374        ensure!(
375            request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
376            MissingRegionOptionSnafu {}
377        );
378        ensure!(
379            !(request.is_physical_table()
380                && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
381            ConflictRegionOptionSnafu {}
382        );
383
384        // check if only one field column is declared, and all tag columns are string
385        let mut field_col: Option<&ColumnMetadata> = None;
386        for col in &request.column_metadatas {
387            match col.semantic_type {
388                SemanticType::Tag => ensure!(
389                    col.column_schema.data_type == ConcreteDataType::string_datatype(),
390                    ColumnTypeMismatchSnafu {
391                        expect: ConcreteDataType::string_datatype(),
392                        actual: col.column_schema.data_type.clone(),
393                    }
394                ),
395                SemanticType::Field => {
396                    if field_col.is_some() {
397                        MultipleFieldColumnSnafu {
398                            previous: field_col.unwrap().column_schema.name.clone(),
399                            current: col.column_schema.name.clone(),
400                        }
401                        .fail()?;
402                    }
403                    field_col = Some(col)
404                }
405                SemanticType::Timestamp => {}
406            }
407        }
408        let field_col = field_col.context(NoFieldColumnSnafu)?;
409
410        // make sure the field column is float64 type
411        ensure!(
412            field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
413            ColumnTypeMismatchSnafu {
414                expect: ConcreteDataType::float64_datatype(),
415                actual: field_col.column_schema.data_type.clone(),
416            }
417        );
418
419        Ok(())
420    }
421
422    /// Build data region id and metadata region id from the given region id.
423    ///
424    /// Return value: (data_region_id, metadata_region_id)
425    fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
426        (
427            to_data_region_id(region_id),
428            to_metadata_region_id(region_id),
429        )
430    }
431
432    /// Build [RegionCreateRequest] for metadata region
433    ///
434    /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
435    pub fn create_request_for_metadata_region(
436        &self,
437        request: &RegionCreateRequest,
438    ) -> RegionCreateRequest {
439        // ts TIME INDEX DEFAULT 0
440        let timestamp_column_metadata = ColumnMetadata {
441            column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
442            semantic_type: SemanticType::Timestamp,
443            column_schema: ColumnSchema::new(
444                METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
445                ConcreteDataType::timestamp_millisecond_datatype(),
446                false,
447            )
448            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
449                Value::Timestamp(Timestamp::new_millisecond(0)),
450            )))
451            .unwrap(),
452        };
453        // key STRING PRIMARY KEY
454        let key_column_metadata = ColumnMetadata {
455            column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
456            semantic_type: SemanticType::Tag,
457            column_schema: ColumnSchema::new(
458                METADATA_SCHEMA_KEY_COLUMN_NAME,
459                ConcreteDataType::string_datatype(),
460                false,
461            ),
462        };
463        // val STRING
464        let value_column_metadata = ColumnMetadata {
465            column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
466            semantic_type: SemanticType::Field,
467            column_schema: ColumnSchema::new(
468                METADATA_SCHEMA_VALUE_COLUMN_NAME,
469                ConcreteDataType::string_datatype(),
470                true,
471            ),
472        };
473
474        // concat region dir
475        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
476
477        let options = region_options_for_metadata_region(&request.options);
478        RegionCreateRequest {
479            engine: MITO_ENGINE_NAME.to_string(),
480            column_metadatas: vec![
481                timestamp_column_metadata,
482                key_column_metadata,
483                value_column_metadata,
484            ],
485            primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
486            options,
487            region_dir: metadata_region_dir,
488        }
489    }
490
491    /// Convert [RegionCreateRequest] for data region.
492    ///
493    /// All tag columns in the original request will be converted to value columns.
494    /// Those columns real semantic type is stored in metadata region.
495    ///
496    /// This will also add internal columns to the request.
497    pub fn create_request_for_data_region(
498        &self,
499        request: &RegionCreateRequest,
500    ) -> RegionCreateRequest {
501        let mut data_region_request = request.clone();
502        let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
503
504        // concat region dir
505        data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
506
507        // change nullability for tag columns
508        data_region_request
509            .column_metadatas
510            .iter_mut()
511            .for_each(|metadata| {
512                if metadata.semantic_type == SemanticType::Tag {
513                    metadata.column_schema.set_nullable();
514                    primary_key.push(metadata.column_id);
515                }
516            });
517
518        // add internal columns
519        let [table_id_col, tsid_col] = Self::internal_column_metadata();
520        data_region_request.column_metadatas.push(table_id_col);
521        data_region_request.column_metadatas.push(tsid_col);
522        data_region_request.primary_key = primary_key;
523
524        // set data region options
525        set_data_region_options(
526            &mut data_region_request.options,
527            self.config.experimental_sparse_primary_key_encoding,
528        );
529
530        data_region_request
531    }
532
533    /// Generate internal column metadata.
534    ///
535    /// Return `[table_id_col, tsid_col]`
536    fn internal_column_metadata() -> [ColumnMetadata; 2] {
537        // Safety: BloomFilter is a valid skipping index type
538        let metric_name_col = ColumnMetadata {
539            column_id: ReservedColumnId::table_id(),
540            semantic_type: SemanticType::Tag,
541            column_schema: ColumnSchema::new(
542                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
543                ConcreteDataType::uint32_datatype(),
544                false,
545            )
546            .with_skipping_options(SkippingIndexOptions::new_unchecked(
547                DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
548                DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
549                datatypes::schema::SkippingIndexType::BloomFilter,
550            ))
551            .unwrap(),
552        };
553        let tsid_col = ColumnMetadata {
554            column_id: ReservedColumnId::tsid(),
555            semantic_type: SemanticType::Tag,
556            column_schema: ColumnSchema::new(
557                DATA_SCHEMA_TSID_COLUMN_NAME,
558                ConcreteDataType::uint64_datatype(),
559                false,
560            )
561            .with_inverted_index(false),
562        };
563        [metric_name_col, tsid_col]
564    }
565}
566
567/// Groups the create logical region requests by physical region id.
568fn group_create_logical_region_requests_by_physical_region_id(
569    requests: Vec<(RegionId, RegionCreateRequest)>,
570) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
571    let mut result = HashMap::with_capacity(requests.len());
572    for (region_id, request) in requests {
573        let physical_region_id = parse_physical_region_id(&request)?;
574        result
575            .entry(physical_region_id)
576            .or_insert_with(Vec::new)
577            .push((region_id, request));
578    }
579
580    Ok(result)
581}
582
583/// Parses the physical region id from the request.
584fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
585    let physical_region_id_raw = request
586        .options
587        .get(LOGICAL_TABLE_METADATA_KEY)
588        .ok_or(MissingRegionOptionSnafu {}.build())?;
589
590    let physical_region_id: RegionId = physical_region_id_raw
591        .parse::<u64>()
592        .with_context(|_| ParseRegionIdSnafu {
593            raw: physical_region_id_raw,
594        })?
595        .into();
596
597    Ok(physical_region_id)
598}
599
600/// Creates the region options for metadata region in metric engine.
601pub(crate) fn region_options_for_metadata_region(
602    original: &HashMap<String, String>,
603) -> HashMap<String, String> {
604    let mut metadata_region_options = HashMap::new();
605    metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
606
607    if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
608        metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.to_string());
609    }
610
611    metadata_region_options
612}
613
614#[cfg(test)]
615mod test {
616    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
617
618    use super::*;
619    use crate::config::EngineConfig;
620    use crate::engine::MetricEngine;
621    use crate::test_util::TestEnv;
622
623    #[test]
624    fn test_verify_region_create_request() {
625        // internal column is occupied
626        let request = RegionCreateRequest {
627            column_metadatas: vec![
628                ColumnMetadata {
629                    column_id: 0,
630                    semantic_type: SemanticType::Timestamp,
631                    column_schema: ColumnSchema::new(
632                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
633                        ConcreteDataType::timestamp_millisecond_datatype(),
634                        false,
635                    ),
636                },
637                ColumnMetadata {
638                    column_id: 1,
639                    semantic_type: SemanticType::Tag,
640                    column_schema: ColumnSchema::new(
641                        DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
642                        ConcreteDataType::uint32_datatype(),
643                        false,
644                    ),
645                },
646            ],
647            region_dir: "test_dir".to_string(),
648            engine: METRIC_ENGINE_NAME.to_string(),
649            primary_key: vec![],
650            options: HashMap::new(),
651        };
652        let result = MetricEngineInner::verify_region_create_request(&request);
653        assert!(result.is_err());
654        assert_eq!(
655            result.unwrap_err().to_string(),
656            "Internal column __table_id is reserved".to_string()
657        );
658
659        // valid request
660        let request = RegionCreateRequest {
661            column_metadatas: vec![
662                ColumnMetadata {
663                    column_id: 0,
664                    semantic_type: SemanticType::Timestamp,
665                    column_schema: ColumnSchema::new(
666                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
667                        ConcreteDataType::timestamp_millisecond_datatype(),
668                        false,
669                    ),
670                },
671                ColumnMetadata {
672                    column_id: 1,
673                    semantic_type: SemanticType::Tag,
674                    column_schema: ColumnSchema::new(
675                        "column1".to_string(),
676                        ConcreteDataType::string_datatype(),
677                        false,
678                    ),
679                },
680                ColumnMetadata {
681                    column_id: 2,
682                    semantic_type: SemanticType::Field,
683                    column_schema: ColumnSchema::new(
684                        "column2".to_string(),
685                        ConcreteDataType::float64_datatype(),
686                        false,
687                    ),
688                },
689            ],
690            region_dir: "test_dir".to_string(),
691            engine: METRIC_ENGINE_NAME.to_string(),
692            primary_key: vec![],
693            options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
694                .into_iter()
695                .collect(),
696        };
697        MetricEngineInner::verify_region_create_request(&request).unwrap();
698    }
699
700    #[test]
701    fn test_verify_region_create_request_options() {
702        let mut request = RegionCreateRequest {
703            column_metadatas: vec![
704                ColumnMetadata {
705                    column_id: 0,
706                    semantic_type: SemanticType::Timestamp,
707                    column_schema: ColumnSchema::new(
708                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
709                        ConcreteDataType::timestamp_millisecond_datatype(),
710                        false,
711                    ),
712                },
713                ColumnMetadata {
714                    column_id: 1,
715                    semantic_type: SemanticType::Field,
716                    column_schema: ColumnSchema::new(
717                        "val".to_string(),
718                        ConcreteDataType::float64_datatype(),
719                        false,
720                    ),
721                },
722            ],
723            region_dir: "test_dir".to_string(),
724            engine: METRIC_ENGINE_NAME.to_string(),
725            primary_key: vec![],
726            options: HashMap::new(),
727        };
728        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
729
730        let mut options = HashMap::new();
731        options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
732        request.options.clone_from(&options);
733        MetricEngineInner::verify_region_create_request(&request).unwrap();
734
735        options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
736        request.options.clone_from(&options);
737        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
738
739        options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
740        request.options = options;
741        MetricEngineInner::verify_region_create_request(&request).unwrap();
742    }
743
744    #[tokio::test]
745    async fn test_create_request_for_physical_regions() {
746        // original request
747        let options: HashMap<_, _> = [
748            ("ttl".to_string(), "60m".to_string()),
749            ("skip_wal".to_string(), "true".to_string()),
750        ]
751        .into_iter()
752        .collect();
753        let request = RegionCreateRequest {
754            engine: METRIC_ENGINE_NAME.to_string(),
755            column_metadatas: vec![
756                ColumnMetadata {
757                    column_id: 0,
758                    semantic_type: SemanticType::Timestamp,
759                    column_schema: ColumnSchema::new(
760                        "timestamp",
761                        ConcreteDataType::timestamp_millisecond_datatype(),
762                        false,
763                    ),
764                },
765                ColumnMetadata {
766                    column_id: 1,
767                    semantic_type: SemanticType::Tag,
768                    column_schema: ColumnSchema::new(
769                        "tag",
770                        ConcreteDataType::string_datatype(),
771                        false,
772                    ),
773                },
774            ],
775            primary_key: vec![0],
776            options,
777            region_dir: "/test_dir".to_string(),
778        };
779
780        // set up
781        let env = TestEnv::new().await;
782        let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
783        let engine_inner = engine.inner;
784
785        // check create data region request
786        let data_region_request = engine_inner.create_request_for_data_region(&request);
787        assert_eq!(
788            data_region_request.region_dir,
789            "/test_dir/data/".to_string()
790        );
791        assert_eq!(data_region_request.column_metadatas.len(), 4);
792        assert_eq!(
793            data_region_request.primary_key,
794            vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
795        );
796        assert!(data_region_request.options.contains_key("ttl"));
797
798        // check create metadata region request
799        let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
800        assert_eq!(
801            metadata_region_request.region_dir,
802            "/test_dir/metadata/".to_string()
803        );
804        assert_eq!(
805            metadata_region_request.options.get("ttl").unwrap(),
806            "forever"
807        );
808        assert!(!metadata_region_request.options.contains_key("skip_wal"));
809    }
810}