common_meta/
snapshot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod file;
16
17use std::fmt::{Display, Formatter};
18use std::path::{Path, PathBuf};
19use std::time::Instant;
20
21use common_telemetry::info;
22use file::{Metadata, MetadataContent};
23use futures::TryStreamExt;
24use object_store::ObjectStore;
25use snafu::{OptionExt, ResultExt};
26use strum::Display;
27
28use crate::error::{
29    Error, InvalidFileExtensionSnafu, InvalidFileNameSnafu, InvalidFilePathSnafu, ReadObjectSnafu,
30    Result, WriteObjectSnafu,
31};
32use crate::kv_backend::KvBackendRef;
33use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
34use crate::rpc::store::{BatchPutRequest, RangeRequest};
35use crate::rpc::KeyValue;
36use crate::snapshot::file::{Document, KeyValue as FileKeyValue};
37
38/// The format of the backup file.
39#[derive(Debug, PartialEq, Eq, Display, Clone, Copy)]
40pub enum FileFormat {
41    #[strum(serialize = "fb")]
42    FlexBuffers,
43}
44
45impl TryFrom<&str> for FileFormat {
46    type Error = String;
47
48    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
49        match value.to_lowercase().as_str() {
50            "fb" => Ok(FileFormat::FlexBuffers),
51            _ => Err(format!("Invalid file format: {}", value)),
52        }
53    }
54}
55
56#[derive(Debug, PartialEq, Eq, Display)]
57#[strum(serialize_all = "lowercase")]
58pub enum DataType {
59    Metadata,
60}
61
62impl TryFrom<&str> for DataType {
63    type Error = String;
64
65    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
66        match value.to_lowercase().as_str() {
67            "metadata" => Ok(DataType::Metadata),
68            _ => Err(format!("Invalid data type: {}", value)),
69        }
70    }
71}
72
73#[derive(Debug, PartialEq, Eq)]
74pub struct FileExtension {
75    format: FileFormat,
76    data_type: DataType,
77}
78
79impl FileExtension {
80    pub fn new(format: FileFormat, data_type: DataType) -> Self {
81        Self { format, data_type }
82    }
83}
84
85impl Display for FileExtension {
86    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
87        write!(f, "{}.{}", self.data_type, self.format)
88    }
89}
90
91impl TryFrom<&str> for FileExtension {
92    type Error = Error;
93
94    fn try_from(value: &str) -> Result<Self> {
95        let parts = value.split(".").collect::<Vec<&str>>();
96        if parts.len() != 2 {
97            return InvalidFileExtensionSnafu {
98                reason: format!(
99                    "Extension should be in the format of <datatype>.<format>, got: {}",
100                    value
101                ),
102            }
103            .fail();
104        }
105
106        let data_type = DataType::try_from(parts[0])
107            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
108        let format = FileFormat::try_from(parts[1])
109            .map_err(|e| InvalidFileExtensionSnafu { reason: e }.build())?;
110        Ok(FileExtension { format, data_type })
111    }
112}
113
114#[derive(Debug, PartialEq, Eq)]
115pub struct FileName {
116    name: String,
117    extension: FileExtension,
118}
119
120impl Display for FileName {
121    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122        write!(f, "{}.{}", self.name, self.extension)
123    }
124}
125
126impl TryFrom<&str> for FileName {
127    type Error = Error;
128
129    fn try_from(value: &str) -> Result<Self> {
130        let Some((name, extension)) = value.split_once(".") else {
131            return InvalidFileNameSnafu {
132                reason: format!(
133                    "The file name should be in the format of <name>.<extension>, got: {}",
134                    value
135                ),
136            }
137            .fail();
138        };
139        let extension = FileExtension::try_from(extension)?;
140        Ok(Self {
141            name: name.to_string(),
142            extension,
143        })
144    }
145}
146
147impl FileName {
148    fn new(name: String, extension: FileExtension) -> Self {
149        Self { name, extension }
150    }
151}
152
153/// The manager of the metadata snapshot.
154///
155/// It manages the metadata snapshot, including dumping and restoring.
156pub struct MetadataSnapshotManager {
157    kv_backend: KvBackendRef,
158    object_store: ObjectStore,
159}
160
161/// The maximum size of the request to put metadata, use 1MiB by default.
162const MAX_REQUEST_SIZE: usize = 1024 * 1024;
163
164impl MetadataSnapshotManager {
165    pub fn new(kv_backend: KvBackendRef, object_store: ObjectStore) -> Self {
166        Self {
167            kv_backend,
168            object_store,
169        }
170    }
171
172    /// Restores the metadata from the backup file to the metadata store.
173    pub async fn restore(&self, file_path: &str) -> Result<u64> {
174        let path = Path::new(file_path);
175
176        let file_name = path
177            .file_name()
178            .and_then(|s| s.to_str())
179            .context(InvalidFilePathSnafu { file_path })?;
180
181        let filename = FileName::try_from(file_name)?;
182        let data = self
183            .object_store
184            .read(file_path)
185            .await
186            .context(ReadObjectSnafu { file_path })?;
187        let document = Document::from_slice(&filename.extension.format, &data.to_bytes())?;
188        let metadata_content = document.into_metadata_content()?;
189        let mut req = BatchPutRequest::default();
190        let mut total_request_size = 0;
191        let mut count = 0;
192        let now = Instant::now();
193        for FileKeyValue { key, value } in metadata_content.into_iter() {
194            count += 1;
195            let key_size = key.len();
196            let value_size = value.len();
197            if total_request_size + key_size + value_size > MAX_REQUEST_SIZE {
198                self.kv_backend.batch_put(req).await?;
199                req = BatchPutRequest::default();
200                total_request_size = 0;
201            }
202            req.kvs.push(KeyValue { key, value });
203            total_request_size += key_size + value_size;
204        }
205        if !req.kvs.is_empty() {
206            self.kv_backend.batch_put(req).await?;
207        }
208
209        info!(
210            "Restored metadata from {} successfully, total {} key-value pairs, elapsed {:?}",
211            file_path,
212            count,
213            now.elapsed()
214        );
215        Ok(count)
216    }
217
218    pub async fn check_target_source_clean(&self) -> Result<bool> {
219        let req = RangeRequest::new().with_range(vec![0], vec![0]);
220        let mut stream = Box::pin(
221            PaginationStream::new(self.kv_backend.clone(), req, 1, Result::Ok).into_stream(),
222        );
223        let v = stream.as_mut().try_next().await?;
224        Ok(v.is_none())
225    }
226
227    /// Dumps the metadata to the backup file.
228    pub async fn dump(&self, path: &str, filename_str: &str) -> Result<(String, u64)> {
229        let format = FileFormat::FlexBuffers;
230        let filename = FileName::new(
231            filename_str.to_string(),
232            FileExtension {
233                format,
234                data_type: DataType::Metadata,
235            },
236        );
237        let file_path_buf = [path, filename.to_string().as_str()]
238            .iter()
239            .collect::<PathBuf>();
240        let file_path = file_path_buf.to_str().context(InvalidFileNameSnafu {
241            reason: format!("Invalid file path: {}, filename: {}", path, filename_str),
242        })?;
243        let now = Instant::now();
244        let req = RangeRequest::new().with_range(vec![0], vec![0]);
245        let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
246            Ok(FileKeyValue {
247                key: kv.key,
248                value: kv.value,
249            })
250        })
251        .into_stream();
252        let keyvalues = stream.try_collect::<Vec<_>>().await?;
253        let num_keyvalues = keyvalues.len();
254        let document = Document::new(
255            Metadata::new(),
256            file::Content::Metadata(MetadataContent::new(keyvalues)),
257        );
258        let bytes = document.to_bytes(&format)?;
259        let r = self
260            .object_store
261            .write(file_path, bytes)
262            .await
263            .context(WriteObjectSnafu { file_path })?;
264        info!(
265            "Dumped metadata to {} successfully, total {} key-value pairs, file size {} bytes, elapsed {:?}",
266            file_path,
267            num_keyvalues,
268            r.content_length(),
269            now.elapsed()
270        );
271
272        Ok((filename.to_string(), num_keyvalues as u64))
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use std::assert_matches::assert_matches;
279    use std::sync::Arc;
280
281    use common_test_util::temp_dir::{create_temp_dir, TempDir};
282    use object_store::services::Fs;
283
284    use super::*;
285    use crate::kv_backend::memory::MemoryKvBackend;
286    use crate::kv_backend::KvBackend;
287    use crate::rpc::store::PutRequest;
288
289    #[test]
290    fn test_file_name() {
291        let file_name = FileName::try_from("test.metadata.fb").unwrap();
292        assert_eq!(file_name.name, "test");
293        assert_eq!(file_name.extension.format, FileFormat::FlexBuffers);
294        assert_eq!(file_name.extension.data_type, DataType::Metadata);
295        assert_eq!(file_name.to_string(), "test.metadata.fb");
296
297        let invalid_file_name = FileName::try_from("test.metadata").unwrap_err();
298        assert_eq!(
299            invalid_file_name.to_string(),
300            "Invalid file extension: Extension should be in the format of <datatype>.<format>, got: metadata"
301        );
302
303        let invalid_file_extension = FileName::try_from("test.metadata.hello").unwrap_err();
304        assert_eq!(
305            invalid_file_extension.to_string(),
306            "Invalid file extension: Invalid file format: hello"
307        );
308    }
309
310    fn test_env(
311        prefix: &str,
312    ) -> (
313        TempDir,
314        Arc<MemoryKvBackend<Error>>,
315        MetadataSnapshotManager,
316    ) {
317        let temp_dir = create_temp_dir(prefix);
318        let kv_backend = Arc::new(MemoryKvBackend::default());
319        let temp_path = temp_dir.path();
320        let data_path = temp_path.join("data").as_path().display().to_string();
321        let builder = Fs::default().root(&data_path);
322        let object_store = ObjectStore::new(builder).unwrap().finish();
323        let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
324        (temp_dir, kv_backend, manager)
325    }
326
327    #[tokio::test]
328    async fn test_dump_and_restore() {
329        common_telemetry::init_default_ut_logging();
330        let (temp_dir, kv_backend, manager) = test_env("test_dump_and_restore");
331        let temp_path = temp_dir.path();
332
333        for i in 0..10 {
334            kv_backend
335                .put(
336                    PutRequest::new()
337                        .with_key(format!("test_{}", i).as_bytes().to_vec())
338                        .with_value(format!("value_{}", i).as_bytes().to_vec()),
339                )
340                .await
341                .unwrap();
342        }
343        let dump_path = temp_path.join("snapshot");
344        manager
345            .dump(
346                &dump_path.as_path().display().to_string(),
347                "metadata_snapshot",
348            )
349            .await
350            .unwrap();
351        // Clean up the kv backend
352        kv_backend.clear();
353
354        let restore_path = dump_path
355            .join("metadata_snapshot.metadata.fb")
356            .as_path()
357            .display()
358            .to_string();
359        manager.restore(&restore_path).await.unwrap();
360
361        for i in 0..10 {
362            let key = format!("test_{}", i);
363            let value = kv_backend.get(key.as_bytes()).await.unwrap().unwrap();
364            assert_eq!(value.value, format!("value_{}", i).as_bytes());
365        }
366    }
367
368    #[tokio::test]
369    async fn test_restore_from_nonexistent_file() {
370        let (temp_dir, _kv_backend, manager) = test_env("test_restore_from_nonexistent_file");
371        let restore_path = temp_dir
372            .path()
373            .join("nonexistent.metadata.fb")
374            .as_path()
375            .display()
376            .to_string();
377        let err = manager.restore(&restore_path).await.unwrap_err();
378        assert_matches!(err, Error::ReadObject { .. })
379    }
380}