cli/metadata/
snapshot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use clap::{Parser, Subcommand};
17use common_error::ext::BoxedError;
18use common_meta::snapshot::MetadataSnapshotManager;
19use object_store::{ObjectStore, Scheme};
20
21use crate::Tool;
22use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store};
23use crate::utils::resolve_relative_path_with_current_dir;
24
25/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
26#[derive(Subcommand)]
27pub enum SnapshotCommand {
28    /// Save a snapshot of the current metadata state to a specified location.
29    Save(SaveCommand),
30    /// Restore metadata from a snapshot.
31    Restore(RestoreCommand),
32    /// Explore metadata from a snapshot.
33    Info(InfoCommand),
34}
35
36impl SnapshotCommand {
37    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
38        match self {
39            SnapshotCommand::Save(cmd) => Ok(Box::new(cmd.build().await?)),
40            SnapshotCommand::Restore(cmd) => Ok(Box::new(cmd.build().await?)),
41            SnapshotCommand::Info(cmd) => Ok(Box::new(cmd.build().await?)),
42        }
43    }
44}
45
46/// Export metadata snapshot tool.
47/// This tool is used to export metadata snapshot from etcd, pg or mysql.
48/// It will dump the metadata snapshot to local file or s3 bucket.
49/// The snapshot file will be in binary format.
50#[derive(Debug, Default, Parser)]
51pub struct SaveCommand {
52    /// The store configuration.
53    #[clap(flatten)]
54    store: StoreConfig,
55    /// The object store configuration.
56    #[clap(flatten)]
57    object_store: ObjectStoreConfig,
58    /// The path of the target snapshot file.
59    #[clap(
60        long,
61        default_value = "metadata_snapshot.metadata.fb",
62        alias = "file_name"
63    )]
64    file_path: String,
65    /// Specifies the root directory used for I/O operations.
66    #[clap(long, default_value = "/", alias = "output_dir")]
67    dir: String,
68}
69
70impl SaveCommand {
71    async fn build(&self) -> Result<MetaSnapshotTool, BoxedError> {
72        let kvbackend = self.store.build().await?;
73        let (object_store, file_path) = build_object_store_and_resolve_file_path(
74            self.object_store.clone(),
75            &self.dir,
76            &self.file_path,
77        )?;
78        let tool = MetaSnapshotTool {
79            inner: MetadataSnapshotManager::new(kvbackend, object_store),
80            file_path,
81        };
82        Ok(tool)
83    }
84}
85
86struct MetaSnapshotTool {
87    inner: MetadataSnapshotManager,
88    file_path: String,
89}
90
91#[async_trait]
92impl Tool for MetaSnapshotTool {
93    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
94        self.inner
95            .dump(&self.file_path)
96            .await
97            .map_err(BoxedError::new)?;
98        Ok(())
99    }
100}
101
102/// Restore metadata from a snapshot file.
103///
104/// This command restores the metadata state from a previously saved snapshot.
105/// The snapshot can be loaded from either a local file system or an S3 bucket,
106/// depending on the provided configuration.
107#[derive(Debug, Default, Parser)]
108pub struct RestoreCommand {
109    /// The store configuration.
110    #[clap(flatten)]
111    store: StoreConfig,
112    /// The object store config.
113    #[clap(flatten)]
114    object_store: ObjectStoreConfig,
115    /// The path of the target snapshot file.
116    #[clap(
117        long,
118        default_value = "metadata_snapshot.metadata.fb",
119        alias = "file_name"
120    )]
121    file_path: String,
122    /// Specifies the root directory used for I/O operations.
123    #[clap(long, default_value = "/", alias = "input_dir")]
124    dir: String,
125    #[clap(long, default_value = "false")]
126    force: bool,
127}
128
129impl RestoreCommand {
130    async fn build(&self) -> Result<MetaRestoreTool, BoxedError> {
131        let kvbackend = self.store.build().await?;
132        let (object_store, file_path) = build_object_store_and_resolve_file_path(
133            self.object_store.clone(),
134            &self.dir,
135            &self.file_path,
136        )
137        .map_err(BoxedError::new)?;
138        let tool = MetaRestoreTool::new(
139            MetadataSnapshotManager::new(kvbackend, object_store),
140            file_path,
141            self.force,
142        );
143        Ok(tool)
144    }
145}
146
147struct MetaRestoreTool {
148    inner: MetadataSnapshotManager,
149    file_path: String,
150    force: bool,
151}
152
153impl MetaRestoreTool {
154    pub fn new(inner: MetadataSnapshotManager, file_path: String, force: bool) -> Self {
155        Self {
156            inner,
157            file_path,
158            force,
159        }
160    }
161}
162
163#[async_trait]
164impl Tool for MetaRestoreTool {
165    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
166        let clean = self
167            .inner
168            .check_target_source_clean()
169            .await
170            .map_err(BoxedError::new)?;
171        if clean {
172            common_telemetry::info!(
173                "The target source is clean, we will restore the metadata snapshot."
174            );
175            self.inner
176                .restore(&self.file_path)
177                .await
178                .map_err(BoxedError::new)?;
179            Ok(())
180        } else if !self.force {
181            common_telemetry::warn!(
182                "The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
183            );
184            Ok(())
185        } else {
186            common_telemetry::info!(
187                "The target source is not clean, We will restore the metadata snapshot with --force."
188            );
189            self.inner
190                .restore(&self.file_path)
191                .await
192                .map_err(BoxedError::new)?;
193            Ok(())
194        }
195    }
196}
197
198/// Explore metadata from a snapshot file.
199///
200/// This command allows filtering the metadata by a specific key and limiting the number of results.
201/// It prints the filtered metadata to the console.
202#[derive(Debug, Default, Parser)]
203pub struct InfoCommand {
204    /// The object store config.
205    #[clap(flatten)]
206    object_store: ObjectStoreConfig,
207    /// The path of the target snapshot file.
208    #[clap(
209        long,
210        default_value = "metadata_snapshot.metadata.fb",
211        alias = "file_name"
212    )]
213    file_path: String,
214    /// Specifies the root directory used for I/O operations.
215    #[clap(long, default_value = "/", alias = "input_dir")]
216    dir: String,
217    /// The query string to filter the metadata.
218    #[clap(long, default_value = "*")]
219    inspect_key: String,
220    /// The limit of the metadata to query.
221    #[clap(long)]
222    limit: Option<usize>,
223}
224
225struct MetaInfoTool {
226    inner: ObjectStore,
227    file_path: String,
228    inspect_key: String,
229    limit: Option<usize>,
230}
231
232#[async_trait]
233impl Tool for MetaInfoTool {
234    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
235        let result = MetadataSnapshotManager::info(
236            &self.inner,
237            &self.file_path,
238            &self.inspect_key,
239            self.limit,
240        )
241        .await
242        .map_err(BoxedError::new)?;
243        for item in result {
244            println!("{}", item);
245        }
246        Ok(())
247    }
248}
249
250impl InfoCommand {
251    async fn build(&self) -> Result<MetaInfoTool, BoxedError> {
252        let (object_store, file_path) = build_object_store_and_resolve_file_path(
253            self.object_store.clone(),
254            &self.dir,
255            &self.file_path,
256        )?;
257        let tool = MetaInfoTool {
258            inner: object_store,
259            file_path,
260            inspect_key: self.inspect_key.clone(),
261            limit: self.limit,
262        };
263        Ok(tool)
264    }
265}
266
267/// Builds the object store and resolves the file path.
268fn build_object_store_and_resolve_file_path(
269    object_store: ObjectStoreConfig,
270    fs_root: &str,
271    file_path: &str,
272) -> Result<(ObjectStore, String), BoxedError> {
273    let object_store = object_store.build().map_err(BoxedError::new)?;
274    let object_store = match object_store {
275        Some(object_store) => object_store,
276        None => new_fs_object_store(fs_root)?,
277    };
278
279    let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) {
280        resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)?
281    } else {
282        file_path.to_string()
283    };
284
285    Ok((object_store, file_path))
286}
287
288#[cfg(test)]
289mod tests {
290    use std::env;
291    use std::path::Path;
292    use std::sync::Arc;
293    use std::time::Duration;
294
295    use clap::Parser;
296    use common_meta::kv_backend::KvBackend;
297    use common_meta::kv_backend::memory::MemoryKvBackend;
298    use common_meta::rpc::store::PutRequest;
299    use object_store::ObjectStore;
300
301    use super::*;
302    use crate::metadata::snapshot::RestoreCommand;
303
304    fn create_raftengine_url(path: &std::path::Path) -> String {
305        let mut path = path.to_string_lossy().replace('\\', "/");
306        if !path.starts_with('/') {
307            path = format!("/{}", path);
308        }
309        format!("raftengine://{}", path)
310    }
311
312    #[tokio::test]
313    async fn test_cmd_resolve_file_path() {
314        common_telemetry::init_default_ut_logging();
315        let cmd = RestoreCommand::parse_from([
316            "",
317            "--file_name",
318            "metadata_snapshot.metadata.fb",
319            "--backend",
320            "memory-store",
321            "--store-addrs",
322            "memory://",
323        ]);
324        let tool = cmd.build().await.unwrap();
325        let current_dir = env::current_dir().unwrap();
326        let file_path = current_dir.join("metadata_snapshot.metadata.fb");
327        assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
328
329        let cmd = RestoreCommand::parse_from([
330            "",
331            "--file_name",
332            "metadata_snapshot.metadata.fb",
333            "--backend",
334            "memory-store",
335            "--store-addrs",
336            "memory://",
337        ]);
338        let tool = cmd.build().await.unwrap();
339        assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
340
341        let cmd = RestoreCommand::parse_from([
342            "",
343            "--file_name",
344            "metadata_snapshot.metadata.fb",
345            "--backend",
346            "memory-store",
347            "--store-addrs",
348            "memory://",
349        ]);
350        let tool = cmd.build().await.unwrap();
351        assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
352    }
353
354    async fn setup_backup_file(object_store: ObjectStore, file_path: &str) {
355        let kv_backend = Arc::new(MemoryKvBackend::default());
356        let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
357        // Put some data into the kv backend
358        kv_backend
359            .put(
360                PutRequest::new()
361                    .with_key(b"test".to_vec())
362                    .with_value(b"test".to_vec()),
363            )
364            .await
365            .unwrap();
366        manager.dump(file_path).await.unwrap();
367    }
368
369    #[tokio::test]
370    async fn test_restore_raft_engine_store() {
371        common_telemetry::init_default_ut_logging();
372        let temp_dir = tempfile::tempdir().unwrap();
373        let root = temp_dir.path().display().to_string();
374        let object_store = new_fs_object_store(&root).unwrap();
375        setup_backup_file(object_store, "/backup/metadata_snapshot.metadata.fb").await;
376        let metadata_path = temp_dir.path().join("metadata");
377        {
378            let cmd = RestoreCommand::parse_from([
379                "",
380                "--file_name",
381                temp_dir
382                    .path()
383                    .join("backup")
384                    .join("metadata_snapshot.metadata.fb")
385                    .to_str()
386                    .unwrap(),
387                "--backend",
388                "raft-engine-store",
389                "--store-addrs",
390                &create_raftengine_url(&metadata_path),
391            ]);
392            let tool = cmd.build().await.unwrap();
393            tool.do_work().await.unwrap();
394        }
395        // Waits for the raft engine release the file lock.
396        tokio::time::sleep(Duration::from_secs(1)).await;
397        let kv = standalone::build_metadata_kvbackend(
398            metadata_path.display().to_string(),
399            Default::default(),
400        )
401        .unwrap();
402
403        let value = kv.get(b"test").await.unwrap().unwrap().value;
404        assert_eq!(value, b"test");
405    }
406
407    #[tokio::test]
408    async fn test_save_raft_engine_store() {
409        common_telemetry::init_default_ut_logging();
410        let temp_dir = tempfile::tempdir().unwrap();
411        let root = temp_dir.path().display().to_string();
412        let metadata_path = temp_dir.path().join("metadata");
413        {
414            let kv = standalone::build_metadata_kvbackend(
415                metadata_path.to_string_lossy().to_string(),
416                Default::default(),
417            )
418            .unwrap();
419            kv.put(
420                PutRequest::new()
421                    .with_key(b"test".to_vec())
422                    .with_value(b"test".to_vec()),
423            )
424            .await
425            .unwrap();
426        }
427        // Waits for the raft engine release the file lock.
428        tokio::time::sleep(Duration::from_secs(1)).await;
429        {
430            let cmd = SaveCommand::parse_from([
431                "",
432                "--file_name",
433                temp_dir
434                    .path()
435                    .join("backup")
436                    .join("metadata_snapshot.metadata.fb")
437                    .to_str()
438                    .unwrap(),
439                "--backend",
440                "raft-engine-store",
441                "--store-addrs",
442                &create_raftengine_url(&metadata_path),
443            ]);
444            let tool = cmd.build().await.unwrap();
445            tool.do_work().await.unwrap();
446        }
447
448        // Reads the snapshot file from the object store.
449        let object_store = new_fs_object_store(&root).unwrap();
450        let kv_backend = Arc::new(MemoryKvBackend::default());
451        let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
452        manager
453            .restore("/backup/metadata_snapshot.metadata.fb")
454            .await
455            .unwrap();
456        let value = kv_backend.get(b"test").await.unwrap().unwrap().value;
457        assert_eq!(value, b"test");
458    }
459
460    #[test]
461    fn test_path() {
462        let path = "C:\\Users\\user\\AppData\\Local\\Temp\\.tmpuPiVuB\\metadata";
463        let path = Path::new(path);
464        let url = create_raftengine_url(path);
465        assert_eq!(
466            url,
467            "raftengine:///C:/Users/user/AppData/Local/Temp/.tmpuPiVuB/metadata"
468        );
469        let url = url::Url::parse(&url).unwrap();
470        assert_eq!(
471            url.path(),
472            "/C:/Users/user/AppData/Local/Temp/.tmpuPiVuB/metadata"
473        );
474    }
475}