1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use serde::{Deserialize, Serialize};
21use snafu::{ensure, OptionExt, ResultExt};
22use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
23use store_api::storage::{ColumnId, RegionId};
24
25use crate::error::{
26 CheckObjectSnafu, DecodeJsonSnafu, DeleteRegionManifestSnafu, EncodeJsonSnafu,
27 InvalidMetadataSnafu, LoadRegionManifestSnafu, ManifestExistsSnafu, MissingRequiredFieldSnafu,
28 ParseFileFormatSnafu, Result, StoreRegionManifestSnafu,
29};
30use crate::FileOptions;
31
32#[inline]
33fn region_manifest_path(region_dir: &str) -> String {
34 format!("{region_dir}manifest/_file_manifest")
35}
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct FileRegionManifest {
39 pub region_id: RegionId,
40 pub column_metadatas: Vec<ColumnMetadata>,
41 pub primary_key: Vec<ColumnId>,
42 pub options: HashMap<String, String>,
43}
44
45impl FileRegionManifest {
46 pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
47 let path = ®ion_manifest_path(region_dir);
48 let exist = object_store
49 .exists(path)
50 .await
51 .context(CheckObjectSnafu { path })?;
52 ensure!(!exist, ManifestExistsSnafu { path });
53
54 let bs = self.encode()?;
55 object_store
56 .write(path, bs)
57 .await
58 .context(StoreRegionManifestSnafu {
59 region_id: self.region_id,
60 })?;
61
62 Ok(())
63 }
64
65 pub async fn load(
66 region_id: RegionId,
67 region_dir: &str,
68 object_store: &ObjectStore,
69 ) -> Result<Self> {
70 let path = ®ion_manifest_path(region_dir);
71 let bs = object_store
72 .read(path)
73 .await
74 .context(LoadRegionManifestSnafu { region_id })?
75 .to_vec();
76 Self::decode(bs.as_slice())
77 }
78
79 pub async fn delete(
80 region_id: RegionId,
81 region_dir: &str,
82 object_store: &ObjectStore,
83 ) -> Result<()> {
84 let path = ®ion_manifest_path(region_dir);
85 object_store
86 .delete(path)
87 .await
88 .context(DeleteRegionManifestSnafu { region_id })
89 }
90
91 pub fn metadata(&self) -> Result<RegionMetadataRef> {
92 let mut builder = RegionMetadataBuilder::new(self.region_id);
93 for column in &self.column_metadatas {
94 builder.push_column_metadata(column.clone());
95 }
96 builder.primary_key(self.primary_key.clone());
97 let metadata = builder.build().context(InvalidMetadataSnafu)?;
98
99 Ok(Arc::new(metadata))
100 }
101
102 pub fn url(&self) -> Result<String> {
103 self.get_option(table::requests::FILE_TABLE_LOCATION_KEY)
104 }
105
106 pub fn file_options(&self) -> Result<FileOptions> {
107 let encoded_opts = self.get_option(table::requests::FILE_TABLE_META_KEY)?;
108 serde_json::from_str(&encoded_opts).context(DecodeJsonSnafu)
109 }
110
111 pub fn format(&self) -> Result<Format> {
112 Format::try_from(&self.options).context(ParseFileFormatSnafu)
113 }
114
115 fn encode(&self) -> Result<Vec<u8>> {
116 serde_json::to_vec(self).context(EncodeJsonSnafu)
117 }
118
119 fn decode(src: &[u8]) -> Result<Self> {
120 serde_json::from_slice(src).context(DecodeJsonSnafu)
121 }
122
123 fn get_option(&self, name: &str) -> Result<String> {
124 self.options
125 .get(name)
126 .cloned()
127 .context(MissingRequiredFieldSnafu { name })
128 }
129}