file_engine/
manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
23use store_api::storage::{ColumnId, RegionId};
24
25use crate::FileOptions;
26use crate::error::{
27    CheckObjectSnafu, DecodeJsonSnafu, DeleteRegionManifestSnafu, EncodeJsonSnafu,
28    InvalidMetadataSnafu, LoadRegionManifestSnafu, ManifestExistsSnafu, MissingRequiredFieldSnafu,
29    ParseFileFormatSnafu, Result, StoreRegionManifestSnafu,
30};
31
32#[inline]
33fn region_manifest_path(region_dir: &str) -> String {
34    format!("{region_dir}manifest/_file_manifest")
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct FileRegionManifest {
39    pub region_id: RegionId,
40    pub column_metadatas: Vec<ColumnMetadata>,
41    pub primary_key: Vec<ColumnId>,
42    pub options: HashMap<String, String>,
43}
44
45impl FileRegionManifest {
46    pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
47        let path = &region_manifest_path(region_dir);
48        let exist = object_store
49            .exists(path)
50            .await
51            .context(CheckObjectSnafu { path })?;
52        ensure!(!exist, ManifestExistsSnafu { path });
53
54        let bs = self.encode()?;
55        object_store
56            .write(path, bs)
57            .await
58            .context(StoreRegionManifestSnafu {
59                region_id: self.region_id,
60            })?;
61
62        Ok(())
63    }
64
65    pub async fn load(
66        region_id: RegionId,
67        region_dir: &str,
68        object_store: &ObjectStore,
69    ) -> Result<Self> {
70        let path = &region_manifest_path(region_dir);
71        let bs = object_store
72            .read(path)
73            .await
74            .context(LoadRegionManifestSnafu { region_id })?
75            .to_vec();
76        Self::decode(bs.as_slice())
77    }
78
79    pub async fn delete(
80        region_id: RegionId,
81        region_dir: &str,
82        object_store: &ObjectStore,
83    ) -> Result<()> {
84        let path = &region_manifest_path(region_dir);
85        object_store
86            .delete(path)
87            .await
88            .context(DeleteRegionManifestSnafu { region_id })
89    }
90
91    pub fn metadata(&self) -> Result<RegionMetadataRef> {
92        let mut builder = RegionMetadataBuilder::new(self.region_id);
93        for column in &self.column_metadatas {
94            builder.push_column_metadata(column.clone());
95        }
96        builder.primary_key(self.primary_key.clone());
97        let metadata = builder
98            .build_without_validation()
99            .context(InvalidMetadataSnafu)?;
100
101        Ok(Arc::new(metadata))
102    }
103
104    pub fn url(&self) -> Result<String> {
105        self.get_option(table::requests::FILE_TABLE_LOCATION_KEY)
106    }
107
108    pub fn file_options(&self) -> Result<FileOptions> {
109        let encoded_opts = self.get_option(table::requests::FILE_TABLE_META_KEY)?;
110        serde_json::from_str(&encoded_opts).context(DecodeJsonSnafu)
111    }
112
113    pub fn format(&self) -> Result<Format> {
114        Format::try_from(&self.options).context(ParseFileFormatSnafu)
115    }
116
117    fn encode(&self) -> Result<Vec<u8>> {
118        serde_json::to_vec(self).context(EncodeJsonSnafu)
119    }
120
121    fn decode(src: &[u8]) -> Result<Self> {
122        serde_json::from_slice(src).context(DecodeJsonSnafu)
123    }
124
125    fn get_option(&self, name: &str) -> Result<String> {
126        self.options
127            .get(name)
128            .cloned()
129            .context(MissingRequiredFieldSnafu { name })
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use api::v1::SemanticType;
136    use datatypes::prelude::ConcreteDataType;
137    use datatypes::schema::ColumnSchema;
138
139    use super::*;
140
141    #[test]
142    fn metadata_allows_internal_column_name() {
143        let manifest = FileRegionManifest {
144            region_id: RegionId::new(1, 0),
145            column_metadatas: vec![
146                ColumnMetadata {
147                    column_schema: ColumnSchema::new(
148                        "__primary_key",
149                        ConcreteDataType::string_datatype(),
150                        false,
151                    ),
152                    semantic_type: SemanticType::Tag,
153                    column_id: 1,
154                },
155                ColumnMetadata {
156                    column_schema: ColumnSchema::new(
157                        "ts",
158                        ConcreteDataType::timestamp_millisecond_datatype(),
159                        false,
160                    ),
161                    semantic_type: SemanticType::Timestamp,
162                    column_id: 2,
163                },
164            ],
165            primary_key: vec![1],
166            options: HashMap::default(),
167        };
168
169        let metadata = manifest.metadata().unwrap();
170        assert!(
171            metadata
172                .column_metadatas
173                .iter()
174                .any(|c| c.column_schema.name == "__primary_key")
175        );
176    }
177}