1use async_trait::async_trait;
16use clap::{Parser, Subcommand};
17use common_error::ext::BoxedError;
18use common_meta::snapshot::MetadataSnapshotManager;
19use object_store::{ObjectStore, Scheme};
20
21use crate::Tool;
22use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store};
23use crate::utils::resolve_relative_path_with_current_dir;
24
25#[derive(Subcommand)]
27pub enum SnapshotCommand {
28 Save(SaveCommand),
30 Restore(RestoreCommand),
32 Info(InfoCommand),
34}
35
36impl SnapshotCommand {
37 pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
38 match self {
39 SnapshotCommand::Save(cmd) => Ok(Box::new(cmd.build().await?)),
40 SnapshotCommand::Restore(cmd) => Ok(Box::new(cmd.build().await?)),
41 SnapshotCommand::Info(cmd) => Ok(Box::new(cmd.build().await?)),
42 }
43 }
44}
45
46#[derive(Debug, Default, Parser)]
51pub struct SaveCommand {
52 #[clap(flatten)]
54 store: StoreConfig,
55 #[clap(flatten)]
57 object_store: ObjectStoreConfig,
58 #[clap(
60 long,
61 default_value = "metadata_snapshot.metadata.fb",
62 alias = "file_name"
63 )]
64 file_path: String,
65 #[clap(long, default_value = "/", alias = "output_dir")]
67 dir: String,
68}
69
70impl SaveCommand {
71 async fn build(&self) -> Result<MetaSnapshotTool, BoxedError> {
72 let kvbackend = self.store.build().await?;
73 let (object_store, file_path) = build_object_store_and_resolve_file_path(
74 self.object_store.clone(),
75 &self.dir,
76 &self.file_path,
77 )?;
78 let tool = MetaSnapshotTool {
79 inner: MetadataSnapshotManager::new(kvbackend, object_store),
80 file_path,
81 };
82 Ok(tool)
83 }
84}
85
86struct MetaSnapshotTool {
87 inner: MetadataSnapshotManager,
88 file_path: String,
89}
90
91#[async_trait]
92impl Tool for MetaSnapshotTool {
93 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
94 self.inner
95 .dump(&self.file_path)
96 .await
97 .map_err(BoxedError::new)?;
98 Ok(())
99 }
100}
101
102#[derive(Debug, Default, Parser)]
108pub struct RestoreCommand {
109 #[clap(flatten)]
111 store: StoreConfig,
112 #[clap(flatten)]
114 object_store: ObjectStoreConfig,
115 #[clap(
117 long,
118 default_value = "metadata_snapshot.metadata.fb",
119 alias = "file_name"
120 )]
121 file_path: String,
122 #[clap(long, default_value = "/", alias = "input_dir")]
124 dir: String,
125 #[clap(long, default_value = "false")]
126 force: bool,
127}
128
129impl RestoreCommand {
130 async fn build(&self) -> Result<MetaRestoreTool, BoxedError> {
131 let kvbackend = self.store.build().await?;
132 let (object_store, file_path) = build_object_store_and_resolve_file_path(
133 self.object_store.clone(),
134 &self.dir,
135 &self.file_path,
136 )
137 .map_err(BoxedError::new)?;
138 let tool = MetaRestoreTool::new(
139 MetadataSnapshotManager::new(kvbackend, object_store),
140 file_path,
141 self.force,
142 );
143 Ok(tool)
144 }
145}
146
147struct MetaRestoreTool {
148 inner: MetadataSnapshotManager,
149 file_path: String,
150 force: bool,
151}
152
153impl MetaRestoreTool {
154 pub fn new(inner: MetadataSnapshotManager, file_path: String, force: bool) -> Self {
155 Self {
156 inner,
157 file_path,
158 force,
159 }
160 }
161}
162
163#[async_trait]
164impl Tool for MetaRestoreTool {
165 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
166 let clean = self
167 .inner
168 .check_target_source_clean()
169 .await
170 .map_err(BoxedError::new)?;
171 if clean {
172 common_telemetry::info!(
173 "The target source is clean, we will restore the metadata snapshot."
174 );
175 self.inner
176 .restore(&self.file_path)
177 .await
178 .map_err(BoxedError::new)?;
179 Ok(())
180 } else if !self.force {
181 common_telemetry::warn!(
182 "The target source is not clean, if you want to restore the metadata snapshot forcefully, please use --force option."
183 );
184 Ok(())
185 } else {
186 common_telemetry::info!(
187 "The target source is not clean, We will restore the metadata snapshot with --force."
188 );
189 self.inner
190 .restore(&self.file_path)
191 .await
192 .map_err(BoxedError::new)?;
193 Ok(())
194 }
195 }
196}
197
198#[derive(Debug, Default, Parser)]
203pub struct InfoCommand {
204 #[clap(flatten)]
206 object_store: ObjectStoreConfig,
207 #[clap(
209 long,
210 default_value = "metadata_snapshot.metadata.fb",
211 alias = "file_name"
212 )]
213 file_path: String,
214 #[clap(long, default_value = "/", alias = "input_dir")]
216 dir: String,
217 #[clap(long, default_value = "*")]
219 inspect_key: String,
220 #[clap(long)]
222 limit: Option<usize>,
223}
224
225struct MetaInfoTool {
226 inner: ObjectStore,
227 file_path: String,
228 inspect_key: String,
229 limit: Option<usize>,
230}
231
232#[async_trait]
233impl Tool for MetaInfoTool {
234 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
235 let result = MetadataSnapshotManager::info(
236 &self.inner,
237 &self.file_path,
238 &self.inspect_key,
239 self.limit,
240 )
241 .await
242 .map_err(BoxedError::new)?;
243 for item in result {
244 println!("{}", item);
245 }
246 Ok(())
247 }
248}
249
250impl InfoCommand {
251 async fn build(&self) -> Result<MetaInfoTool, BoxedError> {
252 let (object_store, file_path) = build_object_store_and_resolve_file_path(
253 self.object_store.clone(),
254 &self.dir,
255 &self.file_path,
256 )?;
257 let tool = MetaInfoTool {
258 inner: object_store,
259 file_path,
260 inspect_key: self.inspect_key.clone(),
261 limit: self.limit,
262 };
263 Ok(tool)
264 }
265}
266
267fn build_object_store_and_resolve_file_path(
269 object_store: ObjectStoreConfig,
270 fs_root: &str,
271 file_path: &str,
272) -> Result<(ObjectStore, String), BoxedError> {
273 let object_store = object_store.build().map_err(BoxedError::new)?;
274 let object_store = match object_store {
275 Some(object_store) => object_store,
276 None => new_fs_object_store(fs_root)?,
277 };
278
279 let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) {
280 resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)?
281 } else {
282 file_path.to_string()
283 };
284
285 Ok((object_store, file_path))
286}
287
288#[cfg(test)]
289mod tests {
290 use std::env;
291 use std::path::Path;
292 use std::sync::Arc;
293 use std::time::Duration;
294
295 use clap::Parser;
296 use common_meta::kv_backend::KvBackend;
297 use common_meta::kv_backend::memory::MemoryKvBackend;
298 use common_meta::rpc::store::PutRequest;
299 use object_store::ObjectStore;
300
301 use super::*;
302 use crate::metadata::snapshot::RestoreCommand;
303
304 fn create_raftengine_url(path: &std::path::Path) -> String {
305 let mut path = path.to_string_lossy().replace('\\', "/");
306 if !path.starts_with('/') {
307 path = format!("/{}", path);
308 }
309 format!("raftengine://{}", path)
310 }
311
312 #[tokio::test]
313 async fn test_cmd_resolve_file_path() {
314 common_telemetry::init_default_ut_logging();
315 let cmd = RestoreCommand::parse_from([
316 "",
317 "--file_name",
318 "metadata_snapshot.metadata.fb",
319 "--backend",
320 "memory-store",
321 "--store-addrs",
322 "memory://",
323 ]);
324 let tool = cmd.build().await.unwrap();
325 let current_dir = env::current_dir().unwrap();
326 let file_path = current_dir.join("metadata_snapshot.metadata.fb");
327 assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
328
329 let cmd = RestoreCommand::parse_from([
330 "",
331 "--file_name",
332 "metadata_snapshot.metadata.fb",
333 "--backend",
334 "memory-store",
335 "--store-addrs",
336 "memory://",
337 ]);
338 let tool = cmd.build().await.unwrap();
339 assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
340
341 let cmd = RestoreCommand::parse_from([
342 "",
343 "--file_name",
344 "metadata_snapshot.metadata.fb",
345 "--backend",
346 "memory-store",
347 "--store-addrs",
348 "memory://",
349 ]);
350 let tool = cmd.build().await.unwrap();
351 assert_eq!(tool.file_path, file_path.to_string_lossy().to_string());
352 }
353
354 async fn setup_backup_file(object_store: ObjectStore, file_path: &str) {
355 let kv_backend = Arc::new(MemoryKvBackend::default());
356 let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
357 kv_backend
359 .put(
360 PutRequest::new()
361 .with_key(b"test".to_vec())
362 .with_value(b"test".to_vec()),
363 )
364 .await
365 .unwrap();
366 manager.dump(file_path).await.unwrap();
367 }
368
369 #[tokio::test]
370 async fn test_restore_raft_engine_store() {
371 common_telemetry::init_default_ut_logging();
372 let temp_dir = tempfile::tempdir().unwrap();
373 let root = temp_dir.path().display().to_string();
374 let object_store = new_fs_object_store(&root).unwrap();
375 setup_backup_file(object_store, "/backup/metadata_snapshot.metadata.fb").await;
376 let metadata_path = temp_dir.path().join("metadata");
377 {
378 let cmd = RestoreCommand::parse_from([
379 "",
380 "--file_name",
381 temp_dir
382 .path()
383 .join("backup")
384 .join("metadata_snapshot.metadata.fb")
385 .to_str()
386 .unwrap(),
387 "--backend",
388 "raft-engine-store",
389 "--store-addrs",
390 &create_raftengine_url(&metadata_path),
391 ]);
392 let tool = cmd.build().await.unwrap();
393 tool.do_work().await.unwrap();
394 }
395 tokio::time::sleep(Duration::from_secs(1)).await;
397 let kv = standalone::build_metadata_kvbackend(
398 metadata_path.display().to_string(),
399 Default::default(),
400 )
401 .unwrap();
402
403 let value = kv.get(b"test").await.unwrap().unwrap().value;
404 assert_eq!(value, b"test");
405 }
406
407 #[tokio::test]
408 async fn test_save_raft_engine_store() {
409 common_telemetry::init_default_ut_logging();
410 let temp_dir = tempfile::tempdir().unwrap();
411 let root = temp_dir.path().display().to_string();
412 let metadata_path = temp_dir.path().join("metadata");
413 {
414 let kv = standalone::build_metadata_kvbackend(
415 metadata_path.to_string_lossy().to_string(),
416 Default::default(),
417 )
418 .unwrap();
419 kv.put(
420 PutRequest::new()
421 .with_key(b"test".to_vec())
422 .with_value(b"test".to_vec()),
423 )
424 .await
425 .unwrap();
426 }
427 tokio::time::sleep(Duration::from_secs(1)).await;
429 {
430 let cmd = SaveCommand::parse_from([
431 "",
432 "--file_name",
433 temp_dir
434 .path()
435 .join("backup")
436 .join("metadata_snapshot.metadata.fb")
437 .to_str()
438 .unwrap(),
439 "--backend",
440 "raft-engine-store",
441 "--store-addrs",
442 &create_raftengine_url(&metadata_path),
443 ]);
444 let tool = cmd.build().await.unwrap();
445 tool.do_work().await.unwrap();
446 }
447
448 let object_store = new_fs_object_store(&root).unwrap();
450 let kv_backend = Arc::new(MemoryKvBackend::default());
451 let manager = MetadataSnapshotManager::new(kv_backend.clone(), object_store);
452 manager
453 .restore("/backup/metadata_snapshot.metadata.fb")
454 .await
455 .unwrap();
456 let value = kv_backend.get(b"test").await.unwrap().unwrap().value;
457 assert_eq!(value, b"test");
458 }
459
460 #[test]
461 fn test_path() {
462 let path = "C:\\Users\\user\\AppData\\Local\\Temp\\.tmpuPiVuB\\metadata";
463 let path = Path::new(path);
464 let url = create_raftengine_url(path);
465 assert_eq!(
466 url,
467 "raftengine:///C:/Users/user/AppData/Local/Temp/.tmpuPiVuB/metadata"
468 );
469 let url = url::Url::parse(&url).unwrap();
470 assert_eq!(
471 url.path(),
472 "/C:/Users/user/AppData/Local/Temp/.tmpuPiVuB/metadata"
473 );
474 }
475}