operator/statement/
copy_table_to.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use client::OutputData;
19use common_base::readable_size::ReadableSize;
20use common_datasource::file_format::csv::stream_to_csv;
21use common_datasource::file_format::json::stream_to_json;
22use common_datasource::file_format::parquet::stream_to_parquet;
23use common_datasource::file_format::Format;
24use common_datasource::object_store::{build_backend, parse_url};
25use common_datasource::util::find_dir_and_filename;
26use common_query::Output;
27use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
28use common_recordbatch::SendableRecordBatchStream;
29use common_telemetry::{debug, tracing};
30use datafusion::datasource::DefaultTableSource;
31use datafusion_common::TableReference as DfTableReference;
32use datafusion_expr::LogicalPlanBuilder;
33use object_store::ObjectStore;
34use session::context::QueryContextRef;
35use snafu::{OptionExt, ResultExt};
36use table::requests::CopyTableRequest;
37use table::table::adapter::DfTableProviderAdapter;
38use table::table_reference::TableReference;
39
40use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
41use crate::statement::StatementExecutor;
42
43// The buffer size should be greater than 5MB (minimum multipart upload size).
44/// Buffer size to flush data to object stores.
45const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
46
47/// Default number of concurrent write, it only works on object store backend(e.g., S3).
48const WRITE_CONCURRENCY: usize = 8;
49
50impl StatementExecutor {
51    async fn stream_to_file(
52        &self,
53        stream: SendableRecordBatchStream,
54        format: &Format,
55        object_store: ObjectStore,
56        path: &str,
57    ) -> Result<usize> {
58        let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
59
60        match format {
61            Format::Csv(_) => stream_to_csv(
62                Box::pin(DfRecordBatchStreamAdapter::new(stream)),
63                object_store,
64                path,
65                threshold,
66                WRITE_CONCURRENCY,
67            )
68            .await
69            .context(error::WriteStreamToFileSnafu { path }),
70            Format::Json(_) => stream_to_json(
71                Box::pin(DfRecordBatchStreamAdapter::new(stream)),
72                object_store,
73                path,
74                threshold,
75                WRITE_CONCURRENCY,
76            )
77            .await
78            .context(error::WriteStreamToFileSnafu { path }),
79            Format::Parquet(_) => {
80                let schema = stream.schema();
81                stream_to_parquet(
82                    Box::pin(DfRecordBatchStreamAdapter::new(stream)),
83                    schema,
84                    object_store,
85                    path,
86                    WRITE_CONCURRENCY,
87                )
88                .await
89                .context(error::WriteStreamToFileSnafu { path })
90            }
91            _ => error::UnsupportedFormatSnafu { format: *format }.fail(),
92        }
93    }
94
95    #[tracing::instrument(skip_all)]
96    pub(crate) async fn copy_table_to(
97        &self,
98        req: CopyTableRequest,
99        query_ctx: QueryContextRef,
100    ) -> Result<usize> {
101        let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
102        let table = self.get_table(&table_ref).await?;
103        let table_id = table.table_info().table_id();
104        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
105
106        let df_table_ref = DfTableReference::from(table_ref);
107
108        let filters = table
109            .schema()
110            .timestamp_column()
111            .and_then(|c| {
112                common_query::logical_plan::build_filter_from_timestamp(
113                    &c.name,
114                    req.timestamp_range.as_ref(),
115                )
116            })
117            .into_iter()
118            .collect::<Vec<_>>();
119
120        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
121        let table_source = Arc::new(DefaultTableSource::new(table_provider));
122
123        let mut builder = LogicalPlanBuilder::scan_with_filters(
124            df_table_ref,
125            table_source,
126            None,
127            filters.clone(),
128        )
129        .context(BuildDfLogicalPlanSnafu)?;
130        for f in filters {
131            builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
132        }
133        let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
134
135        let output = self
136            .query_engine
137            .execute(plan, query_ctx)
138            .await
139            .context(ExecLogicalPlanSnafu)?;
140
141        let CopyTableRequest {
142            location,
143            connection,
144            ..
145        } = &req;
146
147        debug!("Copy table: {table_id} to location: {location}");
148        self.copy_to_file(&format, output, location, connection)
149            .await
150    }
151
152    pub(crate) async fn copy_to_file(
153        &self,
154        format: &Format,
155        output: Output,
156        location: &str,
157        connection: &HashMap<String, String>,
158    ) -> Result<usize> {
159        let stream = match output.data {
160            OutputData::Stream(stream) => stream,
161            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
162            _ => unreachable!(),
163        };
164
165        let (_schema, _host, path) = parse_url(location).context(error::ParseUrlSnafu)?;
166        let (_, filename) = find_dir_and_filename(&path);
167        let filename = filename.context(error::UnexpectedSnafu {
168            violated: format!("Expected filename, path: {path}"),
169        })?;
170        let object_store = build_backend(location, connection).context(error::BuildBackendSnafu)?;
171        self.stream_to_file(stream, format, object_store, &filename)
172            .await
173    }
174}