cli/metadata/
snapshot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16
17use async_trait::async_trait;
18use clap::{Parser, Subcommand};
19use common_base::secrets::{ExposeSecret, SecretString};
20use common_error::ext::BoxedError;
21use common_meta::snapshot::MetadataSnapshotManager;
22use object_store::services::{Fs, S3};
23use object_store::ObjectStore;
24use snafu::{OptionExt, ResultExt};
25
26use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
27use crate::metadata::common::StoreConfig;
28use crate::Tool;
29
30/// Subcommand for metadata snapshot operations, including saving snapshots, restoring from snapshots, and viewing snapshot information.
31#[derive(Subcommand)]
32pub enum SnapshotCommand {
33    /// Save a snapshot of the current metadata state to a specified location.
34    Save(SaveCommand),
35    /// Restore metadata from a snapshot.
36    Restore(RestoreCommand),
37    /// Explore metadata from a snapshot.
38    Info(InfoCommand),
39}
40
41impl SnapshotCommand {
42    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
43        match self {
44            SnapshotCommand::Save(cmd) => cmd.build().await,
45            SnapshotCommand::Restore(cmd) => cmd.build().await,
46            SnapshotCommand::Info(cmd) => cmd.build().await,
47        }
48    }
49}
50
51// TODO(qtang): Abstract a generic s3 config for export import meta snapshot restore
52#[derive(Debug, Default, Parser)]
53struct S3Config {
54    /// whether to use s3 as the output directory. default is false.
55    #[clap(long, default_value = "false")]
56    s3: bool,
57    /// The s3 bucket name.
58    #[clap(long)]
59    s3_bucket: Option<String>,
60    /// The s3 region.
61    #[clap(long)]
62    s3_region: Option<String>,
63    /// The s3 access key.
64    #[clap(long)]
65    s3_access_key: Option<SecretString>,
66    /// The s3 secret key.
67    #[clap(long)]
68    s3_secret_key: Option<SecretString>,
69    /// The s3 endpoint. we will automatically use the default s3 decided by the region if not set.
70    #[clap(long)]
71    s3_endpoint: Option<String>,
72}
73
74impl S3Config {
75    pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
76        if !self.s3 {
77            Ok(None)
78        } else {
79            if self.s3_region.is_none()
80                || self.s3_access_key.is_none()
81                || self.s3_secret_key.is_none()
82                || self.s3_bucket.is_none()
83            {
84                return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
85            }
86            // Safety, unwrap is safe because we have checked the options above.
87            let mut config = S3::default()
88                .bucket(self.s3_bucket.as_ref().unwrap())
89                .region(self.s3_region.as_ref().unwrap())
90                .access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
91                .secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
92
93            if !root.is_empty() && root != "." {
94                config = config.root(root);
95            }
96
97            if let Some(endpoint) = &self.s3_endpoint {
98                config = config.endpoint(endpoint);
99            }
100            Ok(Some(
101                ObjectStore::new(config)
102                    .context(OpenDalSnafu)
103                    .map_err(BoxedError::new)?
104                    .finish(),
105            ))
106        }
107    }
108}
109
110/// Export metadata snapshot tool.
111/// This tool is used to export metadata snapshot from etcd, pg or mysql.
112/// It will dump the metadata snapshot to local file or s3 bucket.
113/// The snapshot file will be in binary format.
114#[derive(Debug, Default, Parser)]
115pub struct SaveCommand {
116    /// The store configuration.
117    #[clap(flatten)]
118    store: StoreConfig,
119    /// The s3 config.
120    #[clap(flatten)]
121    s3_config: S3Config,
122    /// The name of the target snapshot file. we will add the file extension automatically.
123    #[clap(long, default_value = "metadata_snapshot")]
124    file_name: String,
125    /// The directory to store the snapshot file.
126    /// if target output is s3 bucket, this is the root directory in the bucket.
127    /// if target output is local file, this is the local directory.
128    #[clap(long, default_value = "")]
129    output_dir: String,
130}
131
132fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
133    let root = if root.is_empty() { "." } else { root };
134    let object_store = ObjectStore::new(Fs::default().root(root))
135        .context(OpenDalSnafu)
136        .map_err(BoxedError::new)?
137        .finish();
138    Ok(object_store)
139}
140
141impl SaveCommand {
142    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
143        let kvbackend = self.store.build().await?;
144        let output_dir = &self.output_dir;
145        let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
146        if let Some(store) = object_store {
147            let tool = MetaSnapshotTool {
148                inner: MetadataSnapshotManager::new(kvbackend, store),
149                target_file: self.file_name.clone(),
150            };
151            Ok(Box::new(tool))
152        } else {
153            let object_store = create_local_file_object_store(output_dir)?;
154            let tool = MetaSnapshotTool {
155                inner: MetadataSnapshotManager::new(kvbackend, object_store),
156                target_file: self.file_name.clone(),
157            };
158            Ok(Box::new(tool))
159        }
160    }
161}
162
163struct MetaSnapshotTool {
164    inner: MetadataSnapshotManager,
165    target_file: String,
166}
167
168#[async_trait]
169impl Tool for MetaSnapshotTool {
170    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
171        self.inner
172            .dump("", &self.target_file)
173            .await
174            .map_err(BoxedError::new)?;
175        Ok(())
176    }
177}
178
179/// Restore metadata from a snapshot file.
180///
181/// This command restores the metadata state from a previously saved snapshot.
182/// The snapshot can be loaded from either a local file system or an S3 bucket,
183/// depending on the provided configuration.
184#[derive(Debug, Default, Parser)]
185pub struct RestoreCommand {
186    /// The store configuration.
187    #[clap(flatten)]
188    store: StoreConfig,
189    /// The s3 config.
190    #[clap(flatten)]
191    s3_config: S3Config,
192    /// The name of the target snapshot file.
193    #[clap(long, default_value = "metadata_snapshot.metadata.fb")]
194    file_name: String,
195    /// The directory to store the snapshot file.
196    #[clap(long, default_value = ".")]
197    input_dir: String,
198    #[clap(long, default_value = "false")]
199    force: bool,
200}
201
202impl RestoreCommand {
203    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
204        let kvbackend = self.store.build().await?;
205        let input_dir = &self.input_dir;
206        let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
207        if let Some(store) = object_store {
208            let tool = MetaRestoreTool::new(
209                MetadataSnapshotManager::new(kvbackend, store),
210                self.file_name.clone(),
211                self.force,
212            );
213            Ok(Box::new(tool))
214        } else {
215            let object_store = create_local_file_object_store(input_dir)?;
216            let tool = MetaRestoreTool::new(
217                MetadataSnapshotManager::new(kvbackend, object_store),
218                self.file_name.clone(),
219                self.force,
220            );
221            Ok(Box::new(tool))
222        }
223    }
224}
225
226struct MetaRestoreTool {
227    inner: MetadataSnapshotManager,
228    source_file: String,
229    force: bool,
230}
231
232impl MetaRestoreTool {
233    pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
234        Self {
235            inner,
236            source_file,
237            force,
238        }
239    }
240}
241
242#[async_trait]
243impl Tool for MetaRestoreTool {
244    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
245        let clean = self
246            .inner
247            .check_target_source_clean()
248            .await
249            .map_err(BoxedError::new)?;
250        if clean {
251            common_telemetry::info!(
252                "The target source is clean, we will restore the metadata snapshot."
253            );
254            self.inner
255                .restore(&self.source_file)
256                .await
257                .map_err(BoxedError::new)?;
258            Ok(())
259        } else if !self.force {
260            common_telemetry::warn!(
261                 "The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
262             );
263            Ok(())
264        } else {
265            common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
266            self.inner
267                .restore(&self.source_file)
268                .await
269                .map_err(BoxedError::new)?;
270            Ok(())
271        }
272    }
273}
274
275/// Explore metadata from a snapshot file.
276///
277/// This command allows filtering the metadata by a specific key and limiting the number of results.
278/// It prints the filtered metadata to the console.
279#[derive(Debug, Default, Parser)]
280pub struct InfoCommand {
281    /// The s3 config.
282    #[clap(flatten)]
283    s3_config: S3Config,
284    /// The name of the target snapshot file. we will add the file extension automatically.
285    #[clap(long, default_value = "metadata_snapshot")]
286    file_name: String,
287    /// The query string to filter the metadata.
288    #[clap(long, default_value = "*")]
289    inspect_key: String,
290    /// The limit of the metadata to query.
291    #[clap(long)]
292    limit: Option<usize>,
293}
294
295struct MetaInfoTool {
296    inner: ObjectStore,
297    source_file: String,
298    inspect_key: String,
299    limit: Option<usize>,
300}
301
302#[async_trait]
303impl Tool for MetaInfoTool {
304    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
305        let result = MetadataSnapshotManager::info(
306            &self.inner,
307            &self.source_file,
308            &self.inspect_key,
309            self.limit,
310        )
311        .await
312        .map_err(BoxedError::new)?;
313        for item in result {
314            println!("{}", item);
315        }
316        Ok(())
317    }
318}
319
320impl InfoCommand {
321    fn decide_object_store_root_for_local_store(
322        file_path: &str,
323    ) -> Result<(&str, &str), BoxedError> {
324        let path = Path::new(file_path);
325        let parent = path
326            .parent()
327            .and_then(|p| p.to_str())
328            .context(InvalidFilePathSnafu { msg: file_path })
329            .map_err(BoxedError::new)?;
330        let file_name = path
331            .file_name()
332            .and_then(|f| f.to_str())
333            .context(InvalidFilePathSnafu { msg: file_path })
334            .map_err(BoxedError::new)?;
335        let root = if parent.is_empty() { "." } else { parent };
336        Ok((root, file_name))
337    }
338
339    pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
340        let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
341        if let Some(store) = object_store {
342            let tool = MetaInfoTool {
343                inner: store,
344                source_file: self.file_name.clone(),
345                inspect_key: self.inspect_key.clone(),
346                limit: self.limit,
347            };
348            Ok(Box::new(tool))
349        } else {
350            let (root, file_name) =
351                Self::decide_object_store_root_for_local_store(&self.file_name)?;
352            let object_store = create_local_file_object_store(root)?;
353            let tool = MetaInfoTool {
354                inner: object_store,
355                source_file: file_name.to_string(),
356                inspect_key: self.inspect_key.clone(),
357                limit: self.limit,
358            };
359            Ok(Box::new(tool))
360        }
361    }
362}