operator/statement/
copy_table_to.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use client::OutputData;
19use common_base::readable_size::ReadableSize;
20use common_datasource::file_format::Format;
21use common_datasource::file_format::csv::stream_to_csv;
22use common_datasource::file_format::json::stream_to_json;
23use common_datasource::file_format::parquet::stream_to_parquet;
24use common_datasource::object_store::{build_backend, parse_url};
25use common_datasource::util::find_dir_and_filename;
26use common_query::Output;
27use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
28use common_recordbatch::{
29    RecordBatchStream, SendableRecordBatchMapper, SendableRecordBatchStream,
30    map_json_type_to_string, map_json_type_to_string_schema,
31};
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::DefaultTableSource;
34use datafusion_common::TableReference as DfTableReference;
35use datafusion_expr::LogicalPlanBuilder;
36use object_store::ObjectStore;
37use session::context::QueryContextRef;
38use snafu::{OptionExt, ResultExt};
39use table::requests::CopyTableRequest;
40use table::table::adapter::DfTableProviderAdapter;
41use table::table_reference::TableReference;
42
43use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
44use crate::statement::StatementExecutor;
45
46// The buffer size should be greater than 5MB (minimum multipart upload size).
47/// Buffer size to flush data to object stores.
48const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
49
50/// Default number of concurrent write, it only works on object store backend(e.g., S3).
51const WRITE_CONCURRENCY: usize = 8;
52
53impl StatementExecutor {
54    async fn stream_to_file(
55        &self,
56        stream: SendableRecordBatchStream,
57        format: &Format,
58        object_store: ObjectStore,
59        path: &str,
60    ) -> Result<usize> {
61        let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
62
63        let stream = Box::pin(SendableRecordBatchMapper::new(
64            stream,
65            map_json_type_to_string,
66            map_json_type_to_string_schema,
67        ));
68        match format {
69            Format::Csv(format) => stream_to_csv(
70                Box::pin(DfRecordBatchStreamAdapter::new(stream)),
71                object_store,
72                path,
73                threshold,
74                WRITE_CONCURRENCY,
75                format,
76            )
77            .await
78            .context(error::WriteStreamToFileSnafu { path }),
79            Format::Json(format) => stream_to_json(
80                Box::pin(DfRecordBatchStreamAdapter::new(stream)),
81                object_store,
82                path,
83                threshold,
84                WRITE_CONCURRENCY,
85                format,
86            )
87            .await
88            .context(error::WriteStreamToFileSnafu { path }),
89            Format::Parquet(_) => {
90                let schema = stream.schema();
91                stream_to_parquet(
92                    Box::pin(DfRecordBatchStreamAdapter::new(stream)),
93                    schema,
94                    object_store,
95                    path,
96                    WRITE_CONCURRENCY,
97                )
98                .await
99                .context(error::WriteStreamToFileSnafu { path })
100            }
101            _ => error::UnsupportedFormatSnafu {
102                format: format.clone(),
103            }
104            .fail(),
105        }
106    }
107
108    #[tracing::instrument(skip_all)]
109    pub(crate) async fn copy_table_to(
110        &self,
111        req: CopyTableRequest,
112        query_ctx: QueryContextRef,
113    ) -> Result<usize> {
114        let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
115        let table = self.get_table(&table_ref).await?;
116        let table_id = table.table_info().table_id();
117        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
118
119        let df_table_ref = DfTableReference::from(table_ref);
120
121        let filters = table
122            .schema()
123            .timestamp_column()
124            .and_then(|c| {
125                common_query::logical_plan::build_filter_from_timestamp(
126                    &c.name,
127                    req.timestamp_range.as_ref(),
128                )
129            })
130            .into_iter()
131            .collect::<Vec<_>>();
132
133        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
134        let table_source = Arc::new(DefaultTableSource::new(table_provider));
135
136        let mut builder = LogicalPlanBuilder::scan_with_filters(
137            df_table_ref,
138            table_source,
139            None,
140            filters.clone(),
141        )
142        .context(BuildDfLogicalPlanSnafu)?;
143        for f in filters {
144            builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
145        }
146        let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
147
148        let output = self
149            .query_engine
150            .execute(plan, query_ctx)
151            .await
152            .context(ExecLogicalPlanSnafu)?;
153
154        let CopyTableRequest {
155            location,
156            connection,
157            ..
158        } = &req;
159
160        debug!("Copy table: {table_id} to location: {location}");
161        self.copy_to_file(&format, output, location, connection)
162            .await
163    }
164
165    pub(crate) async fn copy_to_file(
166        &self,
167        format: &Format,
168        output: Output,
169        location: &str,
170        connection: &HashMap<String, String>,
171    ) -> Result<usize> {
172        let stream = match output.data {
173            OutputData::Stream(stream) => stream,
174            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
175            _ => unreachable!(),
176        };
177
178        let (_schema, _host, path) = parse_url(location).context(error::ParseUrlSnafu)?;
179        let (_, filename) = find_dir_and_filename(&path);
180        let filename = filename.context(error::UnexpectedSnafu {
181            violated: format!("Expected filename, path: {path}"),
182        })?;
183        let object_store = build_backend(location, connection).context(error::BuildBackendSnafu)?;
184        self.stream_to_file(stream, format, object_store, &filename)
185            .await
186    }
187}