1use std::path::Path;
16
17use async_trait::async_trait;
18use clap::{Parser, Subcommand};
19use common_base::secrets::{ExposeSecret, SecretString};
20use common_error::ext::BoxedError;
21use common_meta::snapshot::MetadataSnapshotManager;
22use object_store::services::{Fs, S3};
23use object_store::ObjectStore;
24use snafu::{OptionExt, ResultExt};
25
26use crate::error::{InvalidFilePathSnafu, OpenDalSnafu, S3ConfigNotSetSnafu};
27use crate::metadata::common::StoreConfig;
28use crate::Tool;
29
30#[derive(Subcommand)]
32pub enum SnapshotCommand {
33 Save(SaveCommand),
35 Restore(RestoreCommand),
37 Info(InfoCommand),
39}
40
41impl SnapshotCommand {
42 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
43 match self {
44 SnapshotCommand::Save(cmd) => cmd.build().await,
45 SnapshotCommand::Restore(cmd) => cmd.build().await,
46 SnapshotCommand::Info(cmd) => cmd.build().await,
47 }
48 }
49}
50
51#[derive(Debug, Default, Parser)]
53struct S3Config {
54 #[clap(long, default_value = "false")]
56 s3: bool,
57 #[clap(long)]
59 s3_bucket: Option<String>,
60 #[clap(long)]
62 s3_region: Option<String>,
63 #[clap(long)]
65 s3_access_key: Option<SecretString>,
66 #[clap(long)]
68 s3_secret_key: Option<SecretString>,
69 #[clap(long)]
71 s3_endpoint: Option<String>,
72}
73
74impl S3Config {
75 pub fn build(&self, root: &str) -> Result<Option<ObjectStore>, BoxedError> {
76 if !self.s3 {
77 Ok(None)
78 } else {
79 if self.s3_region.is_none()
80 || self.s3_access_key.is_none()
81 || self.s3_secret_key.is_none()
82 || self.s3_bucket.is_none()
83 {
84 return S3ConfigNotSetSnafu.fail().map_err(BoxedError::new);
85 }
86 let mut config = S3::default()
88 .bucket(self.s3_bucket.as_ref().unwrap())
89 .region(self.s3_region.as_ref().unwrap())
90 .access_key_id(self.s3_access_key.as_ref().unwrap().expose_secret())
91 .secret_access_key(self.s3_secret_key.as_ref().unwrap().expose_secret());
92
93 if !root.is_empty() && root != "." {
94 config = config.root(root);
95 }
96
97 if let Some(endpoint) = &self.s3_endpoint {
98 config = config.endpoint(endpoint);
99 }
100 Ok(Some(
101 ObjectStore::new(config)
102 .context(OpenDalSnafu)
103 .map_err(BoxedError::new)?
104 .finish(),
105 ))
106 }
107 }
108}
109
110#[derive(Debug, Default, Parser)]
115pub struct SaveCommand {
116 #[clap(flatten)]
118 store: StoreConfig,
119 #[clap(flatten)]
121 s3_config: S3Config,
122 #[clap(long, default_value = "metadata_snapshot")]
124 file_name: String,
125 #[clap(long, default_value = "")]
129 output_dir: String,
130}
131
132fn create_local_file_object_store(root: &str) -> Result<ObjectStore, BoxedError> {
133 let root = if root.is_empty() { "." } else { root };
134 let object_store = ObjectStore::new(Fs::default().root(root))
135 .context(OpenDalSnafu)
136 .map_err(BoxedError::new)?
137 .finish();
138 Ok(object_store)
139}
140
141impl SaveCommand {
142 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
143 let kvbackend = self.store.build().await?;
144 let output_dir = &self.output_dir;
145 let object_store = self.s3_config.build(output_dir).map_err(BoxedError::new)?;
146 if let Some(store) = object_store {
147 let tool = MetaSnapshotTool {
148 inner: MetadataSnapshotManager::new(kvbackend, store),
149 target_file: self.file_name.clone(),
150 };
151 Ok(Box::new(tool))
152 } else {
153 let object_store = create_local_file_object_store(output_dir)?;
154 let tool = MetaSnapshotTool {
155 inner: MetadataSnapshotManager::new(kvbackend, object_store),
156 target_file: self.file_name.clone(),
157 };
158 Ok(Box::new(tool))
159 }
160 }
161}
162
163struct MetaSnapshotTool {
164 inner: MetadataSnapshotManager,
165 target_file: String,
166}
167
168#[async_trait]
169impl Tool for MetaSnapshotTool {
170 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
171 self.inner
172 .dump("", &self.target_file)
173 .await
174 .map_err(BoxedError::new)?;
175 Ok(())
176 }
177}
178
179#[derive(Debug, Default, Parser)]
185pub struct RestoreCommand {
186 #[clap(flatten)]
188 store: StoreConfig,
189 #[clap(flatten)]
191 s3_config: S3Config,
192 #[clap(long, default_value = "metadata_snapshot.metadata.fb")]
194 file_name: String,
195 #[clap(long, default_value = ".")]
197 input_dir: String,
198 #[clap(long, default_value = "false")]
199 force: bool,
200}
201
202impl RestoreCommand {
203 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
204 let kvbackend = self.store.build().await?;
205 let input_dir = &self.input_dir;
206 let object_store = self.s3_config.build(input_dir).map_err(BoxedError::new)?;
207 if let Some(store) = object_store {
208 let tool = MetaRestoreTool::new(
209 MetadataSnapshotManager::new(kvbackend, store),
210 self.file_name.clone(),
211 self.force,
212 );
213 Ok(Box::new(tool))
214 } else {
215 let object_store = create_local_file_object_store(input_dir)?;
216 let tool = MetaRestoreTool::new(
217 MetadataSnapshotManager::new(kvbackend, object_store),
218 self.file_name.clone(),
219 self.force,
220 );
221 Ok(Box::new(tool))
222 }
223 }
224}
225
226struct MetaRestoreTool {
227 inner: MetadataSnapshotManager,
228 source_file: String,
229 force: bool,
230}
231
232impl MetaRestoreTool {
233 pub fn new(inner: MetadataSnapshotManager, source_file: String, force: bool) -> Self {
234 Self {
235 inner,
236 source_file,
237 force,
238 }
239 }
240}
241
242#[async_trait]
243impl Tool for MetaRestoreTool {
244 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
245 let clean = self
246 .inner
247 .check_target_source_clean()
248 .await
249 .map_err(BoxedError::new)?;
250 if clean {
251 common_telemetry::info!(
252 "The target source is clean, we will restore the metadata snapshot."
253 );
254 self.inner
255 .restore(&self.source_file)
256 .await
257 .map_err(BoxedError::new)?;
258 Ok(())
259 } else if !self.force {
260 common_telemetry::warn!(
261 "The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
262 );
263 Ok(())
264 } else {
265 common_telemetry::info!("The target source is not clean, We will restore the metadata snapshot with --force.");
266 self.inner
267 .restore(&self.source_file)
268 .await
269 .map_err(BoxedError::new)?;
270 Ok(())
271 }
272 }
273}
274
275#[derive(Debug, Default, Parser)]
280pub struct InfoCommand {
281 #[clap(flatten)]
283 s3_config: S3Config,
284 #[clap(long, default_value = "metadata_snapshot")]
286 file_name: String,
287 #[clap(long, default_value = "*")]
289 inspect_key: String,
290 #[clap(long)]
292 limit: Option<usize>,
293}
294
295struct MetaInfoTool {
296 inner: ObjectStore,
297 source_file: String,
298 inspect_key: String,
299 limit: Option<usize>,
300}
301
302#[async_trait]
303impl Tool for MetaInfoTool {
304 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
305 let result = MetadataSnapshotManager::info(
306 &self.inner,
307 &self.source_file,
308 &self.inspect_key,
309 self.limit,
310 )
311 .await
312 .map_err(BoxedError::new)?;
313 for item in result {
314 println!("{}", item);
315 }
316 Ok(())
317 }
318}
319
320impl InfoCommand {
321 fn decide_object_store_root_for_local_store(
322 file_path: &str,
323 ) -> Result<(&str, &str), BoxedError> {
324 let path = Path::new(file_path);
325 let parent = path
326 .parent()
327 .and_then(|p| p.to_str())
328 .context(InvalidFilePathSnafu { msg: file_path })
329 .map_err(BoxedError::new)?;
330 let file_name = path
331 .file_name()
332 .and_then(|f| f.to_str())
333 .context(InvalidFilePathSnafu { msg: file_path })
334 .map_err(BoxedError::new)?;
335 let root = if parent.is_empty() { "." } else { parent };
336 Ok((root, file_name))
337 }
338
339 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
340 let object_store = self.s3_config.build("").map_err(BoxedError::new)?;
341 if let Some(store) = object_store {
342 let tool = MetaInfoTool {
343 inner: store,
344 source_file: self.file_name.clone(),
345 inspect_key: self.inspect_key.clone(),
346 limit: self.limit,
347 };
348 Ok(Box::new(tool))
349 } else {
350 let (root, file_name) =
351 Self::decide_object_store_root_for_local_store(&self.file_name)?;
352 let object_store = create_local_file_object_store(root)?;
353 let tool = MetaInfoTool {
354 inner: object_store,
355 source_file: file_name.to_string(),
356 inspect_key: self.inspect_key.clone(),
357 limit: self.limit,
358 };
359 Ok(Box::new(tool))
360 }
361 }
362}