1pub mod file;
16
17use std::borrow::Cow;
18use std::fmt::{Display, Formatter};
19use std::path::{Path, PathBuf};
20use std::time::Instant;
21
22use common_telemetry::info;
23use file::{Metadata, MetadataContent};
24use futures::TryStreamExt;
25use object_store::ObjectStore;
26use snafu::{OptionExt, ResultExt};
27use strum::Display;
28
29use crate::error::{
30 Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
31 Result, WriteObjectSnafu,
32};
33use crate::kv_backend::KvBackendRef;
34use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
35use crate::rpc::store::{BatchPutRequest, RangeRequest};
36use crate::rpc::KeyValue;
37use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
38
39#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
41pub enum FileFormat {
42 #[strum(serialize = "fb")]
43 FlexBuffers,
44}
45
46impl TryFrom<&str> for FileFormat {
47 type Error = String;
48
49 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
50 match value.to_lowercase().as_str() {
51 "fb" => Ok(FileFormat::FlexBuffers),
52 _ => Err(format!("Invalid file format: {}", value)),
53 }
54 }
55}
56
57#[derive(Debug, PartialEq, Eq, Display)]
58#[strum(serialize_all = "lowercase")]
59pub enum DataType {
60 Metadata,
61}
62
63impl TryFrom<&str> for DataType {
64 type Error = String;
65
66 fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
67 match value.to_lowercase().as_str() {
68 "metadata" => Ok(DataType::Metadata),
69 _ => Err(format!("Invalid data type: {}", value)),
70 }
71 }
72}
73
74#[derive(Debug, PartialEq, Eq)]
75pub struct FileExtension {
76 format: FileFormat,
77 data_type: DataType,
78}
79
80impl FileExtension {
81 pub fn new(format: FileFormat, data_type: DataType) -> Self {
82 Self { format, data_type }
83 }
84}
85
86impl Display for FileExtension {
87 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88 write!(f, "{}.{}", self.data_type, self.format)
89 }
90}
91
92impl TryFrom<&str> for FileExtension {
93 type Error = Error;
94
95 fn try_from(value: &str) -> Result<Self> {
96 let parts = value.split(".").collect::<Vec<&str>>();
97 if parts.len() != 2 {
98 return InvalidFileExtensionSnafu {
99 reason: format!(
100 "Extension should be in the format of <datatype>.<format>, got: {}",
101 value
102 ),
103 }
104 .fail();
105 }
106
107 let data_type = DataType::try_from(parts[0])
108 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
109 let format = FileFormat::try_from(parts[1])
110 .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
111 Ok(FileExtension { format, data_type })
112 }
113}
114
115#[derive(Debug, PartialEq, Eq)]
116pub struct FileName {
117 name: String,
118 extension: FileExtension,
119}
120
121impl Display for FileName {
122 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123 write!(f, "{}.{}", self.name, self.extension)
124 }
125}
126
127impl TryFrom<&str> for FileName {
128 type Error = Error;
129
130 fn try_from(value: &str) -> Result<Self> {
131 let Some((name, extension)) = value.split_once(".") else {
132 return InvalidFileNameSnafu {
133 reason: format!(
134 "The file name should be in the format of <name>.<extension>, got: {}",
135 value
136 ),
137 }
138 .fail();
139 };
140 let extension = FileExtension::try_from(extension)?;
141 Ok(Self {
142 name: name.to_string(),
143 extension,
144 })
145 }
146}
147
148impl FileName {
149 fn new(name: String, extension: FileExtension) -> Self {
150 Self { name, extension }
151 }
152}
153
154pub struct MetadataSnapshotManager {
158 kv_backend: KvBackendRef,
159 object_store: ObjectStore,
160}
161
162const MAX_REQUEST_SIZE: usize = 1024 * 1024;
164
165impl MetadataSnapshotManager {
166 pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
167 Self {
168 kv_backend,
169 object_store,
170 }
171 }
172
173 pub async fn restore(&self, file_path: &str) -> Result<u64> {
175 let path = Path::new(file_path);
176
177 let file_name = path
178 .file_name()
179 .and_then(|s| s.to_str())
180 .context(InvalidFilePathSnafu { file_path })?;
181
182 let filename = FileName::try_from(file_name)?;
183 let data = self
184 .object_store
185 .read(file_path)
186 .await
187 .context(ReadObjectSnafu { file_path })?;
188 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
189 let metadata_content = document.into_metadata_content()?;
190 let mut req = BatchPutRequest::default();
191 let mut total_request_size = 0;
192 let mut count = 0;
193 let now = Instant::now();
194 for FileKeyValue { key, value } in metadata_content.into_iter() {
195 count += 1;
196 let key_size = key.len();
197 let value_size = value.len();
198 if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
199 self.kv_backend.batch_put(req).await?;
200 req = BatchPutRequest::default();
201 total_request_size = 0;
202 }
203 req.kvs.push(KeyValue { key, value });
204 total_request_size += key_size + value_size;
205 }
206 if !req.kvs.is_empty() {
207 self.kv_backend.batch_put(req).await?;
208 }
209
210 info!(
211 "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
212 file_path,
213 count,
214 now.elapsed()
215 );
216 Ok(count)
217 }
218
219 pub async fn check_target_source_clean(&self) -> Result<bool> {
220 let req = RangeRequest::new().with_range(vec![0], vec![0]);
221 let mut stream = Box::pin(
222 PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
223 );
224 let v = stream.as_mut().try_next().await?;
225 Ok(v.is_none())
226 }
227
228 pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
230 let format = FileFormat::FlexBuffers;
231 let filename = FileName::new(
232 filename_str.to_string(),
233 FileExtension {
234 format,
235 data_type: DataType::Metadata,
236 },
237 );
238 let file_path_buf = [path, filename.to_string().as_str()]
239 .iter()
240 .collect::<PathBuf>();
241 let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
242 reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
243 })?;
244 let now = Instant::now();
245 let req = RangeRequest::new().with_range(vec![0], vec![0]);
246 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
247 Ok(FileKeyValue {
248 key: kv.key,
249 value: kv.value,
250 })
251 })
252 .into_stream();
253 let keyvalues = stream.try_collect::<Vec<_>>().await?;
254 let num_keyvalues = keyvalues.len();
255 let document = Document::new(
256 Metadata::new(),
257 file::Content::Metadata(MetadataContent::new(keyvalues)),
258 );
259 let bytes = document.to_bytes(&format)?;
260 let r = self
261 .object_store
262 .write(file_path, bytes)
263 .await
264 .context(WriteObjectSnafu { file_path })?;
265 info!(
266 "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
267 file_path,
268 num_keyvalues,
269 r.content_length(),
270 now.elapsed()
271 );
272
273 Ok((filename.to_string(), num_keyvalues as u64))
274 }
275
276 fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
277 format!("{} => {}", key, value)
278 }
279
280 pub async fn info(
281 object_store: &ObjectStore,
282 file_path: &str,
283 query_str: &str,
284 limit: Option<usize>,
285 ) -> Result<Vec<String>> {
286 let path = Path::new(file_path);
287
288 let file_name = path
289 .file_name()
290 .and_then(|s| s.to_str())
291 .context(InvalidFilePathSnafu { file_path })?;
292
293 let filename = FileName::try_from(file_name)?;
294 let data = object_store
295 .read(file_path)
296 .await
297 .context(ReadObjectSnafu { file_path })?;
298 let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
299 let metadata_content = document.into_metadata_content()?.values();
300 let mut results = Vec::with_capacity(limit.unwrap_or(256));
301 for kv in metadata_content {
302 let key_str = String::from_utf8_lossy(&kv.key);
303 if let Some(prefix) = query_str.strip_suffix('*') {
304 if key_str.starts_with(prefix) {
305 let value_str = String::from_utf8_lossy(&kv.value);
306 results.push(Self::format_output(key_str, value_str));
307 }
308 } else if key_str == query_str {
309 let value_str = String::from_utf8_lossy(&kv.value);
310 results.push(Self::format_output(key_str, value_str));
311 }
312 if results.len() == limit.unwrap_or(usize::MAX) {
313 break;
314 }
315 }
316 Ok(results)
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use std::assert_matches::assert_matches;
323 use std::sync::Arc;
324
325 use common_test_util::temp_dir::{create_temp_dir, TempDir};
326 use object_store::services::Fs;
327
328 use super::*;
329 use crate::kv_backend::memory::MemoryKvBackend;
330 use crate::kv_backend::KvBackend;
331 use crate::rpc::store::PutRequest;
332
333 #[test]
334 fn test_file_name() {
335 let file_name = FileName::try_from("test.metadata.fb").unwrap();
336 assert_eq!(file_name.name, "test");
337 assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
338 assert_eq!(file_name.extension.data_type, DataType::Metadata);
339 assert_eq!(file_name.to_string(), "test.metadata.fb");
340
341 let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
342 assert_eq!(
343 invalid_file_name.to_string(),
344 "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
345 );
346
347 let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
348 assert_eq!(
349 invalid_file_extension.to_string(),
350 "Invalid file extension: Invalid file format: hello"
351 );
352 }
353
354 fn test_env(
355 prefix: &str,
356 ) -> (
357 TempDir,
358 Arc<MemoryKvBackend<Error>>,
359 MetadataSnapshotManager,
360 ) {
361 let temp_dir = create_temp_dir(prefix);
362 let kv_backend = Arc::new(MemoryKvBackend::default());
363 let temp_path = temp_dir.path();
364 let data_path = temp_path.join("data").as_path().display().to_string();
365 let builder = Fs::default().root(&data_path);
366 let object_store = ObjectStore::new(builder).unwrap().finish();
367 let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
368 (temp_dir, kv_backend, manager)
369 }
370
371 #[tokio::test]
372 async fn test_dump_and_restore() {
373 common_telemetry::init_default_ut_logging();
374 let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
375 let temp_path = temp_dir.path();
376
377 for i in 0..10 {
378 kv_backend
379 .put(
380 PutRequest::new()
381 .with_key(format!("test_{}", i).as_bytes().to_vec())
382 .with_value(format!("value_{}", i).as_bytes().to_vec()),
383 )
384 .await
385 .unwrap();
386 }
387 let dump_path = temp_path.join("snapshot");
388 manager
389 .dump(
390 &dump_path.as_path().display().to_string(),
391 "metadata_snapshot",
392 )
393 .await
394 .unwrap();
395 kv_backend.clear();
397
398 let restore_path = dump_path
399 .join("metadata_snapshot.metadata.fb")
400 .as_path()
401 .display()
402 .to_string();
403 manager.restore(&restore_path).await.unwrap();
404
405 for i in 0..10 {
406 let key = format!("test_{}", i);
407 let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
408 assert_eq!(value.value, format!("value_{}", i).as_bytes());
409 }
410 }
411
412 #[tokio::test]
413 async fn test_restore_from_nonexistent_file() {
414 let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
415 let restore_path = temp_dir
416 .path()
417 .join("nonexistent.metadata.fb")
418 .as_path()
419 .display()
420 .to_string();
421 let err = manager.restore(&restore_path).await.unwrap_err();
422 assert_matches!(err, Error::ReadObject { .. })
423 }
424}