operator/statement/
copy_table_from.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::sync::Arc;
18
19use client::{Output, OutputData, OutputMeta};
20use common_base::readable_size::ReadableSize;
21use common_datasource::file_format::csv::CsvFormat;
22use common_datasource::file_format::json::JsonFormat;
23use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
24use common_datasource::file_format::{FileFormat, Format};
25use common_datasource::lister::{Lister, Source};
26use common_datasource::object_store::{build_backend, parse_url};
27use common_datasource::util::find_dir_and_filename;
28use common_query::{OutputCost, OutputRows};
29use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_telemetry::{debug, tracing};
32use datafusion::datasource::listing::PartitionedFile;
33use datafusion::datasource::object_store::ObjectStoreUrl;
34use datafusion::datasource::physical_plan::{
35    CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener,
36};
37use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
38use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
39use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
40use datafusion_common::{Constraints, Statistics};
41use datafusion_expr::Expr;
42use datatypes::arrow::compute::can_cast_types;
43use datatypes::arrow::datatypes::{Schema, SchemaRef};
44use datatypes::vectors::Helper;
45use futures_util::StreamExt;
46use object_store::{Entry, EntryMode, ObjectStore};
47use regex::Regex;
48use session::context::QueryContextRef;
49use snafu::ResultExt;
50use table::requests::{CopyTableRequest, InsertRequest};
51use table::table_reference::TableReference;
52use tokio_util::compat::FuturesAsyncReadCompatExt;
53
54use crate::error::{self, IntoVectorsSnafu, Result};
55use crate::statement::StatementExecutor;
56
57const DEFAULT_BATCH_SIZE: usize = 8192;
58const DEFAULT_READ_BUFFER: usize = 256 * 1024;
59
60enum FileMetadata {
61    Parquet {
62        schema: SchemaRef,
63        metadata: ArrowReaderMetadata,
64        path: String,
65    },
66    Orc {
67        schema: SchemaRef,
68        path: String,
69    },
70    Json {
71        schema: SchemaRef,
72        format: JsonFormat,
73        path: String,
74    },
75    Csv {
76        schema: SchemaRef,
77        format: CsvFormat,
78        path: String,
79    },
80}
81
82impl FileMetadata {
83    /// Returns the [SchemaRef]
84    pub fn schema(&self) -> &SchemaRef {
85        match self {
86            FileMetadata::Parquet { schema, .. } => schema,
87            FileMetadata::Orc { schema, .. } => schema,
88            FileMetadata::Json { schema, .. } => schema,
89            FileMetadata::Csv { schema, .. } => schema,
90        }
91    }
92}
93
94impl StatementExecutor {
95    async fn list_copy_from_entries(
96        &self,
97        req: &CopyTableRequest,
98    ) -> Result<(ObjectStore, Vec<Entry>)> {
99        let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
100
101        let object_store =
102            build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
103
104        let (dir, filename) = find_dir_and_filename(&path);
105        let regex = req
106            .pattern
107            .as_ref()
108            .map(|x| Regex::new(x))
109            .transpose()
110            .context(error::BuildRegexSnafu)?;
111
112        let source = if let Some(filename) = filename {
113            Source::Filename(filename)
114        } else {
115            Source::Dir
116        };
117
118        let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
119
120        let entries = lister.list().await.context(error::ListObjectsSnafu)?;
121        debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
122        Ok((object_store, entries))
123    }
124
125    async fn collect_metadata(
126        &self,
127        object_store: &ObjectStore,
128        format: Format,
129        path: String,
130    ) -> Result<FileMetadata> {
131        match format {
132            Format::Csv(format) => Ok(FileMetadata::Csv {
133                schema: Arc::new(
134                    format
135                        .infer_schema(object_store, &path)
136                        .await
137                        .context(error::InferSchemaSnafu { path: &path })?,
138                ),
139                format,
140                path,
141            }),
142            Format::Json(format) => Ok(FileMetadata::Json {
143                schema: Arc::new(
144                    format
145                        .infer_schema(object_store, &path)
146                        .await
147                        .context(error::InferSchemaSnafu { path: &path })?,
148                ),
149                format,
150                path,
151            }),
152            Format::Parquet(_) => {
153                let meta = object_store
154                    .stat(&path)
155                    .await
156                    .context(error::ReadObjectSnafu { path: &path })?;
157                let mut reader = object_store
158                    .reader(&path)
159                    .await
160                    .context(error::ReadObjectSnafu { path: &path })?
161                    .into_futures_async_read(0..meta.content_length())
162                    .await
163                    .context(error::ReadObjectSnafu { path: &path })?
164                    .compat();
165                let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
166                    .await
167                    .context(error::ReadParquetMetadataSnafu)?;
168
169                Ok(FileMetadata::Parquet {
170                    schema: metadata.schema().clone(),
171                    metadata,
172                    path,
173                })
174            }
175            Format::Orc(_) => {
176                let meta = object_store
177                    .stat(&path)
178                    .await
179                    .context(error::ReadObjectSnafu { path: &path })?;
180
181                let reader = object_store
182                    .reader(&path)
183                    .await
184                    .context(error::ReadObjectSnafu { path: &path })?;
185
186                let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
187                    .await
188                    .context(error::ReadOrcSnafu)?;
189
190                Ok(FileMetadata::Orc {
191                    schema: Arc::new(schema),
192                    path,
193                })
194            }
195        }
196    }
197
198    async fn build_file_stream<F: FileOpener + Send + 'static>(
199        &self,
200        opener: F,
201        filename: &str,
202        file_schema: SchemaRef,
203    ) -> Result<DfSendableRecordBatchStream> {
204        let statistics = Statistics::new_unknown(file_schema.as_ref());
205        let stream = FileStream::new(
206            &FileScanConfig {
207                object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
208                file_schema,
209                file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
210                statistics,
211                projection: None,
212                limit: None,
213                table_partition_cols: vec![],
214                output_ordering: vec![],
215                constraints: Constraints::empty(),
216            },
217            0,
218            opener,
219            &ExecutionPlanMetricsSet::new(),
220        )
221        .context(error::BuildFileStreamSnafu)?;
222
223        Ok(Box::pin(stream))
224    }
225
226    async fn build_read_stream(
227        &self,
228        compat_schema: SchemaRef,
229        object_store: &ObjectStore,
230        file_metadata: &FileMetadata,
231        projection: Vec<usize>,
232        filters: Vec<Expr>,
233    ) -> Result<DfSendableRecordBatchStream> {
234        match file_metadata {
235            FileMetadata::Csv {
236                format,
237                path,
238                schema,
239            } => {
240                let output_schema = Arc::new(
241                    compat_schema
242                        .project(&projection)
243                        .context(error::ProjectSchemaSnafu)?,
244                );
245                let csv_config = Arc::new(CsvConfig::new(
246                    DEFAULT_BATCH_SIZE,
247                    schema.clone(),
248                    Some(projection.clone()),
249                    format.has_header,
250                    format.delimiter,
251                    b'"',
252                    None,
253                    Arc::new(object_store_opendal::OpendalStore::new(
254                        object_store.clone(),
255                    )),
256                    None,
257                ));
258                let projected_file_schema = Arc::new(
259                    schema
260                        .project(&projection)
261                        .context(error::ProjectSchemaSnafu)?,
262                );
263                let stream = self
264                    .build_file_stream(
265                        CsvOpener::new(csv_config, format.compression_type.into()),
266                        path,
267                        projected_file_schema,
268                    )
269                    .await?;
270
271                Ok(Box::pin(
272                    // The projection is already applied in the CSV reader when we created the stream,
273                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
274                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
275                        .with_filter(filters)
276                        .context(error::PhysicalExprSnafu)?,
277                ))
278            }
279            FileMetadata::Json {
280                format,
281                path,
282                schema,
283            } => {
284                let projected_file_schema = Arc::new(
285                    schema
286                        .project(&projection)
287                        .context(error::ProjectSchemaSnafu)?,
288                );
289                let output_schema = Arc::new(
290                    compat_schema
291                        .project(&projection)
292                        .context(error::ProjectSchemaSnafu)?,
293                );
294                let store = object_store_opendal::OpendalStore::new(object_store.clone());
295                let stream = self
296                    .build_file_stream(
297                        JsonOpener::new(
298                            DEFAULT_BATCH_SIZE,
299                            projected_file_schema.clone(),
300                            format.compression_type.into(),
301                            Arc::new(store),
302                        ),
303                        path,
304                        projected_file_schema,
305                    )
306                    .await?;
307
308                Ok(Box::pin(
309                    // The projection is already applied in the JSON reader when we created the stream,
310                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
311                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
312                        .with_filter(filters)
313                        .context(error::PhysicalExprSnafu)?,
314                ))
315            }
316            FileMetadata::Parquet { metadata, path, .. } => {
317                let meta = object_store
318                    .stat(path)
319                    .await
320                    .context(error::ReadObjectSnafu { path })?;
321                let reader = object_store
322                    .reader_with(path)
323                    .chunk(DEFAULT_READ_BUFFER)
324                    .await
325                    .context(error::ReadObjectSnafu { path })?
326                    .into_futures_async_read(0..meta.content_length())
327                    .await
328                    .context(error::ReadObjectSnafu { path })?
329                    .compat();
330                let builder =
331                    ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
332                let stream = builder
333                    .build()
334                    .context(error::BuildParquetRecordBatchStreamSnafu)?;
335
336                let output_schema = Arc::new(
337                    compat_schema
338                        .project(&projection)
339                        .context(error::ProjectSchemaSnafu)?,
340                );
341                Ok(Box::pin(
342                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
343                        .with_filter(filters)
344                        .context(error::PhysicalExprSnafu)?,
345                ))
346            }
347            FileMetadata::Orc { path, .. } => {
348                let meta = object_store
349                    .stat(path)
350                    .await
351                    .context(error::ReadObjectSnafu { path })?;
352
353                let reader = object_store
354                    .reader_with(path)
355                    .chunk(DEFAULT_READ_BUFFER)
356                    .await
357                    .context(error::ReadObjectSnafu { path })?;
358                let stream =
359                    new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
360                        .await
361                        .context(error::ReadOrcSnafu)?;
362
363                let output_schema = Arc::new(
364                    compat_schema
365                        .project(&projection)
366                        .context(error::ProjectSchemaSnafu)?,
367                );
368
369                Ok(Box::pin(
370                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
371                        .with_filter(filters)
372                        .context(error::PhysicalExprSnafu)?,
373                ))
374            }
375        }
376    }
377
378    #[tracing::instrument(skip_all)]
379    pub async fn copy_table_from(
380        &self,
381        req: CopyTableRequest,
382        query_ctx: QueryContextRef,
383    ) -> Result<Output> {
384        let table_ref = TableReference {
385            catalog: &req.catalog_name,
386            schema: &req.schema_name,
387            table: &req.table_name,
388        };
389        let table = self.get_table(&table_ref).await?;
390        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
391        let (object_store, entries) = self.list_copy_from_entries(&req).await?;
392        let mut files = Vec::with_capacity(entries.len());
393        let table_schema = table.schema().arrow_schema().clone();
394        let filters = table
395            .schema()
396            .timestamp_column()
397            .and_then(|c| {
398                common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
399            })
400            .into_iter()
401            .collect::<Vec<_>>();
402
403        for entry in entries.iter() {
404            if entry.metadata().mode() != EntryMode::FILE {
405                continue;
406            }
407            let path = entry.path();
408            let file_metadata = self
409                .collect_metadata(&object_store, format, path.to_string())
410                .await?;
411
412            let file_schema = file_metadata.schema();
413            let (file_schema_projection, table_schema_projection, compat_schema) =
414                generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
415            let projected_file_schema = Arc::new(
416                file_schema
417                    .project(&file_schema_projection)
418                    .context(error::ProjectSchemaSnafu)?,
419            );
420            let projected_table_schema = Arc::new(
421                table_schema
422                    .project(&table_schema_projection)
423                    .context(error::ProjectSchemaSnafu)?,
424            );
425            ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
426
427            files.push((
428                Arc::new(compat_schema),
429                file_schema_projection,
430                projected_table_schema,
431                file_metadata,
432            ))
433        }
434
435        let mut rows_inserted = 0;
436        let mut insert_cost = 0;
437        let max_insert_rows = req.limit.map(|n| n as usize);
438        for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
439        {
440            let mut stream = self
441                .build_read_stream(
442                    compat_schema,
443                    &object_store,
444                    &file_metadata,
445                    file_schema_projection,
446                    filters.clone(),
447                )
448                .await?;
449
450            let fields = projected_table_schema
451                .fields()
452                .iter()
453                .map(|f| f.name().to_string())
454                .collect::<Vec<_>>();
455
456            // TODO(hl): make this configurable through options.
457            let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
458            let mut pending_mem_size = 0;
459            let mut pending = vec![];
460
461            while let Some(r) = stream.next().await {
462                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
463                let vectors =
464                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
465
466                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
467
468                let columns_values = fields
469                    .iter()
470                    .cloned()
471                    .zip(vectors)
472                    .collect::<HashMap<_, _>>();
473
474                pending.push(self.inserter.handle_table_insert(
475                    InsertRequest {
476                        catalog_name: req.catalog_name.to_string(),
477                        schema_name: req.schema_name.to_string(),
478                        table_name: req.table_name.to_string(),
479                        columns_values,
480                    },
481                    query_ctx.clone(),
482                ));
483
484                if pending_mem_size as u64 >= pending_mem_threshold {
485                    let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
486                    rows_inserted += rows;
487                    insert_cost += cost;
488                }
489
490                if let Some(max_insert_rows) = max_insert_rows {
491                    if rows_inserted >= max_insert_rows {
492                        return Ok(gen_insert_output(rows_inserted, insert_cost));
493                    }
494                }
495            }
496
497            if !pending.is_empty() {
498                let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
499                rows_inserted += rows;
500                insert_cost += cost;
501            }
502        }
503
504        Ok(gen_insert_output(rows_inserted, insert_cost))
505    }
506}
507
508fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
509    Output::new(
510        OutputData::AffectedRows(rows_inserted),
511        OutputMeta::new_with_cost(insert_cost),
512    )
513}
514
515/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
516async fn batch_insert(
517    pending: &mut Vec<impl Future<Output = Result<Output>>>,
518    pending_bytes: &mut usize,
519) -> Result<(OutputRows, OutputCost)> {
520    let batch = pending.drain(..);
521    let result = futures::future::try_join_all(batch)
522        .await?
523        .iter()
524        .map(|o| o.extract_rows_and_cost())
525        .reduce(|(a, b), (c, d)| (a + c, b + d))
526        .unwrap_or((0, 0));
527    *pending_bytes = 0;
528    Ok(result)
529}
530
531fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
532    let not_match = from
533        .fields
534        .iter()
535        .zip(to.fields.iter())
536        .map(|(l, r)| (l.data_type(), r.data_type()))
537        .enumerate()
538        .find(|(_, (l, r))| !can_cast_types(l, r));
539
540    if let Some((index, _)) = not_match {
541        error::InvalidSchemaSnafu {
542            index,
543            table_schema: to.to_string(),
544            file_schema: from.to_string(),
545        }
546        .fail()
547    } else {
548        Ok(())
549    }
550}
551
552/// Generates a maybe compatible schema of the file schema.
553///
554/// If there is a field is found in table schema,
555/// copy the field data type to maybe compatible schema(`compatible_fields`).
556fn generated_schema_projection_and_compatible_file_schema(
557    file: &SchemaRef,
558    table: &SchemaRef,
559) -> (Vec<usize>, Vec<usize>, Schema) {
560    let mut file_projection = Vec::with_capacity(file.fields.len());
561    let mut table_projection = Vec::with_capacity(file.fields.len());
562    let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
563    for (file_idx, file_field) in file.fields.iter().enumerate() {
564        if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
565            file_projection.push(file_idx);
566            table_projection.push(table_idx);
567
568            // Safety: the compatible_fields has same length as file schema
569            compatible_fields[file_idx] = table_field.clone();
570        }
571    }
572
573    (
574        file_projection,
575        table_projection,
576        Schema::new(compatible_fields),
577    )
578}
579
580#[cfg(test)]
581mod tests {
582    use std::sync::Arc;
583
584    use datatypes::arrow::datatypes::{DataType, Field, Schema};
585
586    use super::*;
587
588    fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
589        let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
590        let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
591        let res = ensure_schema_compatible(&s1, &s2);
592        assert_eq!(
593            matches,
594            res.is_ok(),
595            "from data type: {}, to data type: {}, expected: {}, but got: {}",
596            from.0,
597            to.0,
598            matches,
599            res.is_ok()
600        )
601    }
602
603    #[test]
604    fn test_ensure_datatype_matches_ignore_timezone() {
605        test_schema_matches(
606            (
607                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
608                true,
609            ),
610            (
611                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
612                true,
613            ),
614            true,
615        );
616
617        test_schema_matches(
618            (
619                DataType::Timestamp(
620                    datatypes::arrow::datatypes::TimeUnit::Second,
621                    Some("UTC".into()),
622                ),
623                true,
624            ),
625            (
626                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
627                true,
628            ),
629            true,
630        );
631
632        test_schema_matches(
633            (
634                DataType::Timestamp(
635                    datatypes::arrow::datatypes::TimeUnit::Second,
636                    Some("UTC".into()),
637                ),
638                true,
639            ),
640            (
641                DataType::Timestamp(
642                    datatypes::arrow::datatypes::TimeUnit::Second,
643                    Some("PDT".into()),
644                ),
645                true,
646            ),
647            true,
648        );
649
650        test_schema_matches(
651            (
652                DataType::Timestamp(
653                    datatypes::arrow::datatypes::TimeUnit::Second,
654                    Some("UTC".into()),
655                ),
656                true,
657            ),
658            (
659                DataType::Timestamp(
660                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
661                    Some("UTC".into()),
662                ),
663                true,
664            ),
665            true,
666        );
667
668        test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
669
670        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
671    }
672
673    #[test]
674    fn test_data_type_equals_ignore_timezone_with_options() {
675        test_schema_matches(
676            (
677                DataType::Timestamp(
678                    datatypes::arrow::datatypes::TimeUnit::Microsecond,
679                    Some("UTC".into()),
680                ),
681                true,
682            ),
683            (
684                DataType::Timestamp(
685                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
686                    Some("PDT".into()),
687                ),
688                true,
689            ),
690            true,
691        );
692
693        test_schema_matches(
694            (DataType::Utf8, true),
695            (
696                DataType::Timestamp(
697                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
698                    Some("PDT".into()),
699                ),
700                true,
701            ),
702            true,
703        );
704
705        test_schema_matches(
706            (
707                DataType::Timestamp(
708                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
709                    Some("PDT".into()),
710                ),
711                true,
712            ),
713            (DataType::Utf8, true),
714            true,
715        );
716    }
717
718    fn make_test_schema(v: &[Field]) -> Arc<Schema> {
719        Arc::new(Schema::new(v.to_vec()))
720    }
721
722    #[test]
723    fn test_compatible_file_schema() {
724        let file_schema0 = make_test_schema(&[
725            Field::new("c1", DataType::UInt8, true),
726            Field::new("c2", DataType::UInt8, true),
727        ]);
728
729        let table_schema = make_test_schema(&[
730            Field::new("c1", DataType::Int16, true),
731            Field::new("c2", DataType::Int16, true),
732            Field::new("c3", DataType::Int16, true),
733        ]);
734
735        let compat_schema = make_test_schema(&[
736            Field::new("c1", DataType::Int16, true),
737            Field::new("c2", DataType::Int16, true),
738        ]);
739
740        let (_, tp, _) =
741            generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
742
743        assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
744    }
745
746    #[test]
747    fn test_schema_projection() {
748        let file_schema0 = make_test_schema(&[
749            Field::new("c1", DataType::UInt8, true),
750            Field::new("c2", DataType::UInt8, true),
751            Field::new("c3", DataType::UInt8, true),
752        ]);
753
754        let file_schema1 = make_test_schema(&[
755            Field::new("c3", DataType::UInt8, true),
756            Field::new("c4", DataType::UInt8, true),
757        ]);
758
759        let file_schema2 = make_test_schema(&[
760            Field::new("c3", DataType::UInt8, true),
761            Field::new("c4", DataType::UInt8, true),
762            Field::new("c5", DataType::UInt8, true),
763        ]);
764
765        let file_schema3 = make_test_schema(&[
766            Field::new("c1", DataType::UInt8, true),
767            Field::new("c2", DataType::UInt8, true),
768        ]);
769
770        let table_schema = make_test_schema(&[
771            Field::new("c3", DataType::UInt8, true),
772            Field::new("c4", DataType::UInt8, true),
773            Field::new("c5", DataType::UInt8, true),
774        ]);
775
776        let tests = [
777            (&file_schema0, &table_schema, true), // intersection
778            (&file_schema1, &table_schema, true), // subset
779            (&file_schema2, &table_schema, true), // full-eq
780            (&file_schema3, &table_schema, true), // non-intersection
781        ];
782
783        for test in tests {
784            let (fp, tp, _) =
785                generated_schema_projection_and_compatible_file_schema(test.0, test.1);
786            assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
787        }
788    }
789}