1use std::collections::HashSet;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, ValueEnum};
22use common_error::ext::BoxedError;
23use common_telemetry::{debug, error, info};
24use opendal::layers::LoggingLayer;
25use opendal::{services, Operator};
26use serde_json::Value;
27use snafu::{OptionExt, ResultExt};
28use tokio::sync::Semaphore;
29use tokio::time::Instant;
30
31use crate::database::{parse_proxy_opts, DatabaseClient};
32use crate::error::{
33 EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
34 SchemaNotFoundSnafu,
35};
36use crate::{database, Tool};
37
38type TableReference = (String, String, String);
39
40#[derive(Debug, Default, Clone, ValueEnum)]
41enum ExportTarget {
42 Schema,
44 Data,
46 #[default]
48 All,
49}
50
51#[derive(Debug, Default, Parser)]
52pub struct ExportCommand {
53 #[clap(long)]
55 addr: String,
56
57 #[clap(long)]
60 output_dir: Option<String>,
61
62 #[clap(long, default_value = "greptime-*")]
64 database: String,
65
66 #[clap(long, short = 'j', default_value = "1")]
68 export_jobs: usize,
69
70 #[clap(long, default_value = "3")]
72 max_retry: usize,
73
74 #[clap(long, short = 't', value_enum, default_value = "all")]
76 target: ExportTarget,
77
78 #[clap(long)]
81 start_time: Option<String>,
82
83 #[clap(long)]
86 end_time: Option<String>,
87
88 #[clap(long)]
90 auth_basic: Option<String>,
91
92 #[clap(long, value_parser = humantime::parse_duration)]
97 timeout: Option<Duration>,
98
99 #[clap(long)]
103 proxy: Option<String>,
104
105 #[clap(long)]
107 no_proxy: bool,
108
109 #[clap(long)]
111 s3: bool,
112
113 #[clap(long)]
121 s3_ddl_local_dir: Option<String>,
122
123 #[clap(long)]
126 s3_bucket: Option<String>,
127
128 #[clap(long)]
131 s3_root: Option<String>,
132
133 #[clap(long)]
136 s3_endpoint: Option<String>,
137
138 #[clap(long)]
141 s3_access_key: Option<String>,
142
143 #[clap(long)]
146 s3_secret_key: Option<String>,
147
148 #[clap(long)]
151 s3_region: Option<String>,
152}
153
154impl ExportCommand {
155 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
156 if self.s3
157 && (self.s3_bucket.is_none()
158 || self.s3_endpoint.is_none()
159 || self.s3_access_key.is_none()
160 || self.s3_secret_key.is_none()
161 || self.s3_region.is_none())
162 {
163 return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
164 }
165 if !self.s3 && self.output_dir.is_none() {
166 return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
167 }
168 let (catalog, schema) =
169 database::split_database(&self.database).map_err(BoxedError::new)?;
170 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
171 let database_client = DatabaseClient::new(
172 self.addr.clone(),
173 catalog.clone(),
174 self.auth_basic.clone(),
175 self.timeout.unwrap_or_default(),
177 proxy,
178 );
179
180 Ok(Box::new(Export {
181 catalog,
182 schema,
183 database_client,
184 output_dir: self.output_dir.clone(),
185 parallelism: self.export_jobs,
186 target: self.target.clone(),
187 start_time: self.start_time.clone(),
188 end_time: self.end_time.clone(),
189 s3: self.s3,
190 s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
191 s3_bucket: self.s3_bucket.clone(),
192 s3_root: self.s3_root.clone(),
193 s3_endpoint: self.s3_endpoint.clone(),
194 s3_access_key: self.s3_access_key.clone(),
195 s3_secret_key: self.s3_secret_key.clone(),
196 s3_region: self.s3_region.clone(),
197 }))
198 }
199}
200
201#[derive(Clone)]
202pub struct Export {
203 catalog: String,
204 schema: Option<String>,
205 database_client: DatabaseClient,
206 output_dir: Option<String>,
207 parallelism: usize,
208 target: ExportTarget,
209 start_time: Option<String>,
210 end_time: Option<String>,
211 s3: bool,
212 s3_ddl_local_dir: Option<String>,
213 s3_bucket: Option<String>,
214 s3_root: Option<String>,
215 s3_endpoint: Option<String>,
216 s3_access_key: Option<String>,
217 s3_secret_key: Option<String>,
218 s3_region: Option<String>,
219}
220
221impl Export {
222 fn catalog_path(&self) -> PathBuf {
223 if self.s3 {
224 PathBuf::from(&self.catalog)
225 } else if let Some(dir) = &self.output_dir {
226 PathBuf::from(dir).join(&self.catalog)
227 } else {
228 unreachable!("catalog_path: output_dir must be set when not using s3")
229 }
230 }
231
232 async fn get_db_names(&self) -> Result<Vec<String>> {
233 let db_names = self.all_db_names().await?;
234 let Some(schema) = &self.schema else {
235 return Ok(db_names);
236 };
237
238 db_names
240 .into_iter()
241 .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
242 .map(|name| vec![name])
243 .context(SchemaNotFoundSnafu {
244 catalog: &self.catalog,
245 schema,
246 })
247 }
248
249 async fn all_db_names(&self) -> Result<Vec<String>> {
251 let records = self
252 .database_client
253 .sql_in_public("SHOW DATABASES")
254 .await?
255 .context(EmptyResultSnafu)?;
256 let mut result = Vec::with_capacity(records.len());
257 for value in records {
258 let Value::String(schema) = &value[0] else {
259 unreachable!()
260 };
261 if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
262 continue;
263 }
264 if schema == common_catalog::consts::PG_CATALOG_NAME {
265 continue;
266 }
267 result.push(schema.clone());
268 }
269 Ok(result)
270 }
271
272 async fn get_table_list(
275 &self,
276 catalog: &str,
277 schema: &str,
278 ) -> Result<(
279 Vec<TableReference>,
280 Vec<TableReference>,
281 Vec<TableReference>,
282 )> {
283 let sql = format!(
285 "SELECT table_catalog, table_schema, table_name \
286 FROM information_schema.columns \
287 WHERE column_name = '__tsid' \
288 and table_catalog = \'{catalog}\' \
289 and table_schema = \'{schema}\'"
290 );
291 let records = self
292 .database_client
293 .sql_in_public(&sql)
294 .await?
295 .context(EmptyResultSnafu)?;
296 let mut metric_physical_tables = HashSet::with_capacity(records.len());
297 for value in records {
298 let mut t = Vec::with_capacity(3);
299 for v in &value {
300 let Value::String(value) = v else {
301 unreachable!()
302 };
303 t.push(value);
304 }
305 metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
306 }
307
308 let sql = format!(
309 "SELECT table_catalog, table_schema, table_name, table_type \
310 FROM information_schema.tables \
311 WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
312 and table_catalog = \'{catalog}\' \
313 and table_schema = \'{schema}\'",
314 );
315 let records = self
316 .database_client
317 .sql_in_public(&sql)
318 .await?
319 .context(EmptyResultSnafu)?;
320
321 debug!("Fetched table/view list: {:?}", records);
322
323 if records.is_empty() {
324 return Ok((vec![], vec![], vec![]));
325 }
326
327 let mut remaining_tables = Vec::with_capacity(records.len());
328 let mut views = Vec::new();
329 for value in records {
330 let mut t = Vec::with_capacity(4);
331 for v in &value {
332 let Value::String(value) = v else {
333 unreachable!()
334 };
335 t.push(value);
336 }
337 let table = (t[0].clone(), t[1].clone(), t[2].clone());
338 let table_type = t[3].as_str();
339 if !metric_physical_tables.contains(&table) {
341 if table_type == "VIEW" {
342 views.push(table);
343 } else {
344 remaining_tables.push(table);
345 }
346 }
347 }
348
349 Ok((
350 metric_physical_tables.into_iter().collect(),
351 remaining_tables,
352 views,
353 ))
354 }
355
356 async fn show_create(
357 &self,
358 show_type: &str,
359 catalog: &str,
360 schema: &str,
361 table: Option<&str>,
362 ) -> Result<String> {
363 let sql = match table {
364 Some(table) => format!(
365 r#"SHOW CREATE {} "{}"."{}"."{}""#,
366 show_type, catalog, schema, table
367 ),
368 None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
369 };
370 let records = self
371 .database_client
372 .sql_in_public(&sql)
373 .await?
374 .context(EmptyResultSnafu)?;
375 let Value::String(create) = &records[0][1] else {
376 unreachable!()
377 };
378
379 Ok(format!("{};\n", create))
380 }
381
382 async fn export_create_database(&self) -> Result<()> {
383 let timer = Instant::now();
384 let db_names = self.get_db_names().await?;
385 let db_count = db_names.len();
386 let operator = self.build_prefer_fs_operator().await?;
387
388 for schema in db_names {
389 let create_database = self
390 .show_create("DATABASE", &self.catalog, &schema, None)
391 .await?;
392
393 let file_path = self.get_file_path(&schema, "create_database.sql");
394 self.write_to_storage(&operator, &file_path, create_database.into_bytes())
395 .await?;
396
397 info!(
398 "Exported {}.{} database creation SQL to {}",
399 self.catalog,
400 schema,
401 self.format_output_path(&file_path)
402 );
403 }
404
405 let elapsed = timer.elapsed();
406 info!("Success {db_count} jobs, cost: {elapsed:?}");
407
408 Ok(())
409 }
410
411 async fn export_create_table(&self) -> Result<()> {
412 let timer = Instant::now();
413 let semaphore = Arc::new(Semaphore::new(self.parallelism));
414 let db_names = self.get_db_names().await?;
415 let db_count = db_names.len();
416 let operator = Arc::new(self.build_prefer_fs_operator().await?);
417 let mut tasks = Vec::with_capacity(db_names.len());
418
419 for schema in db_names {
420 let semaphore_moved = semaphore.clone();
421 let export_self = self.clone();
422 let operator = operator.clone();
423 tasks.push(async move {
424 let _permit = semaphore_moved.acquire().await.unwrap();
425 let (metric_physical_tables, remaining_tables, views) = export_self
426 .get_table_list(&export_self.catalog, &schema)
427 .await?;
428
429 if !export_self.s3 {
431 let db_dir = format!("{}/{}/", export_self.catalog, schema);
432 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
433 }
434
435 let file_path = export_self.get_file_path(&schema, "create_tables.sql");
436 let mut content = Vec::new();
437
438 for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
440 let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
441 content.extend_from_slice(create_table.as_bytes());
442 }
443
444 for (c, s, v) in &views {
446 let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
447 content.extend_from_slice(create_view.as_bytes());
448 }
449
450 export_self
452 .write_to_storage(&operator, &file_path, content)
453 .await?;
454
455 info!(
456 "Finished exporting {}.{schema} with {} table schemas to path: {}",
457 export_self.catalog,
458 metric_physical_tables.len() + remaining_tables.len() + views.len(),
459 export_self.format_output_path(&file_path)
460 );
461
462 Ok::<(), Error>(())
463 });
464 }
465
466 let success = self.execute_tasks(tasks).await;
467 let elapsed = timer.elapsed();
468 info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
469
470 Ok(())
471 }
472
473 async fn build_operator(&self) -> Result<Operator> {
474 if self.s3 {
475 self.build_s3_operator().await
476 } else {
477 self.build_fs_operator().await
478 }
479 }
480
481 async fn build_prefer_fs_operator(&self) -> Result<Operator> {
483 if self.s3 && self.s3_ddl_local_dir.is_some() {
485 let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
486 let op = Operator::new(services::Fs::default().root(&root))
487 .context(OpenDalSnafu)?
488 .layer(LoggingLayer::default())
489 .finish();
490 Ok(op)
491 } else if self.s3 {
492 self.build_s3_operator().await
493 } else {
494 self.build_fs_operator().await
495 }
496 }
497
498 async fn build_s3_operator(&self) -> Result<Operator> {
499 let mut builder = services::S3::default().bucket(
500 self.s3_bucket
501 .as_ref()
502 .expect("s3_bucket must be provided when s3 is enabled"),
503 );
504
505 if let Some(root) = self.s3_root.as_ref() {
506 builder = builder.root(root);
507 }
508
509 if let Some(endpoint) = self.s3_endpoint.as_ref() {
510 builder = builder.endpoint(endpoint);
511 }
512
513 if let Some(region) = self.s3_region.as_ref() {
514 builder = builder.region(region);
515 }
516
517 if let Some(key_id) = self.s3_access_key.as_ref() {
518 builder = builder.access_key_id(key_id);
519 }
520
521 if let Some(secret_key) = self.s3_secret_key.as_ref() {
522 builder = builder.secret_access_key(secret_key);
523 }
524
525 let op = Operator::new(builder)
526 .context(OpenDalSnafu)?
527 .layer(LoggingLayer::default())
528 .finish();
529 Ok(op)
530 }
531
532 async fn build_fs_operator(&self) -> Result<Operator> {
533 let root = self
534 .output_dir
535 .as_ref()
536 .context(OutputDirNotSetSnafu)?
537 .clone();
538 let op = Operator::new(services::Fs::default().root(&root))
539 .context(OpenDalSnafu)?
540 .layer(LoggingLayer::default())
541 .finish();
542 Ok(op)
543 }
544
545 async fn export_database_data(&self) -> Result<()> {
546 let timer = Instant::now();
547 let semaphore = Arc::new(Semaphore::new(self.parallelism));
548 let db_names = self.get_db_names().await?;
549 let db_count = db_names.len();
550 let mut tasks = Vec::with_capacity(db_count);
551 let operator = Arc::new(self.build_operator().await?);
552 let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
553 let with_options = build_with_options(&self.start_time, &self.end_time);
554
555 for schema in db_names {
556 let semaphore_moved = semaphore.clone();
557 let export_self = self.clone();
558 let with_options_clone = with_options.clone();
559 let operator = operator.clone();
560 let fs_first_operator = fs_first_operator.clone();
561
562 tasks.push(async move {
563 let _permit = semaphore_moved.acquire().await.unwrap();
564
565 if !export_self.s3 {
567 let db_dir = format!("{}/{}/", export_self.catalog, schema);
568 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
569 }
570
571 let (path, connection_part) = export_self.get_storage_params(&schema);
572
573 let sql = format!(
575 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
576 export_self.catalog, schema, path, with_options_clone, connection_part
577 );
578 info!("Executing sql: {sql}");
579 export_self.database_client.sql_in_public(&sql).await?;
580 info!(
581 "Finished exporting {}.{} data to {}",
582 export_self.catalog, schema, path
583 );
584
585 let copy_database_from_sql = format!(
587 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
588 export_self.catalog, schema, path, with_options_clone, connection_part
589 );
590
591 let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
592 export_self
593 .write_to_storage(
594 &fs_first_operator,
595 ©_from_path,
596 copy_database_from_sql.into_bytes(),
597 )
598 .await?;
599
600 info!(
601 "Finished exporting {}.{} copy_from.sql to {}",
602 export_self.catalog,
603 schema,
604 export_self.format_output_path(©_from_path)
605 );
606
607 Ok::<(), Error>(())
608 });
609 }
610
611 let success = self.execute_tasks(tasks).await;
612 let elapsed = timer.elapsed();
613 info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
614
615 Ok(())
616 }
617
618 fn get_file_path(&self, schema: &str, file_name: &str) -> String {
619 format!("{}/{}/{}", self.catalog, schema, file_name)
620 }
621
622 fn format_output_path(&self, file_path: &str) -> String {
623 if self.s3 {
624 format!(
625 "s3://{}{}/{}",
626 self.s3_bucket.as_ref().unwrap_or(&String::new()),
627 if let Some(root) = &self.s3_root {
628 format!("/{}", root)
629 } else {
630 String::new()
631 },
632 file_path
633 )
634 } else {
635 format!(
636 "{}/{}",
637 self.output_dir.as_ref().unwrap_or(&String::new()),
638 file_path
639 )
640 }
641 }
642
643 async fn write_to_storage(
644 &self,
645 op: &Operator,
646 file_path: &str,
647 content: Vec<u8>,
648 ) -> Result<()> {
649 op.write(file_path, content).await.context(OpenDalSnafu)
650 }
651
652 fn get_storage_params(&self, schema: &str) -> (String, String) {
653 if self.s3 {
654 let s3_path = format!(
655 "s3://{}{}/{}/{}/",
656 self.s3_bucket.as_ref().unwrap(),
658 if let Some(root) = &self.s3_root {
659 format!("/{}", root)
660 } else {
661 String::new()
662 },
663 self.catalog,
664 schema
665 );
666
667 let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
669 format!(", ENDPOINT='{}'", endpoint)
670 } else {
671 String::new()
672 };
673
674 let connection_options = format!(
676 "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
677 self.s3_access_key.as_ref().unwrap(),
678 self.s3_secret_key.as_ref().unwrap(),
679 self.s3_region.as_ref().unwrap(),
680 endpoint_option
681 );
682
683 (s3_path, format!(" CONNECTION ({})", connection_options))
684 } else {
685 (
686 self.catalog_path()
687 .join(format!("{schema}/"))
688 .to_string_lossy()
689 .to_string(),
690 String::new(),
691 )
692 }
693 }
694
695 async fn execute_tasks(
696 &self,
697 tasks: Vec<impl std::future::Future<Output = Result<()>>>,
698 ) -> usize {
699 futures::future::join_all(tasks)
700 .await
701 .into_iter()
702 .filter(|r| match r {
703 Ok(_) => true,
704 Err(e) => {
705 error!(e; "export job failed");
706 false
707 }
708 })
709 .count()
710 }
711}
712
713#[async_trait]
714impl Tool for Export {
715 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
716 match self.target {
717 ExportTarget::Schema => {
718 self.export_create_database()
719 .await
720 .map_err(BoxedError::new)?;
721 self.export_create_table().await.map_err(BoxedError::new)
722 }
723 ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
724 ExportTarget::All => {
725 self.export_create_database()
726 .await
727 .map_err(BoxedError::new)?;
728 self.export_create_table().await.map_err(BoxedError::new)?;
729 self.export_database_data().await.map_err(BoxedError::new)
730 }
731 }
732 }
733}
734
735fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
737 let mut options = vec!["format = 'parquet'".to_string()];
738 if let Some(start) = start_time {
739 options.push(format!("start_time = '{}'", start));
740 }
741 if let Some(end) = end_time {
742 options.push(format!("end_time = '{}'", end));
743 }
744 options.join(", ")
745}