cli/
export.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, ValueEnum};
22use common_error::ext::BoxedError;
23use common_telemetry::{debug, error, info};
24use opendal::layers::LoggingLayer;
25use opendal::{services, Operator};
26use serde_json::Value;
27use snafu::{OptionExt, ResultExt};
28use tokio::sync::Semaphore;
29use tokio::time::Instant;
30
31use crate::database::{parse_proxy_opts, DatabaseClient};
32use crate::error::{
33    EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, S3ConfigNotSetSnafu,
34    SchemaNotFoundSnafu,
35};
36use crate::{database, Tool};
37
38type TableReference = (String, String, String);
39
40#[derive(Debug, Default, Clone, ValueEnum)]
41enum ExportTarget {
42    /// Export all table schemas, corresponding to `SHOW CREATE TABLE`.
43    Schema,
44    /// Export all table data, corresponding to `COPY DATABASE TO`.
45    Data,
46    /// Export all table schemas and data at once.
47    #[default]
48    All,
49}
50
51#[derive(Debug, Default, Parser)]
52pub struct ExportCommand {
53    /// Server address to connect
54    #[clap(long)]
55    addr: String,
56
57    /// Directory to put the exported data. E.g.: /tmp/greptimedb-export
58    /// for local export.
59    #[clap(long)]
60    output_dir: Option<String>,
61
62    /// The name of the catalog to export.
63    #[clap(long, default_value = "greptime-*")]
64    database: String,
65
66    /// Parallelism of the export.
67    #[clap(long, short = 'j', default_value = "1")]
68    export_jobs: usize,
69
70    /// Max retry times for each job.
71    #[clap(long, default_value = "3")]
72    max_retry: usize,
73
74    /// Things to export
75    #[clap(long, short = 't', value_enum, default_value = "all")]
76    target: ExportTarget,
77
78    /// A half-open time range: [start_time, end_time).
79    /// The start of the time range (time-index column) for data export.
80    #[clap(long)]
81    start_time: Option<String>,
82
83    /// A half-open time range: [start_time, end_time).
84    /// The end of the time range (time-index column) for data export.
85    #[clap(long)]
86    end_time: Option<String>,
87
88    /// The basic authentication for connecting to the server
89    #[clap(long)]
90    auth_basic: Option<String>,
91
92    /// The timeout of invoking the database.
93    ///
94    /// It is used to override the server-side timeout setting.
95    /// The default behavior will disable server-side default timeout(i.e. `0s`).
96    #[clap(long, value_parser = humantime::parse_duration)]
97    timeout: Option<Duration>,
98
99    /// The proxy server address to connect, if set, will override the system proxy.
100    ///
101    /// The default behavior will use the system proxy if neither `proxy` nor `no_proxy` is set.
102    #[clap(long)]
103    proxy: Option<String>,
104
105    /// Disable proxy server, if set, will not use any proxy.
106    #[clap(long)]
107    no_proxy: bool,
108
109    /// if export data to s3
110    #[clap(long)]
111    s3: bool,
112
113    /// if both `s3_ddl_local_dir` and `s3` are set, `s3_ddl_local_dir` will be only used for
114    /// exported SQL files, and the data will be exported to s3.
115    ///
116    /// Note that `s3_ddl_local_dir` export sql files to **LOCAL** file system, this is useful if export client don't have
117    /// direct access to s3.
118    ///
119    /// if `s3` is set but `s3_ddl_local_dir` is not set, both SQL&data will be exported to s3.
120    #[clap(long)]
121    s3_ddl_local_dir: Option<String>,
122
123    /// The s3 bucket name
124    /// if s3 is set, this is required
125    #[clap(long)]
126    s3_bucket: Option<String>,
127
128    // The s3 root path
129    /// if s3 is set, this is required
130    #[clap(long)]
131    s3_root: Option<String>,
132
133    /// The s3 endpoint
134    /// if s3 is set, this is required
135    #[clap(long)]
136    s3_endpoint: Option<String>,
137
138    /// The s3 access key
139    /// if s3 is set, this is required
140    #[clap(long)]
141    s3_access_key: Option<String>,
142
143    /// The s3 secret key
144    /// if s3 is set, this is required
145    #[clap(long)]
146    s3_secret_key: Option<String>,
147
148    /// The s3 region
149    /// if s3 is set, this is required
150    #[clap(long)]
151    s3_region: Option<String>,
152}
153
154impl ExportCommand {
155    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
156        if self.s3
157            && (self.s3_bucket.is_none()
158                || self.s3_endpoint.is_none()
159                || self.s3_access_key.is_none()
160                || self.s3_secret_key.is_none()
161                || self.s3_region.is_none())
162        {
163            return Err(BoxedError::new(S3ConfigNotSetSnafu {}.build()));
164        }
165        if !self.s3 && self.output_dir.is_none() {
166            return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
167        }
168        let (catalog, schema) =
169            database::split_database(&self.database).map_err(BoxedError::new)?;
170        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
171        let database_client = DatabaseClient::new(
172            self.addr.clone(),
173            catalog.clone(),
174            self.auth_basic.clone(),
175            // Treats `None` as `0s` to disable server-side default timeout.
176            self.timeout.unwrap_or_default(),
177            proxy,
178        );
179
180        Ok(Box::new(Export {
181            catalog,
182            schema,
183            database_client,
184            output_dir: self.output_dir.clone(),
185            parallelism: self.export_jobs,
186            target: self.target.clone(),
187            start_time: self.start_time.clone(),
188            end_time: self.end_time.clone(),
189            s3: self.s3,
190            s3_ddl_local_dir: self.s3_ddl_local_dir.clone(),
191            s3_bucket: self.s3_bucket.clone(),
192            s3_root: self.s3_root.clone(),
193            s3_endpoint: self.s3_endpoint.clone(),
194            s3_access_key: self.s3_access_key.clone(),
195            s3_secret_key: self.s3_secret_key.clone(),
196            s3_region: self.s3_region.clone(),
197        }))
198    }
199}
200
201#[derive(Clone)]
202pub struct Export {
203    catalog: String,
204    schema: Option<String>,
205    database_client: DatabaseClient,
206    output_dir: Option<String>,
207    parallelism: usize,
208    target: ExportTarget,
209    start_time: Option<String>,
210    end_time: Option<String>,
211    s3: bool,
212    s3_ddl_local_dir: Option<String>,
213    s3_bucket: Option<String>,
214    s3_root: Option<String>,
215    s3_endpoint: Option<String>,
216    s3_access_key: Option<String>,
217    s3_secret_key: Option<String>,
218    s3_region: Option<String>,
219}
220
221impl Export {
222    fn catalog_path(&self) -> PathBuf {
223        if self.s3 {
224            PathBuf::from(&self.catalog)
225        } else if let Some(dir) = &self.output_dir {
226            PathBuf::from(dir).join(&self.catalog)
227        } else {
228            unreachable!("catalog_path: output_dir must be set when not using s3")
229        }
230    }
231
232    async fn get_db_names(&self) -> Result<Vec<String>> {
233        let db_names = self.all_db_names().await?;
234        let Some(schema) = &self.schema else {
235            return Ok(db_names);
236        };
237
238        // Check if the schema exists
239        db_names
240            .into_iter()
241            .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
242            .map(|name| vec![name])
243            .context(SchemaNotFoundSnafu {
244                catalog: &self.catalog,
245                schema,
246            })
247    }
248
249    /// Iterate over all db names.
250    async fn all_db_names(&self) -> Result<Vec<String>> {
251        let records = self
252            .database_client
253            .sql_in_public("SHOW DATABASES")
254            .await?
255            .context(EmptyResultSnafu)?;
256        let mut result = Vec::with_capacity(records.len());
257        for value in records {
258            let Value::String(schema) = &value[0] else {
259                unreachable!()
260            };
261            if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
262                continue;
263            }
264            if schema == common_catalog::consts::PG_CATALOG_NAME {
265                continue;
266            }
267            result.push(schema.clone());
268        }
269        Ok(result)
270    }
271
272    /// Return a list of [`TableReference`] to be exported.
273    /// Includes all tables under the given `catalog` and `schema`.
274    async fn get_table_list(
275        &self,
276        catalog: &str,
277        schema: &str,
278    ) -> Result<(
279        Vec<TableReference>,
280        Vec<TableReference>,
281        Vec<TableReference>,
282    )> {
283        // Puts all metric table first
284        let sql = format!(
285            "SELECT table_catalog, table_schema, table_name \
286            FROM information_schema.columns \
287            WHERE column_name = '__tsid' \
288                and table_catalog = \'{catalog}\' \
289                and table_schema = \'{schema}\'"
290        );
291        let records = self
292            .database_client
293            .sql_in_public(&sql)
294            .await?
295            .context(EmptyResultSnafu)?;
296        let mut metric_physical_tables = HashSet::with_capacity(records.len());
297        for value in records {
298            let mut t = Vec::with_capacity(3);
299            for v in &value {
300                let Value::String(value) = v else {
301                    unreachable!()
302                };
303                t.push(value);
304            }
305            metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
306        }
307
308        let sql = format!(
309            "SELECT table_catalog, table_schema, table_name, table_type \
310            FROM information_schema.tables \
311            WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
312                and table_catalog = \'{catalog}\' \
313                and table_schema = \'{schema}\'",
314        );
315        let records = self
316            .database_client
317            .sql_in_public(&sql)
318            .await?
319            .context(EmptyResultSnafu)?;
320
321        debug!("Fetched table/view list: {:?}", records);
322
323        if records.is_empty() {
324            return Ok((vec![], vec![], vec![]));
325        }
326
327        let mut remaining_tables = Vec::with_capacity(records.len());
328        let mut views = Vec::new();
329        for value in records {
330            let mut t = Vec::with_capacity(4);
331            for v in &value {
332                let Value::String(value) = v else {
333                    unreachable!()
334                };
335                t.push(value);
336            }
337            let table = (t[0].clone(), t[1].clone(), t[2].clone());
338            let table_type = t[3].as_str();
339            // Ignores the physical table
340            if !metric_physical_tables.contains(&table) {
341                if table_type == "VIEW" {
342                    views.push(table);
343                } else {
344                    remaining_tables.push(table);
345                }
346            }
347        }
348
349        Ok((
350            metric_physical_tables.into_iter().collect(),
351            remaining_tables,
352            views,
353        ))
354    }
355
356    async fn show_create(
357        &self,
358        show_type: &str,
359        catalog: &str,
360        schema: &str,
361        table: Option<&str>,
362    ) -> Result<String> {
363        let sql = match table {
364            Some(table) => format!(
365                r#"SHOW CREATE {} "{}"."{}"."{}""#,
366                show_type, catalog, schema, table
367            ),
368            None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
369        };
370        let records = self
371            .database_client
372            .sql_in_public(&sql)
373            .await?
374            .context(EmptyResultSnafu)?;
375        let Value::String(create) = &records[0][1] else {
376            unreachable!()
377        };
378
379        Ok(format!("{};\n", create))
380    }
381
382    async fn export_create_database(&self) -> Result<()> {
383        let timer = Instant::now();
384        let db_names = self.get_db_names().await?;
385        let db_count = db_names.len();
386        let operator = self.build_prefer_fs_operator().await?;
387
388        for schema in db_names {
389            let create_database = self
390                .show_create("DATABASE", &self.catalog, &schema, None)
391                .await?;
392
393            let file_path = self.get_file_path(&schema, "create_database.sql");
394            self.write_to_storage(&operator, &file_path, create_database.into_bytes())
395                .await?;
396
397            info!(
398                "Exported {}.{} database creation SQL to {}",
399                self.catalog,
400                schema,
401                self.format_output_path(&file_path)
402            );
403        }
404
405        let elapsed = timer.elapsed();
406        info!("Success {db_count} jobs, cost: {elapsed:?}");
407
408        Ok(())
409    }
410
411    async fn export_create_table(&self) -> Result<()> {
412        let timer = Instant::now();
413        let semaphore = Arc::new(Semaphore::new(self.parallelism));
414        let db_names = self.get_db_names().await?;
415        let db_count = db_names.len();
416        let operator = Arc::new(self.build_prefer_fs_operator().await?);
417        let mut tasks = Vec::with_capacity(db_names.len());
418
419        for schema in db_names {
420            let semaphore_moved = semaphore.clone();
421            let export_self = self.clone();
422            let operator = operator.clone();
423            tasks.push(async move {
424                let _permit = semaphore_moved.acquire().await.unwrap();
425                let (metric_physical_tables, remaining_tables, views) = export_self
426                    .get_table_list(&export_self.catalog, &schema)
427                    .await?;
428
429                // Create directory if needed for file system storage
430                if !export_self.s3 {
431                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
432                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
433                }
434
435                let file_path = export_self.get_file_path(&schema, "create_tables.sql");
436                let mut content = Vec::new();
437
438                // Add table creation SQL
439                for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
440                    let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
441                    content.extend_from_slice(create_table.as_bytes());
442                }
443
444                // Add view creation SQL
445                for (c, s, v) in &views {
446                    let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
447                    content.extend_from_slice(create_view.as_bytes());
448                }
449
450                // Write to storage
451                export_self
452                    .write_to_storage(&operator, &file_path, content)
453                    .await?;
454
455                info!(
456                    "Finished exporting {}.{schema} with {} table schemas to path: {}",
457                    export_self.catalog,
458                    metric_physical_tables.len() + remaining_tables.len() + views.len(),
459                    export_self.format_output_path(&file_path)
460                );
461
462                Ok::<(), Error>(())
463            });
464        }
465
466        let success = self.execute_tasks(tasks).await;
467        let elapsed = timer.elapsed();
468        info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
469
470        Ok(())
471    }
472
473    async fn build_operator(&self) -> Result<Operator> {
474        if self.s3 {
475            self.build_s3_operator().await
476        } else {
477            self.build_fs_operator().await
478        }
479    }
480
481    /// build operator with preference for file system
482    async fn build_prefer_fs_operator(&self) -> Result<Operator> {
483        // is under s3 mode and s3_ddl_dir is set, use it as root
484        if self.s3 && self.s3_ddl_local_dir.is_some() {
485            let root = self.s3_ddl_local_dir.as_ref().unwrap().clone();
486            let op = Operator::new(services::Fs::default().root(&root))
487                .context(OpenDalSnafu)?
488                .layer(LoggingLayer::default())
489                .finish();
490            Ok(op)
491        } else if self.s3 {
492            self.build_s3_operator().await
493        } else {
494            self.build_fs_operator().await
495        }
496    }
497
498    async fn build_s3_operator(&self) -> Result<Operator> {
499        let mut builder = services::S3::default().bucket(
500            self.s3_bucket
501                .as_ref()
502                .expect("s3_bucket must be provided when s3 is enabled"),
503        );
504
505        if let Some(root) = self.s3_root.as_ref() {
506            builder = builder.root(root);
507        }
508
509        if let Some(endpoint) = self.s3_endpoint.as_ref() {
510            builder = builder.endpoint(endpoint);
511        }
512
513        if let Some(region) = self.s3_region.as_ref() {
514            builder = builder.region(region);
515        }
516
517        if let Some(key_id) = self.s3_access_key.as_ref() {
518            builder = builder.access_key_id(key_id);
519        }
520
521        if let Some(secret_key) = self.s3_secret_key.as_ref() {
522            builder = builder.secret_access_key(secret_key);
523        }
524
525        let op = Operator::new(builder)
526            .context(OpenDalSnafu)?
527            .layer(LoggingLayer::default())
528            .finish();
529        Ok(op)
530    }
531
532    async fn build_fs_operator(&self) -> Result<Operator> {
533        let root = self
534            .output_dir
535            .as_ref()
536            .context(OutputDirNotSetSnafu)?
537            .clone();
538        let op = Operator::new(services::Fs::default().root(&root))
539            .context(OpenDalSnafu)?
540            .layer(LoggingLayer::default())
541            .finish();
542        Ok(op)
543    }
544
545    async fn export_database_data(&self) -> Result<()> {
546        let timer = Instant::now();
547        let semaphore = Arc::new(Semaphore::new(self.parallelism));
548        let db_names = self.get_db_names().await?;
549        let db_count = db_names.len();
550        let mut tasks = Vec::with_capacity(db_count);
551        let operator = Arc::new(self.build_operator().await?);
552        let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
553        let with_options = build_with_options(&self.start_time, &self.end_time);
554
555        for schema in db_names {
556            let semaphore_moved = semaphore.clone();
557            let export_self = self.clone();
558            let with_options_clone = with_options.clone();
559            let operator = operator.clone();
560            let fs_first_operator = fs_first_operator.clone();
561
562            tasks.push(async move {
563                let _permit = semaphore_moved.acquire().await.unwrap();
564
565                // Create directory if not using S3
566                if !export_self.s3 {
567                    let db_dir = format!("{}/{}/", export_self.catalog, schema);
568                    operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
569                }
570
571                let (path, connection_part) = export_self.get_storage_params(&schema);
572
573                // Execute COPY DATABASE TO command
574                let sql = format!(
575                    r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
576                    export_self.catalog, schema, path, with_options_clone, connection_part
577                );
578                info!("Executing sql: {sql}");
579                export_self.database_client.sql_in_public(&sql).await?;
580                info!(
581                    "Finished exporting {}.{} data to {}",
582                    export_self.catalog, schema, path
583                );
584
585                // Create copy_from.sql file
586                let copy_database_from_sql = format!(
587                    r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
588                    export_self.catalog, schema, path, with_options_clone, connection_part
589                );
590
591                let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
592                export_self
593                    .write_to_storage(
594                        &fs_first_operator,
595                        &copy_from_path,
596                        copy_database_from_sql.into_bytes(),
597                    )
598                    .await?;
599
600                info!(
601                    "Finished exporting {}.{} copy_from.sql to {}",
602                    export_self.catalog,
603                    schema,
604                    export_self.format_output_path(&copy_from_path)
605                );
606
607                Ok::<(), Error>(())
608            });
609        }
610
611        let success = self.execute_tasks(tasks).await;
612        let elapsed = timer.elapsed();
613        info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
614
615        Ok(())
616    }
617
618    fn get_file_path(&self, schema: &str, file_name: &str) -> String {
619        format!("{}/{}/{}", self.catalog, schema, file_name)
620    }
621
622    fn format_output_path(&self, file_path: &str) -> String {
623        if self.s3 {
624            format!(
625                "s3://{}{}/{}",
626                self.s3_bucket.as_ref().unwrap_or(&String::new()),
627                if let Some(root) = &self.s3_root {
628                    format!("/{}", root)
629                } else {
630                    String::new()
631                },
632                file_path
633            )
634        } else {
635            format!(
636                "{}/{}",
637                self.output_dir.as_ref().unwrap_or(&String::new()),
638                file_path
639            )
640        }
641    }
642
643    async fn write_to_storage(
644        &self,
645        op: &Operator,
646        file_path: &str,
647        content: Vec<u8>,
648    ) -> Result<()> {
649        op.write(file_path, content).await.context(OpenDalSnafu)
650    }
651
652    fn get_storage_params(&self, schema: &str) -> (String, String) {
653        if self.s3 {
654            let s3_path = format!(
655                "s3://{}{}/{}/{}/",
656                // Safety: s3_bucket is required when s3 is enabled
657                self.s3_bucket.as_ref().unwrap(),
658                if let Some(root) = &self.s3_root {
659                    format!("/{}", root)
660                } else {
661                    String::new()
662                },
663                self.catalog,
664                schema
665            );
666
667            // endpoint is optional
668            let endpoint_option = if let Some(endpoint) = self.s3_endpoint.as_ref() {
669                format!(", ENDPOINT='{}'", endpoint)
670            } else {
671                String::new()
672            };
673
674            // Safety: All s3 options are required
675            let connection_options = format!(
676                "ACCESS_KEY_ID='{}', SECRET_ACCESS_KEY='{}', REGION='{}'{}",
677                self.s3_access_key.as_ref().unwrap(),
678                self.s3_secret_key.as_ref().unwrap(),
679                self.s3_region.as_ref().unwrap(),
680                endpoint_option
681            );
682
683            (s3_path, format!(" CONNECTION ({})", connection_options))
684        } else {
685            (
686                self.catalog_path()
687                    .join(format!("{schema}/"))
688                    .to_string_lossy()
689                    .to_string(),
690                String::new(),
691            )
692        }
693    }
694
695    async fn execute_tasks(
696        &self,
697        tasks: Vec<impl std::future::Future<Output = Result<()>>>,
698    ) -> usize {
699        futures::future::join_all(tasks)
700            .await
701            .into_iter()
702            .filter(|r| match r {
703                Ok(_) => true,
704                Err(e) => {
705                    error!(e; "export job failed");
706                    false
707                }
708            })
709            .count()
710    }
711}
712
713#[async_trait]
714impl Tool for Export {
715    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
716        match self.target {
717            ExportTarget::Schema => {
718                self.export_create_database()
719                    .await
720                    .map_err(BoxedError::new)?;
721                self.export_create_table().await.map_err(BoxedError::new)
722            }
723            ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
724            ExportTarget::All => {
725                self.export_create_database()
726                    .await
727                    .map_err(BoxedError::new)?;
728                self.export_create_table().await.map_err(BoxedError::new)?;
729                self.export_database_data().await.map_err(BoxedError::new)
730            }
731        }
732    }
733}
734
735/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
736fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
737    let mut options = vec!["format = 'parquet'".to_string()];
738    if let Some(start) = start_time {
739        options.push(format!("start_time = '{}'", start));
740    }
741    if let Some(end) = end_time {
742        options.push(format!("end_time = '{}'", end));
743    }
744    options.join(", ")
745}