common_meta/
snapshot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod file;
16
17use std::borrow::Cow;
18use std::fmt::{Display, Formatter};
19use std::path::Path;
20use std::time::Instant;
21
22use common_telemetry::info;
23use file::{Metadata, MetadataContent};
24use futures::{TryStreamExt, future};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ResultExt, ensure};
27use strum::Display;
28
29use crate::error::{
30    Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
31    Result, UnexpectedSnafu, WriteObjectSnafu,
32};
33use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
34use crate::kv_backend::KvBackendRef;
35use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
36use crate::rpc::KeyValue;
37use crate::rpc::store::{BatchPutRequest, RangeRequest};
38use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
39
40/// The format of the backup file.
41#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
42pub enum FileFormat {
43    #[strum(serialize = "fb")]
44    FlexBuffers,
45}
46
47impl TryFrom<&str> for FileFormat {
48    type Error = String;
49
50    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
51        match value.to_lowercase().as_str() {
52            "fb" => Ok(FileFormat::FlexBuffers),
53            _ => Err(format!("Invalid file format: {}", value)),
54        }
55    }
56}
57
58#[derive(Debug, PartialEq, Eq, Display)]
59#[strum(serialize_all = "lowercase")]
60pub enum DataType {
61    Metadata,
62}
63
64impl TryFrom<&str> for DataType {
65    type Error = String;
66
67    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
68        match value.to_lowercase().as_str() {
69            "metadata" => Ok(DataType::Metadata),
70            _ => Err(format!("Invalid data type: {}", value)),
71        }
72    }
73}
74
75#[derive(Debug, PartialEq, Eq)]
76pub struct FileExtension {
77    format: FileFormat,
78    data_type: DataType,
79}
80
81impl FileExtension {
82    pub fn new(format: FileFormat, data_type: DataType) -> Self {
83        Self { format, data_type }
84    }
85}
86
87impl Display for FileExtension {
88    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89        write!(f, "{}.{}", self.data_type, self.format)
90    }
91}
92
93impl TryFrom<&str> for FileExtension {
94    type Error = Error;
95
96    fn try_from(value: &str) -> Result<Self> {
97        let parts = value.split(".").collect::<Vec<&str>>();
98        if parts.len() != 2 {
99            return InvalidFileExtensionSnafu {
100                reason: format!(
101                    "Extension should be in the format of <datatype>.<format>, got: {}",
102                    value
103                ),
104            }
105            .fail();
106        }
107
108        let data_type = DataType::try_from(parts[0])
109            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
110        let format = FileFormat::try_from(parts[1])
111            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
112        Ok(FileExtension { format, data_type })
113    }
114}
115
116#[derive(Debug, PartialEq, Eq)]
117pub struct FileName {
118    name: String,
119    extension: FileExtension,
120}
121
122impl Display for FileName {
123    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
124        write!(f, "{}.{}", self.name, self.extension)
125    }
126}
127
128impl TryFrom<&str> for FileName {
129    type Error = Error;
130
131    fn try_from(value: &str) -> Result<Self> {
132        let Some((name, extension)) = value.split_once(".") else {
133            return InvalidFileNameSnafu {
134                reason: format!(
135                    "The file name should be in the format of <name>.<extension>, got: {}",
136                    value
137                ),
138            }
139            .fail();
140        };
141        let extension = FileExtension::try_from(extension)?;
142        Ok(Self {
143            name: name.to_string(),
144            extension,
145        })
146    }
147}
148
149impl FileName {
150    fn new(name: String, extension: FileExtension) -> Self {
151        Self { name, extension }
152    }
153}
154
155/// The manager of the metadata snapshot.
156///
157/// It manages the metadata snapshot, including dumping and restoring.
158pub struct MetadataSnapshotManager {
159    kv_backend: KvBackendRef,
160    object_store: ObjectStore,
161}
162
163/// The maximum size of the request to put metadata, use 1MiB by default.
164const MAX_REQUEST_SIZE: usize = 1024 * 1024;
165
166/// Returns true if the key is an internal key.
167fn is_internal_key(kv: &FileKeyValue) -> bool {
168    kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes())
169}
170
171impl MetadataSnapshotManager {
172    pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
173        Self {
174            kv_backend,
175            object_store,
176        }
177    }
178
179    /// Restores the metadata from the backup file to the metadata store.
180    pub async fn restore(&self, file_path: &str) -> Result<u64> {
181        let path = Path::new(file_path);
182
183        let file_name = path
184            .file_name()
185            .and_then(|s| s.to_str())
186            .context(InvalidFilePathSnafu { file_path })?;
187
188        let filename = FileName::try_from(file_name)?;
189        let data = self
190            .object_store
191            .read(file_path)
192            .await
193            .context(ReadObjectSnafu { file_path })?;
194        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
195        let metadata_content = document.into_metadata_content()?;
196        let mut req = BatchPutRequest::default();
197        let mut total_request_size = 0;
198        let mut count = 0;
199        let now = Instant::now();
200        for FileKeyValue { key, value } in metadata_content.into_iter() {
201            count += 1;
202            let key_size = key.len();
203            let value_size = value.len();
204            if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
205                self.kv_backend.batch_put(req).await?;
206                req = BatchPutRequest::default();
207                total_request_size = 0;
208            }
209            req.kvs.push(KeyValue { key, value });
210            total_request_size += key_size + value_size;
211        }
212        if !req.kvs.is_empty() {
213            self.kv_backend.batch_put(req).await?;
214        }
215
216        info!(
217            "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
218            file_path,
219            count,
220            now.elapsed()
221        );
222        Ok(count)
223    }
224
225    pub async fn check_target_source_clean(&self) -> Result<bool> {
226        let req = RangeRequest::new().with_range(vec![0], vec![0]);
227        let mut stream = Box::pin(
228            PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
229        );
230        let v = stream.as_mut().try_next().await?;
231        Ok(v.is_none())
232    }
233
234    /// Dumps the metadata to the backup file.
235    pub async fn dump(&self, file_path_str: &str) -> Result<(String, u64)> {
236        let format = FileFormat::FlexBuffers;
237
238        let path = Path::new(file_path_str)
239            .parent()
240            .context(InvalidFilePathSnafu {
241                file_path: file_path_str,
242            })?;
243        let raw_file_name = Path::new(file_path_str)
244            .file_name()
245            .and_then(|s| s.to_str())
246            .context(InvalidFilePathSnafu {
247                file_path: file_path_str,
248            })?;
249        let parsed_file_name = FileName::try_from(raw_file_name).unwrap_or_else(|_| {
250            FileName::new(
251                raw_file_name.to_string(),
252                FileExtension {
253                    format,
254                    data_type: DataType::Metadata,
255                },
256            )
257        });
258        let file_path = path.join(parsed_file_name.to_string());
259        let file_path = file_path.to_str().context(InvalidFilePathSnafu {
260            file_path: file_path_str,
261        })?;
262
263        // Ensure the file does not exist
264        ensure!(
265            !self
266                .object_store
267                .exists(file_path)
268                .await
269                .context(ReadObjectSnafu { file_path })?,
270            UnexpectedSnafu {
271                err_msg: format!(
272                    "The file '{}' already exists. Please choose a different name or remove the existing file before proceeding.",
273                    file_path
274                ),
275            }
276        );
277        let now = Instant::now();
278        let req = RangeRequest::new().with_range(vec![0], vec![0]);
279        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
280            Ok(FileKeyValue {
281                key: kv.key,
282                value: kv.value,
283            })
284        })
285        .into_stream();
286        let keyvalues = stream
287            .try_filter(|f| future::ready(!is_internal_key(f)))
288            .try_collect::<Vec<_>>()
289            .await?;
290        let num_keyvalues = keyvalues.len();
291        let document = Document::new(
292            Metadata::new(),
293            file::Content::Metadata(MetadataContent::new(keyvalues)),
294        );
295        let bytes = document.to_bytes(&format)?;
296        let r = self
297            .object_store
298            .write(file_path, bytes)
299            .await
300            .context(WriteObjectSnafu { file_path })?;
301        info!(
302            "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
303            file_path,
304            num_keyvalues,
305            r.content_length(),
306            now.elapsed()
307        );
308
309        Ok((parsed_file_name.to_string(), num_keyvalues as u64))
310    }
311
312    fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
313        format!("{} => {}", key, value)
314    }
315
316    pub async fn info(
317        object_store: &ObjectStore,
318        file_path: &str,
319        query_str: &str,
320        limit: Option<usize>,
321    ) -> Result<Vec<String>> {
322        let path = Path::new(file_path);
323
324        let file_name = path
325            .file_name()
326            .and_then(|s| s.to_str())
327            .context(InvalidFilePathSnafu { file_path })?;
328
329        let filename = FileName::try_from(file_name)?;
330        let data = object_store
331            .read(file_path)
332            .await
333            .context(ReadObjectSnafu { file_path })?;
334        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
335        let metadata_content = document.into_metadata_content()?.values();
336        let mut results = Vec::with_capacity(limit.unwrap_or(256));
337        for kv in metadata_content {
338            let key_str = String::from_utf8_lossy(&kv.key);
339            if let Some(prefix) = query_str.strip_suffix('*') {
340                if key_str.starts_with(prefix) {
341                    let value_str = String::from_utf8_lossy(&kv.value);
342                    results.push(Self::format_output(key_str, value_str));
343                }
344            } else if key_str == query_str {
345                let value_str = String::from_utf8_lossy(&kv.value);
346                results.push(Self::format_output(key_str, value_str));
347            }
348            if results.len() == limit.unwrap_or(usize::MAX) {
349                break;
350            }
351        }
352        Ok(results)
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use std::assert_matches::assert_matches;
359    use std::sync::Arc;
360
361    use common_test_util::temp_dir::{TempDir, create_temp_dir};
362    use object_store::services::Fs;
363
364    use super::*;
365    use crate::kv_backend::KvBackend;
366    use crate::kv_backend::memory::MemoryKvBackend;
367    use crate::rpc::store::PutRequest;
368
369    #[test]
370    fn test_file_name() {
371        let file_name = FileName::try_from("test.metadata.fb").unwrap();
372        assert_eq!(file_name.name, "test");
373        assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
374        assert_eq!(file_name.extension.data_type, DataType::Metadata);
375        assert_eq!(file_name.to_string(), "test.metadata.fb");
376
377        let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
378        assert_eq!(
379            invalid_file_name.to_string(),
380            "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
381        );
382
383        let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
384        assert_eq!(
385            invalid_file_extension.to_string(),
386            "Invalid file extension: Invalid file format: hello"
387        );
388    }
389
390    fn test_env(
391        prefix: &str,
392    ) -> (
393        TempDir,
394        Arc<MemoryKvBackend<Error>>,
395        MetadataSnapshotManager,
396    ) {
397        let temp_dir = create_temp_dir(prefix);
398        let kv_backend = Arc::new(MemoryKvBackend::default());
399        let temp_path = temp_dir.path();
400        let data_path = temp_path.join("data").as_path().display().to_string();
401        let builder = Fs::default().root(&data_path);
402        let object_store = ObjectStore::new(builder).unwrap().finish();
403        let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
404        (temp_dir, kv_backend, manager)
405    }
406
407    #[tokio::test]
408    async fn test_dump_and_restore() {
409        common_telemetry::init_default_ut_logging();
410        let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
411        let temp_path = temp_dir.path();
412
413        for i in 0..10 {
414            kv_backend
415                .put(
416                    PutRequest::new()
417                        .with_key(format!("test_{}", i).as_bytes().to_vec())
418                        .with_value(format!("value_{}", i).as_bytes().to_vec()),
419                )
420                .await
421                .unwrap();
422        }
423        let dump_path = temp_path.join("snapshot");
424        manager
425            .dump(&format!(
426                "{}/metadata_snapshot",
427                &dump_path.as_path().display().to_string()
428            ))
429            .await
430            .unwrap();
431        // Clean up the kv backend
432        kv_backend.clear();
433        let err = manager
434            .dump(&format!(
435                "{}/metadata_snapshot.metadata.fb",
436                &dump_path.as_path().display().to_string()
437            ))
438            .await
439            .unwrap_err();
440        assert_matches!(err, Error::Unexpected { .. });
441        assert!(err.to_string().contains("already exists"));
442
443        let restore_path = dump_path
444            .join("metadata_snapshot.metadata.fb")
445            .as_path()
446            .display()
447            .to_string();
448        manager.restore(&restore_path).await.unwrap();
449
450        for i in 0..10 {
451            let key = format!("test_{}", i);
452            let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
453            assert_eq!(value.value, format!("value_{}", i).as_bytes());
454        }
455    }
456
457    #[tokio::test]
458    async fn test_restore_from_nonexistent_file() {
459        let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
460        let restore_path = temp_dir
461            .path()
462            .join("nonexistent.metadata.fb")
463            .as_path()
464            .display()
465            .to_string();
466        let err = manager.restore(&restore_path).await.unwrap_err();
467        assert_matches!(err, Error::ReadObject { .. })
468    }
469}