metric_engine/engine/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{FOREVER, Timestamp};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use snafu::{OptionExt, ResultExt, ensure};
27use store_api::metadata::ColumnMetadata;
28use store_api::metric_engine_consts::{
29    ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
30    DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
31    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
32    METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
33    METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
34};
35use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{AffectedRows, PathType, RegionCreateRequest, RegionRequest};
38use store_api::storage::RegionId;
39use store_api::storage::consts::ReservedColumnId;
40
41use crate::engine::MetricEngineInner;
42use crate::engine::create::extract_new_columns::extract_new_columns;
43use crate::engine::options::{PhysicalRegionOptions, set_data_region_options};
44use crate::error::{
45    ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
46    InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
47    MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
48    Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
49};
50use crate::metrics::PHYSICAL_REGION_COUNT;
51use crate::utils::{
52    self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
53    to_metadata_region_id,
54};
55
56const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
58
59impl MetricEngineInner {
60    pub async fn create_regions(
61        &self,
62        mut requests: Vec<(RegionId, RegionCreateRequest)>,
63        extension_return_value: &mut HashMap<String, Vec<u8>>,
64    ) -> Result<AffectedRows> {
65        if requests.is_empty() {
66            return Ok(0);
67        }
68
69        for (_, request) in requests.iter() {
70            Self::verify_region_create_request(request)?;
71        }
72
73        let first_request = &requests.first().unwrap().1;
74        if first_request.is_physical_table() {
75            ensure!(
76                requests.len() == 1,
77                UnexpectedRequestSnafu {
78                    reason: "Physical table must be created with single request".to_string(),
79                }
80            );
81            let (region_id, request) = requests.pop().unwrap();
82            self.create_physical_region(region_id, request, extension_return_value)
83                .await?;
84
85            return Ok(0);
86        } else if first_request
87            .options
88            .contains_key(LOGICAL_TABLE_METADATA_KEY)
89        {
90            if requests.len() == 1 {
91                let request = &requests.first().unwrap().1;
92                let physical_region_id = parse_physical_region_id(request)?;
93                let mut manifest_infos = Vec::with_capacity(1);
94                self.create_logical_regions(physical_region_id, requests, extension_return_value)
95                    .await?;
96                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
97                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
98            } else {
99                let grouped_requests =
100                    group_create_logical_region_requests_by_physical_region_id(requests)?;
101                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
102                for (physical_region_id, requests) in grouped_requests {
103                    self.create_logical_regions(
104                        physical_region_id,
105                        requests,
106                        extension_return_value,
107                    )
108                    .await?;
109                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
110                }
111                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
112            }
113        } else {
114            return MissingRegionOptionSnafu {}.fail();
115        }
116
117        Ok(0)
118    }
119
120    /// Initialize a physical metric region at given region id.
121    async fn create_physical_region(
122        &self,
123        region_id: RegionId,
124        request: RegionCreateRequest,
125        extension_return_value: &mut HashMap<String, Vec<u8>>,
126    ) -> Result<()> {
127        let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128        let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130        // create metadata region
131        let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132        self.mito
133            .handle_request(
134                metadata_region_id,
135                RegionRequest::Create(create_metadata_region_request),
136            )
137            .await
138            .with_context(|_| CreateMitoRegionSnafu {
139                region_type: METADATA_REGION_SUBDIR,
140            })?;
141
142        // create data region
143        let create_data_region_request = self.create_request_for_data_region(&request);
144        let physical_columns = create_data_region_request
145            .column_metadatas
146            .iter()
147            .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148            .collect::<HashMap<_, _>>();
149        let time_index_unit = create_data_region_request
150            .column_metadatas
151            .iter()
152            .find_map(|metadata| {
153                if metadata.semantic_type == SemanticType::Timestamp {
154                    metadata
155                        .column_schema
156                        .data_type
157                        .as_timestamp()
158                        .map(|data_type| data_type.unit())
159                } else {
160                    None
161                }
162            })
163            .context(UnexpectedRequestSnafu {
164                reason: "No time index column found",
165            })?;
166        let response = self
167            .mito
168            .handle_request(
169                data_region_id,
170                RegionRequest::Create(create_data_region_request),
171            )
172            .await
173            .with_context(|_| CreateMitoRegionSnafu {
174                region_type: DATA_REGION_SUBDIR,
175            })?;
176        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
177            PhysicalRegionNotFoundSnafu {
178                region_id: data_region_id,
179            },
180        )?;
181        extension_return_value.extend(response.extensions);
182
183        info!(
184            "Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}"
185        );
186        PHYSICAL_REGION_COUNT.inc();
187
188        // remember this table
189        self.state.write().unwrap().add_physical_region(
190            data_region_id,
191            physical_columns,
192            primary_key_encoding,
193            physical_region_options,
194            time_index_unit,
195        );
196
197        Ok(())
198    }
199
200    /// Create multiple logical regions on the same physical region.
201    async fn create_logical_regions(
202        &self,
203        physical_region_id: RegionId,
204        requests: Vec<(RegionId, RegionCreateRequest)>,
205        extension_return_value: &mut HashMap<String, Vec<u8>>,
206    ) -> Result<()> {
207        let data_region_id = utils::to_data_region_id(physical_region_id);
208
209        let unit = self
210            .state
211            .read()
212            .unwrap()
213            .physical_region_time_index_unit(physical_region_id)
214            .context(PhysicalRegionNotFoundSnafu {
215                region_id: data_region_id,
216            })?;
217        // Checks the time index unit of each request.
218        for (_, request) in &requests {
219            // Safety: verify_region_create_request() ensures that the request is valid.
220            let time_index_column = request
221                .column_metadatas
222                .iter()
223                .find(|col| col.semantic_type == SemanticType::Timestamp)
224                .unwrap();
225            let request_unit = time_index_column
226                .column_schema
227                .data_type
228                .as_timestamp()
229                .unwrap()
230                .unit();
231            ensure!(
232                request_unit == unit,
233                UnexpectedRequestSnafu {
234                    reason: format!(
235                        "Metric has differenttime unit ({:?}) than the physical region ({:?})",
236                        request_unit, unit
237                    ),
238                }
239            );
240        }
241
242        // Filters out the requests that the logical region already exists
243        let requests = {
244            let state = self.state.read().unwrap();
245            let mut skipped = Vec::with_capacity(requests.len());
246            let mut kept_requests = Vec::with_capacity(requests.len());
247
248            for (region_id, request) in requests {
249                if state.is_logical_region_exist(region_id) {
250                    skipped.push(region_id);
251                } else {
252                    kept_requests.push((region_id, request));
253                }
254            }
255
256            // log skipped regions
257            if !skipped.is_empty() {
258                info!(
259                    "Skipped creating logical regions {skipped:?} because they already exist",
260                    skipped = skipped
261                );
262            }
263            kept_requests
264        };
265
266        // Finds new columns to add to physical region
267        let mut new_column_names = HashSet::new();
268        let mut new_columns = Vec::new();
269
270        let index_option = {
271            let state = &self.state.read().unwrap();
272            let region_state = state
273                .physical_region_states()
274                .get(&data_region_id)
275                .with_context(|| PhysicalRegionNotFoundSnafu {
276                    region_id: data_region_id,
277                })?;
278            let physical_columns = region_state.physical_columns();
279
280            extract_new_columns(
281                &requests,
282                physical_columns,
283                &mut new_column_names,
284                &mut new_columns,
285            )?;
286            region_state.options().index
287        };
288
289        // TODO(weny): we dont need to pass a mutable new_columns here.
290        self.data_region
291            .add_columns(data_region_id, new_columns, index_option)
292            .await?;
293
294        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
295        let physical_schema_map = physical_columns
296            .iter()
297            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
298            .collect::<HashMap<_, _>>();
299        let logical_regions = requests
300            .iter()
301            .map(|(region_id, _)| *region_id)
302            .collect::<Vec<_>>();
303        let logical_region_columns = requests.iter().map(|(region_id, request)| {
304            (
305                *region_id,
306                request
307                    .column_metadatas
308                    .iter()
309                    .map(|metadata| {
310                        // Safety: previous steps ensure the physical region exist
311                        let column_metadata = *physical_schema_map
312                            .get(metadata.column_schema.name.as_str())
313                            .unwrap();
314                        (metadata.column_schema.name.as_str(), column_metadata)
315                    })
316                    .collect::<HashMap<_, _>>(),
317            )
318        });
319
320        let new_add_columns = new_column_names.iter().map(|name| {
321            // Safety: previous steps ensure the physical region exist
322            let column_metadata = *physical_schema_map.get(name).unwrap();
323            (name.to_string(), column_metadata.column_id)
324        });
325
326        extension_return_value.insert(
327            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
328            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
329        );
330
331        // Writes logical regions metadata to metadata region
332        self.metadata_region
333            .add_logical_regions(physical_region_id, true, logical_region_columns)
334            .await?;
335
336        {
337            let mut state = self.state.write().unwrap();
338            state.add_physical_columns(data_region_id, new_add_columns);
339            state.add_logical_regions(physical_region_id, logical_regions.clone());
340        }
341        for logical_region_id in logical_regions {
342            self.metadata_region
343                .open_logical_region(logical_region_id)
344                .await;
345        }
346
347        Ok(())
348    }
349
350    /// Check if
351    /// - internal columns are not occupied
352    /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
353    ///   [LOGICAL_TABLE_METADATA_KEY])
354    fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
355        request.validate().context(InvalidMetadataSnafu)?;
356
357        let name_to_index = request
358            .column_metadatas
359            .iter()
360            .enumerate()
361            .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
362            .collect::<HashMap<String, usize>>();
363
364        // check if internal columns are not occupied
365        ensure!(
366            !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
367            InternalColumnOccupiedSnafu {
368                column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
369            }
370        );
371        ensure!(
372            !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
373            InternalColumnOccupiedSnafu {
374                column: DATA_SCHEMA_TSID_COLUMN_NAME,
375            }
376        );
377
378        // check if required table option is present
379        ensure!(
380            request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
381            MissingRegionOptionSnafu {}
382        );
383        ensure!(
384            !(request.is_physical_table()
385                && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
386            ConflictRegionOptionSnafu {}
387        );
388
389        // check if only one field column is declared, and all tag columns are string
390        let mut field_col: Option<&ColumnMetadata> = None;
391        for col in &request.column_metadatas {
392            match col.semantic_type {
393                SemanticType::Tag => ensure!(
394                    col.column_schema.data_type == ConcreteDataType::string_datatype(),
395                    ColumnTypeMismatchSnafu {
396                        expect: ConcreteDataType::string_datatype(),
397                        actual: col.column_schema.data_type.clone(),
398                    }
399                ),
400                SemanticType::Field => {
401                    if let Some(field_col) = field_col {
402                        MultipleFieldColumnSnafu {
403                            previous: field_col.column_schema.name.clone(),
404                            current: col.column_schema.name.clone(),
405                        }
406                        .fail()?;
407                    }
408                    field_col = Some(col)
409                }
410                SemanticType::Timestamp => {}
411            }
412        }
413        let field_col = field_col.context(NoFieldColumnSnafu)?;
414
415        // make sure the field column is float64 type
416        ensure!(
417            field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
418            ColumnTypeMismatchSnafu {
419                expect: ConcreteDataType::float64_datatype(),
420                actual: field_col.column_schema.data_type.clone(),
421            }
422        );
423
424        Ok(())
425    }
426
427    /// Build data region id and metadata region id from the given region id.
428    ///
429    /// Return value: (data_region_id, metadata_region_id)
430    fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
431        (
432            to_data_region_id(region_id),
433            to_metadata_region_id(region_id),
434        )
435    }
436
437    /// Build [RegionCreateRequest] for metadata region
438    ///
439    /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
440    pub fn create_request_for_metadata_region(
441        &self,
442        request: &RegionCreateRequest,
443    ) -> RegionCreateRequest {
444        // ts TIME INDEX DEFAULT 0
445        let timestamp_column_metadata = ColumnMetadata {
446            column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
447            semantic_type: SemanticType::Timestamp,
448            column_schema: ColumnSchema::new(
449                METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
450                ConcreteDataType::timestamp_millisecond_datatype(),
451                false,
452            )
453            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
454                Value::Timestamp(Timestamp::new_millisecond(0)),
455            )))
456            .unwrap(),
457        };
458        // key STRING PRIMARY KEY
459        let key_column_metadata = ColumnMetadata {
460            column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
461            semantic_type: SemanticType::Tag,
462            column_schema: ColumnSchema::new(
463                METADATA_SCHEMA_KEY_COLUMN_NAME,
464                ConcreteDataType::string_datatype(),
465                false,
466            ),
467        };
468        // val STRING
469        let value_column_metadata = ColumnMetadata {
470            column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
471            semantic_type: SemanticType::Field,
472            column_schema: ColumnSchema::new(
473                METADATA_SCHEMA_VALUE_COLUMN_NAME,
474                ConcreteDataType::string_datatype(),
475                true,
476            ),
477        };
478
479        let options = region_options_for_metadata_region(&request.options);
480        RegionCreateRequest {
481            engine: MITO_ENGINE_NAME.to_string(),
482            column_metadatas: vec![
483                timestamp_column_metadata,
484                key_column_metadata,
485                value_column_metadata,
486            ],
487            primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
488            options,
489            table_dir: request.table_dir.clone(),
490            path_type: PathType::Metadata,
491            partition_expr_json: Some("".to_string()),
492        }
493    }
494
495    /// Convert [RegionCreateRequest] for data region.
496    ///
497    /// All tag columns in the original request will be converted to value columns.
498    /// Those columns real semantic type is stored in metadata region.
499    ///
500    /// This will also add internal columns to the request.
501    pub fn create_request_for_data_region(
502        &self,
503        request: &RegionCreateRequest,
504    ) -> RegionCreateRequest {
505        let mut data_region_request = request.clone();
506        let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
507
508        data_region_request.table_dir = request.table_dir.clone();
509        data_region_request.path_type = PathType::Data;
510
511        // change nullability for tag columns
512        data_region_request
513            .column_metadatas
514            .iter_mut()
515            .for_each(|metadata| {
516                if metadata.semantic_type == SemanticType::Tag {
517                    metadata.column_schema.set_nullable();
518                    primary_key.push(metadata.column_id);
519                }
520            });
521
522        // add internal columns
523        let [table_id_col, tsid_col] = Self::internal_column_metadata();
524        data_region_request.column_metadatas.push(table_id_col);
525        data_region_request.column_metadatas.push(tsid_col);
526        data_region_request.primary_key = primary_key;
527
528        // set data region options
529        set_data_region_options(
530            &mut data_region_request.options,
531            self.config.experimental_sparse_primary_key_encoding,
532        );
533
534        data_region_request
535    }
536
537    /// Generate internal column metadata.
538    ///
539    /// Return `[table_id_col, tsid_col]`
540    fn internal_column_metadata() -> [ColumnMetadata; 2] {
541        // Safety: BloomFilter is a valid skipping index type
542        let metric_name_col = ColumnMetadata {
543            column_id: ReservedColumnId::table_id(),
544            semantic_type: SemanticType::Tag,
545            column_schema: ColumnSchema::new(
546                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
547                ConcreteDataType::uint32_datatype(),
548                false,
549            )
550            .with_skipping_options(SkippingIndexOptions::new_unchecked(
551                DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
552                DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
553                datatypes::schema::SkippingIndexType::BloomFilter,
554            ))
555            .unwrap(),
556        };
557        let tsid_col = ColumnMetadata {
558            column_id: ReservedColumnId::tsid(),
559            semantic_type: SemanticType::Tag,
560            column_schema: ColumnSchema::new(
561                DATA_SCHEMA_TSID_COLUMN_NAME,
562                ConcreteDataType::uint64_datatype(),
563                false,
564            )
565            .with_inverted_index(false),
566        };
567        [metric_name_col, tsid_col]
568    }
569}
570
571/// Groups the create logical region requests by physical region id.
572fn group_create_logical_region_requests_by_physical_region_id(
573    requests: Vec<(RegionId, RegionCreateRequest)>,
574) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
575    let mut result = HashMap::with_capacity(requests.len());
576    for (region_id, request) in requests {
577        let physical_region_id = parse_physical_region_id(&request)?;
578        result
579            .entry(physical_region_id)
580            .or_insert_with(Vec::new)
581            .push((region_id, request));
582    }
583
584    Ok(result)
585}
586
587/// Parses the physical region id from the request.
588fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
589    let physical_region_id_raw = request
590        .options
591        .get(LOGICAL_TABLE_METADATA_KEY)
592        .ok_or(MissingRegionOptionSnafu {}.build())?;
593
594    let physical_region_id: RegionId = physical_region_id_raw
595        .parse::<u64>()
596        .with_context(|_| ParseRegionIdSnafu {
597            raw: physical_region_id_raw,
598        })?
599        .into();
600
601    Ok(physical_region_id)
602}
603
604/// Creates the region options for metadata region in metric engine.
605pub(crate) fn region_options_for_metadata_region(
606    original: &HashMap<String, String>,
607) -> HashMap<String, String> {
608    let mut metadata_region_options = HashMap::new();
609    metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
610
611    if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
612        metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone());
613    }
614
615    metadata_region_options
616}
617
618#[cfg(test)]
619mod test {
620    use common_meta::ddl::test_util::assert_column_name_and_id;
621    use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
622    use common_query::prelude::{greptime_timestamp, greptime_value};
623    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
624    use store_api::region_request::BatchRegionDdlRequest;
625
626    use super::*;
627    use crate::config::EngineConfig;
628    use crate::engine::MetricEngine;
629    use crate::test_util::{TestEnv, create_logical_region_request};
630
631    #[test]
632    fn test_verify_region_create_request() {
633        // internal column is occupied
634        let request = RegionCreateRequest {
635            column_metadatas: vec![
636                ColumnMetadata {
637                    column_id: 0,
638                    semantic_type: SemanticType::Timestamp,
639                    column_schema: ColumnSchema::new(
640                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
641                        ConcreteDataType::timestamp_millisecond_datatype(),
642                        false,
643                    ),
644                },
645                ColumnMetadata {
646                    column_id: 1,
647                    semantic_type: SemanticType::Tag,
648                    column_schema: ColumnSchema::new(
649                        DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
650                        ConcreteDataType::uint32_datatype(),
651                        false,
652                    ),
653                },
654            ],
655            table_dir: "test_dir".to_string(),
656            path_type: PathType::Bare,
657            engine: METRIC_ENGINE_NAME.to_string(),
658            primary_key: vec![],
659            options: HashMap::new(),
660            partition_expr_json: Some("".to_string()),
661        };
662        let result = MetricEngineInner::verify_region_create_request(&request);
663        assert!(result.is_err());
664        assert_eq!(
665            result.unwrap_err().to_string(),
666            "Internal column __table_id is reserved".to_string()
667        );
668
669        // valid request
670        let request = RegionCreateRequest {
671            column_metadatas: vec![
672                ColumnMetadata {
673                    column_id: 0,
674                    semantic_type: SemanticType::Timestamp,
675                    column_schema: ColumnSchema::new(
676                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
677                        ConcreteDataType::timestamp_millisecond_datatype(),
678                        false,
679                    ),
680                },
681                ColumnMetadata {
682                    column_id: 1,
683                    semantic_type: SemanticType::Tag,
684                    column_schema: ColumnSchema::new(
685                        "column1".to_string(),
686                        ConcreteDataType::string_datatype(),
687                        false,
688                    ),
689                },
690                ColumnMetadata {
691                    column_id: 2,
692                    semantic_type: SemanticType::Field,
693                    column_schema: ColumnSchema::new(
694                        "column2".to_string(),
695                        ConcreteDataType::float64_datatype(),
696                        false,
697                    ),
698                },
699            ],
700            table_dir: "test_dir".to_string(),
701            path_type: PathType::Bare,
702            engine: METRIC_ENGINE_NAME.to_string(),
703            primary_key: vec![],
704            options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
705                .into_iter()
706                .collect(),
707            partition_expr_json: Some("".to_string()),
708        };
709        MetricEngineInner::verify_region_create_request(&request).unwrap();
710    }
711
712    #[test]
713    fn test_verify_region_create_request_options() {
714        let mut request = RegionCreateRequest {
715            column_metadatas: vec![
716                ColumnMetadata {
717                    column_id: 0,
718                    semantic_type: SemanticType::Timestamp,
719                    column_schema: ColumnSchema::new(
720                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
721                        ConcreteDataType::timestamp_millisecond_datatype(),
722                        false,
723                    ),
724                },
725                ColumnMetadata {
726                    column_id: 1,
727                    semantic_type: SemanticType::Field,
728                    column_schema: ColumnSchema::new(
729                        "val".to_string(),
730                        ConcreteDataType::float64_datatype(),
731                        false,
732                    ),
733                },
734            ],
735            table_dir: "test_dir".to_string(),
736            path_type: PathType::Bare,
737            engine: METRIC_ENGINE_NAME.to_string(),
738            primary_key: vec![],
739            options: HashMap::new(),
740            partition_expr_json: Some("".to_string()),
741        };
742        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
743
744        let mut options = HashMap::new();
745        options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
746        request.options.clone_from(&options);
747        MetricEngineInner::verify_region_create_request(&request).unwrap();
748
749        options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
750        request.options.clone_from(&options);
751        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
752
753        options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
754        request.options = options;
755        MetricEngineInner::verify_region_create_request(&request).unwrap();
756    }
757
758    #[tokio::test]
759    async fn test_create_request_for_physical_regions() {
760        // original request
761        let options: HashMap<_, _> = [
762            ("ttl".to_string(), "60m".to_string()),
763            ("skip_wal".to_string(), "true".to_string()),
764        ]
765        .into_iter()
766        .collect();
767        let request = RegionCreateRequest {
768            engine: METRIC_ENGINE_NAME.to_string(),
769            column_metadatas: vec![
770                ColumnMetadata {
771                    column_id: 0,
772                    semantic_type: SemanticType::Timestamp,
773                    column_schema: ColumnSchema::new(
774                        "timestamp",
775                        ConcreteDataType::timestamp_millisecond_datatype(),
776                        false,
777                    ),
778                },
779                ColumnMetadata {
780                    column_id: 1,
781                    semantic_type: SemanticType::Tag,
782                    column_schema: ColumnSchema::new(
783                        "tag",
784                        ConcreteDataType::string_datatype(),
785                        false,
786                    ),
787                },
788            ],
789            primary_key: vec![0],
790            options,
791            table_dir: "/test_dir".to_string(),
792            path_type: PathType::Bare,
793            partition_expr_json: Some("".to_string()),
794        };
795
796        // set up
797        let env = TestEnv::new().await;
798        let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
799        let engine_inner = engine.inner;
800
801        // check create data region request
802        let data_region_request = engine_inner.create_request_for_data_region(&request);
803        assert_eq!(data_region_request.table_dir, "/test_dir".to_string());
804        assert_eq!(data_region_request.path_type, PathType::Data);
805        assert_eq!(data_region_request.column_metadatas.len(), 4);
806        assert_eq!(
807            data_region_request.primary_key,
808            vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
809        );
810        assert!(data_region_request.options.contains_key("ttl"));
811
812        // check create metadata region request
813        let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
814        assert_eq!(metadata_region_request.table_dir, "/test_dir".to_string());
815        assert_eq!(metadata_region_request.path_type, PathType::Metadata);
816        assert_eq!(
817            metadata_region_request.options.get("ttl").unwrap(),
818            "forever"
819        );
820        assert!(!metadata_region_request.options.contains_key("skip_wal"));
821    }
822
823    #[tokio::test]
824    async fn test_create_logical_regions() {
825        let env = TestEnv::new().await;
826        let engine = env.metric();
827        let physical_region_id1 = RegionId::new(1024, 0);
828        let physical_region_id2 = RegionId::new(1024, 1);
829        let logical_region_id1 = RegionId::new(1025, 0);
830        let logical_region_id2 = RegionId::new(1025, 1);
831        env.create_physical_region(physical_region_id1, "/test_dir1")
832            .await;
833        env.create_physical_region(physical_region_id2, "/test_dir2")
834            .await;
835
836        let region_create_request1 =
837            create_logical_region_request(&["job"], physical_region_id1, "logical1");
838        let region_create_request2 =
839            create_logical_region_request(&["job"], physical_region_id2, "logical2");
840
841        let response = engine
842            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
843                (logical_region_id1, region_create_request1),
844                (logical_region_id2, region_create_request2),
845            ]))
846            .await
847            .unwrap();
848
849        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
850        assert_eq!(manifest_infos.len(), 2);
851        let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
852        assert!(region_ids.contains(&physical_region_id1));
853        assert!(region_ids.contains(&physical_region_id2));
854
855        let column_metadatas =
856            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
857        assert_column_name_and_id(
858            &column_metadatas,
859            &[
860                (greptime_timestamp(), 0),
861                (greptime_value(), 1),
862                ("__table_id", ReservedColumnId::table_id()),
863                ("__tsid", ReservedColumnId::tsid()),
864                ("job", 2),
865            ],
866        );
867    }
868}