common_meta/
snapshot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod file;
16
17use std::borrow::Cow;
18use std::fmt::{Display, Formatter};
19use std::path::{Path, PathBuf};
20use std::time::Instant;
21
22use common_telemetry::info;
23use file::{Metadata, MetadataContent};
24use futures::{future, TryStreamExt};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ResultExt};
27use strum::Display;
28
29use crate::error::{
30    Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
31    Result, WriteObjectSnafu,
32};
33use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
34use crate::kv_backend::KvBackendRef;
35use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
36use crate::rpc::store::{BatchPutRequest, RangeRequest};
37use crate::rpc::KeyValue;
38use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
39
40/// The format of the backup file.
41#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
42pub enum FileFormat {
43    #[strum(serialize = "fb")]
44    FlexBuffers,
45}
46
47impl TryFrom<&str> for FileFormat {
48    type Error = String;
49
50    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
51        match value.to_lowercase().as_str() {
52            "fb" => Ok(FileFormat::FlexBuffers),
53            _ => Err(format!("Invalid file format: {}", value)),
54        }
55    }
56}
57
58#[derive(Debug, PartialEq, Eq, Display)]
59#[strum(serialize_all = "lowercase")]
60pub enum DataType {
61    Metadata,
62}
63
64impl TryFrom<&str> for DataType {
65    type Error = String;
66
67    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
68        match value.to_lowercase().as_str() {
69            "metadata" => Ok(DataType::Metadata),
70            _ => Err(format!("Invalid data type: {}", value)),
71        }
72    }
73}
74
75#[derive(Debug, PartialEq, Eq)]
76pub struct FileExtension {
77    format: FileFormat,
78    data_type: DataType,
79}
80
81impl FileExtension {
82    pub fn new(format: FileFormat, data_type: DataType) -> Self {
83        Self { format, data_type }
84    }
85}
86
87impl Display for FileExtension {
88    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
89        write!(f, "{}.{}", self.data_type, self.format)
90    }
91}
92
93impl TryFrom<&str> for FileExtension {
94    type Error = Error;
95
96    fn try_from(value: &str) -> Result<Self> {
97        let parts = value.split(".").collect::<Vec<&str>>();
98        if parts.len() != 2 {
99            return InvalidFileExtensionSnafu {
100                reason: format!(
101                    "Extension should be in the format of <datatype>.<format>, got: {}",
102                    value
103                ),
104            }
105            .fail();
106        }
107
108        let data_type = DataType::try_from(parts[0])
109            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
110        let format = FileFormat::try_from(parts[1])
111            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
112        Ok(FileExtension { format, data_type })
113    }
114}
115
116#[derive(Debug, PartialEq, Eq)]
117pub struct FileName {
118    name: String,
119    extension: FileExtension,
120}
121
122impl Display for FileName {
123    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
124        write!(f, "{}.{}", self.name, self.extension)
125    }
126}
127
128impl TryFrom<&str> for FileName {
129    type Error = Error;
130
131    fn try_from(value: &str) -> Result<Self> {
132        let Some((name, extension)) = value.split_once(".") else {
133            return InvalidFileNameSnafu {
134                reason: format!(
135                    "The file name should be in the format of <name>.<extension>, got: {}",
136                    value
137                ),
138            }
139            .fail();
140        };
141        let extension = FileExtension::try_from(extension)?;
142        Ok(Self {
143            name: name.to_string(),
144            extension,
145        })
146    }
147}
148
149impl FileName {
150    fn new(name: String, extension: FileExtension) -> Self {
151        Self { name, extension }
152    }
153}
154
155/// The manager of the metadata snapshot.
156///
157/// It manages the metadata snapshot, including dumping and restoring.
158pub struct MetadataSnapshotManager {
159    kv_backend: KvBackendRef,
160    object_store: ObjectStore,
161}
162
163/// The maximum size of the request to put metadata, use 1MiB by default.
164const MAX_REQUEST_SIZE: usize = 1024 * 1024;
165
166/// Returns true if the key is an internal key.
167fn is_internal_key(kv: &FileKeyValue) -> bool {
168    kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes())
169}
170
171impl MetadataSnapshotManager {
172    pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
173        Self {
174            kv_backend,
175            object_store,
176        }
177    }
178
179    /// Restores the metadata from the backup file to the metadata store.
180    pub async fn restore(&self, file_path: &str) -> Result<u64> {
181        let path = Path::new(file_path);
182
183        let file_name = path
184            .file_name()
185            .and_then(|s| s.to_str())
186            .context(InvalidFilePathSnafu { file_path })?;
187
188        let filename = FileName::try_from(file_name)?;
189        let data = self
190            .object_store
191            .read(file_path)
192            .await
193            .context(ReadObjectSnafu { file_path })?;
194        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
195        let metadata_content = document.into_metadata_content()?;
196        let mut req = BatchPutRequest::default();
197        let mut total_request_size = 0;
198        let mut count = 0;
199        let now = Instant::now();
200        for FileKeyValue { key, value } in metadata_content.into_iter() {
201            count += 1;
202            let key_size = key.len();
203            let value_size = value.len();
204            if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
205                self.kv_backend.batch_put(req).await?;
206                req = BatchPutRequest::default();
207                total_request_size = 0;
208            }
209            req.kvs.push(KeyValue { key, value });
210            total_request_size += key_size + value_size;
211        }
212        if !req.kvs.is_empty() {
213            self.kv_backend.batch_put(req).await?;
214        }
215
216        info!(
217            "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
218            file_path,
219            count,
220            now.elapsed()
221        );
222        Ok(count)
223    }
224
225    pub async fn check_target_source_clean(&self) -> Result<bool> {
226        let req = RangeRequest::new().with_range(vec![0], vec![0]);
227        let mut stream = Box::pin(
228            PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
229        );
230        let v = stream.as_mut().try_next().await?;
231        Ok(v.is_none())
232    }
233
234    /// Dumps the metadata to the backup file.
235    pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
236        let format = FileFormat::FlexBuffers;
237        let filename = FileName::new(
238            filename_str.to_string(),
239            FileExtension {
240                format,
241                data_type: DataType::Metadata,
242            },
243        );
244        let file_path_buf = [path, filename.to_string().as_str()]
245            .iter()
246            .collect::<PathBuf>();
247        let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
248            reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
249        })?;
250        let now = Instant::now();
251        let req = RangeRequest::new().with_range(vec![0], vec![0]);
252        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
253            Ok(FileKeyValue {
254                key: kv.key,
255                value: kv.value,
256            })
257        })
258        .into_stream();
259        let keyvalues = stream
260            .try_filter(|f| future::ready(!is_internal_key(f)))
261            .try_collect::<Vec<_>>()
262            .await?;
263        let num_keyvalues = keyvalues.len();
264        let document = Document::new(
265            Metadata::new(),
266            file::Content::Metadata(MetadataContent::new(keyvalues)),
267        );
268        let bytes = document.to_bytes(&format)?;
269        let r = self
270            .object_store
271            .write(file_path, bytes)
272            .await
273            .context(WriteObjectSnafu { file_path })?;
274        info!(
275            "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
276            file_path,
277            num_keyvalues,
278            r.content_length(),
279            now.elapsed()
280        );
281
282        Ok((filename.to_string(), num_keyvalues as u64))
283    }
284
285    fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
286        format!("{} => {}", key, value)
287    }
288
289    pub async fn info(
290        object_store: &ObjectStore,
291        file_path: &str,
292        query_str: &str,
293        limit: Option<usize>,
294    ) -> Result<Vec<String>> {
295        let path = Path::new(file_path);
296
297        let file_name = path
298            .file_name()
299            .and_then(|s| s.to_str())
300            .context(InvalidFilePathSnafu { file_path })?;
301
302        let filename = FileName::try_from(file_name)?;
303        let data = object_store
304            .read(file_path)
305            .await
306            .context(ReadObjectSnafu { file_path })?;
307        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
308        let metadata_content = document.into_metadata_content()?.values();
309        let mut results = Vec::with_capacity(limit.unwrap_or(256));
310        for kv in metadata_content {
311            let key_str = String::from_utf8_lossy(&kv.key);
312            if let Some(prefix) = query_str.strip_suffix('*') {
313                if key_str.starts_with(prefix) {
314                    let value_str = String::from_utf8_lossy(&kv.value);
315                    results.push(Self::format_output(key_str, value_str));
316                }
317            } else if key_str == query_str {
318                let value_str = String::from_utf8_lossy(&kv.value);
319                results.push(Self::format_output(key_str, value_str));
320            }
321            if results.len() == limit.unwrap_or(usize::MAX) {
322                break;
323            }
324        }
325        Ok(results)
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use std::assert_matches::assert_matches;
332    use std::sync::Arc;
333
334    use common_test_util::temp_dir::{create_temp_dir, TempDir};
335    use object_store::services::Fs;
336
337    use super::*;
338    use crate::kv_backend::memory::MemoryKvBackend;
339    use crate::kv_backend::KvBackend;
340    use crate::rpc::store::PutRequest;
341
342    #[test]
343    fn test_file_name() {
344        let file_name = FileName::try_from("test.metadata.fb").unwrap();
345        assert_eq!(file_name.name, "test");
346        assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
347        assert_eq!(file_name.extension.data_type, DataType::Metadata);
348        assert_eq!(file_name.to_string(), "test.metadata.fb");
349
350        let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
351        assert_eq!(
352            invalid_file_name.to_string(),
353            "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
354        );
355
356        let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
357        assert_eq!(
358            invalid_file_extension.to_string(),
359            "Invalid file extension: Invalid file format: hello"
360        );
361    }
362
363    fn test_env(
364        prefix: &str,
365    ) -> (
366        TempDir,
367        Arc<MemoryKvBackend<Error>>,
368        MetadataSnapshotManager,
369    ) {
370        let temp_dir = create_temp_dir(prefix);
371        let kv_backend = Arc::new(MemoryKvBackend::default());
372        let temp_path = temp_dir.path();
373        let data_path = temp_path.join("data").as_path().display().to_string();
374        let builder = Fs::default().root(&data_path);
375        let object_store = ObjectStore::new(builder).unwrap().finish();
376        let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
377        (temp_dir, kv_backend, manager)
378    }
379
380    #[tokio::test]
381    async fn test_dump_and_restore() {
382        common_telemetry::init_default_ut_logging();
383        let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
384        let temp_path = temp_dir.path();
385
386        for i in 0..10 {
387            kv_backend
388                .put(
389                    PutRequest::new()
390                        .with_key(format!("test_{}", i).as_bytes().to_vec())
391                        .with_value(format!("value_{}", i).as_bytes().to_vec()),
392                )
393                .await
394                .unwrap();
395        }
396        let dump_path = temp_path.join("snapshot");
397        manager
398            .dump(
399                &dump_path.as_path().display().to_string(),
400                "metadata_snapshot",
401            )
402            .await
403            .unwrap();
404        // Clean up the kv backend
405        kv_backend.clear();
406
407        let restore_path = dump_path
408            .join("metadata_snapshot.metadata.fb")
409            .as_path()
410            .display()
411            .to_string();
412        manager.restore(&restore_path).await.unwrap();
413
414        for i in 0..10 {
415            let key = format!("test_{}", i);
416            let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
417            assert_eq!(value.value, format!("value_{}", i).as_bytes());
418        }
419    }
420
421    #[tokio::test]
422    async fn test_restore_from_nonexistent_file() {
423        let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
424        let restore_path = temp_dir
425            .path()
426            .join("nonexistent.metadata.fb")
427            .as_path()
428            .display()
429            .to_string();
430        let err = manager.restore(&restore_path).await.unwrap_err();
431        assert_matches!(err, Error::ReadObject { .. })
432    }
433}