1pub mod file;
16
17use std::borrow::Cow;
18use std::fmt::{Display, Formatter};
19use std::path::{Path, PathBuf};
20use std::time::Instant;
21
22use common_telemetry::info;
23use file::{Metadata, MetadataContent};
24use futures::{future, TryStreamExt};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ResultExt};
27use strum::Display;
28
29use crate::error::{
30 Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
31 Result, WriteObjectSnafu,
32};
33use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
34use crate::kv_backend::KvBackendRef;
35use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
36use crate::rpc::store::{BatchPutRequest, RangeRequest};
37use crate::rpc::KeyValue;
38use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
39
40#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
42pub enum FileFormat {
43 #[strum(serialize = "fb")]
44 FlexBuffers,
45}
46
47impl TryFrom<&str> for FileFormat {
48 type Error = String;
49
50 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
51 match value.to_lowercase().as_str() {
52 "fb" => Ok(FileFormat::FlexBuffers),
53 _ => Err(format!("Invalid file format: {}", value)),
54 }
55 }
56}
57
58#[derive(Debug, PartialEq, Eq, Display)]
59#[strum(serialize_all = "lowercase")]
60pub enum DataType {
61 Metadata,
62}
63
64impl TryFrom<&str> for DataType {
65 type Error = String;
66
67 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
68 match value.to_lowercase().as_str() {
69 "metadata" => Ok(DataType::Metadata),
70 _ => Err(format!("Invalid data type: {}", value)),
71 }
72 }
73}
74
75#[derive(Debug, PartialEq, Eq)]
76pub struct FileExtension {
77 format: FileFormat,
78 data_type: DataType,
79}
80
81impl FileExtension {
82 pub fn new(format: FileFormat, data_type: DataType) -> Self {
83 Self { format, data_type }
84 }
85}
86
87impl Display for FileExtension {
88 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89 write!(f, "{}.{}", self.data_type, self.format)
90 }
91}
92
93impl TryFrom<&str> for FileExtension {
94 type Error = Error;
95
96 fn try_from(value: &str) -> Result<Self> {
97 let parts = value.split(".").collect::<Vec<&str>>();
98 if parts.len() != 2 {
99 return InvalidFileExtensionSnafu {
100 reason: format!(
101 "Extension should be in the format of <datatype>.<format>, got: {}",
102 value
103 ),
104 }
105 .fail();
106 }
107
108 let data_type = DataType::try_from(parts[0])
109 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
110 let format = FileFormat::try_from(parts[1])
111 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
112 Ok(FileExtension { format, data_type })
113 }
114}
115
116#[derive(Debug, PartialEq, Eq)]
117pub struct FileName {
118 name: String,
119 extension: FileExtension,
120}
121
122impl Display for FileName {
123 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
124 write!(f, "{}.{}", self.name, self.extension)
125 }
126}
127
128impl TryFrom<&str> for FileName {
129 type Error = Error;
130
131 fn try_from(value: &str) -> Result<Self> {
132 let Some((name, extension)) = value.split_once(".") else {
133 return InvalidFileNameSnafu {
134 reason: format!(
135 "The file name should be in the format of <name>.<extension>, got: {}",
136 value
137 ),
138 }
139 .fail();
140 };
141 let extension = FileExtension::try_from(extension)?;
142 Ok(Self {
143 name: name.to_string(),
144 extension,
145 })
146 }
147}
148
149impl FileName {
150 fn new(name: String, extension: FileExtension) -> Self {
151 Self { name, extension }
152 }
153}
154
155pub struct MetadataSnapshotManager {
159 kv_backend: KvBackendRef,
160 object_store: ObjectStore,
161}
162
163const MAX_REQUEST_SIZE: usize = 1024 * 1024;
165
166fn is_internal_key(kv: &FileKeyValue) -> bool {
168 kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes())
169}
170
171impl MetadataSnapshotManager {
172 pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
173 Self {
174 kv_backend,
175 object_store,
176 }
177 }
178
179 pub async fn restore(&self, file_path: &str) -> Result<u64> {
181 let path = Path::new(file_path);
182
183 let file_name = path
184 .file_name()
185 .and_then(|s| s.to_str())
186 .context(InvalidFilePathSnafu { file_path })?;
187
188 let filename = FileName::try_from(file_name)?;
189 let data = self
190 .object_store
191 .read(file_path)
192 .await
193 .context(ReadObjectSnafu { file_path })?;
194 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
195 let metadata_content = document.into_metadata_content()?;
196 let mut req = BatchPutRequest::default();
197 let mut total_request_size = 0;
198 let mut count = 0;
199 let now = Instant::now();
200 for FileKeyValue { key, value } in metadata_content.into_iter() {
201 count += 1;
202 let key_size = key.len();
203 let value_size = value.len();
204 if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
205 self.kv_backend.batch_put(req).await?;
206 req = BatchPutRequest::default();
207 total_request_size = 0;
208 }
209 req.kvs.push(KeyValue { key, value });
210 total_request_size += key_size + value_size;
211 }
212 if !req.kvs.is_empty() {
213 self.kv_backend.batch_put(req).await?;
214 }
215
216 info!(
217 "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
218 file_path,
219 count,
220 now.elapsed()
221 );
222 Ok(count)
223 }
224
225 pub async fn check_target_source_clean(&self) -> Result<bool> {
226 let req = RangeRequest::new().with_range(vec![0], vec![0]);
227 let mut stream = Box::pin(
228 PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
229 );
230 let v = stream.as_mut().try_next().await?;
231 Ok(v.is_none())
232 }
233
234 pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
236 let format = FileFormat::FlexBuffers;
237 let filename = FileName::new(
238 filename_str.to_string(),
239 FileExtension {
240 format,
241 data_type: DataType::Metadata,
242 },
243 );
244 let file_path_buf = [path, filename.to_string().as_str()]
245 .iter()
246 .collect::<PathBuf>();
247 let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
248 reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
249 })?;
250 let now = Instant::now();
251 let req = RangeRequest::new().with_range(vec![0], vec![0]);
252 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
253 Ok(FileKeyValue {
254 key: kv.key,
255 value: kv.value,
256 })
257 })
258 .into_stream();
259 let keyvalues = stream
260 .try_filter(|f| future::ready(!is_internal_key(f)))
261 .try_collect::<Vec<_>>()
262 .await?;
263 let num_keyvalues = keyvalues.len();
264 let document = Document::new(
265 Metadata::new(),
266 file::Content::Metadata(MetadataContent::new(keyvalues)),
267 );
268 let bytes = document.to_bytes(&format)?;
269 let r = self
270 .object_store
271 .write(file_path, bytes)
272 .await
273 .context(WriteObjectSnafu { file_path })?;
274 info!(
275 "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
276 file_path,
277 num_keyvalues,
278 r.content_length(),
279 now.elapsed()
280 );
281
282 Ok((filename.to_string(), num_keyvalues as u64))
283 }
284
285 fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
286 format!("{} => {}", key, value)
287 }
288
289 pub async fn info(
290 object_store: &ObjectStore,
291 file_path: &str,
292 query_str: &str,
293 limit: Option<usize>,
294 ) -> Result<Vec<String>> {
295 let path = Path::new(file_path);
296
297 let file_name = path
298 .file_name()
299 .and_then(|s| s.to_str())
300 .context(InvalidFilePathSnafu { file_path })?;
301
302 let filename = FileName::try_from(file_name)?;
303 let data = object_store
304 .read(file_path)
305 .await
306 .context(ReadObjectSnafu { file_path })?;
307 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
308 let metadata_content = document.into_metadata_content()?.values();
309 let mut results = Vec::with_capacity(limit.unwrap_or(256));
310 for kv in metadata_content {
311 let key_str = String::from_utf8_lossy(&kv.key);
312 if let Some(prefix) = query_str.strip_suffix('*') {
313 if key_str.starts_with(prefix) {
314 let value_str = String::from_utf8_lossy(&kv.value);
315 results.push(Self::format_output(key_str, value_str));
316 }
317 } else if key_str == query_str {
318 let value_str = String::from_utf8_lossy(&kv.value);
319 results.push(Self::format_output(key_str, value_str));
320 }
321 if results.len() == limit.unwrap_or(usize::MAX) {
322 break;
323 }
324 }
325 Ok(results)
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use std::assert_matches::assert_matches;
332 use std::sync::Arc;
333
334 use common_test_util::temp_dir::{create_temp_dir, TempDir};
335 use object_store::services::Fs;
336
337 use super::*;
338 use crate::kv_backend::memory::MemoryKvBackend;
339 use crate::kv_backend::KvBackend;
340 use crate::rpc::store::PutRequest;
341
342 #[test]
343 fn test_file_name() {
344 let file_name = FileName::try_from("test.metadata.fb").unwrap();
345 assert_eq!(file_name.name, "test");
346 assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
347 assert_eq!(file_name.extension.data_type, DataType::Metadata);
348 assert_eq!(file_name.to_string(), "test.metadata.fb");
349
350 let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
351 assert_eq!(
352 invalid_file_name.to_string(),
353 "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
354 );
355
356 let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
357 assert_eq!(
358 invalid_file_extension.to_string(),
359 "Invalid file extension: Invalid file format: hello"
360 );
361 }
362
363 fn test_env(
364 prefix: &str,
365 ) -> (
366 TempDir,
367 Arc<MemoryKvBackend<Error>>,
368 MetadataSnapshotManager,
369 ) {
370 let temp_dir = create_temp_dir(prefix);
371 let kv_backend = Arc::new(MemoryKvBackend::default());
372 let temp_path = temp_dir.path();
373 let data_path = temp_path.join("data").as_path().display().to_string();
374 let builder = Fs::default().root(&data_path);
375 let object_store = ObjectStore::new(builder).unwrap().finish();
376 let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
377 (temp_dir, kv_backend, manager)
378 }
379
380 #[tokio::test]
381 async fn test_dump_and_restore() {
382 common_telemetry::init_default_ut_logging();
383 let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
384 let temp_path = temp_dir.path();
385
386 for i in 0..10 {
387 kv_backend
388 .put(
389 PutRequest::new()
390 .with_key(format!("test_{}", i).as_bytes().to_vec())
391 .with_value(format!("value_{}", i).as_bytes().to_vec()),
392 )
393 .await
394 .unwrap();
395 }
396 let dump_path = temp_path.join("snapshot");
397 manager
398 .dump(
399 &dump_path.as_path().display().to_string(),
400 "metadata_snapshot",
401 )
402 .await
403 .unwrap();
404 kv_backend.clear();
406
407 let restore_path = dump_path
408 .join("metadata_snapshot.metadata.fb")
409 .as_path()
410 .display()
411 .to_string();
412 manager.restore(&restore_path).await.unwrap();
413
414 for i in 0..10 {
415 let key = format!("test_{}", i);
416 let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
417 assert_eq!(value.value, format!("value_{}", i).as_bytes());
418 }
419 }
420
421 #[tokio::test]
422 async fn test_restore_from_nonexistent_file() {
423 let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
424 let restore_path = temp_dir
425 .path()
426 .join("nonexistent.metadata.fb")
427 .as_path()
428 .display()
429 .to_string();
430 let err = manager.restore(&restore_path).await.unwrap_err();
431 assert_matches!(err, Error::ReadObject { .. })
432 }
433}