1use std::collections::HashMap;
16use std::future::Future;
17use std::sync::Arc;
18
19use client::{Output, OutputData, OutputMeta};
20use common_base::readable_size::ReadableSize;
21use common_datasource::file_format::csv::CsvFormat;
22use common_datasource::file_format::json::JsonFormat;
23use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
24use common_datasource::file_format::{FileFormat, Format};
25use common_datasource::lister::{Lister, Source};
26use common_datasource::object_store::{build_backend, parse_url};
27use common_datasource::util::find_dir_and_filename;
28use common_query::{OutputCost, OutputRows};
29use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_telemetry::{debug, tracing};
32use datafusion::datasource::listing::PartitionedFile;
33use datafusion::datasource::object_store::ObjectStoreUrl;
34use datafusion::datasource::physical_plan::{
35 CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener,
36};
37use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
38use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
39use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
40use datafusion_common::{Constraints, Statistics};
41use datafusion_expr::Expr;
42use datatypes::arrow::compute::can_cast_types;
43use datatypes::arrow::datatypes::{Schema, SchemaRef};
44use datatypes::vectors::Helper;
45use futures_util::StreamExt;
46use object_store::{Entry, EntryMode, ObjectStore};
47use regex::Regex;
48use session::context::QueryContextRef;
49use snafu::ResultExt;
50use table::requests::{CopyTableRequest, InsertRequest};
51use table::table_reference::TableReference;
52use tokio_util::compat::FuturesAsyncReadCompatExt;
53
54use crate::error::{self, IntoVectorsSnafu, Result};
55use crate::statement::StatementExecutor;
56
57const DEFAULT_BATCH_SIZE: usize = 8192;
58const DEFAULT_READ_BUFFER: usize = 256 * 1024;
59
60enum FileMetadata {
61 Parquet {
62 schema: SchemaRef,
63 metadata: ArrowReaderMetadata,
64 path: String,
65 },
66 Orc {
67 schema: SchemaRef,
68 path: String,
69 },
70 Json {
71 schema: SchemaRef,
72 format: JsonFormat,
73 path: String,
74 },
75 Csv {
76 schema: SchemaRef,
77 format: CsvFormat,
78 path: String,
79 },
80}
81
82impl FileMetadata {
83 pub fn schema(&self) -> &SchemaRef {
85 match self {
86 FileMetadata::Parquet { schema, .. } => schema,
87 FileMetadata::Orc { schema, .. } => schema,
88 FileMetadata::Json { schema, .. } => schema,
89 FileMetadata::Csv { schema, .. } => schema,
90 }
91 }
92}
93
94impl StatementExecutor {
95 async fn list_copy_from_entries(
96 &self,
97 req: &CopyTableRequest,
98 ) -> Result<(ObjectStore, Vec<Entry>)> {
99 let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
100
101 let object_store =
102 build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
103
104 let (dir, filename) = find_dir_and_filename(&path);
105 let regex = req
106 .pattern
107 .as_ref()
108 .map(|x| Regex::new(x))
109 .transpose()
110 .context(error::BuildRegexSnafu)?;
111
112 let source = if let Some(filename) = filename {
113 Source::Filename(filename)
114 } else {
115 Source::Dir
116 };
117
118 let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
119
120 let entries = lister.list().await.context(error::ListObjectsSnafu)?;
121 debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
122 Ok((object_store, entries))
123 }
124
125 async fn collect_metadata(
126 &self,
127 object_store: &ObjectStore,
128 format: Format,
129 path: String,
130 ) -> Result<FileMetadata> {
131 match format {
132 Format::Csv(format) => Ok(FileMetadata::Csv {
133 schema: Arc::new(
134 format
135 .infer_schema(object_store, &path)
136 .await
137 .context(error::InferSchemaSnafu { path: &path })?,
138 ),
139 format,
140 path,
141 }),
142 Format::Json(format) => Ok(FileMetadata::Json {
143 schema: Arc::new(
144 format
145 .infer_schema(object_store, &path)
146 .await
147 .context(error::InferSchemaSnafu { path: &path })?,
148 ),
149 format,
150 path,
151 }),
152 Format::Parquet(_) => {
153 let meta = object_store
154 .stat(&path)
155 .await
156 .context(error::ReadObjectSnafu { path: &path })?;
157 let mut reader = object_store
158 .reader(&path)
159 .await
160 .context(error::ReadObjectSnafu { path: &path })?
161 .into_futures_async_read(0..meta.content_length())
162 .await
163 .context(error::ReadObjectSnafu { path: &path })?
164 .compat();
165 let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
166 .await
167 .context(error::ReadParquetMetadataSnafu)?;
168
169 Ok(FileMetadata::Parquet {
170 schema: metadata.schema().clone(),
171 metadata,
172 path,
173 })
174 }
175 Format::Orc(_) => {
176 let meta = object_store
177 .stat(&path)
178 .await
179 .context(error::ReadObjectSnafu { path: &path })?;
180
181 let reader = object_store
182 .reader(&path)
183 .await
184 .context(error::ReadObjectSnafu { path: &path })?;
185
186 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
187 .await
188 .context(error::ReadOrcSnafu)?;
189
190 Ok(FileMetadata::Orc {
191 schema: Arc::new(schema),
192 path,
193 })
194 }
195 }
196 }
197
198 async fn build_file_stream<F: FileOpener + Send + 'static>(
199 &self,
200 opener: F,
201 filename: &str,
202 file_schema: SchemaRef,
203 ) -> Result<DfSendableRecordBatchStream> {
204 let statistics = Statistics::new_unknown(file_schema.as_ref());
205 let stream = FileStream::new(
206 &FileScanConfig {
207 object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), file_schema,
209 file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
210 statistics,
211 projection: None,
212 limit: None,
213 table_partition_cols: vec![],
214 output_ordering: vec![],
215 constraints: Constraints::empty(),
216 },
217 0,
218 opener,
219 &ExecutionPlanMetricsSet::new(),
220 )
221 .context(error::BuildFileStreamSnafu)?;
222
223 Ok(Box::pin(stream))
224 }
225
226 async fn build_read_stream(
227 &self,
228 compat_schema: SchemaRef,
229 object_store: &ObjectStore,
230 file_metadata: &FileMetadata,
231 projection: Vec<usize>,
232 filters: Vec<Expr>,
233 ) -> Result<DfSendableRecordBatchStream> {
234 match file_metadata {
235 FileMetadata::Csv {
236 format,
237 path,
238 schema,
239 } => {
240 let output_schema = Arc::new(
241 compat_schema
242 .project(&projection)
243 .context(error::ProjectSchemaSnafu)?,
244 );
245 let csv_config = Arc::new(CsvConfig::new(
246 DEFAULT_BATCH_SIZE,
247 schema.clone(),
248 Some(projection.clone()),
249 format.has_header,
250 format.delimiter,
251 b'"',
252 None,
253 Arc::new(object_store_opendal::OpendalStore::new(
254 object_store.clone(),
255 )),
256 None,
257 ));
258 let projected_file_schema = Arc::new(
259 schema
260 .project(&projection)
261 .context(error::ProjectSchemaSnafu)?,
262 );
263 let stream = self
264 .build_file_stream(
265 CsvOpener::new(csv_config, format.compression_type.into()),
266 path,
267 projected_file_schema,
268 )
269 .await?;
270
271 Ok(Box::pin(
272 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
275 .with_filter(filters)
276 .context(error::PhysicalExprSnafu)?,
277 ))
278 }
279 FileMetadata::Json {
280 format,
281 path,
282 schema,
283 } => {
284 let projected_file_schema = Arc::new(
285 schema
286 .project(&projection)
287 .context(error::ProjectSchemaSnafu)?,
288 );
289 let output_schema = Arc::new(
290 compat_schema
291 .project(&projection)
292 .context(error::ProjectSchemaSnafu)?,
293 );
294 let store = object_store_opendal::OpendalStore::new(object_store.clone());
295 let stream = self
296 .build_file_stream(
297 JsonOpener::new(
298 DEFAULT_BATCH_SIZE,
299 projected_file_schema.clone(),
300 format.compression_type.into(),
301 Arc::new(store),
302 ),
303 path,
304 projected_file_schema,
305 )
306 .await?;
307
308 Ok(Box::pin(
309 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
312 .with_filter(filters)
313 .context(error::PhysicalExprSnafu)?,
314 ))
315 }
316 FileMetadata::Parquet { metadata, path, .. } => {
317 let meta = object_store
318 .stat(path)
319 .await
320 .context(error::ReadObjectSnafu { path })?;
321 let reader = object_store
322 .reader_with(path)
323 .chunk(DEFAULT_READ_BUFFER)
324 .await
325 .context(error::ReadObjectSnafu { path })?
326 .into_futures_async_read(0..meta.content_length())
327 .await
328 .context(error::ReadObjectSnafu { path })?
329 .compat();
330 let builder =
331 ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
332 let stream = builder
333 .build()
334 .context(error::BuildParquetRecordBatchStreamSnafu)?;
335
336 let output_schema = Arc::new(
337 compat_schema
338 .project(&projection)
339 .context(error::ProjectSchemaSnafu)?,
340 );
341 Ok(Box::pin(
342 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
343 .with_filter(filters)
344 .context(error::PhysicalExprSnafu)?,
345 ))
346 }
347 FileMetadata::Orc { path, .. } => {
348 let meta = object_store
349 .stat(path)
350 .await
351 .context(error::ReadObjectSnafu { path })?;
352
353 let reader = object_store
354 .reader_with(path)
355 .chunk(DEFAULT_READ_BUFFER)
356 .await
357 .context(error::ReadObjectSnafu { path })?;
358 let stream =
359 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
360 .await
361 .context(error::ReadOrcSnafu)?;
362
363 let output_schema = Arc::new(
364 compat_schema
365 .project(&projection)
366 .context(error::ProjectSchemaSnafu)?,
367 );
368
369 Ok(Box::pin(
370 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
371 .with_filter(filters)
372 .context(error::PhysicalExprSnafu)?,
373 ))
374 }
375 }
376 }
377
378 #[tracing::instrument(skip_all)]
379 pub async fn copy_table_from(
380 &self,
381 req: CopyTableRequest,
382 query_ctx: QueryContextRef,
383 ) -> Result<Output> {
384 let table_ref = TableReference {
385 catalog: &req.catalog_name,
386 schema: &req.schema_name,
387 table: &req.table_name,
388 };
389 let table = self.get_table(&table_ref).await?;
390 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
391 let (object_store, entries) = self.list_copy_from_entries(&req).await?;
392 let mut files = Vec::with_capacity(entries.len());
393 let table_schema = table.schema().arrow_schema().clone();
394 let filters = table
395 .schema()
396 .timestamp_column()
397 .and_then(|c| {
398 common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
399 })
400 .into_iter()
401 .collect::<Vec<_>>();
402
403 for entry in entries.iter() {
404 if entry.metadata().mode() != EntryMode::FILE {
405 continue;
406 }
407 let path = entry.path();
408 let file_metadata = self
409 .collect_metadata(&object_store, format, path.to_string())
410 .await?;
411
412 let file_schema = file_metadata.schema();
413 let (file_schema_projection, table_schema_projection, compat_schema) =
414 generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
415 let projected_file_schema = Arc::new(
416 file_schema
417 .project(&file_schema_projection)
418 .context(error::ProjectSchemaSnafu)?,
419 );
420 let projected_table_schema = Arc::new(
421 table_schema
422 .project(&table_schema_projection)
423 .context(error::ProjectSchemaSnafu)?,
424 );
425 ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
426
427 files.push((
428 Arc::new(compat_schema),
429 file_schema_projection,
430 projected_table_schema,
431 file_metadata,
432 ))
433 }
434
435 let mut rows_inserted = 0;
436 let mut insert_cost = 0;
437 let max_insert_rows = req.limit.map(|n| n as usize);
438 for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
439 {
440 let mut stream = self
441 .build_read_stream(
442 compat_schema,
443 &object_store,
444 &file_metadata,
445 file_schema_projection,
446 filters.clone(),
447 )
448 .await?;
449
450 let fields = projected_table_schema
451 .fields()
452 .iter()
453 .map(|f| f.name().to_string())
454 .collect::<Vec<_>>();
455
456 let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
458 let mut pending_mem_size = 0;
459 let mut pending = vec![];
460
461 while let Some(r) = stream.next().await {
462 let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
463 let vectors =
464 Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
465
466 pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
467
468 let columns_values = fields
469 .iter()
470 .cloned()
471 .zip(vectors)
472 .collect::<HashMap<_, _>>();
473
474 pending.push(self.inserter.handle_table_insert(
475 InsertRequest {
476 catalog_name: req.catalog_name.to_string(),
477 schema_name: req.schema_name.to_string(),
478 table_name: req.table_name.to_string(),
479 columns_values,
480 },
481 query_ctx.clone(),
482 ));
483
484 if pending_mem_size as u64 >= pending_mem_threshold {
485 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
486 rows_inserted += rows;
487 insert_cost += cost;
488 }
489
490 if let Some(max_insert_rows) = max_insert_rows {
491 if rows_inserted >= max_insert_rows {
492 return Ok(gen_insert_output(rows_inserted, insert_cost));
493 }
494 }
495 }
496
497 if !pending.is_empty() {
498 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
499 rows_inserted += rows;
500 insert_cost += cost;
501 }
502 }
503
504 Ok(gen_insert_output(rows_inserted, insert_cost))
505 }
506}
507
508fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
509 Output::new(
510 OutputData::AffectedRows(rows_inserted),
511 OutputMeta::new_with_cost(insert_cost),
512 )
513}
514
515async fn batch_insert(
517 pending: &mut Vec<impl Future<Output = Result<Output>>>,
518 pending_bytes: &mut usize,
519) -> Result<(OutputRows, OutputCost)> {
520 let batch = pending.drain(..);
521 let result = futures::future::try_join_all(batch)
522 .await?
523 .iter()
524 .map(|o| o.extract_rows_and_cost())
525 .reduce(|(a, b), (c, d)| (a + c, b + d))
526 .unwrap_or((0, 0));
527 *pending_bytes = 0;
528 Ok(result)
529}
530
531fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
532 let not_match = from
533 .fields
534 .iter()
535 .zip(to.fields.iter())
536 .map(|(l, r)| (l.data_type(), r.data_type()))
537 .enumerate()
538 .find(|(_, (l, r))| !can_cast_types(l, r));
539
540 if let Some((index, _)) = not_match {
541 error::InvalidSchemaSnafu {
542 index,
543 table_schema: to.to_string(),
544 file_schema: from.to_string(),
545 }
546 .fail()
547 } else {
548 Ok(())
549 }
550}
551
552fn generated_schema_projection_and_compatible_file_schema(
557 file: &SchemaRef,
558 table: &SchemaRef,
559) -> (Vec<usize>, Vec<usize>, Schema) {
560 let mut file_projection = Vec::with_capacity(file.fields.len());
561 let mut table_projection = Vec::with_capacity(file.fields.len());
562 let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
563 for (file_idx, file_field) in file.fields.iter().enumerate() {
564 if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
565 file_projection.push(file_idx);
566 table_projection.push(table_idx);
567
568 compatible_fields[file_idx] = table_field.clone();
570 }
571 }
572
573 (
574 file_projection,
575 table_projection,
576 Schema::new(compatible_fields),
577 )
578}
579
580#[cfg(test)]
581mod tests {
582 use std::sync::Arc;
583
584 use datatypes::arrow::datatypes::{DataType, Field, Schema};
585
586 use super::*;
587
588 fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
589 let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
590 let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
591 let res = ensure_schema_compatible(&s1, &s2);
592 assert_eq!(
593 matches,
594 res.is_ok(),
595 "from data type: {}, to data type: {}, expected: {}, but got: {}",
596 from.0,
597 to.0,
598 matches,
599 res.is_ok()
600 )
601 }
602
603 #[test]
604 fn test_ensure_datatype_matches_ignore_timezone() {
605 test_schema_matches(
606 (
607 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
608 true,
609 ),
610 (
611 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
612 true,
613 ),
614 true,
615 );
616
617 test_schema_matches(
618 (
619 DataType::Timestamp(
620 datatypes::arrow::datatypes::TimeUnit::Second,
621 Some("UTC".into()),
622 ),
623 true,
624 ),
625 (
626 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
627 true,
628 ),
629 true,
630 );
631
632 test_schema_matches(
633 (
634 DataType::Timestamp(
635 datatypes::arrow::datatypes::TimeUnit::Second,
636 Some("UTC".into()),
637 ),
638 true,
639 ),
640 (
641 DataType::Timestamp(
642 datatypes::arrow::datatypes::TimeUnit::Second,
643 Some("PDT".into()),
644 ),
645 true,
646 ),
647 true,
648 );
649
650 test_schema_matches(
651 (
652 DataType::Timestamp(
653 datatypes::arrow::datatypes::TimeUnit::Second,
654 Some("UTC".into()),
655 ),
656 true,
657 ),
658 (
659 DataType::Timestamp(
660 datatypes::arrow::datatypes::TimeUnit::Millisecond,
661 Some("UTC".into()),
662 ),
663 true,
664 ),
665 true,
666 );
667
668 test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
669
670 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
671 }
672
673 #[test]
674 fn test_data_type_equals_ignore_timezone_with_options() {
675 test_schema_matches(
676 (
677 DataType::Timestamp(
678 datatypes::arrow::datatypes::TimeUnit::Microsecond,
679 Some("UTC".into()),
680 ),
681 true,
682 ),
683 (
684 DataType::Timestamp(
685 datatypes::arrow::datatypes::TimeUnit::Millisecond,
686 Some("PDT".into()),
687 ),
688 true,
689 ),
690 true,
691 );
692
693 test_schema_matches(
694 (DataType::Utf8, true),
695 (
696 DataType::Timestamp(
697 datatypes::arrow::datatypes::TimeUnit::Millisecond,
698 Some("PDT".into()),
699 ),
700 true,
701 ),
702 true,
703 );
704
705 test_schema_matches(
706 (
707 DataType::Timestamp(
708 datatypes::arrow::datatypes::TimeUnit::Millisecond,
709 Some("PDT".into()),
710 ),
711 true,
712 ),
713 (DataType::Utf8, true),
714 true,
715 );
716 }
717
718 fn make_test_schema(v: &[Field]) -> Arc<Schema> {
719 Arc::new(Schema::new(v.to_vec()))
720 }
721
722 #[test]
723 fn test_compatible_file_schema() {
724 let file_schema0 = make_test_schema(&[
725 Field::new("c1", DataType::UInt8, true),
726 Field::new("c2", DataType::UInt8, true),
727 ]);
728
729 let table_schema = make_test_schema(&[
730 Field::new("c1", DataType::Int16, true),
731 Field::new("c2", DataType::Int16, true),
732 Field::new("c3", DataType::Int16, true),
733 ]);
734
735 let compat_schema = make_test_schema(&[
736 Field::new("c1", DataType::Int16, true),
737 Field::new("c2", DataType::Int16, true),
738 ]);
739
740 let (_, tp, _) =
741 generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
742
743 assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
744 }
745
746 #[test]
747 fn test_schema_projection() {
748 let file_schema0 = make_test_schema(&[
749 Field::new("c1", DataType::UInt8, true),
750 Field::new("c2", DataType::UInt8, true),
751 Field::new("c3", DataType::UInt8, true),
752 ]);
753
754 let file_schema1 = make_test_schema(&[
755 Field::new("c3", DataType::UInt8, true),
756 Field::new("c4", DataType::UInt8, true),
757 ]);
758
759 let file_schema2 = make_test_schema(&[
760 Field::new("c3", DataType::UInt8, true),
761 Field::new("c4", DataType::UInt8, true),
762 Field::new("c5", DataType::UInt8, true),
763 ]);
764
765 let file_schema3 = make_test_schema(&[
766 Field::new("c1", DataType::UInt8, true),
767 Field::new("c2", DataType::UInt8, true),
768 ]);
769
770 let table_schema = make_test_schema(&[
771 Field::new("c3", DataType::UInt8, true),
772 Field::new("c4", DataType::UInt8, true),
773 Field::new("c5", DataType::UInt8, true),
774 ]);
775
776 let tests = [
777 (&file_schema0, &table_schema, true), (&file_schema1, &table_schema, true), (&file_schema2, &table_schema, true), (&file_schema3, &table_schema, true), ];
782
783 for test in tests {
784 let (fp, tp, _) =
785 generated_schema_projection_and_compatible_file_schema(test.0, test.1);
786 assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
787 }
788 }
789}