operator/statement/
copy_table_to.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use client::OutputData;
19use common_base::readable_size::ReadableSize;
20use common_datasource::file_format::Format;
21use common_datasource::file_format::csv::stream_to_csv;
22use common_datasource::file_format::json::stream_to_json;
23use common_datasource::file_format::parquet::stream_to_parquet;
24use common_datasource::object_store::{build_backend, parse_url};
25use common_datasource::util::find_dir_and_filename;
26use common_query::Output;
27use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
28use common_recordbatch::{
29 RecordBatchStream, SendableRecordBatchMapper, SendableRecordBatchStream,
30 map_json_type_to_string, map_json_type_to_string_schema,
31};
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::DefaultTableSource;
34use datafusion_common::TableReference as DfTableReference;
35use datafusion_expr::LogicalPlanBuilder;
36use object_store::ObjectStore;
37use session::context::QueryContextRef;
38use snafu::{OptionExt, ResultExt};
39use table::requests::CopyTableRequest;
40use table::table::adapter::DfTableProviderAdapter;
41use table::table_reference::TableReference;
42
43use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
44use crate::statement::StatementExecutor;
45
46const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
49
50const WRITE_CONCURRENCY: usize = 8;
52
53impl StatementExecutor {
54 async fn stream_to_file(
55 &self,
56 stream: SendableRecordBatchStream,
57 format: &Format,
58 object_store: ObjectStore,
59 path: &str,
60 ) -> Result<usize> {
61 let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
62
63 let stream = Box::pin(SendableRecordBatchMapper::new(
64 stream,
65 map_json_type_to_string,
66 map_json_type_to_string_schema,
67 ));
68 match format {
69 Format::Csv(format) => stream_to_csv(
70 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
71 object_store,
72 path,
73 threshold,
74 WRITE_CONCURRENCY,
75 format,
76 )
77 .await
78 .context(error::WriteStreamToFileSnafu { path }),
79 Format::Json(format) => stream_to_json(
80 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
81 object_store,
82 path,
83 threshold,
84 WRITE_CONCURRENCY,
85 format,
86 )
87 .await
88 .context(error::WriteStreamToFileSnafu { path }),
89 Format::Parquet(_) => {
90 let schema = stream.schema();
91 stream_to_parquet(
92 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
93 schema,
94 object_store,
95 path,
96 WRITE_CONCURRENCY,
97 )
98 .await
99 .context(error::WriteStreamToFileSnafu { path })
100 }
101 _ => error::UnsupportedFormatSnafu {
102 format: format.clone(),
103 }
104 .fail(),
105 }
106 }
107
108 #[tracing::instrument(skip_all)]
109 pub(crate) async fn copy_table_to(
110 &self,
111 req: CopyTableRequest,
112 query_ctx: QueryContextRef,
113 ) -> Result<usize> {
114 let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
115 let table = self.get_table(&table_ref).await?;
116 let table_id = table.table_info().table_id();
117 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
118
119 let df_table_ref = DfTableReference::from(table_ref);
120
121 let filters = table
122 .schema()
123 .timestamp_column()
124 .and_then(|c| {
125 common_query::logical_plan::build_filter_from_timestamp(
126 &c.name,
127 req.timestamp_range.as_ref(),
128 )
129 })
130 .into_iter()
131 .collect::<Vec<_>>();
132
133 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
134 let table_source = Arc::new(DefaultTableSource::new(table_provider));
135
136 let mut builder = LogicalPlanBuilder::scan_with_filters(
137 df_table_ref,
138 table_source,
139 None,
140 filters.clone(),
141 )
142 .context(BuildDfLogicalPlanSnafu)?;
143 for f in filters {
144 builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
145 }
146 let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
147
148 let output = self
149 .query_engine
150 .execute(plan, query_ctx)
151 .await
152 .context(ExecLogicalPlanSnafu)?;
153
154 let CopyTableRequest {
155 location,
156 connection,
157 ..
158 } = &req;
159
160 debug!("Copy table: {table_id} to location: {location}");
161 self.copy_to_file(&format, output, location, connection)
162 .await
163 }
164
165 pub(crate) async fn copy_to_file(
166 &self,
167 format: &Format,
168 output: Output,
169 location: &str,
170 connection: &HashMap<String, String>,
171 ) -> Result<usize> {
172 let stream = match output.data {
173 OutputData::Stream(stream) => stream,
174 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
175 _ => unreachable!(),
176 };
177
178 let (_schema, _host, path) = parse_url(location).context(error::ParseUrlSnafu)?;
179 let (_, filename) = find_dir_and_filename(&path);
180 let filename = filename.context(error::UnexpectedSnafu {
181 violated: format!("Expected filename, path: {path}"),
182 })?;
183 let object_store = build_backend(location, connection).context(error::BuildBackendSnafu)?;
184 self.stream_to_file(stream, format, object_store, &filename)
185 .await
186 }
187}