metric_engine/engine/
create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod extract_new_columns;
16
17use std::collections::{HashMap, HashSet};
18
19use api::v1::SemanticType;
20use common_telemetry::info;
21use common_time::{Timestamp, FOREVER};
22use datatypes::data_type::ConcreteDataType;
23use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
24use datatypes::value::Value;
25use mito2::engine::MITO_ENGINE_NAME;
26use object_store::util::join_dir;
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::{
30    ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
31    DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR,
32    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33    METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
34    METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY};
37use store_api::region_engine::RegionEngine;
38use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
39use store_api::storage::consts::ReservedColumnId;
40use store_api::storage::RegionId;
41
42use crate::engine::create::extract_new_columns::extract_new_columns;
43use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
44use crate::engine::MetricEngineInner;
45use crate::error::{
46    ColumnTypeMismatchSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
47    InternalColumnOccupiedSnafu, InvalidMetadataSnafu, MissingRegionOptionSnafu,
48    MultipleFieldColumnSnafu, NoFieldColumnSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu,
49    Result, SerializeColumnMetadataSnafu, UnexpectedRequestSnafu,
50};
51use crate::metrics::PHYSICAL_REGION_COUNT;
52use crate::utils::{
53    self, append_manifest_info, encode_manifest_info_to_extensions, to_data_region_id,
54    to_metadata_region_id,
55};
56
57const DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY: u32 = 1024;
58const DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE: f64 = 0.01;
59
60impl MetricEngineInner {
61    pub async fn create_regions(
62        &self,
63        mut requests: Vec<(RegionId, RegionCreateRequest)>,
64        extension_return_value: &mut HashMap<String, Vec<u8>>,
65    ) -> Result<AffectedRows> {
66        if requests.is_empty() {
67            return Ok(0);
68        }
69
70        for (_, request) in requests.iter() {
71            Self::verify_region_create_request(request)?;
72        }
73
74        let first_request = &requests.first().unwrap().1;
75        if first_request.is_physical_table() {
76            ensure!(
77                requests.len() == 1,
78                UnexpectedRequestSnafu {
79                    reason: "Physical table must be created with single request".to_string(),
80                }
81            );
82            let (region_id, request) = requests.pop().unwrap();
83            self.create_physical_region(region_id, request, extension_return_value)
84                .await?;
85
86            return Ok(0);
87        } else if first_request
88            .options
89            .contains_key(LOGICAL_TABLE_METADATA_KEY)
90        {
91            if requests.len() == 1 {
92                let request = &requests.first().unwrap().1;
93                let physical_region_id = parse_physical_region_id(request)?;
94                let mut manifest_infos = Vec::with_capacity(1);
95                self.create_logical_regions(physical_region_id, requests, extension_return_value)
96                    .await?;
97                append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
98                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
99            } else {
100                let grouped_requests =
101                    group_create_logical_region_requests_by_physical_region_id(requests)?;
102                let mut manifest_infos = Vec::with_capacity(grouped_requests.len());
103                for (physical_region_id, requests) in grouped_requests {
104                    self.create_logical_regions(
105                        physical_region_id,
106                        requests,
107                        extension_return_value,
108                    )
109                    .await?;
110                    append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
111                }
112                encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
113            }
114        } else {
115            return MissingRegionOptionSnafu {}.fail();
116        }
117
118        Ok(0)
119    }
120
121    /// Initialize a physical metric region at given region id.
122    async fn create_physical_region(
123        &self,
124        region_id: RegionId,
125        request: RegionCreateRequest,
126        extension_return_value: &mut HashMap<String, Vec<u8>>,
127    ) -> Result<()> {
128        let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
129        let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
130
131        // create metadata region
132        let create_metadata_region_request = self.create_request_for_metadata_region(&request);
133        self.mito
134            .handle_request(
135                metadata_region_id,
136                RegionRequest::Create(create_metadata_region_request),
137            )
138            .await
139            .with_context(|_| CreateMitoRegionSnafu {
140                region_type: METADATA_REGION_SUBDIR,
141            })?;
142
143        // create data region
144        let create_data_region_request = self.create_request_for_data_region(&request);
145        let physical_columns = create_data_region_request
146            .column_metadatas
147            .iter()
148            .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
149            .collect::<HashMap<_, _>>();
150        let time_index_unit = create_data_region_request
151            .column_metadatas
152            .iter()
153            .find_map(|metadata| {
154                if metadata.semantic_type == SemanticType::Timestamp {
155                    metadata
156                        .column_schema
157                        .data_type
158                        .as_timestamp()
159                        .map(|data_type| data_type.unit())
160                } else {
161                    None
162                }
163            })
164            .context(UnexpectedRequestSnafu {
165                reason: "No time index column found",
166            })?;
167        let response = self
168            .mito
169            .handle_request(
170                data_region_id,
171                RegionRequest::Create(create_data_region_request),
172            )
173            .await
174            .with_context(|_| CreateMitoRegionSnafu {
175                region_type: DATA_REGION_SUBDIR,
176            })?;
177        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
178            PhysicalRegionNotFoundSnafu {
179                region_id: data_region_id,
180            },
181        )?;
182        extension_return_value.extend(response.extensions);
183
184        info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
185        PHYSICAL_REGION_COUNT.inc();
186
187        // remember this table
188        self.state.write().unwrap().add_physical_region(
189            data_region_id,
190            physical_columns,
191            primary_key_encoding,
192            physical_region_options,
193            time_index_unit,
194        );
195
196        Ok(())
197    }
198
199    /// Create multiple logical regions on the same physical region.
200    async fn create_logical_regions(
201        &self,
202        physical_region_id: RegionId,
203        requests: Vec<(RegionId, RegionCreateRequest)>,
204        extension_return_value: &mut HashMap<String, Vec<u8>>,
205    ) -> Result<()> {
206        let data_region_id = utils::to_data_region_id(physical_region_id);
207
208        let unit = self
209            .state
210            .read()
211            .unwrap()
212            .physical_region_time_index_unit(physical_region_id)
213            .context(PhysicalRegionNotFoundSnafu {
214                region_id: data_region_id,
215            })?;
216        // Checks the time index unit of each request.
217        for (_, request) in &requests {
218            // Safety: verify_region_create_request() ensures that the request is valid.
219            let time_index_column = request
220                .column_metadatas
221                .iter()
222                .find(|col| col.semantic_type == SemanticType::Timestamp)
223                .unwrap();
224            let request_unit = time_index_column
225                .column_schema
226                .data_type
227                .as_timestamp()
228                .unwrap()
229                .unit();
230            ensure!(
231                request_unit == unit,
232                UnexpectedRequestSnafu {
233                    reason: format!(
234                        "Metric has differenttime unit ({:?}) than the physical region ({:?})",
235                        request_unit, unit
236                    ),
237                }
238            );
239        }
240
241        // Filters out the requests that the logical region already exists
242        let requests = {
243            let state = self.state.read().unwrap();
244            let mut skipped = Vec::with_capacity(requests.len());
245            let mut kept_requests = Vec::with_capacity(requests.len());
246
247            for (region_id, request) in requests {
248                if state.is_logical_region_exist(region_id) {
249                    skipped.push(region_id);
250                } else {
251                    kept_requests.push((region_id, request));
252                }
253            }
254
255            // log skipped regions
256            if !skipped.is_empty() {
257                info!(
258                    "Skipped creating logical regions {skipped:?} because they already exist",
259                    skipped = skipped
260                );
261            }
262            kept_requests
263        };
264
265        // Finds new columns to add to physical region
266        let mut new_column_names = HashSet::new();
267        let mut new_columns = Vec::new();
268
269        let index_option = {
270            let state = &self.state.read().unwrap();
271            let region_state = state
272                .physical_region_states()
273                .get(&data_region_id)
274                .with_context(|| PhysicalRegionNotFoundSnafu {
275                    region_id: data_region_id,
276                })?;
277            let physical_columns = region_state.physical_columns();
278
279            extract_new_columns(
280                &requests,
281                physical_columns,
282                &mut new_column_names,
283                &mut new_columns,
284            )?;
285            region_state.options().index
286        };
287
288        // TODO(weny): we dont need to pass a mutable new_columns here.
289        self.data_region
290            .add_columns(data_region_id, new_columns, index_option)
291            .await?;
292
293        let physical_columns = self.data_region.physical_columns(data_region_id).await?;
294        let physical_schema_map = physical_columns
295            .iter()
296            .map(|metadata| (metadata.column_schema.name.as_str(), metadata))
297            .collect::<HashMap<_, _>>();
298        let logical_regions = requests
299            .iter()
300            .map(|(region_id, _)| (*region_id))
301            .collect::<Vec<_>>();
302        let logical_region_columns = requests.iter().map(|(region_id, request)| {
303            (
304                *region_id,
305                request
306                    .column_metadatas
307                    .iter()
308                    .map(|metadata| {
309                        // Safety: previous steps ensure the physical region exist
310                        let column_metadata = *physical_schema_map
311                            .get(metadata.column_schema.name.as_str())
312                            .unwrap();
313                        (metadata.column_schema.name.as_str(), column_metadata)
314                    })
315                    .collect::<HashMap<_, _>>(),
316            )
317        });
318
319        let new_add_columns = new_column_names.iter().map(|name| {
320            // Safety: previous steps ensure the physical region exist
321            let column_metadata = *physical_schema_map.get(name).unwrap();
322            (name.to_string(), column_metadata.column_id)
323        });
324
325        extension_return_value.insert(
326            ALTER_PHYSICAL_EXTENSION_KEY.to_string(),
327            ColumnMetadata::encode_list(&physical_columns).context(SerializeColumnMetadataSnafu)?,
328        );
329
330        // Writes logical regions metadata to metadata region
331        self.metadata_region
332            .add_logical_regions(physical_region_id, true, logical_region_columns)
333            .await?;
334
335        {
336            let mut state = self.state.write().unwrap();
337            state.add_physical_columns(data_region_id, new_add_columns);
338            state.add_logical_regions(physical_region_id, logical_regions.clone());
339        }
340        for logical_region_id in logical_regions {
341            self.metadata_region
342                .open_logical_region(logical_region_id)
343                .await;
344        }
345
346        Ok(())
347    }
348
349    /// Check if
350    /// - internal columns are not occupied
351    /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
352    ///   [LOGICAL_TABLE_METADATA_KEY])
353    fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
354        request.validate().context(InvalidMetadataSnafu)?;
355
356        let name_to_index = request
357            .column_metadatas
358            .iter()
359            .enumerate()
360            .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
361            .collect::<HashMap<String, usize>>();
362
363        // check if internal columns are not occupied
364        ensure!(
365            !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
366            InternalColumnOccupiedSnafu {
367                column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
368            }
369        );
370        ensure!(
371            !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
372            InternalColumnOccupiedSnafu {
373                column: DATA_SCHEMA_TSID_COLUMN_NAME,
374            }
375        );
376
377        // check if required table option is present
378        ensure!(
379            request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
380            MissingRegionOptionSnafu {}
381        );
382        ensure!(
383            !(request.is_physical_table()
384                && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
385            ConflictRegionOptionSnafu {}
386        );
387
388        // check if only one field column is declared, and all tag columns are string
389        let mut field_col: Option<&ColumnMetadata> = None;
390        for col in &request.column_metadatas {
391            match col.semantic_type {
392                SemanticType::Tag => ensure!(
393                    col.column_schema.data_type == ConcreteDataType::string_datatype(),
394                    ColumnTypeMismatchSnafu {
395                        expect: ConcreteDataType::string_datatype(),
396                        actual: col.column_schema.data_type.clone(),
397                    }
398                ),
399                SemanticType::Field => {
400                    if field_col.is_some() {
401                        MultipleFieldColumnSnafu {
402                            previous: field_col.unwrap().column_schema.name.clone(),
403                            current: col.column_schema.name.clone(),
404                        }
405                        .fail()?;
406                    }
407                    field_col = Some(col)
408                }
409                SemanticType::Timestamp => {}
410            }
411        }
412        let field_col = field_col.context(NoFieldColumnSnafu)?;
413
414        // make sure the field column is float64 type
415        ensure!(
416            field_col.column_schema.data_type == ConcreteDataType::float64_datatype(),
417            ColumnTypeMismatchSnafu {
418                expect: ConcreteDataType::float64_datatype(),
419                actual: field_col.column_schema.data_type.clone(),
420            }
421        );
422
423        Ok(())
424    }
425
426    /// Build data region id and metadata region id from the given region id.
427    ///
428    /// Return value: (data_region_id, metadata_region_id)
429    fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
430        (
431            to_data_region_id(region_id),
432            to_metadata_region_id(region_id),
433        )
434    }
435
436    /// Build [RegionCreateRequest] for metadata region
437    ///
438    /// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
439    pub fn create_request_for_metadata_region(
440        &self,
441        request: &RegionCreateRequest,
442    ) -> RegionCreateRequest {
443        // ts TIME INDEX DEFAULT 0
444        let timestamp_column_metadata = ColumnMetadata {
445            column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
446            semantic_type: SemanticType::Timestamp,
447            column_schema: ColumnSchema::new(
448                METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
449                ConcreteDataType::timestamp_millisecond_datatype(),
450                false,
451            )
452            .with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
453                Value::Timestamp(Timestamp::new_millisecond(0)),
454            )))
455            .unwrap(),
456        };
457        // key STRING PRIMARY KEY
458        let key_column_metadata = ColumnMetadata {
459            column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
460            semantic_type: SemanticType::Tag,
461            column_schema: ColumnSchema::new(
462                METADATA_SCHEMA_KEY_COLUMN_NAME,
463                ConcreteDataType::string_datatype(),
464                false,
465            ),
466        };
467        // val STRING
468        let value_column_metadata = ColumnMetadata {
469            column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
470            semantic_type: SemanticType::Field,
471            column_schema: ColumnSchema::new(
472                METADATA_SCHEMA_VALUE_COLUMN_NAME,
473                ConcreteDataType::string_datatype(),
474                true,
475            ),
476        };
477
478        // concat region dir
479        let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
480
481        let options = region_options_for_metadata_region(&request.options);
482        RegionCreateRequest {
483            engine: MITO_ENGINE_NAME.to_string(),
484            column_metadatas: vec![
485                timestamp_column_metadata,
486                key_column_metadata,
487                value_column_metadata,
488            ],
489            primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
490            options,
491            region_dir: metadata_region_dir,
492        }
493    }
494
495    /// Convert [RegionCreateRequest] for data region.
496    ///
497    /// All tag columns in the original request will be converted to value columns.
498    /// Those columns real semantic type is stored in metadata region.
499    ///
500    /// This will also add internal columns to the request.
501    pub fn create_request_for_data_region(
502        &self,
503        request: &RegionCreateRequest,
504    ) -> RegionCreateRequest {
505        let mut data_region_request = request.clone();
506        let mut primary_key = vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
507
508        // concat region dir
509        data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
510
511        // change nullability for tag columns
512        data_region_request
513            .column_metadatas
514            .iter_mut()
515            .for_each(|metadata| {
516                if metadata.semantic_type == SemanticType::Tag {
517                    metadata.column_schema.set_nullable();
518                    primary_key.push(metadata.column_id);
519                }
520            });
521
522        // add internal columns
523        let [table_id_col, tsid_col] = Self::internal_column_metadata();
524        data_region_request.column_metadatas.push(table_id_col);
525        data_region_request.column_metadatas.push(tsid_col);
526        data_region_request.primary_key = primary_key;
527
528        // set data region options
529        set_data_region_options(
530            &mut data_region_request.options,
531            self.config.experimental_sparse_primary_key_encoding,
532        );
533
534        data_region_request
535    }
536
537    /// Generate internal column metadata.
538    ///
539    /// Return `[table_id_col, tsid_col]`
540    fn internal_column_metadata() -> [ColumnMetadata; 2] {
541        // Safety: BloomFilter is a valid skipping index type
542        let metric_name_col = ColumnMetadata {
543            column_id: ReservedColumnId::table_id(),
544            semantic_type: SemanticType::Tag,
545            column_schema: ColumnSchema::new(
546                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
547                ConcreteDataType::uint32_datatype(),
548                false,
549            )
550            .with_skipping_options(SkippingIndexOptions::new_unchecked(
551                DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY,
552                DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE,
553                datatypes::schema::SkippingIndexType::BloomFilter,
554            ))
555            .unwrap(),
556        };
557        let tsid_col = ColumnMetadata {
558            column_id: ReservedColumnId::tsid(),
559            semantic_type: SemanticType::Tag,
560            column_schema: ColumnSchema::new(
561                DATA_SCHEMA_TSID_COLUMN_NAME,
562                ConcreteDataType::uint64_datatype(),
563                false,
564            )
565            .with_inverted_index(false),
566        };
567        [metric_name_col, tsid_col]
568    }
569}
570
571/// Groups the create logical region requests by physical region id.
572fn group_create_logical_region_requests_by_physical_region_id(
573    requests: Vec<(RegionId, RegionCreateRequest)>,
574) -> Result<HashMap<RegionId, Vec<(RegionId, RegionCreateRequest)>>> {
575    let mut result = HashMap::with_capacity(requests.len());
576    for (region_id, request) in requests {
577        let physical_region_id = parse_physical_region_id(&request)?;
578        result
579            .entry(physical_region_id)
580            .or_insert_with(Vec::new)
581            .push((region_id, request));
582    }
583
584    Ok(result)
585}
586
587/// Parses the physical region id from the request.
588fn parse_physical_region_id(request: &RegionCreateRequest) -> Result<RegionId> {
589    let physical_region_id_raw = request
590        .options
591        .get(LOGICAL_TABLE_METADATA_KEY)
592        .ok_or(MissingRegionOptionSnafu {}.build())?;
593
594    let physical_region_id: RegionId = physical_region_id_raw
595        .parse::<u64>()
596        .with_context(|_| ParseRegionIdSnafu {
597            raw: physical_region_id_raw,
598        })?
599        .into();
600
601    Ok(physical_region_id)
602}
603
604/// Creates the region options for metadata region in metric engine.
605pub(crate) fn region_options_for_metadata_region(
606    original: &HashMap<String, String>,
607) -> HashMap<String, String> {
608    let mut metadata_region_options = HashMap::new();
609    metadata_region_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
610
611    if let Some(wal_options) = original.get(WAL_OPTIONS_KEY) {
612        metadata_region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.to_string());
613    }
614
615    metadata_region_options
616}
617
618#[cfg(test)]
619mod test {
620    use common_meta::ddl::test_util::assert_column_name_and_id;
621    use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
622    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
623    use store_api::region_request::BatchRegionDdlRequest;
624
625    use super::*;
626    use crate::config::EngineConfig;
627    use crate::engine::MetricEngine;
628    use crate::test_util::{create_logical_region_request, TestEnv};
629
630    #[test]
631    fn test_verify_region_create_request() {
632        // internal column is occupied
633        let request = RegionCreateRequest {
634            column_metadatas: vec![
635                ColumnMetadata {
636                    column_id: 0,
637                    semantic_type: SemanticType::Timestamp,
638                    column_schema: ColumnSchema::new(
639                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
640                        ConcreteDataType::timestamp_millisecond_datatype(),
641                        false,
642                    ),
643                },
644                ColumnMetadata {
645                    column_id: 1,
646                    semantic_type: SemanticType::Tag,
647                    column_schema: ColumnSchema::new(
648                        DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
649                        ConcreteDataType::uint32_datatype(),
650                        false,
651                    ),
652                },
653            ],
654            region_dir: "test_dir".to_string(),
655            engine: METRIC_ENGINE_NAME.to_string(),
656            primary_key: vec![],
657            options: HashMap::new(),
658        };
659        let result = MetricEngineInner::verify_region_create_request(&request);
660        assert!(result.is_err());
661        assert_eq!(
662            result.unwrap_err().to_string(),
663            "Internal column __table_id is reserved".to_string()
664        );
665
666        // valid request
667        let request = RegionCreateRequest {
668            column_metadatas: vec![
669                ColumnMetadata {
670                    column_id: 0,
671                    semantic_type: SemanticType::Timestamp,
672                    column_schema: ColumnSchema::new(
673                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
674                        ConcreteDataType::timestamp_millisecond_datatype(),
675                        false,
676                    ),
677                },
678                ColumnMetadata {
679                    column_id: 1,
680                    semantic_type: SemanticType::Tag,
681                    column_schema: ColumnSchema::new(
682                        "column1".to_string(),
683                        ConcreteDataType::string_datatype(),
684                        false,
685                    ),
686                },
687                ColumnMetadata {
688                    column_id: 2,
689                    semantic_type: SemanticType::Field,
690                    column_schema: ColumnSchema::new(
691                        "column2".to_string(),
692                        ConcreteDataType::float64_datatype(),
693                        false,
694                    ),
695                },
696            ],
697            region_dir: "test_dir".to_string(),
698            engine: METRIC_ENGINE_NAME.to_string(),
699            primary_key: vec![],
700            options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
701                .into_iter()
702                .collect(),
703        };
704        MetricEngineInner::verify_region_create_request(&request).unwrap();
705    }
706
707    #[test]
708    fn test_verify_region_create_request_options() {
709        let mut request = RegionCreateRequest {
710            column_metadatas: vec![
711                ColumnMetadata {
712                    column_id: 0,
713                    semantic_type: SemanticType::Timestamp,
714                    column_schema: ColumnSchema::new(
715                        METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
716                        ConcreteDataType::timestamp_millisecond_datatype(),
717                        false,
718                    ),
719                },
720                ColumnMetadata {
721                    column_id: 1,
722                    semantic_type: SemanticType::Field,
723                    column_schema: ColumnSchema::new(
724                        "val".to_string(),
725                        ConcreteDataType::float64_datatype(),
726                        false,
727                    ),
728                },
729            ],
730            region_dir: "test_dir".to_string(),
731            engine: METRIC_ENGINE_NAME.to_string(),
732            primary_key: vec![],
733            options: HashMap::new(),
734        };
735        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
736
737        let mut options = HashMap::new();
738        options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
739        request.options.clone_from(&options);
740        MetricEngineInner::verify_region_create_request(&request).unwrap();
741
742        options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
743        request.options.clone_from(&options);
744        MetricEngineInner::verify_region_create_request(&request).unwrap_err();
745
746        options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
747        request.options = options;
748        MetricEngineInner::verify_region_create_request(&request).unwrap();
749    }
750
751    #[tokio::test]
752    async fn test_create_request_for_physical_regions() {
753        // original request
754        let options: HashMap<_, _> = [
755            ("ttl".to_string(), "60m".to_string()),
756            ("skip_wal".to_string(), "true".to_string()),
757        ]
758        .into_iter()
759        .collect();
760        let request = RegionCreateRequest {
761            engine: METRIC_ENGINE_NAME.to_string(),
762            column_metadatas: vec![
763                ColumnMetadata {
764                    column_id: 0,
765                    semantic_type: SemanticType::Timestamp,
766                    column_schema: ColumnSchema::new(
767                        "timestamp",
768                        ConcreteDataType::timestamp_millisecond_datatype(),
769                        false,
770                    ),
771                },
772                ColumnMetadata {
773                    column_id: 1,
774                    semantic_type: SemanticType::Tag,
775                    column_schema: ColumnSchema::new(
776                        "tag",
777                        ConcreteDataType::string_datatype(),
778                        false,
779                    ),
780                },
781            ],
782            primary_key: vec![0],
783            options,
784            region_dir: "/test_dir".to_string(),
785        };
786
787        // set up
788        let env = TestEnv::new().await;
789        let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap();
790        let engine_inner = engine.inner;
791
792        // check create data region request
793        let data_region_request = engine_inner.create_request_for_data_region(&request);
794        assert_eq!(
795            data_region_request.region_dir,
796            "/test_dir/data/".to_string()
797        );
798        assert_eq!(data_region_request.column_metadatas.len(), 4);
799        assert_eq!(
800            data_region_request.primary_key,
801            vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1]
802        );
803        assert!(data_region_request.options.contains_key("ttl"));
804
805        // check create metadata region request
806        let metadata_region_request = engine_inner.create_request_for_metadata_region(&request);
807        assert_eq!(
808            metadata_region_request.region_dir,
809            "/test_dir/metadata/".to_string()
810        );
811        assert_eq!(
812            metadata_region_request.options.get("ttl").unwrap(),
813            "forever"
814        );
815        assert!(!metadata_region_request.options.contains_key("skip_wal"));
816    }
817
818    #[tokio::test]
819    async fn test_create_logical_regions() {
820        let env = TestEnv::new().await;
821        let engine = env.metric();
822        let physical_region_id1 = RegionId::new(1024, 0);
823        let physical_region_id2 = RegionId::new(1024, 1);
824        let logical_region_id1 = RegionId::new(1025, 0);
825        let logical_region_id2 = RegionId::new(1025, 1);
826        env.create_physical_region(physical_region_id1, "/test_dir1")
827            .await;
828        env.create_physical_region(physical_region_id2, "/test_dir2")
829            .await;
830
831        let region_create_request1 =
832            create_logical_region_request(&["job"], physical_region_id1, "logical1");
833        let region_create_request2 =
834            create_logical_region_request(&["job"], physical_region_id2, "logical2");
835
836        let response = engine
837            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
838                (logical_region_id1, region_create_request1),
839                (logical_region_id2, region_create_request2),
840            ]))
841            .await
842            .unwrap();
843
844        let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
845        assert_eq!(manifest_infos.len(), 2);
846        let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
847        assert!(region_ids.contains(&physical_region_id1));
848        assert!(region_ids.contains(&physical_region_id2));
849
850        let column_metadatas =
851            parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
852        assert_column_name_and_id(
853            &column_metadatas,
854            &[
855                ("greptime_timestamp", 0),
856                ("greptime_value", 1),
857                ("__table_id", ReservedColumnId::table_id()),
858                ("__tsid", ReservedColumnId::tsid()),
859                ("job", 2),
860            ],
861        );
862    }
863}