metric_engine/engine/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use snafu::{ensure, OptionExt, ResultExt};
27use store_api::metadata::ColumnMetadata;
28use store_api::metric_engine_consts::{
29    ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
30    DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
31    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
32    METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
33    METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
34};
35use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{AffectedRows, PathType, RegionCreateRequest, RegionRequest};
38use store_api::storage::consts::ReservedColumnId;
39use store_api::storage::RegionId;
40
41use crate::engine::create::extract_new_columns::extract_new_columns;
42use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
43use crate::engine::MetricEngineInner;
44use crate::error::{
45    ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
46    InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
47    MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
48    Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
49};
50use crate::metrics::PHYSICAL_REGION_COUNT;
51use crate::utils::{
52    self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
53    to_metadata_region_id,
54};
55
56const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
58
59impl MetricEngineInner {
60    pub async fn create_regions(
61        &self,
62        mut requests: Vec<(RegionId, RegionCreateRequest)>,
63        extension_return_value: &mut HashMap<String, Vec<u8>>,
64    ) -> Result<AffectedRows> {
65        if requests.is_empty() {
66            return Ok(0);
67        }
68
69        for (_, request) in requests.iter() {
70            Self::verify_region_create_request(request)?;
71        }
72
73        let first_request = &requests.first().unwrap().1;
74        if first_request.is_physical_table() {
75            ensure!(
76                requests.len() == 1,
77                UnexpectedRequestSnafu {
78                    reason: "Physical table must be created with single request".to_string(),
79                }
80            );
81            let (region_id, request) = requests.pop().unwrap();
82            self.create_physical_region(region_id, request, extension_return_value)
83                .await?;
84
85            return Ok(0);
86        } else if first_request
87            .options
88            .contains_key(LOGICAL_TABLE_METADATA_KEY)
89        {
90            if requests.len() == 1 {
91                let request = &requests.first().unwrap().1;
92                let physical_region_id = parse_physical_region_id(request)?;
93                let mut manifest_infos = Vec::with_capacity(1);
94                self.create_logical_regions(physical_region_id, requests, extension_return_value)
95                    .await?;
96                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
97                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
98            } else {
99                let grouped_requests =
100                    group_create_logical_region_requests_by_physical_region_id(requests)?;
101                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
102                for (physical_region_id, requests) in grouped_requests {
103                    self.create_logical_regions(
104                        physical_region_id,
105                        requests,
106                        extension_return_value,
107                    )
108                    .await?;
109                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
110                }
111                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
112            }
113        } else {
114            return MissingRegionOptionSnafu {}.fail();
115        }
116
117        Ok(0)
118    }
119
120    /// Initialize a physical metric region at given region id.
121    async fn create_physical_region(
122        &self,
123        region_id: RegionId,
124        request: RegionCreateRequest,
125        extension_return_value: &mut HashMap<String, Vec<u8>>,
126    ) -> Result<()> {
127        let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
128        let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
129
130        // create metadata region
131        let create_metadata_region_request = self.create_request_for_metadata_region(&request);
132        self.mito
133            .handle_request(
134                metadata_region_id,
135                RegionRequest::Create(create_metadata_region_request),
136            )
137            .await
138            .with_context(|_| CreateMitoRegionSnafu {
139                region_type: METADATA_REGION_SUBDIR,
140            })?;
141
142        // create data region
143        let create_data_region_request = self.create_request_for_data_region(&request);
144        let physical_columns = create_data_region_request
145            .column_metadatas
146            .iter()
147            .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
148            .collect::<HashMap<_, _>>();
149        let time_index_unit = create_data_region_request
150            .column_metadatas
151            .iter()
152            .find_map(|metadata| {
153                if metadata.semantic_type == SemanticType::Timestamp {
154                    metadata
155                        .column_schema
156                        .data_type
157                        .as_timestamp()
158                        .map(|data_type| data_type.unit())
159                } else {
160                    None
161                }
162            })
163            .context(UnexpectedRequestSnafu {
164                reason: "No time index column found",
165            })?;
166        let response = self
167            .mito
168            .handle_request(
169                data_region_id,
170                RegionRequest::Create(create_data_region_request),
171            )
172            .await
173            .with_context(|_| CreateMitoRegionSnafu {
174                region_type: DATA_REGION_SUBDIR,
175            })?;
176        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
177            PhysicalRegionNotFoundSnafu {
178                region_id: data_region_id,
179            },
180        )?;
181        extension_return_value.extend(response.extensions);
182
183        info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
184        PHYSICAL_REGION_COUNT.inc();
185
186        // remember this table
187        self.state.write().unwrap().add_physical_region(
188            data_region_id,
189            physical_columns,
190            primary_key_encoding,
191            physical_region_options,
192            time_index_unit,
193        );
194
195        Ok(())
196    }
197
198    /// Create multiple logical regions on the same physical region.
199    async fn create_logical_regions(
200        &self,
201        physical_region_id: RegionId,
202        requests: Vec<(RegionId, RegionCreateRequest)>,
203        extension_return_value: &mut HashMap<String, Vec<u8>>,
204    ) -> Result<()> {
205        let data_region_id = utils::to_data_region_id(physical_region_id);
206
207        let unit = self
208            .state
209            .read()
210            .unwrap()
211            .physical_region_time_index_unit(physical_region_id)
212            .context(PhysicalRegionNotFoundSnafu {
213                region_id: data_region_id,
214            })?;
215        // Checks the time index unit of each request.
216        for (_, request) in &requests {
217            // Safety: verify_region_create_request() ensures that the request is valid.
218            let time_index_column = request
219                .column_metadatas
220                .iter()
221                .find(|col| col.semantic_type == SemanticType::Timestamp)
222                .unwrap();
223            let request_unit = time_index_column
224                .column_schema
225                .data_type
226                .as_timestamp()
227                .unwrap()
228                .unit();
229            ensure!(
230                request_unit == unit,
231                UnexpectedRequestSnafu {
232                    reason: format!(
233                        "Metric has differenttime unit ({:?}) than the physical region ({:?})",
234                        request_unit, unit
235                    ),
236                }
237            );
238        }
239
240        // Filters out the requests that the logical region already exists
241        let requests = {
242            let state = self.state.read().unwrap();
243            let mut skipped = Vec::with_capacity(requests.len());
244            let mut kept_requests = Vec::with_capacity(requests.len());
245
246            for (region_id, request) in requests {
247                if state.is_logical_region_exist(region_id) {
248                    skipped.push(region_id);
249                } else {
250                    kept_requests.push((region_id, request));
251                }
252            }
253
254            // log skipped regions
255            if !skipped.is_empty() {
256                info!(
257                    "Skipped creating logical regions {skipped:?} because they already exist",
258                    skipped = skipped
259                );
260            }
261            kept_requests
262        };
263
264        // Finds new columns to add to physical region
265        let mut new_column_names = HashSet::new();
266        let mut new_columns = Vec::new();
267
268        let index_option = {
269            let state = &self.state.read().unwrap();
270            let region_state = state
271                .physical_region_states()
272                .get(&data_region_id)
273                .with_context(|| PhysicalRegionNotFoundSnafu {
274                    region_id: data_region_id,
275                })?;
276            let physical_columns = region_state.physical_columns();
277
278            extract_new_columns(
279                &requests,
280                physical_columns,
281                &mut new_column_names,
282                &mut new_columns,
283            )?;
284            region_state.options().index
285        };
286
287        // TODO(weny): we dont need to pass a mutable new_columns here.
288        self.data_region
289            .add_columns(data_region_id, new_columns, index_option)
290            .await?;
291
292        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
293        let physical_schema_map = physical_columns
294            .iter()
295            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
296            .collect::<HashMap<_, _>>();
297        let logical_regions = requests
298            .iter()
299            .map(|(region_id, _)| (*region_id))
300            .collect::<Vec<_>>();
301        let logical_region_columns = requests.iter().map(|(region_id, request)| {
302            (
303                *region_id,
304                request
305                    .column_metadatas
306                    .iter()
307                    .map(|metadata| {
308                        // Safety: previous steps ensure the physical region exist
309                        let column_metadata = *physical_schema_map
310                            .get(metadata.column_schema.name.as_str())
311                            .unwrap();
312                        (metadata.column_schema.name.as_str(), column_metadata)
313                    })
314                    .collect::<HashMap<_, _>>(),
315            )
316        });
317
318        let new_add_columns = new_column_names.iter().map(|name| {
319            // Safety: previous steps ensure the physical region exist
320            let column_metadata = *physical_schema_map.get(name).unwrap();
321            (name.to_string(), column_metadata.column_id)
322        });
323
324        extension_return_value.insert(
325            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
326            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
327        );
328
329        // Writes logical regions metadata to metadata region
330        self.metadata_region
331            .add_logical_regions(physical_region_id, true, logical_region_columns)
332            .await?;
333
334        {
335            let mut state = self.state.write().unwrap();
336            state.add_physical_columns(data_region_id, new_add_columns);
337            state.add_logical_regions(physical_region_id, logical_regions.clone());
338        }
339        for logical_region_id in logical_regions {
340            self.metadata_region
341                .open_logical_region(logical_region_id)
342                .await;
343        }
344
345        Ok(())
346    }
347
348    /// Check if
349    /// - internal columns are not occupied
350    /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
351    ///   [LOGICAL_TABLE_METADATA_KEY])
352    fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
353        request.validate().context(InvalidMetadataSnafu)?;
354
355        let name_to_index = request
356            .column_metadatas
357            .iter()
358            .enumerate()
359            .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
360            .collect::<HashMap<String, usize>>();
361
362        // check if internal columns are not occupied
363        ensure!(
364            !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
365            InternalColumnOccupiedSnafu {
366                column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
367            }
368        );
369        ensure!(
370            !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
371            InternalColumnOccupiedSnafu {
372                column: DATA_SCHEMA_TSID_COLUMN_NAME,
373            }
374        );
375
376        // check if required table option is present
377        ensure!(
378            request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
379            MissingRegionOptionSnafu {}
380        );
381        ensure!(
382            !(request.is_physical_table()
383                && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
384            ConflictRegionOptionSnafu {}
385        );
386
387        // check if only one field column is declared, and all tag columns are string
388        let mut field_col: Option<&ColumnMetadata> = None;
389        for col in &request.column_metadatas {
390            match col.semantic_type {
391                SemanticType::Tag => ensure!(
392                    col.column_schema.data_type == ConcreteDataType::string_datatype(),
393                    ColumnTypeMismatchSnafu {
394                        expect: ConcreteDataType::string_datatype(),
395                        actual: col.column_schema.data_type.clone(),
396                    }
397                ),
398                SemanticType::Field => {
399                    if field_col.is_some() {
400                        MultipleFieldColumnSnafu {
401                            previous: field_col.unwrap().column_schema.name.clone(),
402                            current: col.column_schema.name.clone(),
403                        }
404                        .fail()?;
405                    }
406                    field_col = Some(col)
407                }
408                SemanticType::Timestamp => {}
409            }
410        }
411        let field_col = field_col.context(NoFieldColumnSnafu)?;
412
413        // make sure the field column is float64 type
414        ensure!(
415            field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
416            ColumnTypeMismatchSnafu {
417                expect: ConcreteDataType::float64_datatype(),
418                actual: field_col.column_schema.data_type.clone(),
419            }
420        );
421
422        Ok(())
423    }
424
425    /// Build data region id and metadata region id from the given region id.
426    ///
427    /// Return value: (data_region_id, metadata_region_id)
428    fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
429        (
430            to_data_region_id(region_id),
431            to_metadata_region_id(region_id),
432        )
433    }
434
435    /// Build [RegionCreateRequest] for metadata region
436    ///
437    /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
438    pub fn create_request_for_metadata_region(
439        &self,
440        request: &RegionCreateRequest,
441    ) -> RegionCreateRequest {
442        // ts TIME INDEX DEFAULT 0
443        let timestamp_column_metadata = ColumnMetadata {
444            column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
445            semantic_type: SemanticType::Timestamp,
446            column_schema: ColumnSchema::new(
447                METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
448                ConcreteDataType::timestamp_millisecond_datatype(),
449                false,
450            )
451            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
452                Value::Timestamp(Timestamp::new_millisecond(0)),
453            )))
454            .unwrap(),
455        };
456        // key STRING PRIMARY KEY
457        let key_column_metadata = ColumnMetadata {
458            column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
459            semantic_type: SemanticType::Tag,
460            column_schema: ColumnSchema::new(
461                METADATA_SCHEMA_KEY_COLUMN_NAME,
462                ConcreteDataType::string_datatype(),
463                false,
464            ),
465        };
466        // val STRING
467        let value_column_metadata = ColumnMetadata {
468            column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
469            semantic_type: SemanticType::Field,
470            column_schema: ColumnSchema::new(
471                METADATA_SCHEMA_VALUE_COLUMN_NAME,
472                ConcreteDataType::string_datatype(),
473                true,
474            ),
475        };
476
477        let options = region_options_for_metadata_region(&request.options);
478        RegionCreateRequest {
479            engine: MITO_ENGINE_NAME.to_string(),
480            column_metadatas: vec![
481                timestamp_column_metadata,
482                key_column_metadata,
483                value_column_metadata,
484            ],
485            primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
486            options,
487            table_dir: request.table_dir.clone(),
488            path_type: PathType::Metadata,
489            partition_expr_json: Some("".to_string()),
490        }
491    }
492
493    /// Convert [RegionCreateRequest] for data region.
494    ///
495    /// All tag columns in the original request will be converted to value columns.
496    /// Those columns real semantic type is stored in metadata region.
497    ///
498    /// This will also add internal columns to the request.
499    pub fn create_request_for_data_region(
500        &self,
501        request: &RegionCreateRequest,
502    ) -> RegionCreateRequest {
503        let mut data_region_request = request.clone();
504        let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
505
506        data_region_request.table_dir = request.table_dir.clone();
507        data_region_request.path_type = PathType::Data;
508
509        // change nullability for tag columns
510        data_region_request
511            .column_metadatas
512            .iter_mut()
513            .for_each(|metadata| {
514                if metadata.semantic_type == SemanticType::Tag {
515                    metadata.column_schema.set_nullable();
516                    primary_key.push(metadata.column_id);
517                }
518            });
519
520        // add internal columns
521        let [table_id_col, tsid_col] = Self::internal_column_metadata();
522        data_region_request.column_metadatas.push(table_id_col);
523        data_region_request.column_metadatas.push(tsid_col);
524        data_region_request.primary_key = primary_key;
525
526        // set data region options
527        set_data_region_options(
528            &mut data_region_request.options,
529            self.config.experimental_sparse_primary_key_encoding,
530        );
531
532        data_region_request
533    }
534
535    /// Generate internal column metadata.
536    ///
537    /// Return `[table_id_col, tsid_col]`
538    fn internal_column_metadata() -> [ColumnMetadata; 2] {
539        // Safety: BloomFilter is a valid skipping index type
540        let metric_name_col = ColumnMetadata {
541            column_id: ReservedColumnId::table_id(),
542            semantic_type: SemanticType::Tag,
543            column_schema: ColumnSchema::new(
544                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
545                ConcreteDataType::uint32_datatype(),
546                false,
547            )
548            .with_skipping_options(SkippingIndexOptions::new_unchecked(
549                DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
550                DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
551                datatypes::schema::SkippingIndexType::BloomFilter,
552            ))
553            .unwrap(),
554        };
555        let tsid_col = ColumnMetadata {
556            column_id: ReservedColumnId::tsid(),
557            semantic_type: SemanticType::Tag,
558            column_schema: ColumnSchema::new(
559                DATA_SCHEMA_TSID_COLUMN_NAME,
560                ConcreteDataType::uint64_datatype(),
561                false,
562            )
563            .with_inverted_index(false),
564        };
565        [metric_name_col, tsid_col]
566    }
567}
568
569/// Groups the create logical region requests by physical region id.
570fn group_create_logical_region_requests_by_physical_region_id(
571    requests: Vec<(RegionId, RegionCreateRequest)>,
572) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
573    let mut result = HashMap::with_capacity(requests.len());
574    for (region_id, request) in requests {
575        let physical_region_id = parse_physical_region_id(&request)?;
576        result
577            .entry(physical_region_id)
578            .or_insert_with(Vec::new)
579            .push((region_id, request));
580    }
581
582    Ok(result)
583}
584
585/// Parses the physical region id from the request.
586fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
587    let physical_region_id_raw = request
588        .options
589        .get(LOGICAL_TABLE_METADATA_KEY)
590        .ok_or(MissingRegionOptionSnafu {}.build())?;
591
592    let physical_region_id: RegionId = physical_region_id_raw
593        .parse::<u64>()
594        .with_context(|_| ParseRegionIdSnafu {
595            raw: physical_region_id_raw,
596        })?
597        .into();
598
599    Ok(physical_region_id)
600}
601
602/// Creates the region options for metadata region in metric engine.
603pub(crate) fn region_options_for_metadata_region(
604    original: &HashMap<String, String>,
605) -> HashMap<String, String> {
606    let mut metadata_region_options = HashMap::new();
607    metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
608
609    if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
610        metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.to_string());
611    }
612
613    metadata_region_options
614}
615
616#[cfg(test)]
617mod test {
618    use common_meta::ddl::test_util::assert_column_name_and_id;
619    use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
620    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
621    use store_api::region_request::BatchRegionDdlRequest;
622
623    use super::*;
624    use crate::config::EngineConfig;
625    use crate::engine::MetricEngine;
626    use crate::test_util::{create_logical_region_request, TestEnv};
627
628    #[test]
629    fn test_verify_region_create_request() {
630        // internal column is occupied
631        let request = RegionCreateRequest {
632            column_metadatas: vec![
633                ColumnMetadata {
634                    column_id: 0,
635                    semantic_type: SemanticType::Timestamp,
636                    column_schema: ColumnSchema::new(
637                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
638                        ConcreteDataType::timestamp_millisecond_datatype(),
639                        false,
640                    ),
641                },
642                ColumnMetadata {
643                    column_id: 1,
644                    semantic_type: SemanticType::Tag,
645                    column_schema: ColumnSchema::new(
646                        DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
647                        ConcreteDataType::uint32_datatype(),
648                        false,
649                    ),
650                },
651            ],
652            table_dir: "test_dir".to_string(),
653            path_type: PathType::Bare,
654            engine: METRIC_ENGINE_NAME.to_string(),
655            primary_key: vec![],
656            options: HashMap::new(),
657            partition_expr_json: Some("".to_string()),
658        };
659        let result = MetricEngineInner::verify_region_create_request(&request);
660        assert!(result.is_err());
661        assert_eq!(
662            result.unwrap_err().to_string(),
663            "Internal column __table_id is reserved".to_string()
664        );
665
666        // valid request
667        let request = RegionCreateRequest {
668            column_metadatas: vec![
669                ColumnMetadata {
670                    column_id: 0,
671                    semantic_type: SemanticType::Timestamp,
672                    column_schema: ColumnSchema::new(
673                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
674                        ConcreteDataType::timestamp_millisecond_datatype(),
675                        false,
676                    ),
677                },
678                ColumnMetadata {
679                    column_id: 1,
680                    semantic_type: SemanticType::Tag,
681                    column_schema: ColumnSchema::new(
682                        "column1".to_string(),
683                        ConcreteDataType::string_datatype(),
684                        false,
685                    ),
686                },
687                ColumnMetadata {
688                    column_id: 2,
689                    semantic_type: SemanticType::Field,
690                    column_schema: ColumnSchema::new(
691                        "column2".to_string(),
692                        ConcreteDataType::float64_datatype(),
693                        false,
694                    ),
695                },
696            ],
697            table_dir: "test_dir".to_string(),
698            path_type: PathType::Bare,
699            engine: METRIC_ENGINE_NAME.to_string(),
700            primary_key: vec![],
701            options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
702                .into_iter()
703                .collect(),
704            partition_expr_json: Some("".to_string()),
705        };
706        MetricEngineInner::verify_region_create_request(&request).unwrap();
707    }
708
709    #[test]
710    fn test_verify_region_create_request_options() {
711        let mut request = RegionCreateRequest {
712            column_metadatas: vec![
713                ColumnMetadata {
714                    column_id: 0,
715                    semantic_type: SemanticType::Timestamp,
716                    column_schema: ColumnSchema::new(
717                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
718                        ConcreteDataType::timestamp_millisecond_datatype(),
719                        false,
720                    ),
721                },
722                ColumnMetadata {
723                    column_id: 1,
724                    semantic_type: SemanticType::Field,
725                    column_schema: ColumnSchema::new(
726                        "val".to_string(),
727                        ConcreteDataType::float64_datatype(),
728                        false,
729                    ),
730                },
731            ],
732            table_dir: "test_dir".to_string(),
733            path_type: PathType::Bare,
734            engine: METRIC_ENGINE_NAME.to_string(),
735            primary_key: vec![],
736            options: HashMap::new(),
737            partition_expr_json: Some("".to_string()),
738        };
739        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
740
741        let mut options = HashMap::new();
742        options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
743        request.options.clone_from(&options);
744        MetricEngineInner::verify_region_create_request(&request).unwrap();
745
746        options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
747        request.options.clone_from(&options);
748        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
749
750        options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
751        request.options = options;
752        MetricEngineInner::verify_region_create_request(&request).unwrap();
753    }
754
755    #[tokio::test]
756    async fn test_create_request_for_physical_regions() {
757        // original request
758        let options: HashMap<_, _> = [
759            ("ttl".to_string(), "60m".to_string()),
760            ("skip_wal".to_string(), "true".to_string()),
761        ]
762        .into_iter()
763        .collect();
764        let request = RegionCreateRequest {
765            engine: METRIC_ENGINE_NAME.to_string(),
766            column_metadatas: vec![
767                ColumnMetadata {
768                    column_id: 0,
769                    semantic_type: SemanticType::Timestamp,
770                    column_schema: ColumnSchema::new(
771                        "timestamp",
772                        ConcreteDataType::timestamp_millisecond_datatype(),
773                        false,
774                    ),
775                },
776                ColumnMetadata {
777                    column_id: 1,
778                    semantic_type: SemanticType::Tag,
779                    column_schema: ColumnSchema::new(
780                        "tag",
781                        ConcreteDataType::string_datatype(),
782                        false,
783                    ),
784                },
785            ],
786            primary_key: vec![0],
787            options,
788            table_dir: "/test_dir".to_string(),
789            path_type: PathType::Bare,
790            partition_expr_json: Some("".to_string()),
791        };
792
793        // set up
794        let env = TestEnv::new().await;
795        let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
796        let engine_inner = engine.inner;
797
798        // check create data region request
799        let data_region_request = engine_inner.create_request_for_data_region(&request);
800        assert_eq!(data_region_request.table_dir, "/test_dir".to_string());
801        assert_eq!(data_region_request.path_type, PathType::Data);
802        assert_eq!(data_region_request.column_metadatas.len(), 4);
803        assert_eq!(
804            data_region_request.primary_key,
805            vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
806        );
807        assert!(data_region_request.options.contains_key("ttl"));
808
809        // check create metadata region request
810        let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
811        assert_eq!(metadata_region_request.table_dir, "/test_dir".to_string());
812        assert_eq!(metadata_region_request.path_type, PathType::Metadata);
813        assert_eq!(
814            metadata_region_request.options.get("ttl").unwrap(),
815            "forever"
816        );
817        assert!(!metadata_region_request.options.contains_key("skip_wal"));
818    }
819
820    #[tokio::test]
821    async fn test_create_logical_regions() {
822        let env = TestEnv::new().await;
823        let engine = env.metric();
824        let physical_region_id1 = RegionId::new(1024, 0);
825        let physical_region_id2 = RegionId::new(1024, 1);
826        let logical_region_id1 = RegionId::new(1025, 0);
827        let logical_region_id2 = RegionId::new(1025, 1);
828        env.create_physical_region(physical_region_id1, "/test_dir1")
829            .await;
830        env.create_physical_region(physical_region_id2, "/test_dir2")
831            .await;
832
833        let region_create_request1 =
834            create_logical_region_request(&["job"], physical_region_id1, "logical1");
835        let region_create_request2 =
836            create_logical_region_request(&["job"], physical_region_id2, "logical2");
837
838        let response = engine
839            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
840                (logical_region_id1, region_create_request1),
841                (logical_region_id2, region_create_request2),
842            ]))
843            .await
844            .unwrap();
845
846        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
847        assert_eq!(manifest_infos.len(), 2);
848        let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
849        assert!(region_ids.contains(&physical_region_id1));
850        assert!(region_ids.contains(&physical_region_id2));
851
852        let column_metadatas =
853            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
854        assert_column_name_and_id(
855            &column_metadatas,
856            &[
857                ("greptime_timestamp", 0),
858                ("greptime_value", 1),
859                ("__table_id", ReservedColumnId::table_id()),
860                ("__tsid", ReservedColumnId::tsid()),
861                ("job", 2),
862            ],
863        );
864    }
865}