1use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
24use common_datasource::file_format::{FileFormat, Format};
25use common_datasource::lister::{Lister, Source};
26use common_datasource::object_store::{build_backend, parse_url, FS_SCHEMA};
27use common_datasource::util::find_dir_and_filename;
28use common_query::{OutputCost, OutputRows};
29use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_telemetry::{debug, tracing};
32use datafusion::datasource::listing::PartitionedFile;
33use datafusion::datasource::object_store::ObjectStoreUrl;
34use datafusion::datasource::physical_plan::{
35 CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource,
36};
37use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
38use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
39use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
40use datafusion_expr::Expr;
41use datatypes::arrow::compute::can_cast_types;
42use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
43use datatypes::vectors::Helper;
44use futures_util::StreamExt;
45use object_store::{Entry, EntryMode, ObjectStore};
46use regex::Regex;
47use session::context::QueryContextRef;
48use snafu::{ensure, ResultExt};
49use table::requests::{CopyTableRequest, InsertRequest};
50use table::table_reference::TableReference;
51use tokio_util::compat::FuturesAsyncReadCompatExt;
52
53use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
54use crate::statement::StatementExecutor;
55
56const DEFAULT_BATCH_SIZE: usize = 8192;
57const DEFAULT_READ_BUFFER: usize = 256 * 1024;
58enum FileMetadata {
59 Parquet {
60 schema: SchemaRef,
61 metadata: ArrowReaderMetadata,
62 path: String,
63 },
64 Orc {
65 schema: SchemaRef,
66 path: String,
67 },
68 Json {
69 schema: SchemaRef,
70 path: String,
71 },
72 Csv {
73 schema: SchemaRef,
74 format: CsvFormat,
75 path: String,
76 },
77}
78
79impl FileMetadata {
80 pub fn schema(&self) -> &SchemaRef {
82 match self {
83 FileMetadata::Parquet { schema, .. } => schema,
84 FileMetadata::Orc { schema, .. } => schema,
85 FileMetadata::Json { schema, .. } => schema,
86 FileMetadata::Csv { schema, .. } => schema,
87 }
88 }
89}
90
91impl StatementExecutor {
92 async fn list_copy_from_entries(
93 &self,
94 req: &CopyTableRequest,
95 ) -> Result<(ObjectStore, Vec<Entry>)> {
96 let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
97
98 if schema.to_uppercase() == FS_SCHEMA {
99 ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
100 }
101
102 let object_store =
103 build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
104
105 let (dir, filename) = find_dir_and_filename(&path);
106 let regex = req
107 .pattern
108 .as_ref()
109 .map(|x| Regex::new(x))
110 .transpose()
111 .context(error::BuildRegexSnafu)?;
112
113 let source = if let Some(filename) = filename {
114 Source::Filename(filename)
115 } else {
116 Source::Dir
117 };
118
119 let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
120
121 let entries = lister.list().await.context(error::ListObjectsSnafu)?;
122 debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
123 Ok((object_store, entries))
124 }
125
126 async fn collect_metadata(
127 &self,
128 object_store: &ObjectStore,
129 format: Format,
130 path: String,
131 ) -> Result<FileMetadata> {
132 match format {
133 Format::Csv(format) => Ok(FileMetadata::Csv {
134 schema: Arc::new(
135 format
136 .infer_schema(object_store, &path)
137 .await
138 .context(error::InferSchemaSnafu { path: &path })?,
139 ),
140 format,
141 path,
142 }),
143 Format::Json(format) => Ok(FileMetadata::Json {
144 schema: Arc::new(
145 format
146 .infer_schema(object_store, &path)
147 .await
148 .context(error::InferSchemaSnafu { path: &path })?,
149 ),
150 path,
151 }),
152 Format::Parquet(_) => {
153 let meta = object_store
154 .stat(&path)
155 .await
156 .context(error::ReadObjectSnafu { path: &path })?;
157 let mut reader = object_store
158 .reader(&path)
159 .await
160 .context(error::ReadObjectSnafu { path: &path })?
161 .into_futures_async_read(0..meta.content_length())
162 .await
163 .context(error::ReadObjectSnafu { path: &path })?
164 .compat();
165 let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
166 .await
167 .context(error::ReadParquetMetadataSnafu)?;
168
169 Ok(FileMetadata::Parquet {
170 schema: metadata.schema().clone(),
171 metadata,
172 path,
173 })
174 }
175 Format::Orc(_) => {
176 let meta = object_store
177 .stat(&path)
178 .await
179 .context(error::ReadObjectSnafu { path: &path })?;
180
181 let reader = object_store
182 .reader(&path)
183 .await
184 .context(error::ReadObjectSnafu { path: &path })?;
185
186 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
187 .await
188 .context(error::ReadOrcSnafu)?;
189
190 Ok(FileMetadata::Orc {
191 schema: Arc::new(schema),
192 path,
193 })
194 }
195 }
196 }
197
198 async fn build_file_stream(
199 &self,
200 store: &ObjectStore,
201 filename: &str,
202 file_schema: SchemaRef,
203 file_source: Arc<dyn FileSource>,
204 projection: Option<Vec<usize>>,
205 ) -> Result<DfSendableRecordBatchStream> {
206 let config = FileScanConfigBuilder::new(
207 ObjectStoreUrl::local_filesystem(),
208 file_schema,
209 file_source.clone(),
210 )
211 .with_file_group(FileGroup::new(vec![PartitionedFile::new(filename, 0)]))
212 .with_projection(projection)
213 .build();
214
215 let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
216 let file_opener = file_source
217 .with_projection(&config)
218 .create_file_opener(store, &config, 0);
219 let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
220 .context(error::BuildFileStreamSnafu)?;
221
222 Ok(Box::pin(stream))
223 }
224
225 async fn build_read_stream(
226 &self,
227 compat_schema: SchemaRef,
228 object_store: &ObjectStore,
229 file_metadata: &FileMetadata,
230 projection: Vec<usize>,
231 filters: Vec<Expr>,
232 ) -> Result<DfSendableRecordBatchStream> {
233 match file_metadata {
234 FileMetadata::Csv {
235 format,
236 path,
237 schema,
238 } => {
239 let output_schema = Arc::new(
240 compat_schema
241 .project(&projection)
242 .context(error::ProjectSchemaSnafu)?,
243 );
244
245 let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
246 .with_schema(schema.clone())
247 .with_batch_size(DEFAULT_BATCH_SIZE);
248
249 let stream = self
250 .build_file_stream(
251 object_store,
252 path,
253 schema.clone(),
254 csv_source,
255 Some(projection),
256 )
257 .await?;
258
259 Ok(Box::pin(
260 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
263 .with_filter(filters)
264 .context(error::PhysicalExprSnafu)?,
265 ))
266 }
267 FileMetadata::Json { path, schema } => {
268 let output_schema = Arc::new(
269 compat_schema
270 .project(&projection)
271 .context(error::ProjectSchemaSnafu)?,
272 );
273
274 let json_source = JsonSource::new()
275 .with_schema(schema.clone())
276 .with_batch_size(DEFAULT_BATCH_SIZE);
277
278 let stream = self
279 .build_file_stream(
280 object_store,
281 path,
282 schema.clone(),
283 json_source,
284 Some(projection),
285 )
286 .await?;
287
288 Ok(Box::pin(
289 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
292 .with_filter(filters)
293 .context(error::PhysicalExprSnafu)?,
294 ))
295 }
296 FileMetadata::Parquet { metadata, path, .. } => {
297 let meta = object_store
298 .stat(path)
299 .await
300 .context(error::ReadObjectSnafu { path })?;
301 let reader = object_store
302 .reader_with(path)
303 .chunk(DEFAULT_READ_BUFFER)
304 .await
305 .context(error::ReadObjectSnafu { path })?
306 .into_futures_async_read(0..meta.content_length())
307 .await
308 .context(error::ReadObjectSnafu { path })?
309 .compat();
310 let builder =
311 ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
312 let stream = builder
313 .build()
314 .context(error::BuildParquetRecordBatchStreamSnafu)?;
315
316 let output_schema = Arc::new(
317 compat_schema
318 .project(&projection)
319 .context(error::ProjectSchemaSnafu)?,
320 );
321 Ok(Box::pin(
322 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
323 .with_filter(filters)
324 .context(error::PhysicalExprSnafu)?,
325 ))
326 }
327 FileMetadata::Orc { path, .. } => {
328 let meta = object_store
329 .stat(path)
330 .await
331 .context(error::ReadObjectSnafu { path })?;
332
333 let reader = object_store
334 .reader_with(path)
335 .chunk(DEFAULT_READ_BUFFER)
336 .await
337 .context(error::ReadObjectSnafu { path })?;
338 let stream =
339 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
340 .await
341 .context(error::ReadOrcSnafu)?;
342
343 let output_schema = Arc::new(
344 compat_schema
345 .project(&projection)
346 .context(error::ProjectSchemaSnafu)?,
347 );
348
349 Ok(Box::pin(
350 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
351 .with_filter(filters)
352 .context(error::PhysicalExprSnafu)?,
353 ))
354 }
355 }
356 }
357
358 #[tracing::instrument(skip_all)]
359 pub async fn copy_table_from(
360 &self,
361 req: CopyTableRequest,
362 query_ctx: QueryContextRef,
363 ) -> Result<Output> {
364 let table_ref = TableReference {
365 catalog: &req.catalog_name,
366 schema: &req.schema_name,
367 table: &req.table_name,
368 };
369 let table = self.get_table(&table_ref).await?;
370 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
371 let (object_store, entries) = self.list_copy_from_entries(&req).await?;
372 let mut files = Vec::with_capacity(entries.len());
373 let table_schema = table.schema().arrow_schema().clone();
374 let filters = table
375 .schema()
376 .timestamp_column()
377 .and_then(|c| {
378 common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
379 })
380 .into_iter()
381 .collect::<Vec<_>>();
382
383 for entry in entries.iter() {
384 if entry.metadata().mode() != EntryMode::FILE {
385 continue;
386 }
387 let path = entry.path();
388 let file_metadata = self
389 .collect_metadata(&object_store, format, path.to_string())
390 .await?;
391
392 let file_schema = file_metadata.schema();
393 let (file_schema_projection, table_schema_projection, compat_schema) =
394 generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
395 let projected_file_schema = Arc::new(
396 file_schema
397 .project(&file_schema_projection)
398 .context(error::ProjectSchemaSnafu)?,
399 );
400 let projected_table_schema = Arc::new(
401 table_schema
402 .project(&table_schema_projection)
403 .context(error::ProjectSchemaSnafu)?,
404 );
405 ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
406
407 files.push((
408 Arc::new(compat_schema),
409 file_schema_projection,
410 projected_table_schema,
411 file_metadata,
412 ))
413 }
414
415 let mut rows_inserted = 0;
416 let mut insert_cost = 0;
417 let max_insert_rows = req.limit.map(|n| n as usize);
418 for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
419 {
420 let mut stream = self
421 .build_read_stream(
422 compat_schema,
423 &object_store,
424 &file_metadata,
425 file_schema_projection,
426 filters.clone(),
427 )
428 .await?;
429
430 let fields = projected_table_schema
431 .fields()
432 .iter()
433 .map(|f| f.name().to_string())
434 .collect::<Vec<_>>();
435
436 let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
438 let mut pending_mem_size = 0;
439 let mut pending = vec![];
440
441 while let Some(r) = stream.next().await {
442 let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
443 let vectors =
444 Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
445
446 pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
447
448 let columns_values = fields
449 .iter()
450 .cloned()
451 .zip(vectors)
452 .collect::<HashMap<_, _>>();
453
454 pending.push(self.inserter.handle_table_insert(
455 InsertRequest {
456 catalog_name: req.catalog_name.to_string(),
457 schema_name: req.schema_name.to_string(),
458 table_name: req.table_name.to_string(),
459 columns_values,
460 },
461 query_ctx.clone(),
462 ));
463
464 if pending_mem_size as u64 >= pending_mem_threshold {
465 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
466 rows_inserted += rows;
467 insert_cost += cost;
468 }
469
470 if let Some(max_insert_rows) = max_insert_rows {
471 if rows_inserted >= max_insert_rows {
472 return Ok(gen_insert_output(rows_inserted, insert_cost));
473 }
474 }
475 }
476
477 if !pending.is_empty() {
478 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
479 rows_inserted += rows;
480 insert_cost += cost;
481 }
482 }
483
484 Ok(gen_insert_output(rows_inserted, insert_cost))
485 }
486}
487
488fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
489 Output::new(
490 OutputData::AffectedRows(rows_inserted),
491 OutputMeta::new_with_cost(insert_cost),
492 )
493}
494
495async fn batch_insert(
497 pending: &mut Vec<impl Future<Output = Result<Output>>>,
498 pending_bytes: &mut usize,
499) -> Result<(OutputRows, OutputCost)> {
500 let batch = pending.drain(..);
501 let result = futures::future::try_join_all(batch)
502 .await?
503 .iter()
504 .map(|o| o.extract_rows_and_cost())
505 .reduce(|(a, b), (c, d)| (a + c, b + d))
506 .unwrap_or((0, 0));
507 *pending_bytes = 0;
508 Ok(result)
509}
510
511fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
513 if let ArrowDataType::Map(_, _) = from {
515 if let ArrowDataType::Binary = to {
516 return true;
517 }
518 }
519
520 can_cast_types(from, to)
522}
523
524fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
525 let not_match = from
526 .fields
527 .iter()
528 .zip(to.fields.iter())
529 .map(|(l, r)| (l.data_type(), r.data_type()))
530 .enumerate()
531 .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
532
533 if let Some((index, _)) = not_match {
534 error::InvalidSchemaSnafu {
535 index,
536 table_schema: to.to_string(),
537 file_schema: from.to_string(),
538 }
539 .fail()
540 } else {
541 Ok(())
542 }
543}
544
545fn generated_schema_projection_and_compatible_file_schema(
550 file: &SchemaRef,
551 table: &SchemaRef,
552) -> (Vec<usize>, Vec<usize>, Schema) {
553 let mut file_projection = Vec::with_capacity(file.fields.len());
554 let mut table_projection = Vec::with_capacity(file.fields.len());
555 let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
556 for (file_idx, file_field) in file.fields.iter().enumerate() {
557 if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
558 file_projection.push(file_idx);
559 table_projection.push(table_idx);
560
561 compatible_fields[file_idx] = table_field.clone();
563 }
564 }
565
566 (
567 file_projection,
568 table_projection,
569 Schema::new(compatible_fields),
570 )
571}
572
573#[cfg(test)]
574mod tests {
575 use std::sync::Arc;
576
577 use datatypes::arrow::datatypes::{DataType, Field, Schema};
578
579 use super::*;
580
581 fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
582 let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
583 let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
584 let res = ensure_schema_compatible(&s1, &s2);
585 assert_eq!(
586 matches,
587 res.is_ok(),
588 "from data type: {}, to data type: {}, expected: {}, but got: {}",
589 from.0,
590 to.0,
591 matches,
592 res.is_ok()
593 )
594 }
595
596 #[test]
597 fn test_ensure_datatype_matches_ignore_timezone() {
598 test_schema_matches(
599 (
600 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
601 true,
602 ),
603 (
604 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
605 true,
606 ),
607 true,
608 );
609
610 test_schema_matches(
611 (
612 DataType::Timestamp(
613 datatypes::arrow::datatypes::TimeUnit::Second,
614 Some("UTC".into()),
615 ),
616 true,
617 ),
618 (
619 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
620 true,
621 ),
622 true,
623 );
624
625 test_schema_matches(
626 (
627 DataType::Timestamp(
628 datatypes::arrow::datatypes::TimeUnit::Second,
629 Some("UTC".into()),
630 ),
631 true,
632 ),
633 (
634 DataType::Timestamp(
635 datatypes::arrow::datatypes::TimeUnit::Second,
636 Some("PDT".into()),
637 ),
638 true,
639 ),
640 true,
641 );
642
643 test_schema_matches(
644 (
645 DataType::Timestamp(
646 datatypes::arrow::datatypes::TimeUnit::Second,
647 Some("UTC".into()),
648 ),
649 true,
650 ),
651 (
652 DataType::Timestamp(
653 datatypes::arrow::datatypes::TimeUnit::Millisecond,
654 Some("UTC".into()),
655 ),
656 true,
657 ),
658 true,
659 );
660
661 test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
662
663 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
664 }
665
666 #[test]
667 fn test_data_type_equals_ignore_timezone_with_options() {
668 test_schema_matches(
669 (
670 DataType::Timestamp(
671 datatypes::arrow::datatypes::TimeUnit::Microsecond,
672 Some("UTC".into()),
673 ),
674 true,
675 ),
676 (
677 DataType::Timestamp(
678 datatypes::arrow::datatypes::TimeUnit::Millisecond,
679 Some("PDT".into()),
680 ),
681 true,
682 ),
683 true,
684 );
685
686 test_schema_matches(
687 (DataType::Utf8, true),
688 (
689 DataType::Timestamp(
690 datatypes::arrow::datatypes::TimeUnit::Millisecond,
691 Some("PDT".into()),
692 ),
693 true,
694 ),
695 true,
696 );
697
698 test_schema_matches(
699 (
700 DataType::Timestamp(
701 datatypes::arrow::datatypes::TimeUnit::Millisecond,
702 Some("PDT".into()),
703 ),
704 true,
705 ),
706 (DataType::Utf8, true),
707 true,
708 );
709 }
710
711 #[test]
712 fn test_map_to_binary_json_compatibility() {
713 let map_type = DataType::Map(
715 Arc::new(Field::new(
716 "key_value",
717 DataType::Struct(
718 vec![
719 Field::new("key", DataType::Utf8, false),
720 Field::new("value", DataType::Utf8, false),
721 ]
722 .into(),
723 ),
724 false,
725 )),
726 false,
727 );
728
729 test_schema_matches((map_type, false), (DataType::Binary, true), true);
730
731 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
732 test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
733 }
734
735 fn make_test_schema(v: &[Field]) -> Arc<Schema> {
736 Arc::new(Schema::new(v.to_vec()))
737 }
738
739 #[test]
740 fn test_compatible_file_schema() {
741 let file_schema0 = make_test_schema(&[
742 Field::new("c1", DataType::UInt8, true),
743 Field::new("c2", DataType::UInt8, true),
744 ]);
745
746 let table_schema = make_test_schema(&[
747 Field::new("c1", DataType::Int16, true),
748 Field::new("c2", DataType::Int16, true),
749 Field::new("c3", DataType::Int16, true),
750 ]);
751
752 let compat_schema = make_test_schema(&[
753 Field::new("c1", DataType::Int16, true),
754 Field::new("c2", DataType::Int16, true),
755 ]);
756
757 let (_, tp, _) =
758 generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
759
760 assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
761 }
762
763 #[test]
764 fn test_schema_projection() {
765 let file_schema0 = make_test_schema(&[
766 Field::new("c1", DataType::UInt8, true),
767 Field::new("c2", DataType::UInt8, true),
768 Field::new("c3", DataType::UInt8, true),
769 ]);
770
771 let file_schema1 = make_test_schema(&[
772 Field::new("c3", DataType::UInt8, true),
773 Field::new("c4", DataType::UInt8, true),
774 ]);
775
776 let file_schema2 = make_test_schema(&[
777 Field::new("c3", DataType::UInt8, true),
778 Field::new("c4", DataType::UInt8, true),
779 Field::new("c5", DataType::UInt8, true),
780 ]);
781
782 let file_schema3 = make_test_schema(&[
783 Field::new("c1", DataType::UInt8, true),
784 Field::new("c2", DataType::UInt8, true),
785 ]);
786
787 let table_schema = make_test_schema(&[
788 Field::new("c3", DataType::UInt8, true),
789 Field::new("c4", DataType::UInt8, true),
790 Field::new("c5", DataType::UInt8, true),
791 ]);
792
793 let tests = [
794 (&file_schema0, &table_schema, true), (&file_schema1, &table_schema, true), (&file_schema2, &table_schema, true), (&file_schema3, &table_schema, true), ];
799
800 for test in tests {
801 let (fp, tp, _) =
802 generated_schema_projection_and_compatible_file_schema(test.0, test.1);
803 assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
804 }
805 }
806}