1pub mod file;
16
17use std::borrow::Cow;
18use std::fmt::{Display, Formatter};
19use std::path::Path;
20use std::time::Instant;
21
22use common_telemetry::info;
23use file::{Metadata, MetadataContent};
24use futures::{TryStreamExt, future};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ResultExt, ensure};
27use strum::Display;
28
29use crate::error::{
30 Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
31 Result, UnexpectedSnafu, WriteObjectSnafu,
32};
33use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
34use crate::kv_backend::KvBackendRef;
35use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
36use crate::rpc::KeyValue;
37use crate::rpc::store::{BatchPutRequest, RangeRequest};
38use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
39
40#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
42pub enum FileFormat {
43 #[strum(serialize = "fb")]
44 FlexBuffers,
45}
46
47impl TryFrom<&str> for FileFormat {
48 type Error = String;
49
50 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
51 match value.to_lowercase().as_str() {
52 "fb" => Ok(FileFormat::FlexBuffers),
53 _ => Err(format!("Invalid file format: {}", value)),
54 }
55 }
56}
57
58#[derive(Debug, PartialEq, Eq, Display)]
59#[strum(serialize_all = "lowercase")]
60pub enum DataType {
61 Metadata,
62}
63
64impl TryFrom<&str> for DataType {
65 type Error = String;
66
67 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
68 match value.to_lowercase().as_str() {
69 "metadata" => Ok(DataType::Metadata),
70 _ => Err(format!("Invalid data type: {}", value)),
71 }
72 }
73}
74
75#[derive(Debug, PartialEq, Eq)]
76pub struct FileExtension {
77 format: FileFormat,
78 data_type: DataType,
79}
80
81impl FileExtension {
82 pub fn new(format: FileFormat, data_type: DataType) -> Self {
83 Self { format, data_type }
84 }
85}
86
87impl Display for FileExtension {
88 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89 write!(f, "{}.{}", self.data_type, self.format)
90 }
91}
92
93impl TryFrom<&str> for FileExtension {
94 type Error = Error;
95
96 fn try_from(value: &str) -> Result<Self> {
97 let parts = value.split(".").collect::<Vec<&str>>();
98 if parts.len() != 2 {
99 return InvalidFileExtensionSnafu {
100 reason: format!(
101 "Extension should be in the format of <datatype>.<format>, got: {}",
102 value
103 ),
104 }
105 .fail();
106 }
107
108 let data_type = DataType::try_from(parts[0])
109 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
110 let format = FileFormat::try_from(parts[1])
111 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
112 Ok(FileExtension { format, data_type })
113 }
114}
115
116#[derive(Debug, PartialEq, Eq)]
117pub struct FileName {
118 name: String,
119 extension: FileExtension,
120}
121
122impl Display for FileName {
123 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
124 write!(f, "{}.{}", self.name, self.extension)
125 }
126}
127
128impl TryFrom<&str> for FileName {
129 type Error = Error;
130
131 fn try_from(value: &str) -> Result<Self> {
132 let Some((name, extension)) = value.split_once(".") else {
133 return InvalidFileNameSnafu {
134 reason: format!(
135 "The file name should be in the format of <name>.<extension>, got: {}",
136 value
137 ),
138 }
139 .fail();
140 };
141 let extension = FileExtension::try_from(extension)?;
142 Ok(Self {
143 name: name.to_string(),
144 extension,
145 })
146 }
147}
148
149impl FileName {
150 fn new(name: String, extension: FileExtension) -> Self {
151 Self { name, extension }
152 }
153}
154
155pub struct MetadataSnapshotManager {
159 kv_backend: KvBackendRef,
160 object_store: ObjectStore,
161}
162
163const MAX_REQUEST_SIZE: usize = 1024 * 1024;
165
166fn is_internal_key(kv: &FileKeyValue) -> bool {
168 kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes())
169}
170
171impl MetadataSnapshotManager {
172 pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
173 Self {
174 kv_backend,
175 object_store,
176 }
177 }
178
179 pub async fn restore(&self, file_path: &str) -> Result<u64> {
181 let path = Path::new(file_path);
182
183 let file_name = path
184 .file_name()
185 .and_then(|s| s.to_str())
186 .context(InvalidFilePathSnafu { file_path })?;
187
188 let filename = FileName::try_from(file_name)?;
189 let data = self
190 .object_store
191 .read(file_path)
192 .await
193 .context(ReadObjectSnafu { file_path })?;
194 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
195 let metadata_content = document.into_metadata_content()?;
196 let mut req = BatchPutRequest::default();
197 let mut total_request_size = 0;
198 let mut count = 0;
199 let now = Instant::now();
200 for FileKeyValue { key, value } in metadata_content.into_iter() {
201 count += 1;
202 let key_size = key.len();
203 let value_size = value.len();
204 if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
205 self.kv_backend.batch_put(req).await?;
206 req = BatchPutRequest::default();
207 total_request_size = 0;
208 }
209 req.kvs.push(KeyValue { key, value });
210 total_request_size += key_size + value_size;
211 }
212 if !req.kvs.is_empty() {
213 self.kv_backend.batch_put(req).await?;
214 }
215
216 info!(
217 "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
218 file_path,
219 count,
220 now.elapsed()
221 );
222 Ok(count)
223 }
224
225 pub async fn check_target_source_clean(&self) -> Result<bool> {
226 let req = RangeRequest::new().with_range(vec![0], vec![0]);
227 let mut stream = Box::pin(
228 PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
229 );
230 let v = stream.as_mut().try_next().await?;
231 Ok(v.is_none())
232 }
233
234 pub async fn dump(&self, file_path_str: &str) -> Result<(String, u64)> {
236 let format = FileFormat::FlexBuffers;
237
238 let path = Path::new(file_path_str)
239 .parent()
240 .context(InvalidFilePathSnafu {
241 file_path: file_path_str,
242 })?;
243 let raw_file_name = Path::new(file_path_str)
244 .file_name()
245 .and_then(|s| s.to_str())
246 .context(InvalidFilePathSnafu {
247 file_path: file_path_str,
248 })?;
249 let parsed_file_name = FileName::try_from(raw_file_name).unwrap_or_else(|_| {
250 FileName::new(
251 raw_file_name.to_string(),
252 FileExtension {
253 format,
254 data_type: DataType::Metadata,
255 },
256 )
257 });
258 let file_path = path.join(parsed_file_name.to_string());
259 let file_path = file_path.to_str().context(InvalidFilePathSnafu {
260 file_path: file_path_str,
261 })?;
262
263 ensure!(
265 !self
266 .object_store
267 .exists(file_path)
268 .await
269 .context(ReadObjectSnafu { file_path })?,
270 UnexpectedSnafu {
271 err_msg: format!(
272 "The file '{}' already exists. Please choose a different name or remove the existing file before proceeding.",
273 file_path
274 ),
275 }
276 );
277 let now = Instant::now();
278 let req = RangeRequest::new().with_range(vec![0], vec![0]);
279 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
280 Ok(FileKeyValue {
281 key: kv.key,
282 value: kv.value,
283 })
284 })
285 .into_stream();
286 let keyvalues = stream
287 .try_filter(|f| future::ready(!is_internal_key(f)))
288 .try_collect::<Vec<_>>()
289 .await?;
290 let num_keyvalues = keyvalues.len();
291 let document = Document::new(
292 Metadata::new(),
293 file::Content::Metadata(MetadataContent::new(keyvalues)),
294 );
295 let bytes = document.to_bytes(&format)?;
296 let r = self
297 .object_store
298 .write(file_path, bytes)
299 .await
300 .context(WriteObjectSnafu { file_path })?;
301 info!(
302 "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
303 file_path,
304 num_keyvalues,
305 r.content_length(),
306 now.elapsed()
307 );
308
309 Ok((parsed_file_name.to_string(), num_keyvalues as u64))
310 }
311
312 fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
313 format!("{} => {}", key, value)
314 }
315
316 pub async fn info(
317 object_store: &ObjectStore,
318 file_path: &str,
319 query_str: &str,
320 limit: Option<usize>,
321 ) -> Result<Vec<String>> {
322 let path = Path::new(file_path);
323
324 let file_name = path
325 .file_name()
326 .and_then(|s| s.to_str())
327 .context(InvalidFilePathSnafu { file_path })?;
328
329 let filename = FileName::try_from(file_name)?;
330 let data = object_store
331 .read(file_path)
332 .await
333 .context(ReadObjectSnafu { file_path })?;
334 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
335 let metadata_content = document.into_metadata_content()?.values();
336 let mut results = Vec::with_capacity(limit.unwrap_or(256));
337 for kv in metadata_content {
338 let key_str = String::from_utf8_lossy(&kv.key);
339 if let Some(prefix) = query_str.strip_suffix('*') {
340 if key_str.starts_with(prefix) {
341 let value_str = String::from_utf8_lossy(&kv.value);
342 results.push(Self::format_output(key_str, value_str));
343 }
344 } else if key_str == query_str {
345 let value_str = String::from_utf8_lossy(&kv.value);
346 results.push(Self::format_output(key_str, value_str));
347 }
348 if results.len() == limit.unwrap_or(usize::MAX) {
349 break;
350 }
351 }
352 Ok(results)
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use std::assert_matches::assert_matches;
359 use std::sync::Arc;
360
361 use common_test_util::temp_dir::{TempDir, create_temp_dir};
362 use object_store::services::Fs;
363
364 use super::*;
365 use crate::kv_backend::KvBackend;
366 use crate::kv_backend::memory::MemoryKvBackend;
367 use crate::rpc::store::PutRequest;
368
369 #[test]
370 fn test_file_name() {
371 let file_name = FileName::try_from("test.metadata.fb").unwrap();
372 assert_eq!(file_name.name, "test");
373 assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
374 assert_eq!(file_name.extension.data_type, DataType::Metadata);
375 assert_eq!(file_name.to_string(), "test.metadata.fb");
376
377 let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
378 assert_eq!(
379 invalid_file_name.to_string(),
380 "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
381 );
382
383 let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
384 assert_eq!(
385 invalid_file_extension.to_string(),
386 "Invalid file extension: Invalid file format: hello"
387 );
388 }
389
390 fn test_env(
391 prefix: &str,
392 ) -> (
393 TempDir,
394 Arc<MemoryKvBackend<Error>>,
395 MetadataSnapshotManager,
396 ) {
397 let temp_dir = create_temp_dir(prefix);
398 let kv_backend = Arc::new(MemoryKvBackend::default());
399 let temp_path = temp_dir.path();
400 let data_path = temp_path.join("data").as_path().display().to_string();
401 let builder = Fs::default().root(&data_path);
402 let object_store = ObjectStore::new(builder).unwrap().finish();
403 let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
404 (temp_dir, kv_backend, manager)
405 }
406
407 #[tokio::test]
408 async fn test_dump_and_restore() {
409 common_telemetry::init_default_ut_logging();
410 let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
411 let temp_path = temp_dir.path();
412
413 for i in 0..10 {
414 kv_backend
415 .put(
416 PutRequest::new()
417 .with_key(format!("test_{}", i).as_bytes().to_vec())
418 .with_value(format!("value_{}", i).as_bytes().to_vec()),
419 )
420 .await
421 .unwrap();
422 }
423 let dump_path = temp_path.join("snapshot");
424 manager
425 .dump(&format!(
426 "{}/metadata_snapshot",
427 &dump_path.as_path().display().to_string()
428 ))
429 .await
430 .unwrap();
431 kv_backend.clear();
433 let err = manager
434 .dump(&format!(
435 "{}/metadata_snapshot.metadata.fb",
436 &dump_path.as_path().display().to_string()
437 ))
438 .await
439 .unwrap_err();
440 assert_matches!(err, Error::Unexpected { .. });
441 assert!(err.to_string().contains("already exists"));
442
443 let restore_path = dump_path
444 .join("metadata_snapshot.metadata.fb")
445 .as_path()
446 .display()
447 .to_string();
448 manager.restore(&restore_path).await.unwrap();
449
450 for i in 0..10 {
451 let key = format!("test_{}", i);
452 let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
453 assert_eq!(value.value, format!("value_{}", i).as_bytes());
454 }
455 }
456
457 #[tokio::test]
458 async fn test_restore_from_nonexistent_file() {
459 let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
460 let restore_path = temp_dir
461 .path()
462 .join("nonexistent.metadata.fb")
463 .as_path()
464 .display()
465 .to_string();
466 let err = manager.restore(&restore_path).await.unwrap_err();
467 assert_matches!(err, Error::ReadObject { .. })
468 }
469}