operator/statement/
copy_table_to.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use client::OutputData;
19use common_base::readable_size::ReadableSize;
20use common_datasource::file_format::csv::stream_to_csv;
21use common_datasource::file_format::json::stream_to_json;
22use common_datasource::file_format::parquet::stream_to_parquet;
23use common_datasource::file_format::Format;
24use common_datasource::object_store::{build_backend, parse_url};
25use common_datasource::util::find_dir_and_filename;
26use common_query::Output;
27use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
28use common_recordbatch::{
29 map_json_type_to_string, map_json_type_to_string_schema, RecordBatchStream,
30 SendableRecordBatchMapper, SendableRecordBatchStream,
31};
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::DefaultTableSource;
34use datafusion_common::TableReference as DfTableReference;
35use datafusion_expr::LogicalPlanBuilder;
36use object_store::ObjectStore;
37use session::context::QueryContextRef;
38use snafu::{OptionExt, ResultExt};
39use table::requests::CopyTableRequest;
40use table::table::adapter::DfTableProviderAdapter;
41use table::table_reference::TableReference;
42
43use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
44use crate::statement::StatementExecutor;
45
46const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
49
50const WRITE_CONCURRENCY: usize = 8;
52
53impl StatementExecutor {
54 async fn stream_to_file(
55 &self,
56 stream: SendableRecordBatchStream,
57 format: &Format,
58 object_store: ObjectStore,
59 path: &str,
60 ) -> Result<usize> {
61 let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
62
63 let stream = Box::pin(SendableRecordBatchMapper::new(
64 stream,
65 map_json_type_to_string,
66 map_json_type_to_string_schema,
67 ));
68 match format {
69 Format::Csv(_) => stream_to_csv(
70 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
71 object_store,
72 path,
73 threshold,
74 WRITE_CONCURRENCY,
75 )
76 .await
77 .context(error::WriteStreamToFileSnafu { path }),
78 Format::Json(_) => stream_to_json(
79 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
80 object_store,
81 path,
82 threshold,
83 WRITE_CONCURRENCY,
84 )
85 .await
86 .context(error::WriteStreamToFileSnafu { path }),
87 Format::Parquet(_) => {
88 let schema = stream.schema();
89 stream_to_parquet(
90 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
91 schema,
92 object_store,
93 path,
94 WRITE_CONCURRENCY,
95 )
96 .await
97 .context(error::WriteStreamToFileSnafu { path })
98 }
99 _ => error::UnsupportedFormatSnafu { format: *format }.fail(),
100 }
101 }
102
103 #[tracing::instrument(skip_all)]
104 pub(crate) async fn copy_table_to(
105 &self,
106 req: CopyTableRequest,
107 query_ctx: QueryContextRef,
108 ) -> Result<usize> {
109 let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
110 let table = self.get_table(&table_ref).await?;
111 let table_id = table.table_info().table_id();
112 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
113
114 let df_table_ref = DfTableReference::from(table_ref);
115
116 let filters = table
117 .schema()
118 .timestamp_column()
119 .and_then(|c| {
120 common_query::logical_plan::build_filter_from_timestamp(
121 &c.name,
122 req.timestamp_range.as_ref(),
123 )
124 })
125 .into_iter()
126 .collect::<Vec<_>>();
127
128 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
129 let table_source = Arc::new(DefaultTableSource::new(table_provider));
130
131 let mut builder = LogicalPlanBuilder::scan_with_filters(
132 df_table_ref,
133 table_source,
134 None,
135 filters.clone(),
136 )
137 .context(BuildDfLogicalPlanSnafu)?;
138 for f in filters {
139 builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
140 }
141 let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
142
143 let output = self
144 .query_engine
145 .execute(plan, query_ctx)
146 .await
147 .context(ExecLogicalPlanSnafu)?;
148
149 let CopyTableRequest {
150 location,
151 connection,
152 ..
153 } = &req;
154
155 debug!("Copy table: {table_id} to location: {location}");
156 self.copy_to_file(&format, output, location, connection)
157 .await
158 }
159
160 pub(crate) async fn copy_to_file(
161 &self,
162 format: &Format,
163 output: Output,
164 location: &str,
165 connection: &HashMap<String, String>,
166 ) -> Result<usize> {
167 let stream = match output.data {
168 OutputData::Stream(stream) => stream,
169 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
170 _ => unreachable!(),
171 };
172
173 let (_schema, _host, path) = parse_url(location).context(error::ParseUrlSnafu)?;
174 let (_, filename) = find_dir_and_filename(&path);
175 let filename = filename.context(error::UnexpectedSnafu {
176 violated: format!("Expected filename, path: {path}"),
177 })?;
178 let object_store = build_backend(location, connection).context(error::BuildBackendSnafu)?;
179 self.stream_to_file(stream, format, object_store, &filename)
180 .await
181 }
182}