common_meta/
snapshot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod file;
16
17use std::borrow::Cow;
18use std::fmt::{Display, Formatter};
19use std::path::{Path, PathBuf};
20use std::time::Instant;
21
22use common_telemetry::info;
23use file::{Metadata, MetadataContent};
24use futures::TryStreamExt;
25use object_store::ObjectStore;
26use snafu::{OptionExt, ResultExt};
27use strum::Display;
28
29use crate::error::{
30    Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
31    Result, WriteObjectSnafu,
32};
33use crate::kv_backend::KvBackendRef;
34use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
35use crate::rpc::store::{BatchPutRequest, RangeRequest};
36use crate::rpc::KeyValue;
37use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
38
39/// The format of the backup file.
40#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
41pub enum FileFormat {
42    #[strum(serialize = "fb")]
43    FlexBuffers,
44}
45
46impl TryFrom<&str> for FileFormat {
47    type Error = String;
48
49    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
50        match value.to_lowercase().as_str() {
51            "fb" => Ok(FileFormat::FlexBuffers),
52            _ => Err(format!("Invalid file format: {}", value)),
53        }
54    }
55}
56
57#[derive(Debug, PartialEq, Eq, Display)]
58#[strum(serialize_all = "lowercase")]
59pub enum DataType {
60    Metadata,
61}
62
63impl TryFrom<&str> for DataType {
64    type Error = String;
65
66    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
67        match value.to_lowercase().as_str() {
68            "metadata" => Ok(DataType::Metadata),
69            _ => Err(format!("Invalid data type: {}", value)),
70        }
71    }
72}
73
74#[derive(Debug, PartialEq, Eq)]
75pub struct FileExtension {
76    format: FileFormat,
77    data_type: DataType,
78}
79
80impl FileExtension {
81    pub fn new(format: FileFormat, data_type: DataType) -> Self {
82        Self { format, data_type }
83    }
84}
85
86impl Display for FileExtension {
87    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88        write!(f, "{}.{}", self.data_type, self.format)
89    }
90}
91
92impl TryFrom<&str> for FileExtension {
93    type Error = Error;
94
95    fn try_from(value: &str) -> Result<Self> {
96        let parts = value.split(".").collect::<Vec<&str>>();
97        if parts.len() != 2 {
98            return InvalidFileExtensionSnafu {
99                reason: format!(
100                    "Extension should be in the format of <datatype>.<format>, got: {}",
101                    value
102                ),
103            }
104            .fail();
105        }
106
107        let data_type = DataType::try_from(parts[0])
108            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
109        let format = FileFormat::try_from(parts[1])
110            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
111        Ok(FileExtension { format, data_type })
112    }
113}
114
115#[derive(Debug, PartialEq, Eq)]
116pub struct FileName {
117    name: String,
118    extension: FileExtension,
119}
120
121impl Display for FileName {
122    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123        write!(f, "{}.{}", self.name, self.extension)
124    }
125}
126
127impl TryFrom<&str> for FileName {
128    type Error = Error;
129
130    fn try_from(value: &str) -> Result<Self> {
131        let Some((name, extension)) = value.split_once(".") else {
132            return InvalidFileNameSnafu {
133                reason: format!(
134                    "The file name should be in the format of <name>.<extension>, got: {}",
135                    value
136                ),
137            }
138            .fail();
139        };
140        let extension = FileExtension::try_from(extension)?;
141        Ok(Self {
142            name: name.to_string(),
143            extension,
144        })
145    }
146}
147
148impl FileName {
149    fn new(name: String, extension: FileExtension) -> Self {
150        Self { name, extension }
151    }
152}
153
154/// The manager of the metadata snapshot.
155///
156/// It manages the metadata snapshot, including dumping and restoring.
157pub struct MetadataSnapshotManager {
158    kv_backend: KvBackendRef,
159    object_store: ObjectStore,
160}
161
162/// The maximum size of the request to put metadata, use 1MiB by default.
163const MAX_REQUEST_SIZE: usize = 1024 * 1024;
164
165impl MetadataSnapshotManager {
166    pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
167        Self {
168            kv_backend,
169            object_store,
170        }
171    }
172
173    /// Restores the metadata from the backup file to the metadata store.
174    pub async fn restore(&self, file_path: &str) -> Result<u64> {
175        let path = Path::new(file_path);
176
177        let file_name = path
178            .file_name()
179            .and_then(|s| s.to_str())
180            .context(InvalidFilePathSnafu { file_path })?;
181
182        let filename = FileName::try_from(file_name)?;
183        let data = self
184            .object_store
185            .read(file_path)
186            .await
187            .context(ReadObjectSnafu { file_path })?;
188        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
189        let metadata_content = document.into_metadata_content()?;
190        let mut req = BatchPutRequest::default();
191        let mut total_request_size = 0;
192        let mut count = 0;
193        let now = Instant::now();
194        for FileKeyValue { key, value } in metadata_content.into_iter() {
195            count += 1;
196            let key_size = key.len();
197            let value_size = value.len();
198            if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
199                self.kv_backend.batch_put(req).await?;
200                req = BatchPutRequest::default();
201                total_request_size = 0;
202            }
203            req.kvs.push(KeyValue { key, value });
204            total_request_size += key_size + value_size;
205        }
206        if !req.kvs.is_empty() {
207            self.kv_backend.batch_put(req).await?;
208        }
209
210        info!(
211            "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
212            file_path,
213            count,
214            now.elapsed()
215        );
216        Ok(count)
217    }
218
219    pub async fn check_target_source_clean(&self) -> Result<bool> {
220        let req = RangeRequest::new().with_range(vec![0], vec![0]);
221        let mut stream = Box::pin(
222            PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
223        );
224        let v = stream.as_mut().try_next().await?;
225        Ok(v.is_none())
226    }
227
228    /// Dumps the metadata to the backup file.
229    pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
230        let format = FileFormat::FlexBuffers;
231        let filename = FileName::new(
232            filename_str.to_string(),
233            FileExtension {
234                format,
235                data_type: DataType::Metadata,
236            },
237        );
238        let file_path_buf = [path, filename.to_string().as_str()]
239            .iter()
240            .collect::<PathBuf>();
241        let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
242            reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
243        })?;
244        let now = Instant::now();
245        let req = RangeRequest::new().with_range(vec![0], vec![0]);
246        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
247            Ok(FileKeyValue {
248                key: kv.key,
249                value: kv.value,
250            })
251        })
252        .into_stream();
253        let keyvalues = stream.try_collect::<Vec<_>>().await?;
254        let num_keyvalues = keyvalues.len();
255        let document = Document::new(
256            Metadata::new(),
257            file::Content::Metadata(MetadataContent::new(keyvalues)),
258        );
259        let bytes = document.to_bytes(&format)?;
260        let r = self
261            .object_store
262            .write(file_path, bytes)
263            .await
264            .context(WriteObjectSnafu { file_path })?;
265        info!(
266            "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
267            file_path,
268            num_keyvalues,
269            r.content_length(),
270            now.elapsed()
271        );
272
273        Ok((filename.to_string(), num_keyvalues as u64))
274    }
275
276    fn format_output(key: Cow<'_, str>, value: Cow<'_, str>) -> String {
277        format!("{} => {}", key, value)
278    }
279
280    pub async fn info(
281        object_store: &ObjectStore,
282        file_path: &str,
283        query_str: &str,
284        limit: Option<usize>,
285    ) -> Result<Vec<String>> {
286        let path = Path::new(file_path);
287
288        let file_name = path
289            .file_name()
290            .and_then(|s| s.to_str())
291            .context(InvalidFilePathSnafu { file_path })?;
292
293        let filename = FileName::try_from(file_name)?;
294        let data = object_store
295            .read(file_path)
296            .await
297            .context(ReadObjectSnafu { file_path })?;
298        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
299        let metadata_content = document.into_metadata_content()?.values();
300        let mut results = Vec::with_capacity(limit.unwrap_or(256));
301        for kv in metadata_content {
302            let key_str = String::from_utf8_lossy(&kv.key);
303            if let Some(prefix) = query_str.strip_suffix('*') {
304                if key_str.starts_with(prefix) {
305                    let value_str = String::from_utf8_lossy(&kv.value);
306                    results.push(Self::format_output(key_str, value_str));
307                }
308            } else if key_str == query_str {
309                let value_str = String::from_utf8_lossy(&kv.value);
310                results.push(Self::format_output(key_str, value_str));
311            }
312            if results.len() == limit.unwrap_or(usize::MAX) {
313                break;
314            }
315        }
316        Ok(results)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use std::assert_matches::assert_matches;
323    use std::sync::Arc;
324
325    use common_test_util::temp_dir::{create_temp_dir, TempDir};
326    use object_store::services::Fs;
327
328    use super::*;
329    use crate::kv_backend::memory::MemoryKvBackend;
330    use crate::kv_backend::KvBackend;
331    use crate::rpc::store::PutRequest;
332
333    #[test]
334    fn test_file_name() {
335        let file_name = FileName::try_from("test.metadata.fb").unwrap();
336        assert_eq!(file_name.name, "test");
337        assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
338        assert_eq!(file_name.extension.data_type, DataType::Metadata);
339        assert_eq!(file_name.to_string(), "test.metadata.fb");
340
341        let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
342        assert_eq!(
343            invalid_file_name.to_string(),
344            "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
345        );
346
347        let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
348        assert_eq!(
349            invalid_file_extension.to_string(),
350            "Invalid file extension: Invalid file format: hello"
351        );
352    }
353
354    fn test_env(
355        prefix: &str,
356    ) -> (
357        TempDir,
358        Arc<MemoryKvBackend<Error>>,
359        MetadataSnapshotManager,
360    ) {
361        let temp_dir = create_temp_dir(prefix);
362        let kv_backend = Arc::new(MemoryKvBackend::default());
363        let temp_path = temp_dir.path();
364        let data_path = temp_path.join("data").as_path().display().to_string();
365        let builder = Fs::default().root(&data_path);
366        let object_store = ObjectStore::new(builder).unwrap().finish();
367        let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
368        (temp_dir, kv_backend, manager)
369    }
370
371    #[tokio::test]
372    async fn test_dump_and_restore() {
373        common_telemetry::init_default_ut_logging();
374        let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
375        let temp_path = temp_dir.path();
376
377        for i in 0..10 {
378            kv_backend
379                .put(
380                    PutRequest::new()
381                        .with_key(format!("test_{}", i).as_bytes().to_vec())
382                        .with_value(format!("value_{}", i).as_bytes().to_vec()),
383                )
384                .await
385                .unwrap();
386        }
387        let dump_path = temp_path.join("snapshot");
388        manager
389            .dump(
390                &dump_path.as_path().display().to_string(),
391                "metadata_snapshot",
392            )
393            .await
394            .unwrap();
395        // Clean up the kv backend
396        kv_backend.clear();
397
398        let restore_path = dump_path
399            .join("metadata_snapshot.metadata.fb")
400            .as_path()
401            .display()
402            .to_string();
403        manager.restore(&restore_path).await.unwrap();
404
405        for i in 0..10 {
406            let key = format!("test_{}", i);
407            let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
408            assert_eq!(value.value, format!("value_{}", i).as_bytes());
409        }
410    }
411
412    #[tokio::test]
413    async fn test_restore_from_nonexistent_file() {
414        let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
415        let restore_path = temp_dir
416            .path()
417            .join("nonexistent.metadata.fb")
418            .as_path()
419            .display()
420            .to_string();
421        let err = manager.restore(&restore_path).await.unwrap_err();
422        assert_matches!(err, Error::ReadObject { .. })
423    }
424}