1use async_trait::async_trait;
16use clap::{Parser, Subcommand};
17use common_error::ext::BoxedError;
18use common_meta::snapshot::MetadataSnapshotManager;
19use object_store::{ObjectStore, Scheme};
20
21use crate::Tool;
22use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store};
23use crate::utils::resolve_relative_path_with_current_dir;
24
25#[derive(Subcommand)]
27pub enum SnapshotCommand {
28 Save(SaveCommand),
30 Restore(RestoreCommand),
32 Info(InfoCommand),
34}
35
36impl SnapshotCommand {
37 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
38 match self {
39 SnapshotCommand::Save(cmd) => Ok(Box::new(cmd.build().await?)),
40 SnapshotCommand::Restore(cmd) => Ok(Box::new(cmd.build().await?)),
41 SnapshotCommand::Info(cmd) => Ok(Box::new(cmd.build().await?)),
42 }
43 }
44}
45
46#[derive(Debug, Default, Parser)]
51pub struct SaveCommand {
52 #[clap(flatten)]
54 store: StoreConfig,
55 #[clap(flatten)]
57 object_store: ObjectStoreConfig,
58 #[clap(
60 long,
61 default_value = "metadata_snapshot.metadata.fb",
62 alias = "file_name"
63 )]
64 file_path: String,
65 #[clap(long, default_value = "/", alias = "output_dir")]
67 dir: String,
68}
69
70impl SaveCommand {
71 async fn build(&self) -> Result<MetaSnapshotTool, BoxedError> {
72 let kvbackend = self.store.build().await?;
73 let (object_store, file_path) = build_object_store_and_resolve_file_path(
74 self.object_store.clone(),
75 &self.dir,
76 &self.file_path,
77 )?;
78 let tool = MetaSnapshotTool {
79 inner: MetadataSnapshotManager::new(kvbackend, object_store),
80 file_path,
81 };
82 Ok(tool)
83 }
84}
85
86struct MetaSnapshotTool {
87 inner: MetadataSnapshotManager,
88 file_path: String,
89}
90
91#[async_trait]
92impl Tool for MetaSnapshotTool {
93 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
94 self.inner
95 .dump(&self.file_path)
96 .await
97 .map_err(BoxedError::new)?;
98 Ok(())
99 }
100}
101
102#[derive(Debug, Default, Parser)]
108pub struct RestoreCommand {
109 #[clap(flatten)]
111 store: StoreConfig,
112 #[clap(flatten)]
114 object_store: ObjectStoreConfig,
115 #[clap(
117 long,
118 default_value = "metadata_snapshot.metadata.fb",
119 alias = "file_name"
120 )]
121 file_path: String,
122 #[clap(long, default_value = "/", alias = "input_dir")]
124 dir: String,
125 #[clap(long, default_value = "false")]
126 force: bool,
127}
128
129impl RestoreCommand {
130 async fn build(&self) -> Result<MetaRestoreTool, BoxedError> {
131 let kvbackend = self.store.build().await?;
132 let (object_store, file_path) = build_object_store_and_resolve_file_path(
133 self.object_store.clone(),
134 &self.dir,
135 &self.file_path,
136 )
137 .map_err(BoxedError::new)?;
138 let tool = MetaRestoreTool::new(
139 MetadataSnapshotManager::new(kvbackend, object_store),
140 file_path,
141 self.force,
142 );
143 Ok(tool)
144 }
145}
146
147struct MetaRestoreTool {
148 inner: MetadataSnapshotManager,
149 file_path: String,
150 force: bool,
151}
152
153impl MetaRestoreTool {
154 pub fn new(inner: MetadataSnapshotManager, file_path: String, force: bool) -> Self {
155 Self {
156 inner,
157 file_path,
158 force,
159 }
160 }
161}
162
163#[async_trait]
164impl Tool for MetaRestoreTool {
165 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
166 let clean = self
167 .inner
168 .check_target_source_clean()
169 .await
170 .map_err(BoxedError::new)?;
171 if clean {
172 common_telemetry::info!(
173 "The target source is clean, we will restore the metadata snapshot."
174 );
175 self.inner
176 .restore(&self.file_path)
177 .await
178 .map_err(BoxedError::new)?;
179 Ok(())
180 } else if !self.force {
181 common_telemetry::warn!(
182 "The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
183 );
184 Ok(())
185 } else {
186 common_telemetry::info!(
187 "The target source is not clean, We will restore the metadata snapshot with --force."
188 );
189 self.inner
190 .restore(&self.file_path)
191 .await
192 .map_err(BoxedError::new)?;
193 Ok(())
194 }
195 }
196}
197
198#[derive(Debug, Default, Parser)]
203pub struct InfoCommand {
204 #[clap(flatten)]
206 object_store: ObjectStoreConfig,
207 #[clap(
209 long,
210 default_value = "metadata_snapshot.metadata.fb",
211 alias = "file_name"
212 )]
213 file_path: String,
214 #[clap(long, default_value = "/", alias = "input_dir")]
216 dir: String,
217 #[clap(long, default_value = "*")]
219 inspect_key: String,
220 #[clap(long)]
222 limit: Option<usize>,
223}
224
225struct MetaInfoTool {
226 inner: ObjectStore,
227 file_path: String,
228 inspect_key: String,
229 limit: Option<usize>,
230}
231
232#[async_trait]
233impl Tool for MetaInfoTool {
234 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
235 let result = MetadataSnapshotManager::info(
236 &self.inner,
237 &self.file_path,
238 &self.inspect_key,
239 self.limit,
240 )
241 .await
242 .map_err(BoxedError::new)?;
243 for item in result {
244 println!("{}", item);
245 }
246 Ok(())
247 }
248}
249
250impl InfoCommand {
251 async fn build(&self) -> Result<MetaInfoTool, BoxedError> {
252 let (object_store, file_path) = build_object_store_and_resolve_file_path(
253 self.object_store.clone(),
254 &self.dir,
255 &self.file_path,
256 )?;
257 let tool = MetaInfoTool {
258 inner: object_store,
259 file_path,
260 inspect_key: self.inspect_key.clone(),
261 limit: self.limit,
262 };
263 Ok(tool)
264 }
265}
266
267fn build_object_store_and_resolve_file_path(
269 object_store: ObjectStoreConfig,
270 fs_root: &str,
271 file_path: &str,
272) -> Result<(ObjectStore, String), BoxedError> {
273 let object_store = object_store.build().map_err(BoxedError::new)?;
274 let object_store = match object_store {
275 Some(object_store) => object_store,
276 None => new_fs_object_store(fs_root)?,
277 };
278
279 let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) {
280 resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)?
281 } else {
282 file_path.to_string()
283 };
284
285 Ok((object_store, file_path))
286}
287
288#[cfg(test)]
289mod tests {
290 use std::env;
291
292 use clap::Parser;
293
294 use crate::metadata::snapshot::RestoreCommand;
295
296 #[tokio::test]
297 async fn test_cmd_resolve_file_path() {
298 common_telemetry::init_default_ut_logging();
299 let cmd = RestoreCommand::parse_from([
300 "",
301 "--file_name",
302 "metadata_snapshot.metadata.fb",
303 "--backend",
304 "memory-store",
305 "--store-addrs",
306 "memory://",
307 ]);
308 let tool = cmd.build().await.unwrap();
309 let current_dir = env::current_dir().unwrap();
310 let file_path = current_dir.join("metadata_snapshot.metadata.fb");
311 assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
312
313 let cmd = RestoreCommand::parse_from([
314 "",
315 "--file_name",
316 "metadata_snapshot.metadata.fb",
317 "--backend",
318 "memory-store",
319 "--store-addrs",
320 "memory://",
321 ]);
322 let tool = cmd.build().await.unwrap();
323 assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
324
325 let cmd = RestoreCommand::parse_from([
326 "",
327 "--file_name",
328 "metadata_snapshot.metadata.fb",
329 "--backend",
330 "memory-store",
331 "--store-addrs",
332 "memory://",
333 ]);
334 let tool = cmd.build().await.unwrap();
335 assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
336 }
337}