file_engine/
manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use serde::{Deserialize, Serialize};
21use snafu::{ensure, OptionExt, ResultExt};
22use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
23use store_api::storage::{ColumnId, RegionId};
24
25use crate::error::{
26    CheckObjectSnafu, DecodeJsonSnafu, DeleteRegionManifestSnafu, EncodeJsonSnafu,
27    InvalidMetadataSnafu, LoadRegionManifestSnafu, ManifestExistsSnafu, MissingRequiredFieldSnafu,
28    ParseFileFormatSnafu, Result, StoreRegionManifestSnafu,
29};
30use crate::FileOptions;
31
32#[inline]
33fn region_manifest_path(region_dir: &str) -> String {
34    format!("{region_dir}manifest/_file_manifest")
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct FileRegionManifest {
39    pub region_id: RegionId,
40    pub column_metadatas: Vec<ColumnMetadata>,
41    pub primary_key: Vec<ColumnId>,
42    pub options: HashMap<String, String>,
43}
44
45impl FileRegionManifest {
46    pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
47        let path = &region_manifest_path(region_dir);
48        let exist = object_store
49            .exists(path)
50            .await
51            .context(CheckObjectSnafu { path })?;
52        ensure!(!exist, ManifestExistsSnafu { path });
53
54        let bs = self.encode()?;
55        object_store
56            .write(path, bs)
57            .await
58            .context(StoreRegionManifestSnafu {
59                region_id: self.region_id,
60            })?;
61
62        Ok(())
63    }
64
65    pub async fn load(
66        region_id: RegionId,
67        region_dir: &str,
68        object_store: &ObjectStore,
69    ) -> Result<Self> {
70        let path = &region_manifest_path(region_dir);
71        let bs = object_store
72            .read(path)
73            .await
74            .context(LoadRegionManifestSnafu { region_id })?
75            .to_vec();
76        Self::decode(bs.as_slice())
77    }
78
79    pub async fn delete(
80        region_id: RegionId,
81        region_dir: &str,
82        object_store: &ObjectStore,
83    ) -> Result<()> {
84        let path = &region_manifest_path(region_dir);
85        object_store
86            .delete(path)
87            .await
88            .context(DeleteRegionManifestSnafu { region_id })
89    }
90
91    pub fn metadata(&self) -> Result<RegionMetadataRef> {
92        let mut builder = RegionMetadataBuilder::new(self.region_id);
93        for column in &self.column_metadatas {
94            builder.push_column_metadata(column.clone());
95        }
96        builder.primary_key(self.primary_key.clone());
97        let metadata = builder.build().context(InvalidMetadataSnafu)?;
98
99        Ok(Arc::new(metadata))
100    }
101
102    pub fn url(&self) -> Result<String> {
103        self.get_option(table::requests::FILE_TABLE_LOCATION_KEY)
104    }
105
106    pub fn file_options(&self) -> Result<FileOptions> {
107        let encoded_opts = self.get_option(table::requests::FILE_TABLE_META_KEY)?;
108        serde_json::from_str(&encoded_opts).context(DecodeJsonSnafu)
109    }
110
111    pub fn format(&self) -> Result<Format> {
112        Format::try_from(&self.options).context(ParseFileFormatSnafu)
113    }
114
115    fn encode(&self) -> Result<Vec<u8>> {
116        serde_json::to_vec(self).context(EncodeJsonSnafu)
117    }
118
119    fn decode(src: &[u8]) -> Result<Self> {
120        serde_json::from_slice(src).context(DecodeJsonSnafu)
121    }
122
123    fn get_option(&self, name: &str) -> Result<String> {
124        self.options
125            .get(name)
126            .cloned()
127            .context(MissingRequiredFieldSnafu { name })
128    }
129}