1use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
25use common_datasource::file_format::{FileFormat, Format, file_to_stream};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
34use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
35use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
36use datafusion_common::config::CsvOptions;
37use datafusion_expr::Expr;
38use datatypes::arrow::compute::can_cast_types;
39use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
40use datatypes::vectors::Helper;
41use futures_util::StreamExt;
42use object_store::{Entry, EntryMode, ObjectStore};
43use regex::Regex;
44use session::context::QueryContextRef;
45use snafu::{ResultExt, ensure};
46use table::requests::{CopyTableRequest, InsertRequest};
47use table::table_reference::TableReference;
48use tokio_util::compat::FuturesAsyncReadCompatExt;
49
50use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
51use crate::statement::StatementExecutor;
52
53const DEFAULT_BATCH_SIZE: usize = 8192;
54const DEFAULT_READ_BUFFER: usize = 256 * 1024;
55
56enum FileMetadata {
57 Parquet {
58 schema: SchemaRef,
59 metadata: ArrowReaderMetadata,
60 path: String,
61 },
62 Orc {
63 schema: SchemaRef,
64 path: String,
65 },
66 Json {
67 schema: SchemaRef,
68 format: JsonFormat,
69 path: String,
70 },
71 Csv {
72 schema: SchemaRef,
73 format: CsvFormat,
74 path: String,
75 },
76}
77
78impl FileMetadata {
79 pub fn schema(&self) -> &SchemaRef {
81 match self {
82 FileMetadata::Parquet { schema, .. } => schema,
83 FileMetadata::Orc { schema, .. } => schema,
84 FileMetadata::Json { schema, .. } => schema,
85 FileMetadata::Csv { schema, .. } => schema,
86 }
87 }
88}
89
90impl StatementExecutor {
91 async fn list_copy_from_entries(
92 &self,
93 req: &CopyTableRequest,
94 ) -> Result<(ObjectStore, Vec<Entry>)> {
95 let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
96
97 if schema.to_uppercase() == FS_SCHEMA {
98 ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
99 }
100
101 let object_store =
102 build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
103
104 let (dir, filename) = find_dir_and_filename(&path);
105 let regex = req
106 .pattern
107 .as_ref()
108 .map(|x| Regex::new(x))
109 .transpose()
110 .context(error::BuildRegexSnafu)?;
111
112 let source = if let Some(filename) = filename {
113 Source::Filename(filename)
114 } else {
115 Source::Dir
116 };
117
118 let lister = Lister::new(object_store.clone(), source.clone(), dir.clone(), regex);
119
120 let entries = lister.list().await.context(error::ListObjectsSnafu)?;
121 debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
122 Ok((object_store, entries))
123 }
124
125 async fn collect_metadata(
126 &self,
127 object_store: &ObjectStore,
128 format: Format,
129 path: String,
130 ) -> Result<FileMetadata> {
131 match format {
132 Format::Csv(format) => Ok(FileMetadata::Csv {
133 schema: Arc::new(
134 format
135 .infer_schema(object_store, &path)
136 .await
137 .context(error::InferSchemaSnafu { path: &path })?,
138 ),
139 format,
140 path,
141 }),
142 Format::Json(format) => Ok(FileMetadata::Json {
143 schema: Arc::new(
144 format
145 .infer_schema(object_store, &path)
146 .await
147 .context(error::InferSchemaSnafu { path: &path })?,
148 ),
149 format,
150 path,
151 }),
152 Format::Parquet(_) => {
153 let meta = object_store
154 .stat(&path)
155 .await
156 .context(error::ReadObjectSnafu { path: &path })?;
157 let mut reader = object_store
158 .reader(&path)
159 .await
160 .context(error::ReadObjectSnafu { path: &path })?
161 .into_futures_async_read(0..meta.content_length())
162 .await
163 .context(error::ReadObjectSnafu { path: &path })?
164 .compat();
165 let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
166 .await
167 .context(error::ReadParquetMetadataSnafu)?;
168
169 Ok(FileMetadata::Parquet {
170 schema: metadata.schema().clone(),
171 metadata,
172 path,
173 })
174 }
175 Format::Orc(_) => {
176 let meta = object_store
177 .stat(&path)
178 .await
179 .context(error::ReadObjectSnafu { path: &path })?;
180
181 let reader = object_store
182 .reader(&path)
183 .await
184 .context(error::ReadObjectSnafu { path: &path })?;
185
186 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
187 .await
188 .context(error::ReadOrcSnafu)?;
189
190 Ok(FileMetadata::Orc {
191 schema: Arc::new(schema),
192 path,
193 })
194 }
195 }
196 }
197
198 async fn build_read_stream(
199 &self,
200 compat_schema: SchemaRef,
201 object_store: &ObjectStore,
202 file_metadata: &FileMetadata,
203 projection: Vec<usize>,
204 filters: Vec<Expr>,
205 ) -> Result<DfSendableRecordBatchStream> {
206 match file_metadata {
207 FileMetadata::Csv {
208 format,
209 path,
210 schema,
211 } => {
212 let output_schema = Arc::new(
213 compat_schema
214 .project(&projection)
215 .context(error::ProjectSchemaSnafu)?,
216 );
217
218 let options = CsvOptions::default()
219 .with_has_header(format.has_header)
220 .with_delimiter(format.delimiter);
221 let csv_source = CsvSource::new(schema.clone())
222 .with_csv_options(options)
223 .with_batch_size(DEFAULT_BATCH_SIZE);
224 let stream = file_to_stream(
225 object_store,
226 path,
227 csv_source,
228 Some(projection),
229 format.compression_type,
230 )
231 .await
232 .context(error::BuildFileStreamSnafu)?;
233
234 Ok(Box::pin(
235 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
238 .with_filter(filters)
239 .context(error::PhysicalExprSnafu)?,
240 ))
241 }
242 FileMetadata::Json {
243 path,
244 format,
245 schema,
246 } => {
247 let output_schema = Arc::new(
248 compat_schema
249 .project(&projection)
250 .context(error::ProjectSchemaSnafu)?,
251 );
252
253 let json_source =
254 JsonSource::new(schema.clone()).with_batch_size(DEFAULT_BATCH_SIZE);
255 let stream = file_to_stream(
256 object_store,
257 path,
258 json_source,
259 Some(projection),
260 format.compression_type,
261 )
262 .await
263 .context(error::BuildFileStreamSnafu)?;
264
265 Ok(Box::pin(
266 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
269 .with_filter(filters)
270 .context(error::PhysicalExprSnafu)?,
271 ))
272 }
273 FileMetadata::Parquet { metadata, path, .. } => {
274 let meta = object_store
275 .stat(path)
276 .await
277 .context(error::ReadObjectSnafu { path })?;
278 let reader = object_store
279 .reader_with(path)
280 .chunk(DEFAULT_READ_BUFFER)
281 .await
282 .context(error::ReadObjectSnafu { path })?
283 .into_futures_async_read(0..meta.content_length())
284 .await
285 .context(error::ReadObjectSnafu { path })?
286 .compat();
287 let builder =
288 ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
289 let stream = builder
290 .build()
291 .context(error::BuildParquetRecordBatchStreamSnafu)?;
292
293 let output_schema = Arc::new(
294 compat_schema
295 .project(&projection)
296 .context(error::ProjectSchemaSnafu)?,
297 );
298 Ok(Box::pin(
299 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
300 .with_filter(filters)
301 .context(error::PhysicalExprSnafu)?,
302 ))
303 }
304 FileMetadata::Orc { path, .. } => {
305 let meta = object_store
306 .stat(path)
307 .await
308 .context(error::ReadObjectSnafu { path })?;
309
310 let reader = object_store
311 .reader_with(path)
312 .chunk(DEFAULT_READ_BUFFER)
313 .await
314 .context(error::ReadObjectSnafu { path })?;
315 let stream =
316 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
317 .await
318 .context(error::ReadOrcSnafu)?;
319
320 let output_schema = Arc::new(
321 compat_schema
322 .project(&projection)
323 .context(error::ProjectSchemaSnafu)?,
324 );
325
326 Ok(Box::pin(
327 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
328 .with_filter(filters)
329 .context(error::PhysicalExprSnafu)?,
330 ))
331 }
332 }
333 }
334
335 #[tracing::instrument(skip_all)]
336 pub async fn copy_table_from(
337 &self,
338 req: CopyTableRequest,
339 query_ctx: QueryContextRef,
340 ) -> Result<Output> {
341 let table_ref = TableReference {
342 catalog: &req.catalog_name,
343 schema: &req.schema_name,
344 table: &req.table_name,
345 };
346 let table = self.get_table(&table_ref).await?;
347 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
348 let (object_store, entries) = self.list_copy_from_entries(&req).await?;
349 let mut files = Vec::with_capacity(entries.len());
350 let table_schema = table.schema().arrow_schema().clone();
351 let filters = table
352 .schema()
353 .timestamp_column()
354 .and_then(|c| {
355 common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
356 })
357 .into_iter()
358 .collect::<Vec<_>>();
359
360 for entry in entries.iter() {
361 if entry.metadata().mode() != EntryMode::FILE {
362 continue;
363 }
364 let path = entry.path();
365 let file_metadata = self
366 .collect_metadata(&object_store, format.clone(), path.to_string())
367 .await?;
368
369 let file_schema = file_metadata.schema();
370 let (file_schema_projection, table_schema_projection, compat_schema) =
371 generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
372 let projected_file_schema = Arc::new(
373 file_schema
374 .project(&file_schema_projection)
375 .context(error::ProjectSchemaSnafu)?,
376 );
377 let projected_table_schema = Arc::new(
378 table_schema
379 .project(&table_schema_projection)
380 .context(error::ProjectSchemaSnafu)?,
381 );
382 ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
383
384 files.push((
385 Arc::new(compat_schema),
386 file_schema_projection,
387 projected_table_schema,
388 file_metadata,
389 ))
390 }
391
392 let mut rows_inserted = 0;
393 let mut insert_cost = 0;
394 let max_insert_rows = req.limit.map(|n| n as usize);
395 for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
396 {
397 let mut stream = self
398 .build_read_stream(
399 compat_schema,
400 &object_store,
401 &file_metadata,
402 file_schema_projection,
403 filters.clone(),
404 )
405 .await?;
406
407 let fields = projected_table_schema
408 .fields()
409 .iter()
410 .map(|f| f.name().clone())
411 .collect::<Vec<_>>();
412
413 let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
415 let mut pending_mem_size = 0;
416 let mut pending = vec![];
417
418 while let Some(r) = stream.next().await {
419 let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
420 let vectors =
421 Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
422
423 pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
424
425 let columns_values = fields
426 .iter()
427 .cloned()
428 .zip(vectors)
429 .collect::<HashMap<_, _>>();
430
431 pending.push(self.inserter.handle_table_insert(
432 InsertRequest {
433 catalog_name: req.catalog_name.clone(),
434 schema_name: req.schema_name.clone(),
435 table_name: req.table_name.clone(),
436 columns_values,
437 },
438 query_ctx.clone(),
439 ));
440
441 if pending_mem_size as u64 >= pending_mem_threshold {
442 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
443 rows_inserted += rows;
444 insert_cost += cost;
445 }
446
447 if let Some(max_insert_rows) = max_insert_rows
448 && rows_inserted >= max_insert_rows
449 {
450 return Ok(gen_insert_output(rows_inserted, insert_cost));
451 }
452 }
453
454 if !pending.is_empty() {
455 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
456 rows_inserted += rows;
457 insert_cost += cost;
458 }
459 }
460
461 Ok(gen_insert_output(rows_inserted, insert_cost))
462 }
463}
464
465fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
466 Output::new(
467 OutputData::AffectedRows(rows_inserted),
468 OutputMeta::new_with_cost(insert_cost),
469 )
470}
471
472async fn batch_insert(
474 pending: &mut Vec<impl Future<Output = Result<Output>>>,
475 pending_bytes: &mut usize,
476) -> Result<(OutputRows, OutputCost)> {
477 let batch = pending.drain(..);
478 let result = futures::future::try_join_all(batch)
479 .await?
480 .iter()
481 .map(|o| o.extract_rows_and_cost())
482 .reduce(|(a, b), (c, d)| (a + c, b + d))
483 .unwrap_or((0, 0));
484 *pending_bytes = 0;
485 Ok(result)
486}
487
488fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
490 if let ArrowDataType::Map(_, _) = from
492 && let ArrowDataType::Binary = to
493 {
494 return true;
495 }
496
497 can_cast_types(from, to)
499}
500
501fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
502 let not_match = from
503 .fields
504 .iter()
505 .zip(to.fields.iter())
506 .map(|(l, r)| (l.data_type(), r.data_type()))
507 .enumerate()
508 .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
509
510 if let Some((index, _)) = not_match {
511 error::InvalidSchemaSnafu {
512 index,
513 table_schema: to.to_string(),
514 file_schema: from.to_string(),
515 }
516 .fail()
517 } else {
518 Ok(())
519 }
520}
521
522fn generated_schema_projection_and_compatible_file_schema(
527 file: &SchemaRef,
528 table: &SchemaRef,
529) -> (Vec<usize>, Vec<usize>, Schema) {
530 let mut file_projection = Vec::with_capacity(file.fields.len());
531 let mut table_projection = Vec::with_capacity(file.fields.len());
532 let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
533 for (file_idx, file_field) in file.fields.iter().enumerate() {
534 if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
535 file_projection.push(file_idx);
536 table_projection.push(table_idx);
537
538 compatible_fields[file_idx] = table_field.clone();
540 }
541 }
542
543 (
544 file_projection,
545 table_projection,
546 Schema::new(compatible_fields),
547 )
548}
549
550#[cfg(test)]
551mod tests {
552 use std::sync::Arc;
553
554 use datatypes::arrow::datatypes::{DataType, Field, Schema};
555
556 use super::*;
557
558 fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
559 let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
560 let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
561 let res = ensure_schema_compatible(&s1, &s2);
562 assert_eq!(
563 matches,
564 res.is_ok(),
565 "from data type: {}, to data type: {}, expected: {}, but got: {}",
566 from.0,
567 to.0,
568 matches,
569 res.is_ok()
570 )
571 }
572
573 #[test]
574 fn test_ensure_datatype_matches_ignore_timezone() {
575 test_schema_matches(
576 (
577 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
578 true,
579 ),
580 (
581 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
582 true,
583 ),
584 true,
585 );
586
587 test_schema_matches(
588 (
589 DataType::Timestamp(
590 datatypes::arrow::datatypes::TimeUnit::Second,
591 Some("UTC".into()),
592 ),
593 true,
594 ),
595 (
596 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
597 true,
598 ),
599 true,
600 );
601
602 test_schema_matches(
603 (
604 DataType::Timestamp(
605 datatypes::arrow::datatypes::TimeUnit::Second,
606 Some("UTC".into()),
607 ),
608 true,
609 ),
610 (
611 DataType::Timestamp(
612 datatypes::arrow::datatypes::TimeUnit::Second,
613 Some("PDT".into()),
614 ),
615 true,
616 ),
617 true,
618 );
619
620 test_schema_matches(
621 (
622 DataType::Timestamp(
623 datatypes::arrow::datatypes::TimeUnit::Second,
624 Some("UTC".into()),
625 ),
626 true,
627 ),
628 (
629 DataType::Timestamp(
630 datatypes::arrow::datatypes::TimeUnit::Millisecond,
631 Some("UTC".into()),
632 ),
633 true,
634 ),
635 true,
636 );
637
638 test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
639
640 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
641 }
642
643 #[test]
644 fn test_data_type_equals_ignore_timezone_with_options() {
645 test_schema_matches(
646 (
647 DataType::Timestamp(
648 datatypes::arrow::datatypes::TimeUnit::Microsecond,
649 Some("UTC".into()),
650 ),
651 true,
652 ),
653 (
654 DataType::Timestamp(
655 datatypes::arrow::datatypes::TimeUnit::Millisecond,
656 Some("PDT".into()),
657 ),
658 true,
659 ),
660 true,
661 );
662
663 test_schema_matches(
664 (DataType::Utf8, true),
665 (
666 DataType::Timestamp(
667 datatypes::arrow::datatypes::TimeUnit::Millisecond,
668 Some("PDT".into()),
669 ),
670 true,
671 ),
672 true,
673 );
674
675 test_schema_matches(
676 (
677 DataType::Timestamp(
678 datatypes::arrow::datatypes::TimeUnit::Millisecond,
679 Some("PDT".into()),
680 ),
681 true,
682 ),
683 (DataType::Utf8, true),
684 true,
685 );
686 }
687
688 #[test]
689 fn test_map_to_binary_json_compatibility() {
690 let map_type = DataType::Map(
692 Arc::new(Field::new(
693 "key_value",
694 DataType::Struct(
695 vec![
696 Field::new("key", DataType::Utf8, false),
697 Field::new("value", DataType::Utf8, false),
698 ]
699 .into(),
700 ),
701 false,
702 )),
703 false,
704 );
705
706 test_schema_matches((map_type, false), (DataType::Binary, true), true);
707
708 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
709 test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
710 }
711
712 fn make_test_schema(v: &[Field]) -> Arc<Schema> {
713 Arc::new(Schema::new(v.to_vec()))
714 }
715
716 #[test]
717 fn test_compatible_file_schema() {
718 let file_schema0 = make_test_schema(&[
719 Field::new("c1", DataType::UInt8, true),
720 Field::new("c2", DataType::UInt8, true),
721 ]);
722
723 let table_schema = make_test_schema(&[
724 Field::new("c1", DataType::Int16, true),
725 Field::new("c2", DataType::Int16, true),
726 Field::new("c3", DataType::Int16, true),
727 ]);
728
729 let compat_schema = make_test_schema(&[
730 Field::new("c1", DataType::Int16, true),
731 Field::new("c2", DataType::Int16, true),
732 ]);
733
734 let (_, tp, _) =
735 generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
736
737 assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
738 }
739
740 #[test]
741 fn test_schema_projection() {
742 let file_schema0 = make_test_schema(&[
743 Field::new("c1", DataType::UInt8, true),
744 Field::new("c2", DataType::UInt8, true),
745 Field::new("c3", DataType::UInt8, true),
746 ]);
747
748 let file_schema1 = make_test_schema(&[
749 Field::new("c3", DataType::UInt8, true),
750 Field::new("c4", DataType::UInt8, true),
751 ]);
752
753 let file_schema2 = make_test_schema(&[
754 Field::new("c3", DataType::UInt8, true),
755 Field::new("c4", DataType::UInt8, true),
756 Field::new("c5", DataType::UInt8, true),
757 ]);
758
759 let file_schema3 = make_test_schema(&[
760 Field::new("c1", DataType::UInt8, true),
761 Field::new("c2", DataType::UInt8, true),
762 ]);
763
764 let table_schema = make_test_schema(&[
765 Field::new("c3", DataType::UInt8, true),
766 Field::new("c4", DataType::UInt8, true),
767 Field::new("c5", DataType::UInt8, true),
768 ]);
769
770 let tests = [
771 (&file_schema0, &table_schema, true), (&file_schema1, &table_schema, true), (&file_schema2, &table_schema, true), (&file_schema3, &table_schema, true), ];
776
777 for test in tests {
778 let (fp, tp, _) =
779 generated_schema_projection_and_compatible_file_schema(test.0, test.1);
780 assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
781 }
782 }
783}