1use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
25use common_datasource::file_format::{FileFormat, Format, file_to_stream};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
34use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
35use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
36use datafusion_expr::Expr;
37use datatypes::arrow::compute::can_cast_types;
38use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
39use datatypes::vectors::Helper;
40use futures_util::StreamExt;
41use object_store::{Entry, EntryMode, ObjectStore};
42use regex::Regex;
43use session::context::QueryContextRef;
44use snafu::{ResultExt, ensure};
45use table::requests::{CopyTableRequest, InsertRequest};
46use table::table_reference::TableReference;
47use tokio_util::compat::FuturesAsyncReadCompatExt;
48
49use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
50use crate::statement::StatementExecutor;
51
52const DEFAULT_BATCH_SIZE: usize = 8192;
53const DEFAULT_READ_BUFFER: usize = 256 * 1024;
54
55enum FileMetadata {
56 Parquet {
57 schema: SchemaRef,
58 metadata: ArrowReaderMetadata,
59 path: String,
60 },
61 Orc {
62 schema: SchemaRef,
63 path: String,
64 },
65 Json {
66 schema: SchemaRef,
67 format: JsonFormat,
68 path: String,
69 },
70 Csv {
71 schema: SchemaRef,
72 format: CsvFormat,
73 path: String,
74 },
75}
76
77impl FileMetadata {
78 pub fn schema(&self) -> &SchemaRef {
80 match self {
81 FileMetadata::Parquet { schema, .. } => schema,
82 FileMetadata::Orc { schema, .. } => schema,
83 FileMetadata::Json { schema, .. } => schema,
84 FileMetadata::Csv { schema, .. } => schema,
85 }
86 }
87}
88
89impl StatementExecutor {
90 async fn list_copy_from_entries(
91 &self,
92 req: &CopyTableRequest,
93 ) -> Result<(ObjectStore, Vec<Entry>)> {
94 let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
95
96 if schema.to_uppercase() == FS_SCHEMA {
97 ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
98 }
99
100 let object_store =
101 build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
102
103 let (dir, filename) = find_dir_and_filename(&path);
104 let regex = req
105 .pattern
106 .as_ref()
107 .map(|x| Regex::new(x))
108 .transpose()
109 .context(error::BuildRegexSnafu)?;
110
111 let source = if let Some(filename) = filename {
112 Source::Filename(filename)
113 } else {
114 Source::Dir
115 };
116
117 let lister = Lister::new(object_store.clone(), source.clone(), dir.clone(), regex);
118
119 let entries = lister.list().await.context(error::ListObjectsSnafu)?;
120 debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
121 Ok((object_store, entries))
122 }
123
124 async fn collect_metadata(
125 &self,
126 object_store: &ObjectStore,
127 format: Format,
128 path: String,
129 ) -> Result<FileMetadata> {
130 match format {
131 Format::Csv(format) => Ok(FileMetadata::Csv {
132 schema: Arc::new(
133 format
134 .infer_schema(object_store, &path)
135 .await
136 .context(error::InferSchemaSnafu { path: &path })?,
137 ),
138 format,
139 path,
140 }),
141 Format::Json(format) => Ok(FileMetadata::Json {
142 schema: Arc::new(
143 format
144 .infer_schema(object_store, &path)
145 .await
146 .context(error::InferSchemaSnafu { path: &path })?,
147 ),
148 format,
149 path,
150 }),
151 Format::Parquet(_) => {
152 let meta = object_store
153 .stat(&path)
154 .await
155 .context(error::ReadObjectSnafu { path: &path })?;
156 let mut reader = object_store
157 .reader(&path)
158 .await
159 .context(error::ReadObjectSnafu { path: &path })?
160 .into_futures_async_read(0..meta.content_length())
161 .await
162 .context(error::ReadObjectSnafu { path: &path })?
163 .compat();
164 let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
165 .await
166 .context(error::ReadParquetMetadataSnafu)?;
167
168 Ok(FileMetadata::Parquet {
169 schema: metadata.schema().clone(),
170 metadata,
171 path,
172 })
173 }
174 Format::Orc(_) => {
175 let meta = object_store
176 .stat(&path)
177 .await
178 .context(error::ReadObjectSnafu { path: &path })?;
179
180 let reader = object_store
181 .reader(&path)
182 .await
183 .context(error::ReadObjectSnafu { path: &path })?;
184
185 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
186 .await
187 .context(error::ReadOrcSnafu)?;
188
189 Ok(FileMetadata::Orc {
190 schema: Arc::new(schema),
191 path,
192 })
193 }
194 }
195 }
196
197 async fn build_read_stream(
198 &self,
199 compat_schema: SchemaRef,
200 object_store: &ObjectStore,
201 file_metadata: &FileMetadata,
202 projection: Vec<usize>,
203 filters: Vec<Expr>,
204 ) -> Result<DfSendableRecordBatchStream> {
205 match file_metadata {
206 FileMetadata::Csv {
207 format,
208 path,
209 schema,
210 } => {
211 let output_schema = Arc::new(
212 compat_schema
213 .project(&projection)
214 .context(error::ProjectSchemaSnafu)?,
215 );
216
217 let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
218 .with_schema(schema.clone())
219 .with_batch_size(DEFAULT_BATCH_SIZE);
220 let stream = file_to_stream(
221 object_store,
222 path,
223 schema.clone(),
224 csv_source,
225 Some(projection),
226 format.compression_type,
227 )
228 .await
229 .context(error::BuildFileStreamSnafu)?;
230
231 Ok(Box::pin(
232 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
235 .with_filter(filters)
236 .context(error::PhysicalExprSnafu)?,
237 ))
238 }
239 FileMetadata::Json {
240 path,
241 format,
242 schema,
243 } => {
244 let output_schema = Arc::new(
245 compat_schema
246 .project(&projection)
247 .context(error::ProjectSchemaSnafu)?,
248 );
249
250 let json_source = JsonSource::new()
251 .with_schema(schema.clone())
252 .with_batch_size(DEFAULT_BATCH_SIZE);
253 let stream = file_to_stream(
254 object_store,
255 path,
256 schema.clone(),
257 json_source,
258 Some(projection),
259 format.compression_type,
260 )
261 .await
262 .context(error::BuildFileStreamSnafu)?;
263
264 Ok(Box::pin(
265 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
268 .with_filter(filters)
269 .context(error::PhysicalExprSnafu)?,
270 ))
271 }
272 FileMetadata::Parquet { metadata, path, .. } => {
273 let meta = object_store
274 .stat(path)
275 .await
276 .context(error::ReadObjectSnafu { path })?;
277 let reader = object_store
278 .reader_with(path)
279 .chunk(DEFAULT_READ_BUFFER)
280 .await
281 .context(error::ReadObjectSnafu { path })?
282 .into_futures_async_read(0..meta.content_length())
283 .await
284 .context(error::ReadObjectSnafu { path })?
285 .compat();
286 let builder =
287 ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
288 let stream = builder
289 .build()
290 .context(error::BuildParquetRecordBatchStreamSnafu)?;
291
292 let output_schema = Arc::new(
293 compat_schema
294 .project(&projection)
295 .context(error::ProjectSchemaSnafu)?,
296 );
297 Ok(Box::pin(
298 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
299 .with_filter(filters)
300 .context(error::PhysicalExprSnafu)?,
301 ))
302 }
303 FileMetadata::Orc { path, .. } => {
304 let meta = object_store
305 .stat(path)
306 .await
307 .context(error::ReadObjectSnafu { path })?;
308
309 let reader = object_store
310 .reader_with(path)
311 .chunk(DEFAULT_READ_BUFFER)
312 .await
313 .context(error::ReadObjectSnafu { path })?;
314 let stream =
315 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
316 .await
317 .context(error::ReadOrcSnafu)?;
318
319 let output_schema = Arc::new(
320 compat_schema
321 .project(&projection)
322 .context(error::ProjectSchemaSnafu)?,
323 );
324
325 Ok(Box::pin(
326 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
327 .with_filter(filters)
328 .context(error::PhysicalExprSnafu)?,
329 ))
330 }
331 }
332 }
333
334 #[tracing::instrument(skip_all)]
335 pub async fn copy_table_from(
336 &self,
337 req: CopyTableRequest,
338 query_ctx: QueryContextRef,
339 ) -> Result<Output> {
340 let table_ref = TableReference {
341 catalog: &req.catalog_name,
342 schema: &req.schema_name,
343 table: &req.table_name,
344 };
345 let table = self.get_table(&table_ref).await?;
346 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
347 let (object_store, entries) = self.list_copy_from_entries(&req).await?;
348 let mut files = Vec::with_capacity(entries.len());
349 let table_schema = table.schema().arrow_schema().clone();
350 let filters = table
351 .schema()
352 .timestamp_column()
353 .and_then(|c| {
354 common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
355 })
356 .into_iter()
357 .collect::<Vec<_>>();
358
359 for entry in entries.iter() {
360 if entry.metadata().mode() != EntryMode::FILE {
361 continue;
362 }
363 let path = entry.path();
364 let file_metadata = self
365 .collect_metadata(&object_store, format.clone(), path.to_string())
366 .await?;
367
368 let file_schema = file_metadata.schema();
369 let (file_schema_projection, table_schema_projection, compat_schema) =
370 generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
371 let projected_file_schema = Arc::new(
372 file_schema
373 .project(&file_schema_projection)
374 .context(error::ProjectSchemaSnafu)?,
375 );
376 let projected_table_schema = Arc::new(
377 table_schema
378 .project(&table_schema_projection)
379 .context(error::ProjectSchemaSnafu)?,
380 );
381 ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
382
383 files.push((
384 Arc::new(compat_schema),
385 file_schema_projection,
386 projected_table_schema,
387 file_metadata,
388 ))
389 }
390
391 let mut rows_inserted = 0;
392 let mut insert_cost = 0;
393 let max_insert_rows = req.limit.map(|n| n as usize);
394 for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
395 {
396 let mut stream = self
397 .build_read_stream(
398 compat_schema,
399 &object_store,
400 &file_metadata,
401 file_schema_projection,
402 filters.clone(),
403 )
404 .await?;
405
406 let fields = projected_table_schema
407 .fields()
408 .iter()
409 .map(|f| f.name().clone())
410 .collect::<Vec<_>>();
411
412 let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
414 let mut pending_mem_size = 0;
415 let mut pending = vec![];
416
417 while let Some(r) = stream.next().await {
418 let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
419 let vectors =
420 Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
421
422 pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
423
424 let columns_values = fields
425 .iter()
426 .cloned()
427 .zip(vectors)
428 .collect::<HashMap<_, _>>();
429
430 pending.push(self.inserter.handle_table_insert(
431 InsertRequest {
432 catalog_name: req.catalog_name.clone(),
433 schema_name: req.schema_name.clone(),
434 table_name: req.table_name.clone(),
435 columns_values,
436 },
437 query_ctx.clone(),
438 ));
439
440 if pending_mem_size as u64 >= pending_mem_threshold {
441 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
442 rows_inserted += rows;
443 insert_cost += cost;
444 }
445
446 if let Some(max_insert_rows) = max_insert_rows
447 && rows_inserted >= max_insert_rows
448 {
449 return Ok(gen_insert_output(rows_inserted, insert_cost));
450 }
451 }
452
453 if !pending.is_empty() {
454 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
455 rows_inserted += rows;
456 insert_cost += cost;
457 }
458 }
459
460 Ok(gen_insert_output(rows_inserted, insert_cost))
461 }
462}
463
464fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
465 Output::new(
466 OutputData::AffectedRows(rows_inserted),
467 OutputMeta::new_with_cost(insert_cost),
468 )
469}
470
471async fn batch_insert(
473 pending: &mut Vec<impl Future<Output = Result<Output>>>,
474 pending_bytes: &mut usize,
475) -> Result<(OutputRows, OutputCost)> {
476 let batch = pending.drain(..);
477 let result = futures::future::try_join_all(batch)
478 .await?
479 .iter()
480 .map(|o| o.extract_rows_and_cost())
481 .reduce(|(a, b), (c, d)| (a + c, b + d))
482 .unwrap_or((0, 0));
483 *pending_bytes = 0;
484 Ok(result)
485}
486
487fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
489 if let ArrowDataType::Map(_, _) = from
491 && let ArrowDataType::Binary = to
492 {
493 return true;
494 }
495
496 can_cast_types(from, to)
498}
499
500fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
501 let not_match = from
502 .fields
503 .iter()
504 .zip(to.fields.iter())
505 .map(|(l, r)| (l.data_type(), r.data_type()))
506 .enumerate()
507 .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
508
509 if let Some((index, _)) = not_match {
510 error::InvalidSchemaSnafu {
511 index,
512 table_schema: to.to_string(),
513 file_schema: from.to_string(),
514 }
515 .fail()
516 } else {
517 Ok(())
518 }
519}
520
521fn generated_schema_projection_and_compatible_file_schema(
526 file: &SchemaRef,
527 table: &SchemaRef,
528) -> (Vec<usize>, Vec<usize>, Schema) {
529 let mut file_projection = Vec::with_capacity(file.fields.len());
530 let mut table_projection = Vec::with_capacity(file.fields.len());
531 let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
532 for (file_idx, file_field) in file.fields.iter().enumerate() {
533 if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
534 file_projection.push(file_idx);
535 table_projection.push(table_idx);
536
537 compatible_fields[file_idx] = table_field.clone();
539 }
540 }
541
542 (
543 file_projection,
544 table_projection,
545 Schema::new(compatible_fields),
546 )
547}
548
549#[cfg(test)]
550mod tests {
551 use std::sync::Arc;
552
553 use datatypes::arrow::datatypes::{DataType, Field, Schema};
554
555 use super::*;
556
557 fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
558 let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
559 let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
560 let res = ensure_schema_compatible(&s1, &s2);
561 assert_eq!(
562 matches,
563 res.is_ok(),
564 "from data type: {}, to data type: {}, expected: {}, but got: {}",
565 from.0,
566 to.0,
567 matches,
568 res.is_ok()
569 )
570 }
571
572 #[test]
573 fn test_ensure_datatype_matches_ignore_timezone() {
574 test_schema_matches(
575 (
576 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
577 true,
578 ),
579 (
580 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
581 true,
582 ),
583 true,
584 );
585
586 test_schema_matches(
587 (
588 DataType::Timestamp(
589 datatypes::arrow::datatypes::TimeUnit::Second,
590 Some("UTC".into()),
591 ),
592 true,
593 ),
594 (
595 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
596 true,
597 ),
598 true,
599 );
600
601 test_schema_matches(
602 (
603 DataType::Timestamp(
604 datatypes::arrow::datatypes::TimeUnit::Second,
605 Some("UTC".into()),
606 ),
607 true,
608 ),
609 (
610 DataType::Timestamp(
611 datatypes::arrow::datatypes::TimeUnit::Second,
612 Some("PDT".into()),
613 ),
614 true,
615 ),
616 true,
617 );
618
619 test_schema_matches(
620 (
621 DataType::Timestamp(
622 datatypes::arrow::datatypes::TimeUnit::Second,
623 Some("UTC".into()),
624 ),
625 true,
626 ),
627 (
628 DataType::Timestamp(
629 datatypes::arrow::datatypes::TimeUnit::Millisecond,
630 Some("UTC".into()),
631 ),
632 true,
633 ),
634 true,
635 );
636
637 test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
638
639 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
640 }
641
642 #[test]
643 fn test_data_type_equals_ignore_timezone_with_options() {
644 test_schema_matches(
645 (
646 DataType::Timestamp(
647 datatypes::arrow::datatypes::TimeUnit::Microsecond,
648 Some("UTC".into()),
649 ),
650 true,
651 ),
652 (
653 DataType::Timestamp(
654 datatypes::arrow::datatypes::TimeUnit::Millisecond,
655 Some("PDT".into()),
656 ),
657 true,
658 ),
659 true,
660 );
661
662 test_schema_matches(
663 (DataType::Utf8, true),
664 (
665 DataType::Timestamp(
666 datatypes::arrow::datatypes::TimeUnit::Millisecond,
667 Some("PDT".into()),
668 ),
669 true,
670 ),
671 true,
672 );
673
674 test_schema_matches(
675 (
676 DataType::Timestamp(
677 datatypes::arrow::datatypes::TimeUnit::Millisecond,
678 Some("PDT".into()),
679 ),
680 true,
681 ),
682 (DataType::Utf8, true),
683 true,
684 );
685 }
686
687 #[test]
688 fn test_map_to_binary_json_compatibility() {
689 let map_type = DataType::Map(
691 Arc::new(Field::new(
692 "key_value",
693 DataType::Struct(
694 vec![
695 Field::new("key", DataType::Utf8, false),
696 Field::new("value", DataType::Utf8, false),
697 ]
698 .into(),
699 ),
700 false,
701 )),
702 false,
703 );
704
705 test_schema_matches((map_type, false), (DataType::Binary, true), true);
706
707 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
708 test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
709 }
710
711 fn make_test_schema(v: &[Field]) -> Arc<Schema> {
712 Arc::new(Schema::new(v.to_vec()))
713 }
714
715 #[test]
716 fn test_compatible_file_schema() {
717 let file_schema0 = make_test_schema(&[
718 Field::new("c1", DataType::UInt8, true),
719 Field::new("c2", DataType::UInt8, true),
720 ]);
721
722 let table_schema = make_test_schema(&[
723 Field::new("c1", DataType::Int16, true),
724 Field::new("c2", DataType::Int16, true),
725 Field::new("c3", DataType::Int16, true),
726 ]);
727
728 let compat_schema = make_test_schema(&[
729 Field::new("c1", DataType::Int16, true),
730 Field::new("c2", DataType::Int16, true),
731 ]);
732
733 let (_, tp, _) =
734 generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
735
736 assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
737 }
738
739 #[test]
740 fn test_schema_projection() {
741 let file_schema0 = make_test_schema(&[
742 Field::new("c1", DataType::UInt8, true),
743 Field::new("c2", DataType::UInt8, true),
744 Field::new("c3", DataType::UInt8, true),
745 ]);
746
747 let file_schema1 = make_test_schema(&[
748 Field::new("c3", DataType::UInt8, true),
749 Field::new("c4", DataType::UInt8, true),
750 ]);
751
752 let file_schema2 = make_test_schema(&[
753 Field::new("c3", DataType::UInt8, true),
754 Field::new("c4", DataType::UInt8, true),
755 Field::new("c5", DataType::UInt8, true),
756 ]);
757
758 let file_schema3 = make_test_schema(&[
759 Field::new("c1", DataType::UInt8, true),
760 Field::new("c2", DataType::UInt8, true),
761 ]);
762
763 let table_schema = make_test_schema(&[
764 Field::new("c3", DataType::UInt8, true),
765 Field::new("c4", DataType::UInt8, true),
766 Field::new("c5", DataType::UInt8, true),
767 ]);
768
769 let tests = [
770 (&file_schema0, &table_schema, true), (&file_schema1, &table_schema, true), (&file_schema2, &table_schema, true), (&file_schema3, &table_schema, true), ];
775
776 for test in tests {
777 let (fp, tp, _) =
778 generated_schema_projection_and_compatible_file_schema(test.0, test.1);
779 assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
780 }
781 }
782}