1use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
25use common_datasource::file_format::{FileFormat, Format};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{build_backend, parse_url, FS_SCHEMA};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
31use common_recordbatch::DfSendableRecordBatchStream;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::listing::PartitionedFile;
34use datafusion::datasource::object_store::ObjectStoreUrl;
35use datafusion::datasource::physical_plan::{
36 CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener,
37};
38use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
39use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
40use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
41use datafusion_common::{Constraints, Statistics};
42use datafusion_expr::Expr;
43use datatypes::arrow::compute::can_cast_types;
44use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
45use datatypes::vectors::Helper;
46use futures_util::StreamExt;
47use object_store::{Entry, EntryMode, ObjectStore};
48use regex::Regex;
49use session::context::QueryContextRef;
50use snafu::{ensure, ResultExt};
51use table::requests::{CopyTableRequest, InsertRequest};
52use table::table_reference::TableReference;
53use tokio_util::compat::FuturesAsyncReadCompatExt;
54
55use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
56use crate::statement::StatementExecutor;
57
58const DEFAULT_BATCH_SIZE: usize = 8192;
59const DEFAULT_READ_BUFFER: usize = 256 * 1024;
60enum FileMetadata {
61 Parquet {
62 schema: SchemaRef,
63 metadata: ArrowReaderMetadata,
64 path: String,
65 },
66 Orc {
67 schema: SchemaRef,
68 path: String,
69 },
70 Json {
71 schema: SchemaRef,
72 format: JsonFormat,
73 path: String,
74 },
75 Csv {
76 schema: SchemaRef,
77 format: CsvFormat,
78 path: String,
79 },
80}
81
82impl FileMetadata {
83 pub fn schema(&self) -> &SchemaRef {
85 match self {
86 FileMetadata::Parquet { schema, .. } => schema,
87 FileMetadata::Orc { schema, .. } => schema,
88 FileMetadata::Json { schema, .. } => schema,
89 FileMetadata::Csv { schema, .. } => schema,
90 }
91 }
92}
93
94impl StatementExecutor {
95 async fn list_copy_from_entries(
96 &self,
97 req: &CopyTableRequest,
98 ) -> Result<(ObjectStore, Vec<Entry>)> {
99 let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
100
101 if schema.to_uppercase() == FS_SCHEMA {
102 ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
103 }
104
105 let object_store =
106 build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
107
108 let (dir, filename) = find_dir_and_filename(&path);
109 let regex = req
110 .pattern
111 .as_ref()
112 .map(|x| Regex::new(x))
113 .transpose()
114 .context(error::BuildRegexSnafu)?;
115
116 let source = if let Some(filename) = filename {
117 Source::Filename(filename)
118 } else {
119 Source::Dir
120 };
121
122 let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
123
124 let entries = lister.list().await.context(error::ListObjectsSnafu)?;
125 debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
126 Ok((object_store, entries))
127 }
128
129 async fn collect_metadata(
130 &self,
131 object_store: &ObjectStore,
132 format: Format,
133 path: String,
134 ) -> Result<FileMetadata> {
135 match format {
136 Format::Csv(format) => Ok(FileMetadata::Csv {
137 schema: Arc::new(
138 format
139 .infer_schema(object_store, &path)
140 .await
141 .context(error::InferSchemaSnafu { path: &path })?,
142 ),
143 format,
144 path,
145 }),
146 Format::Json(format) => Ok(FileMetadata::Json {
147 schema: Arc::new(
148 format
149 .infer_schema(object_store, &path)
150 .await
151 .context(error::InferSchemaSnafu { path: &path })?,
152 ),
153 format,
154 path,
155 }),
156 Format::Parquet(_) => {
157 let meta = object_store
158 .stat(&path)
159 .await
160 .context(error::ReadObjectSnafu { path: &path })?;
161 let mut reader = object_store
162 .reader(&path)
163 .await
164 .context(error::ReadObjectSnafu { path: &path })?
165 .into_futures_async_read(0..meta.content_length())
166 .await
167 .context(error::ReadObjectSnafu { path: &path })?
168 .compat();
169 let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
170 .await
171 .context(error::ReadParquetMetadataSnafu)?;
172
173 Ok(FileMetadata::Parquet {
174 schema: metadata.schema().clone(),
175 metadata,
176 path,
177 })
178 }
179 Format::Orc(_) => {
180 let meta = object_store
181 .stat(&path)
182 .await
183 .context(error::ReadObjectSnafu { path: &path })?;
184
185 let reader = object_store
186 .reader(&path)
187 .await
188 .context(error::ReadObjectSnafu { path: &path })?;
189
190 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
191 .await
192 .context(error::ReadOrcSnafu)?;
193
194 Ok(FileMetadata::Orc {
195 schema: Arc::new(schema),
196 path,
197 })
198 }
199 }
200 }
201
202 async fn build_file_stream<F: FileOpener + Send + 'static>(
203 &self,
204 opener: F,
205 filename: &str,
206 file_schema: SchemaRef,
207 ) -> Result<DfSendableRecordBatchStream> {
208 let statistics = Statistics::new_unknown(file_schema.as_ref());
209 let stream = FileStream::new(
210 &FileScanConfig {
211 object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), file_schema,
213 file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
214 statistics,
215 projection: None,
216 limit: None,
217 table_partition_cols: vec![],
218 output_ordering: vec![],
219 constraints: Constraints::empty(),
220 },
221 0,
222 opener,
223 &ExecutionPlanMetricsSet::new(),
224 )
225 .context(error::BuildFileStreamSnafu)?;
226
227 Ok(Box::pin(stream))
228 }
229
230 async fn build_read_stream(
231 &self,
232 compat_schema: SchemaRef,
233 object_store: &ObjectStore,
234 file_metadata: &FileMetadata,
235 projection: Vec<usize>,
236 filters: Vec<Expr>,
237 ) -> Result<DfSendableRecordBatchStream> {
238 match file_metadata {
239 FileMetadata::Csv {
240 format,
241 path,
242 schema,
243 } => {
244 let output_schema = Arc::new(
245 compat_schema
246 .project(&projection)
247 .context(error::ProjectSchemaSnafu)?,
248 );
249 let csv_config = Arc::new(CsvConfig::new(
250 DEFAULT_BATCH_SIZE,
251 schema.clone(),
252 Some(projection.clone()),
253 format.has_header,
254 format.delimiter,
255 b'"',
256 None,
257 Arc::new(object_store_opendal::OpendalStore::new(
258 object_store.clone(),
259 )),
260 None,
261 ));
262 let projected_file_schema = Arc::new(
263 schema
264 .project(&projection)
265 .context(error::ProjectSchemaSnafu)?,
266 );
267 let stream = self
268 .build_file_stream(
269 CsvOpener::new(csv_config, format.compression_type.into()),
270 path,
271 projected_file_schema,
272 )
273 .await?;
274
275 Ok(Box::pin(
276 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
279 .with_filter(filters)
280 .context(error::PhysicalExprSnafu)?,
281 ))
282 }
283 FileMetadata::Json {
284 format,
285 path,
286 schema,
287 } => {
288 let projected_file_schema = Arc::new(
289 schema
290 .project(&projection)
291 .context(error::ProjectSchemaSnafu)?,
292 );
293 let output_schema = Arc::new(
294 compat_schema
295 .project(&projection)
296 .context(error::ProjectSchemaSnafu)?,
297 );
298 let store = object_store_opendal::OpendalStore::new(object_store.clone());
299 let stream = self
300 .build_file_stream(
301 JsonOpener::new(
302 DEFAULT_BATCH_SIZE,
303 projected_file_schema.clone(),
304 format.compression_type.into(),
305 Arc::new(store),
306 ),
307 path,
308 projected_file_schema,
309 )
310 .await?;
311
312 Ok(Box::pin(
313 RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
316 .with_filter(filters)
317 .context(error::PhysicalExprSnafu)?,
318 ))
319 }
320 FileMetadata::Parquet { metadata, path, .. } => {
321 let meta = object_store
322 .stat(path)
323 .await
324 .context(error::ReadObjectSnafu { path })?;
325 let reader = object_store
326 .reader_with(path)
327 .chunk(DEFAULT_READ_BUFFER)
328 .await
329 .context(error::ReadObjectSnafu { path })?
330 .into_futures_async_read(0..meta.content_length())
331 .await
332 .context(error::ReadObjectSnafu { path })?
333 .compat();
334 let builder =
335 ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
336 let stream = builder
337 .build()
338 .context(error::BuildParquetRecordBatchStreamSnafu)?;
339
340 let output_schema = Arc::new(
341 compat_schema
342 .project(&projection)
343 .context(error::ProjectSchemaSnafu)?,
344 );
345 Ok(Box::pin(
346 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
347 .with_filter(filters)
348 .context(error::PhysicalExprSnafu)?,
349 ))
350 }
351 FileMetadata::Orc { path, .. } => {
352 let meta = object_store
353 .stat(path)
354 .await
355 .context(error::ReadObjectSnafu { path })?;
356
357 let reader = object_store
358 .reader_with(path)
359 .chunk(DEFAULT_READ_BUFFER)
360 .await
361 .context(error::ReadObjectSnafu { path })?;
362 let stream =
363 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
364 .await
365 .context(error::ReadOrcSnafu)?;
366
367 let output_schema = Arc::new(
368 compat_schema
369 .project(&projection)
370 .context(error::ProjectSchemaSnafu)?,
371 );
372
373 Ok(Box::pin(
374 RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
375 .with_filter(filters)
376 .context(error::PhysicalExprSnafu)?,
377 ))
378 }
379 }
380 }
381
382 #[tracing::instrument(skip_all)]
383 pub async fn copy_table_from(
384 &self,
385 req: CopyTableRequest,
386 query_ctx: QueryContextRef,
387 ) -> Result<Output> {
388 let table_ref = TableReference {
389 catalog: &req.catalog_name,
390 schema: &req.schema_name,
391 table: &req.table_name,
392 };
393 let table = self.get_table(&table_ref).await?;
394 let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
395 let (object_store, entries) = self.list_copy_from_entries(&req).await?;
396 let mut files = Vec::with_capacity(entries.len());
397 let table_schema = table.schema().arrow_schema().clone();
398 let filters = table
399 .schema()
400 .timestamp_column()
401 .and_then(|c| {
402 common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
403 })
404 .into_iter()
405 .collect::<Vec<_>>();
406
407 for entry in entries.iter() {
408 if entry.metadata().mode() != EntryMode::FILE {
409 continue;
410 }
411 let path = entry.path();
412 let file_metadata = self
413 .collect_metadata(&object_store, format, path.to_string())
414 .await?;
415
416 let file_schema = file_metadata.schema();
417 let (file_schema_projection, table_schema_projection, compat_schema) =
418 generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
419 let projected_file_schema = Arc::new(
420 file_schema
421 .project(&file_schema_projection)
422 .context(error::ProjectSchemaSnafu)?,
423 );
424 let projected_table_schema = Arc::new(
425 table_schema
426 .project(&table_schema_projection)
427 .context(error::ProjectSchemaSnafu)?,
428 );
429 ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
430
431 files.push((
432 Arc::new(compat_schema),
433 file_schema_projection,
434 projected_table_schema,
435 file_metadata,
436 ))
437 }
438
439 let mut rows_inserted = 0;
440 let mut insert_cost = 0;
441 let max_insert_rows = req.limit.map(|n| n as usize);
442 for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
443 {
444 let mut stream = self
445 .build_read_stream(
446 compat_schema,
447 &object_store,
448 &file_metadata,
449 file_schema_projection,
450 filters.clone(),
451 )
452 .await?;
453
454 let fields = projected_table_schema
455 .fields()
456 .iter()
457 .map(|f| f.name().to_string())
458 .collect::<Vec<_>>();
459
460 let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
462 let mut pending_mem_size = 0;
463 let mut pending = vec![];
464
465 while let Some(r) = stream.next().await {
466 let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
467 let vectors =
468 Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
469
470 pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
471
472 let columns_values = fields
473 .iter()
474 .cloned()
475 .zip(vectors)
476 .collect::<HashMap<_, _>>();
477
478 pending.push(self.inserter.handle_table_insert(
479 InsertRequest {
480 catalog_name: req.catalog_name.to_string(),
481 schema_name: req.schema_name.to_string(),
482 table_name: req.table_name.to_string(),
483 columns_values,
484 },
485 query_ctx.clone(),
486 ));
487
488 if pending_mem_size as u64 >= pending_mem_threshold {
489 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
490 rows_inserted += rows;
491 insert_cost += cost;
492 }
493
494 if let Some(max_insert_rows) = max_insert_rows {
495 if rows_inserted >= max_insert_rows {
496 return Ok(gen_insert_output(rows_inserted, insert_cost));
497 }
498 }
499 }
500
501 if !pending.is_empty() {
502 let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
503 rows_inserted += rows;
504 insert_cost += cost;
505 }
506 }
507
508 Ok(gen_insert_output(rows_inserted, insert_cost))
509 }
510}
511
512fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
513 Output::new(
514 OutputData::AffectedRows(rows_inserted),
515 OutputMeta::new_with_cost(insert_cost),
516 )
517}
518
519async fn batch_insert(
521 pending: &mut Vec<impl Future<Output = Result<Output>>>,
522 pending_bytes: &mut usize,
523) -> Result<(OutputRows, OutputCost)> {
524 let batch = pending.drain(..);
525 let result = futures::future::try_join_all(batch)
526 .await?
527 .iter()
528 .map(|o| o.extract_rows_and_cost())
529 .reduce(|(a, b), (c, d)| (a + c, b + d))
530 .unwrap_or((0, 0));
531 *pending_bytes = 0;
532 Ok(result)
533}
534
535fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
537 if let ArrowDataType::Map(_, _) = from {
539 if let ArrowDataType::Binary = to {
540 return true;
541 }
542 }
543
544 can_cast_types(from, to)
546}
547
548fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
549 let not_match = from
550 .fields
551 .iter()
552 .zip(to.fields.iter())
553 .map(|(l, r)| (l.data_type(), r.data_type()))
554 .enumerate()
555 .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
556
557 if let Some((index, _)) = not_match {
558 error::InvalidSchemaSnafu {
559 index,
560 table_schema: to.to_string(),
561 file_schema: from.to_string(),
562 }
563 .fail()
564 } else {
565 Ok(())
566 }
567}
568
569fn generated_schema_projection_and_compatible_file_schema(
574 file: &SchemaRef,
575 table: &SchemaRef,
576) -> (Vec<usize>, Vec<usize>, Schema) {
577 let mut file_projection = Vec::with_capacity(file.fields.len());
578 let mut table_projection = Vec::with_capacity(file.fields.len());
579 let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
580 for (file_idx, file_field) in file.fields.iter().enumerate() {
581 if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
582 file_projection.push(file_idx);
583 table_projection.push(table_idx);
584
585 compatible_fields[file_idx] = table_field.clone();
587 }
588 }
589
590 (
591 file_projection,
592 table_projection,
593 Schema::new(compatible_fields),
594 )
595}
596
597#[cfg(test)]
598mod tests {
599 use std::sync::Arc;
600
601 use datatypes::arrow::datatypes::{DataType, Field, Schema};
602
603 use super::*;
604
605 fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
606 let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
607 let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
608 let res = ensure_schema_compatible(&s1, &s2);
609 assert_eq!(
610 matches,
611 res.is_ok(),
612 "from data type: {}, to data type: {}, expected: {}, but got: {}",
613 from.0,
614 to.0,
615 matches,
616 res.is_ok()
617 )
618 }
619
620 #[test]
621 fn test_ensure_datatype_matches_ignore_timezone() {
622 test_schema_matches(
623 (
624 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
625 true,
626 ),
627 (
628 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
629 true,
630 ),
631 true,
632 );
633
634 test_schema_matches(
635 (
636 DataType::Timestamp(
637 datatypes::arrow::datatypes::TimeUnit::Second,
638 Some("UTC".into()),
639 ),
640 true,
641 ),
642 (
643 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
644 true,
645 ),
646 true,
647 );
648
649 test_schema_matches(
650 (
651 DataType::Timestamp(
652 datatypes::arrow::datatypes::TimeUnit::Second,
653 Some("UTC".into()),
654 ),
655 true,
656 ),
657 (
658 DataType::Timestamp(
659 datatypes::arrow::datatypes::TimeUnit::Second,
660 Some("PDT".into()),
661 ),
662 true,
663 ),
664 true,
665 );
666
667 test_schema_matches(
668 (
669 DataType::Timestamp(
670 datatypes::arrow::datatypes::TimeUnit::Second,
671 Some("UTC".into()),
672 ),
673 true,
674 ),
675 (
676 DataType::Timestamp(
677 datatypes::arrow::datatypes::TimeUnit::Millisecond,
678 Some("UTC".into()),
679 ),
680 true,
681 ),
682 true,
683 );
684
685 test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
686
687 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
688 }
689
690 #[test]
691 fn test_data_type_equals_ignore_timezone_with_options() {
692 test_schema_matches(
693 (
694 DataType::Timestamp(
695 datatypes::arrow::datatypes::TimeUnit::Microsecond,
696 Some("UTC".into()),
697 ),
698 true,
699 ),
700 (
701 DataType::Timestamp(
702 datatypes::arrow::datatypes::TimeUnit::Millisecond,
703 Some("PDT".into()),
704 ),
705 true,
706 ),
707 true,
708 );
709
710 test_schema_matches(
711 (DataType::Utf8, true),
712 (
713 DataType::Timestamp(
714 datatypes::arrow::datatypes::TimeUnit::Millisecond,
715 Some("PDT".into()),
716 ),
717 true,
718 ),
719 true,
720 );
721
722 test_schema_matches(
723 (
724 DataType::Timestamp(
725 datatypes::arrow::datatypes::TimeUnit::Millisecond,
726 Some("PDT".into()),
727 ),
728 true,
729 ),
730 (DataType::Utf8, true),
731 true,
732 );
733 }
734
735 #[test]
736 fn test_map_to_binary_json_compatibility() {
737 let map_type = DataType::Map(
739 Arc::new(Field::new(
740 "key_value",
741 DataType::Struct(
742 vec![
743 Field::new("key", DataType::Utf8, false),
744 Field::new("value", DataType::Utf8, false),
745 ]
746 .into(),
747 ),
748 false,
749 )),
750 false,
751 );
752
753 test_schema_matches((map_type, false), (DataType::Binary, true), true);
754
755 test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
756 test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
757 }
758
759 fn make_test_schema(v: &[Field]) -> Arc<Schema> {
760 Arc::new(Schema::new(v.to_vec()))
761 }
762
763 #[test]
764 fn test_compatible_file_schema() {
765 let file_schema0 = make_test_schema(&[
766 Field::new("c1", DataType::UInt8, true),
767 Field::new("c2", DataType::UInt8, true),
768 ]);
769
770 let table_schema = make_test_schema(&[
771 Field::new("c1", DataType::Int16, true),
772 Field::new("c2", DataType::Int16, true),
773 Field::new("c3", DataType::Int16, true),
774 ]);
775
776 let compat_schema = make_test_schema(&[
777 Field::new("c1", DataType::Int16, true),
778 Field::new("c2", DataType::Int16, true),
779 ]);
780
781 let (_, tp, _) =
782 generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
783
784 assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
785 }
786
787 #[test]
788 fn test_schema_projection() {
789 let file_schema0 = make_test_schema(&[
790 Field::new("c1", DataType::UInt8, true),
791 Field::new("c2", DataType::UInt8, true),
792 Field::new("c3", DataType::UInt8, true),
793 ]);
794
795 let file_schema1 = make_test_schema(&[
796 Field::new("c3", DataType::UInt8, true),
797 Field::new("c4", DataType::UInt8, true),
798 ]);
799
800 let file_schema2 = make_test_schema(&[
801 Field::new("c3", DataType::UInt8, true),
802 Field::new("c4", DataType::UInt8, true),
803 Field::new("c5", DataType::UInt8, true),
804 ]);
805
806 let file_schema3 = make_test_schema(&[
807 Field::new("c1", DataType::UInt8, true),
808 Field::new("c2", DataType::UInt8, true),
809 ]);
810
811 let table_schema = make_test_schema(&[
812 Field::new("c3", DataType::UInt8, true),
813 Field::new("c4", DataType::UInt8, true),
814 Field::new("c5", DataType::UInt8, true),
815 ]);
816
817 let tests = [
818 (&file_schema0, &table_schema, true), (&file_schema1, &table_schema, true), (&file_schema2, &table_schema, true), (&file_schema3, &table_schema, true), ];
823
824 for test in tests {
825 let (fp, tp, _) =
826 generated_schema_projection_and_compatible_file_schema(test.0, test.1);
827 assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
828 }
829 }
830}