1pub mod file;
16
17use std::fmt::{Display, Formatter};
18use std::path::{Path, PathBuf};
19use std::time::Instant;
20
21use common_telemetry::info;
22use file::{Metadata, MetadataContent};
23use futures::TryStreamExt;
24use object_store::ObjectStore;
25use snafu::{OptionExt, ResultExt};
26use strum::Display;
27
28use crate::error::{
29 Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
30 Result, WriteObjectSnafu,
31};
32use crate::kv_backend::KvBackendRef;
33use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
34use crate::rpc::store::{BatchPutRequest, RangeRequest};
35use crate::rpc::KeyValue;
36use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
37
38#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
40pub enum FileFormat {
41 #[strum(serialize = "fb")]
42 FlexBuffers,
43}
44
45impl TryFrom<&str> for FileFormat {
46 type Error = String;
47
48 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
49 match value.to_lowercase().as_str() {
50 "fb" => Ok(FileFormat::FlexBuffers),
51 _ => Err(format!("Invalid file format: {}", value)),
52 }
53 }
54}
55
56#[derive(Debug, PartialEq, Eq, Display)]
57#[strum(serialize_all = "lowercase")]
58pub enum DataType {
59 Metadata,
60}
61
62impl TryFrom<&str> for DataType {
63 type Error = String;
64
65 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
66 match value.to_lowercase().as_str() {
67 "metadata" => Ok(DataType::Metadata),
68 _ => Err(format!("Invalid data type: {}", value)),
69 }
70 }
71}
72
73#[derive(Debug, PartialEq, Eq)]
74pub struct FileExtension {
75 format: FileFormat,
76 data_type: DataType,
77}
78
79impl FileExtension {
80 pub fn new(format: FileFormat, data_type: DataType) -> Self {
81 Self { format, data_type }
82 }
83}
84
85impl Display for FileExtension {
86 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
87 write!(f, "{}.{}", self.data_type, self.format)
88 }
89}
90
91impl TryFrom<&str> for FileExtension {
92 type Error = Error;
93
94 fn try_from(value: &str) -> Result<Self> {
95 let parts = value.split(".").collect::<Vec<&str>>();
96 if parts.len() != 2 {
97 return InvalidFileExtensionSnafu {
98 reason: format!(
99 "Extension should be in the format of <datatype>.<format>, got: {}",
100 value
101 ),
102 }
103 .fail();
104 }
105
106 let data_type = DataType::try_from(parts[0])
107 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
108 let format = FileFormat::try_from(parts[1])
109 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
110 Ok(FileExtension { format, data_type })
111 }
112}
113
114#[derive(Debug, PartialEq, Eq)]
115pub struct FileName {
116 name: String,
117 extension: FileExtension,
118}
119
120impl Display for FileName {
121 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122 write!(f, "{}.{}", self.name, self.extension)
123 }
124}
125
126impl TryFrom<&str> for FileName {
127 type Error = Error;
128
129 fn try_from(value: &str) -> Result<Self> {
130 let Some((name, extension)) = value.split_once(".") else {
131 return InvalidFileNameSnafu {
132 reason: format!(
133 "The file name should be in the format of <name>.<extension>, got: {}",
134 value
135 ),
136 }
137 .fail();
138 };
139 let extension = FileExtension::try_from(extension)?;
140 Ok(Self {
141 name: name.to_string(),
142 extension,
143 })
144 }
145}
146
147impl FileName {
148 fn new(name: String, extension: FileExtension) -> Self {
149 Self { name, extension }
150 }
151}
152
153pub struct MetadataSnapshotManager {
157 kv_backend: KvBackendRef,
158 object_store: ObjectStore,
159}
160
161const MAX_REQUEST_SIZE: usize = 1024 * 1024;
163
164impl MetadataSnapshotManager {
165 pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
166 Self {
167 kv_backend,
168 object_store,
169 }
170 }
171
172 pub async fn restore(&self, file_path: &str) -> Result<u64> {
174 let path = Path::new(file_path);
175
176 let file_name = path
177 .file_name()
178 .and_then(|s| s.to_str())
179 .context(InvalidFilePathSnafu { file_path })?;
180
181 let filename = FileName::try_from(file_name)?;
182 let data = self
183 .object_store
184 .read(file_path)
185 .await
186 .context(ReadObjectSnafu { file_path })?;
187 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
188 let metadata_content = document.into_metadata_content()?;
189 let mut req = BatchPutRequest::default();
190 let mut total_request_size = 0;
191 let mut count = 0;
192 let now = Instant::now();
193 for FileKeyValue { key, value } in metadata_content.into_iter() {
194 count += 1;
195 let key_size = key.len();
196 let value_size = value.len();
197 if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
198 self.kv_backend.batch_put(req).await?;
199 req = BatchPutRequest::default();
200 total_request_size = 0;
201 }
202 req.kvs.push(KeyValue { key, value });
203 total_request_size += key_size + value_size;
204 }
205 if !req.kvs.is_empty() {
206 self.kv_backend.batch_put(req).await?;
207 }
208
209 info!(
210 "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
211 file_path,
212 count,
213 now.elapsed()
214 );
215 Ok(count)
216 }
217
218 pub async fn check_target_source_clean(&self) -> Result<bool> {
219 let req = RangeRequest::new().with_range(vec![0], vec![0]);
220 let mut stream = Box::pin(
221 PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
222 );
223 let v = stream.as_mut().try_next().await?;
224 Ok(v.is_none())
225 }
226
227 pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
229 let format = FileFormat::FlexBuffers;
230 let filename = FileName::new(
231 filename_str.to_string(),
232 FileExtension {
233 format,
234 data_type: DataType::Metadata,
235 },
236 );
237 let file_path_buf = [path, filename.to_string().as_str()]
238 .iter()
239 .collect::<PathBuf>();
240 let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
241 reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
242 })?;
243 let now = Instant::now();
244 let req = RangeRequest::new().with_range(vec![0], vec![0]);
245 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
246 Ok(FileKeyValue {
247 key: kv.key,
248 value: kv.value,
249 })
250 })
251 .into_stream();
252 let keyvalues = stream.try_collect::<Vec<_>>().await?;
253 let num_keyvalues = keyvalues.len();
254 let document = Document::new(
255 Metadata::new(),
256 file::Content::Metadata(MetadataContent::new(keyvalues)),
257 );
258 let bytes = document.to_bytes(&format)?;
259 let r = self
260 .object_store
261 .write(file_path, bytes)
262 .await
263 .context(WriteObjectSnafu { file_path })?;
264 info!(
265 "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
266 file_path,
267 num_keyvalues,
268 r.content_length(),
269 now.elapsed()
270 );
271
272 Ok((filename.to_string(), num_keyvalues as u64))
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use std::assert_matches::assert_matches;
279 use std::sync::Arc;
280
281 use common_test_util::temp_dir::{create_temp_dir, TempDir};
282 use object_store::services::Fs;
283
284 use super::*;
285 use crate::kv_backend::memory::MemoryKvBackend;
286 use crate::kv_backend::KvBackend;
287 use crate::rpc::store::PutRequest;
288
289 #[test]
290 fn test_file_name() {
291 let file_name = FileName::try_from("test.metadata.fb").unwrap();
292 assert_eq!(file_name.name, "test");
293 assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
294 assert_eq!(file_name.extension.data_type, DataType::Metadata);
295 assert_eq!(file_name.to_string(), "test.metadata.fb");
296
297 let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
298 assert_eq!(
299 invalid_file_name.to_string(),
300 "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
301 );
302
303 let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
304 assert_eq!(
305 invalid_file_extension.to_string(),
306 "Invalid file extension: Invalid file format: hello"
307 );
308 }
309
310 fn test_env(
311 prefix: &str,
312 ) -> (
313 TempDir,
314 Arc<MemoryKvBackend<Error>>,
315 MetadataSnapshotManager,
316 ) {
317 let temp_dir = create_temp_dir(prefix);
318 let kv_backend = Arc::new(MemoryKvBackend::default());
319 let temp_path = temp_dir.path();
320 let data_path = temp_path.join("data").as_path().display().to_string();
321 let builder = Fs::default().root(&data_path);
322 let object_store = ObjectStore::new(builder).unwrap().finish();
323 let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
324 (temp_dir, kv_backend, manager)
325 }
326
327 #[tokio::test]
328 async fn test_dump_and_restore() {
329 common_telemetry::init_default_ut_logging();
330 let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
331 let temp_path = temp_dir.path();
332
333 for i in 0..10 {
334 kv_backend
335 .put(
336 PutRequest::new()
337 .with_key(format!("test_{}", i).as_bytes().to_vec())
338 .with_value(format!("value_{}", i).as_bytes().to_vec()),
339 )
340 .await
341 .unwrap();
342 }
343 let dump_path = temp_path.join("snapshot");
344 manager
345 .dump(
346 &dump_path.as_path().display().to_string(),
347 "metadata_snapshot",
348 )
349 .await
350 .unwrap();
351 kv_backend.clear();
353
354 let restore_path = dump_path
355 .join("metadata_snapshot.metadata.fb")
356 .as_path()
357 .display()
358 .to_string();
359 manager.restore(&restore_path).await.unwrap();
360
361 for i in 0..10 {
362 let key = format!("test_{}", i);
363 let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
364 assert_eq!(value.value, format!("value_{}", i).as_bytes());
365 }
366 }
367
368 #[tokio::test]
369 async fn test_restore_from_nonexistent_file() {
370 let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
371 let restore_path = temp_dir
372 .path()
373 .join("nonexistent.metadata.fb")
374 .as_path()
375 .display()
376 .to_string();
377 let err = manager.restore(&restore_path).await.unwrap_err();
378 assert_matches!(err, Error::ReadObject { .. })
379 }
380}