operator/statement/
copy_table_to.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use client::OutputData;
19use common_base::readable_size::ReadableSize;
20use common_datasource::file_format::csv::stream_to_csv;
21use common_datasource::file_format::json::stream_to_json;
22use common_datasource::file_format::parquet::stream_to_parquet;
23use common_datasource::file_format::Format;
24use common_datasource::object_store::{build_backend, parse_url};
25use common_datasource::util::find_dir_and_filename;
26use common_query::Output;
27use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
28use common_recordbatch::SendableRecordBatchStream;
29use common_telemetry::{debug, tracing};
30use datafusion::datasource::DefaultTableSource;
31use datafusion_common::TableReference as DfTableReference;
32use datafusion_expr::LogicalPlanBuilder;
33use object_store::ObjectStore;
34use session::context::QueryContextRef;
35use snafu::{OptionExt, ResultExt};
36use table::requests::CopyTableRequest;
37use table::table::adapter::DfTableProviderAdapter;
38use table::table_reference::TableReference;
39
40use crate::error::{self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result};
41use crate::statement::StatementExecutor;
42
43const WRITE_BUFFER_THRESHOLD: ReadableSize = ReadableSize::mb(8);
46
47const WRITE_CONCURRENCY: usize = 8;
49
50impl StatementExecutor {
51 async fn stream_to_file(
52 &self,
53 stream: SendableRecordBatchStream,
54 format: &Format,
55 object_store: ObjectStore,
56 path: &str,
57 ) -> Result<usize> {
58 let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize;
59
60 match format {
61 Format::Csv(_) => stream_to_csv(
62 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
63 object_store,
64 path,
65 threshold,
66 WRITE_CONCURRENCY,
67 )
68 .await
69 .context(error::WriteStreamToFileSnafu { path }),
70 Format::Json(_) => stream_to_json(
71 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
72 object_store,
73 path,
74 threshold,
75 WRITE_CONCURRENCY,
76 )
77 .await
78 .context(error::WriteStreamToFileSnafu { path }),
79 Format::Parquet(_) => {
80 let schema = stream.schema();
81 stream_to_parquet(
82 Box::pin(DfRecordBatchStreamAdapter::new(stream)),
83 schema,
84 object_store,
85 path,
86 WRITE_CONCURRENCY,
87 )
88 .await
89 .context(error::WriteStreamToFileSnafu { path })
90 }
91 _ => error::UnsupportedFormatSnafu { format: *format }.fail(),
92 }
93 }
94
95 #[tracing::instrument(skip_all)]
96 pub(crate) async fn copy_table_to(
97 &self,
98 req: CopyTableRequest,
99 query_ctx: QueryContextRef,
100 ) -> Result<usize> {
101 let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
102 let table = self.get_table(&table_ref).await?;
103 let table_id = table.table_info().table_id();
104 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
105
106 let df_table_ref = DfTableReference::from(table_ref);
107
108 let filters = table
109 .schema()
110 .timestamp_column()
111 .and_then(|c| {
112 common_query::logical_plan::build_filter_from_timestamp(
113 &c.name,
114 req.timestamp_range.as_ref(),
115 )
116 })
117 .into_iter()
118 .collect::<Vec<_>>();
119
120 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
121 let table_source = Arc::new(DefaultTableSource::new(table_provider));
122
123 let mut builder = LogicalPlanBuilder::scan_with_filters(
124 df_table_ref,
125 table_source,
126 None,
127 filters.clone(),
128 )
129 .context(BuildDfLogicalPlanSnafu)?;
130 for f in filters {
131 builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
132 }
133 let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
134
135 let output = self
136 .query_engine
137 .execute(plan, query_ctx)
138 .await
139 .context(ExecLogicalPlanSnafu)?;
140
141 let CopyTableRequest {
142 location,
143 connection,
144 ..
145 } = &req;
146
147 debug!("Copy table: {table_id} to location: {location}");
148 self.copy_to_file(&format, output, location, connection)
149 .await
150 }
151
152 pub(crate) async fn copy_to_file(
153 &self,
154 format: &Format,
155 output: Output,
156 location: &str,
157 connection: &HashMap<String, String>,
158 ) -> Result<usize> {
159 let stream = match output.data {
160 OutputData::Stream(stream) => stream,
161 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
162 _ => unreachable!(),
163 };
164
165 let (_schema, _host, path) = parse_url(location).context(error::ParseUrlSnafu)?;
166 let (_, filename) = find_dir_and_filename(&path);
167 let filename = filename.context(error::UnexpectedSnafu {
168 violated: format!("Expected filename, path: {path}"),
169 })?;
170 let object_store = build_backend(location, connection).context(error::BuildBackendSnafu)?;
171 self.stream_to_file(stream, format, object_store, &filename)
172 .await
173 }
174}