operator/statement/
copy_table_from.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
25use common_datasource::file_format::{FileFormat, Format, file_to_stream};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
34use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
35use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
36use datafusion_expr::Expr;
37use datatypes::arrow::compute::can_cast_types;
38use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
39use datatypes::vectors::Helper;
40use futures_util::StreamExt;
41use object_store::{Entry, EntryMode, ObjectStore};
42use regex::Regex;
43use session::context::QueryContextRef;
44use snafu::{ResultExt, ensure};
45use table::requests::{CopyTableRequest, InsertRequest};
46use table::table_reference::TableReference;
47use tokio_util::compat::FuturesAsyncReadCompatExt;
48
49use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
50use crate::statement::StatementExecutor;
51
52const DEFAULT_BATCH_SIZE: usize = 8192;
53const DEFAULT_READ_BUFFER: usize = 256 * 1024;
54
55enum FileMetadata {
56    Parquet {
57        schema: SchemaRef,
58        metadata: ArrowReaderMetadata,
59        path: String,
60    },
61    Orc {
62        schema: SchemaRef,
63        path: String,
64    },
65    Json {
66        schema: SchemaRef,
67        format: JsonFormat,
68        path: String,
69    },
70    Csv {
71        schema: SchemaRef,
72        format: CsvFormat,
73        path: String,
74    },
75}
76
77impl FileMetadata {
78    /// Returns the [SchemaRef]
79    pub fn schema(&self) -> &SchemaRef {
80        match self {
81            FileMetadata::Parquet { schema, .. } => schema,
82            FileMetadata::Orc { schema, .. } => schema,
83            FileMetadata::Json { schema, .. } => schema,
84            FileMetadata::Csv { schema, .. } => schema,
85        }
86    }
87}
88
89impl StatementExecutor {
90    async fn list_copy_from_entries(
91        &self,
92        req: &CopyTableRequest,
93    ) -> Result<(ObjectStore, Vec<Entry>)> {
94        let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
95
96        if schema.to_uppercase() == FS_SCHEMA {
97            ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
98        }
99
100        let object_store =
101            build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
102
103        let (dir, filename) = find_dir_and_filename(&path);
104        let regex = req
105            .pattern
106            .as_ref()
107            .map(|x| Regex::new(x))
108            .transpose()
109            .context(error::BuildRegexSnafu)?;
110
111        let source = if let Some(filename) = filename {
112            Source::Filename(filename)
113        } else {
114            Source::Dir
115        };
116
117        let lister = Lister::new(object_store.clone(), source.clone(), dir.clone(), regex);
118
119        let entries = lister.list().await.context(error::ListObjectsSnafu)?;
120        debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
121        Ok((object_store, entries))
122    }
123
124    async fn collect_metadata(
125        &self,
126        object_store: &ObjectStore,
127        format: Format,
128        path: String,
129    ) -> Result<FileMetadata> {
130        match format {
131            Format::Csv(format) => Ok(FileMetadata::Csv {
132                schema: Arc::new(
133                    format
134                        .infer_schema(object_store, &path)
135                        .await
136                        .context(error::InferSchemaSnafu { path: &path })?,
137                ),
138                format,
139                path,
140            }),
141            Format::Json(format) => Ok(FileMetadata::Json {
142                schema: Arc::new(
143                    format
144                        .infer_schema(object_store, &path)
145                        .await
146                        .context(error::InferSchemaSnafu { path: &path })?,
147                ),
148                format,
149                path,
150            }),
151            Format::Parquet(_) => {
152                let meta = object_store
153                    .stat(&path)
154                    .await
155                    .context(error::ReadObjectSnafu { path: &path })?;
156                let mut reader = object_store
157                    .reader(&path)
158                    .await
159                    .context(error::ReadObjectSnafu { path: &path })?
160                    .into_futures_async_read(0..meta.content_length())
161                    .await
162                    .context(error::ReadObjectSnafu { path: &path })?
163                    .compat();
164                let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
165                    .await
166                    .context(error::ReadParquetMetadataSnafu)?;
167
168                Ok(FileMetadata::Parquet {
169                    schema: metadata.schema().clone(),
170                    metadata,
171                    path,
172                })
173            }
174            Format::Orc(_) => {
175                let meta = object_store
176                    .stat(&path)
177                    .await
178                    .context(error::ReadObjectSnafu { path: &path })?;
179
180                let reader = object_store
181                    .reader(&path)
182                    .await
183                    .context(error::ReadObjectSnafu { path: &path })?;
184
185                let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
186                    .await
187                    .context(error::ReadOrcSnafu)?;
188
189                Ok(FileMetadata::Orc {
190                    schema: Arc::new(schema),
191                    path,
192                })
193            }
194        }
195    }
196
197    async fn build_read_stream(
198        &self,
199        compat_schema: SchemaRef,
200        object_store: &ObjectStore,
201        file_metadata: &FileMetadata,
202        projection: Vec<usize>,
203        filters: Vec<Expr>,
204    ) -> Result<DfSendableRecordBatchStream> {
205        match file_metadata {
206            FileMetadata::Csv {
207                format,
208                path,
209                schema,
210            } => {
211                let output_schema = Arc::new(
212                    compat_schema
213                        .project(&projection)
214                        .context(error::ProjectSchemaSnafu)?,
215                );
216
217                let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
218                    .with_schema(schema.clone())
219                    .with_batch_size(DEFAULT_BATCH_SIZE);
220                let stream = file_to_stream(
221                    object_store,
222                    path,
223                    schema.clone(),
224                    csv_source,
225                    Some(projection),
226                    format.compression_type,
227                )
228                .await
229                .context(error::BuildFileStreamSnafu)?;
230
231                Ok(Box::pin(
232                    // The projection is already applied in the CSV reader when we created the stream,
233                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
234                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
235                        .with_filter(filters)
236                        .context(error::PhysicalExprSnafu)?,
237                ))
238            }
239            FileMetadata::Json {
240                path,
241                format,
242                schema,
243            } => {
244                let output_schema = Arc::new(
245                    compat_schema
246                        .project(&projection)
247                        .context(error::ProjectSchemaSnafu)?,
248                );
249
250                let json_source = JsonSource::new()
251                    .with_schema(schema.clone())
252                    .with_batch_size(DEFAULT_BATCH_SIZE);
253                let stream = file_to_stream(
254                    object_store,
255                    path,
256                    schema.clone(),
257                    json_source,
258                    Some(projection),
259                    format.compression_type,
260                )
261                .await
262                .context(error::BuildFileStreamSnafu)?;
263
264                Ok(Box::pin(
265                    // The projection is already applied in the JSON reader when we created the stream,
266                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
267                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
268                        .with_filter(filters)
269                        .context(error::PhysicalExprSnafu)?,
270                ))
271            }
272            FileMetadata::Parquet { metadata, path, .. } => {
273                let meta = object_store
274                    .stat(path)
275                    .await
276                    .context(error::ReadObjectSnafu { path })?;
277                let reader = object_store
278                    .reader_with(path)
279                    .chunk(DEFAULT_READ_BUFFER)
280                    .await
281                    .context(error::ReadObjectSnafu { path })?
282                    .into_futures_async_read(0..meta.content_length())
283                    .await
284                    .context(error::ReadObjectSnafu { path })?
285                    .compat();
286                let builder =
287                    ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
288                let stream = builder
289                    .build()
290                    .context(error::BuildParquetRecordBatchStreamSnafu)?;
291
292                let output_schema = Arc::new(
293                    compat_schema
294                        .project(&projection)
295                        .context(error::ProjectSchemaSnafu)?,
296                );
297                Ok(Box::pin(
298                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
299                        .with_filter(filters)
300                        .context(error::PhysicalExprSnafu)?,
301                ))
302            }
303            FileMetadata::Orc { path, .. } => {
304                let meta = object_store
305                    .stat(path)
306                    .await
307                    .context(error::ReadObjectSnafu { path })?;
308
309                let reader = object_store
310                    .reader_with(path)
311                    .chunk(DEFAULT_READ_BUFFER)
312                    .await
313                    .context(error::ReadObjectSnafu { path })?;
314                let stream =
315                    new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
316                        .await
317                        .context(error::ReadOrcSnafu)?;
318
319                let output_schema = Arc::new(
320                    compat_schema
321                        .project(&projection)
322                        .context(error::ProjectSchemaSnafu)?,
323                );
324
325                Ok(Box::pin(
326                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
327                        .with_filter(filters)
328                        .context(error::PhysicalExprSnafu)?,
329                ))
330            }
331        }
332    }
333
334    #[tracing::instrument(skip_all)]
335    pub async fn copy_table_from(
336        &self,
337        req: CopyTableRequest,
338        query_ctx: QueryContextRef,
339    ) -> Result<Output> {
340        let table_ref = TableReference {
341            catalog: &req.catalog_name,
342            schema: &req.schema_name,
343            table: &req.table_name,
344        };
345        let table = self.get_table(&table_ref).await?;
346        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
347        let (object_store, entries) = self.list_copy_from_entries(&req).await?;
348        let mut files = Vec::with_capacity(entries.len());
349        let table_schema = table.schema().arrow_schema().clone();
350        let filters = table
351            .schema()
352            .timestamp_column()
353            .and_then(|c| {
354                common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
355            })
356            .into_iter()
357            .collect::<Vec<_>>();
358
359        for entry in entries.iter() {
360            if entry.metadata().mode() != EntryMode::FILE {
361                continue;
362            }
363            let path = entry.path();
364            let file_metadata = self
365                .collect_metadata(&object_store, format.clone(), path.to_string())
366                .await?;
367
368            let file_schema = file_metadata.schema();
369            let (file_schema_projection, table_schema_projection, compat_schema) =
370                generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
371            let projected_file_schema = Arc::new(
372                file_schema
373                    .project(&file_schema_projection)
374                    .context(error::ProjectSchemaSnafu)?,
375            );
376            let projected_table_schema = Arc::new(
377                table_schema
378                    .project(&table_schema_projection)
379                    .context(error::ProjectSchemaSnafu)?,
380            );
381            ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
382
383            files.push((
384                Arc::new(compat_schema),
385                file_schema_projection,
386                projected_table_schema,
387                file_metadata,
388            ))
389        }
390
391        let mut rows_inserted = 0;
392        let mut insert_cost = 0;
393        let max_insert_rows = req.limit.map(|n| n as usize);
394        for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
395        {
396            let mut stream = self
397                .build_read_stream(
398                    compat_schema,
399                    &object_store,
400                    &file_metadata,
401                    file_schema_projection,
402                    filters.clone(),
403                )
404                .await?;
405
406            let fields = projected_table_schema
407                .fields()
408                .iter()
409                .map(|f| f.name().clone())
410                .collect::<Vec<_>>();
411
412            // TODO(hl): make this configurable through options.
413            let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
414            let mut pending_mem_size = 0;
415            let mut pending = vec![];
416
417            while let Some(r) = stream.next().await {
418                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
419                let vectors =
420                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
421
422                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
423
424                let columns_values = fields
425                    .iter()
426                    .cloned()
427                    .zip(vectors)
428                    .collect::<HashMap<_, _>>();
429
430                pending.push(self.inserter.handle_table_insert(
431                    InsertRequest {
432                        catalog_name: req.catalog_name.clone(),
433                        schema_name: req.schema_name.clone(),
434                        table_name: req.table_name.clone(),
435                        columns_values,
436                    },
437                    query_ctx.clone(),
438                ));
439
440                if pending_mem_size as u64 >= pending_mem_threshold {
441                    let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
442                    rows_inserted += rows;
443                    insert_cost += cost;
444                }
445
446                if let Some(max_insert_rows) = max_insert_rows
447                    && rows_inserted >= max_insert_rows
448                {
449                    return Ok(gen_insert_output(rows_inserted, insert_cost));
450                }
451            }
452
453            if !pending.is_empty() {
454                let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
455                rows_inserted += rows;
456                insert_cost += cost;
457            }
458        }
459
460        Ok(gen_insert_output(rows_inserted, insert_cost))
461    }
462}
463
464fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
465    Output::new(
466        OutputData::AffectedRows(rows_inserted),
467        OutputMeta::new_with_cost(insert_cost),
468    )
469}
470
471/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
472async fn batch_insert(
473    pending: &mut Vec<impl Future<Output = Result<Output>>>,
474    pending_bytes: &mut usize,
475) -> Result<(OutputRows, OutputCost)> {
476    let batch = pending.drain(..);
477    let result = futures::future::try_join_all(batch)
478        .await?
479        .iter()
480        .map(|o| o.extract_rows_and_cost())
481        .reduce(|(a, b), (c, d)| (a + c, b + d))
482        .unwrap_or((0, 0));
483    *pending_bytes = 0;
484    Ok(result)
485}
486
487/// Custom type compatibility check for GreptimeDB that handles Map -> Binary (JSON) conversion
488fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
489    // Handle Map -> Binary conversion for JSON types
490    if let ArrowDataType::Map(_, _) = from
491        && let ArrowDataType::Binary = to
492    {
493        return true;
494    }
495
496    // For all other cases, use Arrow's built-in can_cast_types
497    can_cast_types(from, to)
498}
499
500fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
501    let not_match = from
502        .fields
503        .iter()
504        .zip(to.fields.iter())
505        .map(|(l, r)| (l.data_type(), r.data_type()))
506        .enumerate()
507        .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
508
509    if let Some((index, _)) = not_match {
510        error::InvalidSchemaSnafu {
511            index,
512            table_schema: to.to_string(),
513            file_schema: from.to_string(),
514        }
515        .fail()
516    } else {
517        Ok(())
518    }
519}
520
521/// Generates a maybe compatible schema of the file schema.
522///
523/// If there is a field is found in table schema,
524/// copy the field data type to maybe compatible schema(`compatible_fields`).
525fn generated_schema_projection_and_compatible_file_schema(
526    file: &SchemaRef,
527    table: &SchemaRef,
528) -> (Vec<usize>, Vec<usize>, Schema) {
529    let mut file_projection = Vec::with_capacity(file.fields.len());
530    let mut table_projection = Vec::with_capacity(file.fields.len());
531    let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
532    for (file_idx, file_field) in file.fields.iter().enumerate() {
533        if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
534            file_projection.push(file_idx);
535            table_projection.push(table_idx);
536
537            // Safety: the compatible_fields has same length as file schema
538            compatible_fields[file_idx] = table_field.clone();
539        }
540    }
541
542    (
543        file_projection,
544        table_projection,
545        Schema::new(compatible_fields),
546    )
547}
548
549#[cfg(test)]
550mod tests {
551    use std::sync::Arc;
552
553    use datatypes::arrow::datatypes::{DataType, Field, Schema};
554
555    use super::*;
556
557    fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
558        let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
559        let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
560        let res = ensure_schema_compatible(&s1, &s2);
561        assert_eq!(
562            matches,
563            res.is_ok(),
564            "from data type: {}, to data type: {}, expected: {}, but got: {}",
565            from.0,
566            to.0,
567            matches,
568            res.is_ok()
569        )
570    }
571
572    #[test]
573    fn test_ensure_datatype_matches_ignore_timezone() {
574        test_schema_matches(
575            (
576                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
577                true,
578            ),
579            (
580                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
581                true,
582            ),
583            true,
584        );
585
586        test_schema_matches(
587            (
588                DataType::Timestamp(
589                    datatypes::arrow::datatypes::TimeUnit::Second,
590                    Some("UTC".into()),
591                ),
592                true,
593            ),
594            (
595                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
596                true,
597            ),
598            true,
599        );
600
601        test_schema_matches(
602            (
603                DataType::Timestamp(
604                    datatypes::arrow::datatypes::TimeUnit::Second,
605                    Some("UTC".into()),
606                ),
607                true,
608            ),
609            (
610                DataType::Timestamp(
611                    datatypes::arrow::datatypes::TimeUnit::Second,
612                    Some("PDT".into()),
613                ),
614                true,
615            ),
616            true,
617        );
618
619        test_schema_matches(
620            (
621                DataType::Timestamp(
622                    datatypes::arrow::datatypes::TimeUnit::Second,
623                    Some("UTC".into()),
624                ),
625                true,
626            ),
627            (
628                DataType::Timestamp(
629                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
630                    Some("UTC".into()),
631                ),
632                true,
633            ),
634            true,
635        );
636
637        test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
638
639        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
640    }
641
642    #[test]
643    fn test_data_type_equals_ignore_timezone_with_options() {
644        test_schema_matches(
645            (
646                DataType::Timestamp(
647                    datatypes::arrow::datatypes::TimeUnit::Microsecond,
648                    Some("UTC".into()),
649                ),
650                true,
651            ),
652            (
653                DataType::Timestamp(
654                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
655                    Some("PDT".into()),
656                ),
657                true,
658            ),
659            true,
660        );
661
662        test_schema_matches(
663            (DataType::Utf8, true),
664            (
665                DataType::Timestamp(
666                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
667                    Some("PDT".into()),
668                ),
669                true,
670            ),
671            true,
672        );
673
674        test_schema_matches(
675            (
676                DataType::Timestamp(
677                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
678                    Some("PDT".into()),
679                ),
680                true,
681            ),
682            (DataType::Utf8, true),
683            true,
684        );
685    }
686
687    #[test]
688    fn test_map_to_binary_json_compatibility() {
689        // Test Map -> Binary conversion for JSON types
690        let map_type = DataType::Map(
691            Arc::new(Field::new(
692                "key_value",
693                DataType::Struct(
694                    vec![
695                        Field::new("key", DataType::Utf8, false),
696                        Field::new("value", DataType::Utf8, false),
697                    ]
698                    .into(),
699                ),
700                false,
701            )),
702            false,
703        );
704
705        test_schema_matches((map_type, false), (DataType::Binary, true), true);
706
707        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
708        test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
709    }
710
711    fn make_test_schema(v: &[Field]) -> Arc<Schema> {
712        Arc::new(Schema::new(v.to_vec()))
713    }
714
715    #[test]
716    fn test_compatible_file_schema() {
717        let file_schema0 = make_test_schema(&[
718            Field::new("c1", DataType::UInt8, true),
719            Field::new("c2", DataType::UInt8, true),
720        ]);
721
722        let table_schema = make_test_schema(&[
723            Field::new("c1", DataType::Int16, true),
724            Field::new("c2", DataType::Int16, true),
725            Field::new("c3", DataType::Int16, true),
726        ]);
727
728        let compat_schema = make_test_schema(&[
729            Field::new("c1", DataType::Int16, true),
730            Field::new("c2", DataType::Int16, true),
731        ]);
732
733        let (_, tp, _) =
734            generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
735
736        assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
737    }
738
739    #[test]
740    fn test_schema_projection() {
741        let file_schema0 = make_test_schema(&[
742            Field::new("c1", DataType::UInt8, true),
743            Field::new("c2", DataType::UInt8, true),
744            Field::new("c3", DataType::UInt8, true),
745        ]);
746
747        let file_schema1 = make_test_schema(&[
748            Field::new("c3", DataType::UInt8, true),
749            Field::new("c4", DataType::UInt8, true),
750        ]);
751
752        let file_schema2 = make_test_schema(&[
753            Field::new("c3", DataType::UInt8, true),
754            Field::new("c4", DataType::UInt8, true),
755            Field::new("c5", DataType::UInt8, true),
756        ]);
757
758        let file_schema3 = make_test_schema(&[
759            Field::new("c1", DataType::UInt8, true),
760            Field::new("c2", DataType::UInt8, true),
761        ]);
762
763        let table_schema = make_test_schema(&[
764            Field::new("c3", DataType::UInt8, true),
765            Field::new("c4", DataType::UInt8, true),
766            Field::new("c5", DataType::UInt8, true),
767        ]);
768
769        let tests = [
770            (&file_schema0, &table_schema, true), // intersection
771            (&file_schema1, &table_schema, true), // subset
772            (&file_schema2, &table_schema, true), // full-eq
773            (&file_schema3, &table_schema, true), // non-intersection
774        ];
775
776        for test in tests {
777            let (fp, tp, _) =
778                generated_schema_projection_and_compatible_file_schema(test.0, test.1);
779            assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
780        }
781    }
782}