operator/statement/
copy_table_from.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
25use common_datasource::file_format::{FileFormat, Format, file_to_stream};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
34use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
35use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
36use datafusion_common::config::CsvOptions;
37use datafusion_expr::Expr;
38use datatypes::arrow::compute::can_cast_types;
39use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
40use datatypes::vectors::Helper;
41use futures_util::StreamExt;
42use object_store::{Entry, EntryMode, ObjectStore};
43use regex::Regex;
44use session::context::QueryContextRef;
45use snafu::{ResultExt, ensure};
46use table::requests::{CopyTableRequest, InsertRequest};
47use table::table_reference::TableReference;
48use tokio_util::compat::FuturesAsyncReadCompatExt;
49
50use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
51use crate::statement::StatementExecutor;
52
53const DEFAULT_BATCH_SIZE: usize = 8192;
54const DEFAULT_READ_BUFFER: usize = 256 * 1024;
55
56enum FileMetadata {
57    Parquet {
58        schema: SchemaRef,
59        metadata: ArrowReaderMetadata,
60        path: String,
61    },
62    Orc {
63        schema: SchemaRef,
64        path: String,
65    },
66    Json {
67        schema: SchemaRef,
68        format: JsonFormat,
69        path: String,
70    },
71    Csv {
72        schema: SchemaRef,
73        format: CsvFormat,
74        path: String,
75    },
76}
77
78impl FileMetadata {
79    /// Returns the [SchemaRef]
80    pub fn schema(&self) -> &SchemaRef {
81        match self {
82            FileMetadata::Parquet { schema, .. } => schema,
83            FileMetadata::Orc { schema, .. } => schema,
84            FileMetadata::Json { schema, .. } => schema,
85            FileMetadata::Csv { schema, .. } => schema,
86        }
87    }
88}
89
90impl StatementExecutor {
91    async fn list_copy_from_entries(
92        &self,
93        req: &CopyTableRequest,
94    ) -> Result<(ObjectStore, Vec<Entry>)> {
95        let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
96
97        if schema.to_uppercase() == FS_SCHEMA {
98            ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
99        }
100
101        let object_store =
102            build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
103
104        let (dir, filename) = find_dir_and_filename(&path);
105        let regex = req
106            .pattern
107            .as_ref()
108            .map(|x| Regex::new(x))
109            .transpose()
110            .context(error::BuildRegexSnafu)?;
111
112        let source = if let Some(filename) = filename {
113            Source::Filename(filename)
114        } else {
115            Source::Dir
116        };
117
118        let lister = Lister::new(object_store.clone(), source.clone(), dir.clone(), regex);
119
120        let entries = lister.list().await.context(error::ListObjectsSnafu)?;
121        debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
122        Ok((object_store, entries))
123    }
124
125    async fn collect_metadata(
126        &self,
127        object_store: &ObjectStore,
128        format: Format,
129        path: String,
130    ) -> Result<FileMetadata> {
131        match format {
132            Format::Csv(format) => Ok(FileMetadata::Csv {
133                schema: Arc::new(
134                    format
135                        .infer_schema(object_store, &path)
136                        .await
137                        .context(error::InferSchemaSnafu { path: &path })?,
138                ),
139                format,
140                path,
141            }),
142            Format::Json(format) => Ok(FileMetadata::Json {
143                schema: Arc::new(
144                    format
145                        .infer_schema(object_store, &path)
146                        .await
147                        .context(error::InferSchemaSnafu { path: &path })?,
148                ),
149                format,
150                path,
151            }),
152            Format::Parquet(_) => {
153                let meta = object_store
154                    .stat(&path)
155                    .await
156                    .context(error::ReadObjectSnafu { path: &path })?;
157                let mut reader = object_store
158                    .reader(&path)
159                    .await
160                    .context(error::ReadObjectSnafu { path: &path })?
161                    .into_futures_async_read(0..meta.content_length())
162                    .await
163                    .context(error::ReadObjectSnafu { path: &path })?
164                    .compat();
165                let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
166                    .await
167                    .context(error::ReadParquetMetadataSnafu)?;
168
169                Ok(FileMetadata::Parquet {
170                    schema: metadata.schema().clone(),
171                    metadata,
172                    path,
173                })
174            }
175            Format::Orc(_) => {
176                let meta = object_store
177                    .stat(&path)
178                    .await
179                    .context(error::ReadObjectSnafu { path: &path })?;
180
181                let reader = object_store
182                    .reader(&path)
183                    .await
184                    .context(error::ReadObjectSnafu { path: &path })?;
185
186                let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
187                    .await
188                    .context(error::ReadOrcSnafu)?;
189
190                Ok(FileMetadata::Orc {
191                    schema: Arc::new(schema),
192                    path,
193                })
194            }
195        }
196    }
197
198    async fn build_read_stream(
199        &self,
200        compat_schema: SchemaRef,
201        object_store: &ObjectStore,
202        file_metadata: &FileMetadata,
203        projection: Vec<usize>,
204        filters: Vec<Expr>,
205    ) -> Result<DfSendableRecordBatchStream> {
206        match file_metadata {
207            FileMetadata::Csv {
208                format,
209                path,
210                schema,
211            } => {
212                let output_schema = Arc::new(
213                    compat_schema
214                        .project(&projection)
215                        .context(error::ProjectSchemaSnafu)?,
216                );
217
218                let options = CsvOptions::default()
219                    .with_has_header(format.has_header)
220                    .with_delimiter(format.delimiter);
221                let csv_source = CsvSource::new(schema.clone())
222                    .with_csv_options(options)
223                    .with_batch_size(DEFAULT_BATCH_SIZE);
224                let stream = file_to_stream(
225                    object_store,
226                    path,
227                    csv_source,
228                    Some(projection),
229                    format.compression_type,
230                )
231                .await
232                .context(error::BuildFileStreamSnafu)?;
233
234                Ok(Box::pin(
235                    // The projection is already applied in the CSV reader when we created the stream,
236                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
237                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
238                        .with_filter(filters)
239                        .context(error::PhysicalExprSnafu)?,
240                ))
241            }
242            FileMetadata::Json {
243                path,
244                format,
245                schema,
246            } => {
247                let output_schema = Arc::new(
248                    compat_schema
249                        .project(&projection)
250                        .context(error::ProjectSchemaSnafu)?,
251                );
252
253                let json_source =
254                    JsonSource::new(schema.clone()).with_batch_size(DEFAULT_BATCH_SIZE);
255                let stream = file_to_stream(
256                    object_store,
257                    path,
258                    json_source,
259                    Some(projection),
260                    format.compression_type,
261                )
262                .await
263                .context(error::BuildFileStreamSnafu)?;
264
265                Ok(Box::pin(
266                    // The projection is already applied in the JSON reader when we created the stream,
267                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
268                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
269                        .with_filter(filters)
270                        .context(error::PhysicalExprSnafu)?,
271                ))
272            }
273            FileMetadata::Parquet { metadata, path, .. } => {
274                let meta = object_store
275                    .stat(path)
276                    .await
277                    .context(error::ReadObjectSnafu { path })?;
278                let reader = object_store
279                    .reader_with(path)
280                    .chunk(DEFAULT_READ_BUFFER)
281                    .await
282                    .context(error::ReadObjectSnafu { path })?
283                    .into_futures_async_read(0..meta.content_length())
284                    .await
285                    .context(error::ReadObjectSnafu { path })?
286                    .compat();
287                let builder =
288                    ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
289                let stream = builder
290                    .build()
291                    .context(error::BuildParquetRecordBatchStreamSnafu)?;
292
293                let output_schema = Arc::new(
294                    compat_schema
295                        .project(&projection)
296                        .context(error::ProjectSchemaSnafu)?,
297                );
298                Ok(Box::pin(
299                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
300                        .with_filter(filters)
301                        .context(error::PhysicalExprSnafu)?,
302                ))
303            }
304            FileMetadata::Orc { path, .. } => {
305                let meta = object_store
306                    .stat(path)
307                    .await
308                    .context(error::ReadObjectSnafu { path })?;
309
310                let reader = object_store
311                    .reader_with(path)
312                    .chunk(DEFAULT_READ_BUFFER)
313                    .await
314                    .context(error::ReadObjectSnafu { path })?;
315                let stream =
316                    new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
317                        .await
318                        .context(error::ReadOrcSnafu)?;
319
320                let output_schema = Arc::new(
321                    compat_schema
322                        .project(&projection)
323                        .context(error::ProjectSchemaSnafu)?,
324                );
325
326                Ok(Box::pin(
327                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
328                        .with_filter(filters)
329                        .context(error::PhysicalExprSnafu)?,
330                ))
331            }
332        }
333    }
334
335    #[tracing::instrument(skip_all)]
336    pub async fn copy_table_from(
337        &self,
338        req: CopyTableRequest,
339        query_ctx: QueryContextRef,
340    ) -> Result<Output> {
341        let table_ref = TableReference {
342            catalog: &req.catalog_name,
343            schema: &req.schema_name,
344            table: &req.table_name,
345        };
346        let table = self.get_table(&table_ref).await?;
347        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
348        let (object_store, entries) = self.list_copy_from_entries(&req).await?;
349        let mut files = Vec::with_capacity(entries.len());
350        let table_schema = table.schema().arrow_schema().clone();
351        let filters = table
352            .schema()
353            .timestamp_column()
354            .and_then(|c| {
355                common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
356            })
357            .into_iter()
358            .collect::<Vec<_>>();
359
360        for entry in entries.iter() {
361            if entry.metadata().mode() != EntryMode::FILE {
362                continue;
363            }
364            let path = entry.path();
365            let file_metadata = self
366                .collect_metadata(&object_store, format.clone(), path.to_string())
367                .await?;
368
369            let file_schema = file_metadata.schema();
370            let (file_schema_projection, table_schema_projection, compat_schema) =
371                generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
372            let projected_file_schema = Arc::new(
373                file_schema
374                    .project(&file_schema_projection)
375                    .context(error::ProjectSchemaSnafu)?,
376            );
377            let projected_table_schema = Arc::new(
378                table_schema
379                    .project(&table_schema_projection)
380                    .context(error::ProjectSchemaSnafu)?,
381            );
382            ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
383
384            files.push((
385                Arc::new(compat_schema),
386                file_schema_projection,
387                projected_table_schema,
388                file_metadata,
389            ))
390        }
391
392        let mut rows_inserted = 0;
393        let mut insert_cost = 0;
394        let max_insert_rows = req.limit.map(|n| n as usize);
395        for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
396        {
397            let mut stream = self
398                .build_read_stream(
399                    compat_schema,
400                    &object_store,
401                    &file_metadata,
402                    file_schema_projection,
403                    filters.clone(),
404                )
405                .await?;
406
407            let fields = projected_table_schema
408                .fields()
409                .iter()
410                .map(|f| f.name().clone())
411                .collect::<Vec<_>>();
412
413            // TODO(hl): make this configurable through options.
414            let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
415            let mut pending_mem_size = 0;
416            let mut pending = vec![];
417
418            while let Some(r) = stream.next().await {
419                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
420                let vectors =
421                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
422
423                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
424
425                let columns_values = fields
426                    .iter()
427                    .cloned()
428                    .zip(vectors)
429                    .collect::<HashMap<_, _>>();
430
431                pending.push(self.inserter.handle_table_insert(
432                    InsertRequest {
433                        catalog_name: req.catalog_name.clone(),
434                        schema_name: req.schema_name.clone(),
435                        table_name: req.table_name.clone(),
436                        columns_values,
437                    },
438                    query_ctx.clone(),
439                ));
440
441                if pending_mem_size as u64 >= pending_mem_threshold {
442                    let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
443                    rows_inserted += rows;
444                    insert_cost += cost;
445                }
446
447                if let Some(max_insert_rows) = max_insert_rows
448                    && rows_inserted >= max_insert_rows
449                {
450                    return Ok(gen_insert_output(rows_inserted, insert_cost));
451                }
452            }
453
454            if !pending.is_empty() {
455                let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
456                rows_inserted += rows;
457                insert_cost += cost;
458            }
459        }
460
461        Ok(gen_insert_output(rows_inserted, insert_cost))
462    }
463}
464
465fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
466    Output::new(
467        OutputData::AffectedRows(rows_inserted),
468        OutputMeta::new_with_cost(insert_cost),
469    )
470}
471
472/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
473async fn batch_insert(
474    pending: &mut Vec<impl Future<Output = Result<Output>>>,
475    pending_bytes: &mut usize,
476) -> Result<(OutputRows, OutputCost)> {
477    let batch = pending.drain(..);
478    let result = futures::future::try_join_all(batch)
479        .await?
480        .iter()
481        .map(|o| o.extract_rows_and_cost())
482        .reduce(|(a, b), (c, d)| (a + c, b + d))
483        .unwrap_or((0, 0));
484    *pending_bytes = 0;
485    Ok(result)
486}
487
488/// Custom type compatibility check for GreptimeDB that handles Map -> Binary (JSON) conversion
489fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
490    // Handle Map -> Binary conversion for JSON types
491    if let ArrowDataType::Map(_, _) = from
492        && let ArrowDataType::Binary = to
493    {
494        return true;
495    }
496
497    // For all other cases, use Arrow's built-in can_cast_types
498    can_cast_types(from, to)
499}
500
501fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
502    let not_match = from
503        .fields
504        .iter()
505        .zip(to.fields.iter())
506        .map(|(l, r)| (l.data_type(), r.data_type()))
507        .enumerate()
508        .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
509
510    if let Some((index, _)) = not_match {
511        error::InvalidSchemaSnafu {
512            index,
513            table_schema: to.to_string(),
514            file_schema: from.to_string(),
515        }
516        .fail()
517    } else {
518        Ok(())
519    }
520}
521
522/// Generates a maybe compatible schema of the file schema.
523///
524/// If there is a field is found in table schema,
525/// copy the field data type to maybe compatible schema(`compatible_fields`).
526fn generated_schema_projection_and_compatible_file_schema(
527    file: &SchemaRef,
528    table: &SchemaRef,
529) -> (Vec<usize>, Vec<usize>, Schema) {
530    let mut file_projection = Vec::with_capacity(file.fields.len());
531    let mut table_projection = Vec::with_capacity(file.fields.len());
532    let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
533    for (file_idx, file_field) in file.fields.iter().enumerate() {
534        if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
535            file_projection.push(file_idx);
536            table_projection.push(table_idx);
537
538            // Safety: the compatible_fields has same length as file schema
539            compatible_fields[file_idx] = table_field.clone();
540        }
541    }
542
543    (
544        file_projection,
545        table_projection,
546        Schema::new(compatible_fields),
547    )
548}
549
550#[cfg(test)]
551mod tests {
552    use std::sync::Arc;
553
554    use datatypes::arrow::datatypes::{DataType, Field, Schema};
555
556    use super::*;
557
558    fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
559        let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
560        let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
561        let res = ensure_schema_compatible(&s1, &s2);
562        assert_eq!(
563            matches,
564            res.is_ok(),
565            "from data type: {}, to data type: {}, expected: {}, but got: {}",
566            from.0,
567            to.0,
568            matches,
569            res.is_ok()
570        )
571    }
572
573    #[test]
574    fn test_ensure_datatype_matches_ignore_timezone() {
575        test_schema_matches(
576            (
577                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
578                true,
579            ),
580            (
581                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
582                true,
583            ),
584            true,
585        );
586
587        test_schema_matches(
588            (
589                DataType::Timestamp(
590                    datatypes::arrow::datatypes::TimeUnit::Second,
591                    Some("UTC".into()),
592                ),
593                true,
594            ),
595            (
596                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
597                true,
598            ),
599            true,
600        );
601
602        test_schema_matches(
603            (
604                DataType::Timestamp(
605                    datatypes::arrow::datatypes::TimeUnit::Second,
606                    Some("UTC".into()),
607                ),
608                true,
609            ),
610            (
611                DataType::Timestamp(
612                    datatypes::arrow::datatypes::TimeUnit::Second,
613                    Some("PDT".into()),
614                ),
615                true,
616            ),
617            true,
618        );
619
620        test_schema_matches(
621            (
622                DataType::Timestamp(
623                    datatypes::arrow::datatypes::TimeUnit::Second,
624                    Some("UTC".into()),
625                ),
626                true,
627            ),
628            (
629                DataType::Timestamp(
630                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
631                    Some("UTC".into()),
632                ),
633                true,
634            ),
635            true,
636        );
637
638        test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
639
640        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
641    }
642
643    #[test]
644    fn test_data_type_equals_ignore_timezone_with_options() {
645        test_schema_matches(
646            (
647                DataType::Timestamp(
648                    datatypes::arrow::datatypes::TimeUnit::Microsecond,
649                    Some("UTC".into()),
650                ),
651                true,
652            ),
653            (
654                DataType::Timestamp(
655                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
656                    Some("PDT".into()),
657                ),
658                true,
659            ),
660            true,
661        );
662
663        test_schema_matches(
664            (DataType::Utf8, true),
665            (
666                DataType::Timestamp(
667                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
668                    Some("PDT".into()),
669                ),
670                true,
671            ),
672            true,
673        );
674
675        test_schema_matches(
676            (
677                DataType::Timestamp(
678                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
679                    Some("PDT".into()),
680                ),
681                true,
682            ),
683            (DataType::Utf8, true),
684            true,
685        );
686    }
687
688    #[test]
689    fn test_map_to_binary_json_compatibility() {
690        // Test Map -> Binary conversion for JSON types
691        let map_type = DataType::Map(
692            Arc::new(Field::new(
693                "key_value",
694                DataType::Struct(
695                    vec![
696                        Field::new("key", DataType::Utf8, false),
697                        Field::new("value", DataType::Utf8, false),
698                    ]
699                    .into(),
700                ),
701                false,
702            )),
703            false,
704        );
705
706        test_schema_matches((map_type, false), (DataType::Binary, true), true);
707
708        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
709        test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
710    }
711
712    fn make_test_schema(v: &[Field]) -> Arc<Schema> {
713        Arc::new(Schema::new(v.to_vec()))
714    }
715
716    #[test]
717    fn test_compatible_file_schema() {
718        let file_schema0 = make_test_schema(&[
719            Field::new("c1", DataType::UInt8, true),
720            Field::new("c2", DataType::UInt8, true),
721        ]);
722
723        let table_schema = make_test_schema(&[
724            Field::new("c1", DataType::Int16, true),
725            Field::new("c2", DataType::Int16, true),
726            Field::new("c3", DataType::Int16, true),
727        ]);
728
729        let compat_schema = make_test_schema(&[
730            Field::new("c1", DataType::Int16, true),
731            Field::new("c2", DataType::Int16, true),
732        ]);
733
734        let (_, tp, _) =
735            generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
736
737        assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
738    }
739
740    #[test]
741    fn test_schema_projection() {
742        let file_schema0 = make_test_schema(&[
743            Field::new("c1", DataType::UInt8, true),
744            Field::new("c2", DataType::UInt8, true),
745            Field::new("c3", DataType::UInt8, true),
746        ]);
747
748        let file_schema1 = make_test_schema(&[
749            Field::new("c3", DataType::UInt8, true),
750            Field::new("c4", DataType::UInt8, true),
751        ]);
752
753        let file_schema2 = make_test_schema(&[
754            Field::new("c3", DataType::UInt8, true),
755            Field::new("c4", DataType::UInt8, true),
756            Field::new("c5", DataType::UInt8, true),
757        ]);
758
759        let file_schema3 = make_test_schema(&[
760            Field::new("c1", DataType::UInt8, true),
761            Field::new("c2", DataType::UInt8, true),
762        ]);
763
764        let table_schema = make_test_schema(&[
765            Field::new("c3", DataType::UInt8, true),
766            Field::new("c4", DataType::UInt8, true),
767            Field::new("c5", DataType::UInt8, true),
768        ]);
769
770        let tests = [
771            (&file_schema0, &table_schema, true), // intersection
772            (&file_schema1, &table_schema, true), // subset
773            (&file_schema2, &table_schema, true), // full-eq
774            (&file_schema3, &table_schema, true), // non-intersection
775        ];
776
777        for test in tests {
778            let (fp, tp, _) =
779                generated_schema_projection_and_compatible_file_schema(test.0, test.1);
780            assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
781        }
782    }
783}