operator/statement/
copy_table_to.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use client::OutputData;
19use common_base::readable_size::ReadableSize;
20use common_datasource::file_format::Format;
21use common_datasource::file_format::csv::stream_to_csv;
22use common_datasource::file_format::json::stream_to_json;
23use common_datasource::file_format::parquet::stream_to_parquet;
24use common_datasource::object_store::{build_backend, parse_url};
25use common_datasource::util::find_dir_and_filename;
26use common_query::Output;
27use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
28use common_recordbatch::{
29 RecordBatchStream, SendableRecordBatchMapper, SendableRecordBatchStream,
30 map_json_type_to_string, map_json_type_to_string_schema,
31};
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::DefaultTableSource;
34use datafusion_common::TableReference as DfTableReference;
35use datafusion_expr::LogicalPlanBuilder;
36use object_store::ObjectStore;
37use session::context::QueryContextRef;
38use snafu::{OptionExt, ResultExt};
39use table::requests::CopyTableRequest;
40use table::table::adapter::DfTableProviderAdapter;
41use table::table_reference::TableReference;
42
43use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
44use crate::statement::StatementExecutor;
45
46const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
49
50const WRITE_CONCURRENCY: usize = 8;
52
53impl StatementExecutor {
54 async fn stream_to_file(
55 &self,
56 stream: SendableRecordBatchStream,
57 format: &Format,
58 object_store: ObjectStore,
59 path: &str,
60 ) -> Result<usize> {
61 let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
62
63 let stream = Box::pin(SendableRecordBatchMapper::new(
64 stream,
65 map_json_type_to_string,
66 map_json_type_to_string_schema,
67 ));
68 match format {
69 Format::Csv(format) => stream_to_csv(
70 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
71 object_store,
72 path,
73 threshold,
74 WRITE_CONCURRENCY,
75 format,
76 )
77 .await
78 .context(error::WriteStreamToFileSnafu { path }),
79 Format::Json(_) => stream_to_json(
80 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
81 object_store,
82 path,
83 threshold,
84 WRITE_CONCURRENCY,
85 )
86 .await
87 .context(error::WriteStreamToFileSnafu { path }),
88 Format::Parquet(_) => {
89 let schema = stream.schema();
90 stream_to_parquet(
91 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
92 schema,
93 object_store,
94 path,
95 WRITE_CONCURRENCY,
96 )
97 .await
98 .context(error::WriteStreamToFileSnafu { path })
99 }
100 _ => error::UnsupportedFormatSnafu {
101 format: format.clone(),
102 }
103 .fail(),
104 }
105 }
106
107 #[tracing::instrument(skip_all)]
108 pub(crate) async fn copy_table_to(
109 &self,
110 req: CopyTableRequest,
111 query_ctx: QueryContextRef,
112 ) -> Result<usize> {
113 let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
114 let table = self.get_table(&table_ref).await?;
115 let table_id = table.table_info().table_id();
116 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
117
118 let df_table_ref = DfTableReference::from(table_ref);
119
120 let filters = table
121 .schema()
122 .timestamp_column()
123 .and_then(|c| {
124 common_query::logical_plan::build_filter_from_timestamp(
125 &c.name,
126 req.timestamp_range.as_ref(),
127 )
128 })
129 .into_iter()
130 .collect::<Vec<_>>();
131
132 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
133 let table_source = Arc::new(DefaultTableSource::new(table_provider));
134
135 let mut builder = LogicalPlanBuilder::scan_with_filters(
136 df_table_ref,
137 table_source,
138 None,
139 filters.clone(),
140 )
141 .context(BuildDfLogicalPlanSnafu)?;
142 for f in filters {
143 builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
144 }
145 let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
146
147 let output = self
148 .query_engine
149 .execute(plan, query_ctx)
150 .await
151 .context(ExecLogicalPlanSnafu)?;
152
153 let CopyTableRequest {
154 location,
155 connection,
156 ..
157 } = &req;
158
159 debug!("Copy table: {table_id} to location: {location}");
160 self.copy_to_file(&format, output, location, connection)
161 .await
162 }
163
164 pub(crate) async fn copy_to_file(
165 &self,
166 format: &Format,
167 output: Output,
168 location: &str,
169 connection: &HashMap<String, String>,
170 ) -> Result<usize> {
171 let stream = match output.data {
172 OutputData::Stream(stream) => stream,
173 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
174 _ => unreachable!(),
175 };
176
177 let (_schema, _host, path) = parse_url(location).context(error::ParseUrlSnafu)?;
178 let (_, filename) = find_dir_and_filename(&path);
179 let filename = filename.context(error::UnexpectedSnafu {
180 violated: format!("Expected filename, path: {path}"),
181 })?;
182 let object_store = build_backend(location, connection).context(error::BuildBackendSnafu)?;
183 self.stream_to_file(stream, format, object_store, &filename)
184 .await
185 }
186}