operator/statement/
copy_table_from.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
25use common_datasource::file_format::{FileFormat, Format};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{build_backend, parse_url, FS_SCHEMA};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
31use common_recordbatch::DfSendableRecordBatchStream;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::listing::PartitionedFile;
34use datafusion::datasource::object_store::ObjectStoreUrl;
35use datafusion::datasource::physical_plan::{
36    CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener,
37};
38use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
39use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
40use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
41use datafusion_common::{Constraints, Statistics};
42use datafusion_expr::Expr;
43use datatypes::arrow::compute::can_cast_types;
44use datatypes::arrow::datatypes::{Schema, SchemaRef};
45use datatypes::vectors::Helper;
46use futures_util::StreamExt;
47use object_store::{Entry, EntryMode, ObjectStore};
48use regex::Regex;
49use session::context::QueryContextRef;
50use snafu::{ensure, ResultExt};
51use table::requests::{CopyTableRequest, InsertRequest};
52use table::table_reference::TableReference;
53use tokio_util::compat::FuturesAsyncReadCompatExt;
54
55use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
56use crate::statement::StatementExecutor;
57
58const DEFAULT_BATCH_SIZE: usize = 8192;
59const DEFAULT_READ_BUFFER: usize = 256 * 1024;
60enum FileMetadata {
61    Parquet {
62        schema: SchemaRef,
63        metadata: ArrowReaderMetadata,
64        path: String,
65    },
66    Orc {
67        schema: SchemaRef,
68        path: String,
69    },
70    Json {
71        schema: SchemaRef,
72        format: JsonFormat,
73        path: String,
74    },
75    Csv {
76        schema: SchemaRef,
77        format: CsvFormat,
78        path: String,
79    },
80}
81
82impl FileMetadata {
83    /// Returns the [SchemaRef]
84    pub fn schema(&self) -> &SchemaRef {
85        match self {
86            FileMetadata::Parquet { schema, .. } => schema,
87            FileMetadata::Orc { schema, .. } => schema,
88            FileMetadata::Json { schema, .. } => schema,
89            FileMetadata::Csv { schema, .. } => schema,
90        }
91    }
92}
93
94impl StatementExecutor {
95    async fn list_copy_from_entries(
96        &self,
97        req: &CopyTableRequest,
98    ) -> Result<(ObjectStore, Vec<Entry>)> {
99        let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
100
101        if schema.to_uppercase() == FS_SCHEMA {
102            ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
103        }
104
105        let object_store =
106            build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
107
108        let (dir, filename) = find_dir_and_filename(&path);
109        let regex = req
110            .pattern
111            .as_ref()
112            .map(|x| Regex::new(x))
113            .transpose()
114            .context(error::BuildRegexSnafu)?;
115
116        let source = if let Some(filename) = filename {
117            Source::Filename(filename)
118        } else {
119            Source::Dir
120        };
121
122        let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
123
124        let entries = lister.list().await.context(error::ListObjectsSnafu)?;
125        debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
126        Ok((object_store, entries))
127    }
128
129    async fn collect_metadata(
130        &self,
131        object_store: &ObjectStore,
132        format: Format,
133        path: String,
134    ) -> Result<FileMetadata> {
135        match format {
136            Format::Csv(format) => Ok(FileMetadata::Csv {
137                schema: Arc::new(
138                    format
139                        .infer_schema(object_store, &path)
140                        .await
141                        .context(error::InferSchemaSnafu { path: &path })?,
142                ),
143                format,
144                path,
145            }),
146            Format::Json(format) => Ok(FileMetadata::Json {
147                schema: Arc::new(
148                    format
149                        .infer_schema(object_store, &path)
150                        .await
151                        .context(error::InferSchemaSnafu { path: &path })?,
152                ),
153                format,
154                path,
155            }),
156            Format::Parquet(_) => {
157                let meta = object_store
158                    .stat(&path)
159                    .await
160                    .context(error::ReadObjectSnafu { path: &path })?;
161                let mut reader = object_store
162                    .reader(&path)
163                    .await
164                    .context(error::ReadObjectSnafu { path: &path })?
165                    .into_futures_async_read(0..meta.content_length())
166                    .await
167                    .context(error::ReadObjectSnafu { path: &path })?
168                    .compat();
169                let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
170                    .await
171                    .context(error::ReadParquetMetadataSnafu)?;
172
173                Ok(FileMetadata::Parquet {
174                    schema: metadata.schema().clone(),
175                    metadata,
176                    path,
177                })
178            }
179            Format::Orc(_) => {
180                let meta = object_store
181                    .stat(&path)
182                    .await
183                    .context(error::ReadObjectSnafu { path: &path })?;
184
185                let reader = object_store
186                    .reader(&path)
187                    .await
188                    .context(error::ReadObjectSnafu { path: &path })?;
189
190                let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
191                    .await
192                    .context(error::ReadOrcSnafu)?;
193
194                Ok(FileMetadata::Orc {
195                    schema: Arc::new(schema),
196                    path,
197                })
198            }
199        }
200    }
201
202    async fn build_file_stream<F: FileOpener + Send + 'static>(
203        &self,
204        opener: F,
205        filename: &str,
206        file_schema: SchemaRef,
207    ) -> Result<DfSendableRecordBatchStream> {
208        let statistics = Statistics::new_unknown(file_schema.as_ref());
209        let stream = FileStream::new(
210            &FileScanConfig {
211                object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
212                file_schema,
213                file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
214                statistics,
215                projection: None,
216                limit: None,
217                table_partition_cols: vec![],
218                output_ordering: vec![],
219                constraints: Constraints::empty(),
220            },
221            0,
222            opener,
223            &ExecutionPlanMetricsSet::new(),
224        )
225        .context(error::BuildFileStreamSnafu)?;
226
227        Ok(Box::pin(stream))
228    }
229
230    async fn build_read_stream(
231        &self,
232        compat_schema: SchemaRef,
233        object_store: &ObjectStore,
234        file_metadata: &FileMetadata,
235        projection: Vec<usize>,
236        filters: Vec<Expr>,
237    ) -> Result<DfSendableRecordBatchStream> {
238        match file_metadata {
239            FileMetadata::Csv {
240                format,
241                path,
242                schema,
243            } => {
244                let output_schema = Arc::new(
245                    compat_schema
246                        .project(&projection)
247                        .context(error::ProjectSchemaSnafu)?,
248                );
249                let csv_config = Arc::new(CsvConfig::new(
250                    DEFAULT_BATCH_SIZE,
251                    schema.clone(),
252                    Some(projection.clone()),
253                    format.has_header,
254                    format.delimiter,
255                    b'"',
256                    None,
257                    Arc::new(object_store_opendal::OpendalStore::new(
258                        object_store.clone(),
259                    )),
260                    None,
261                ));
262                let projected_file_schema = Arc::new(
263                    schema
264                        .project(&projection)
265                        .context(error::ProjectSchemaSnafu)?,
266                );
267                let stream = self
268                    .build_file_stream(
269                        CsvOpener::new(csv_config, format.compression_type.into()),
270                        path,
271                        projected_file_schema,
272                    )
273                    .await?;
274
275                Ok(Box::pin(
276                    // The projection is already applied in the CSV reader when we created the stream,
277                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
278                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
279                        .with_filter(filters)
280                        .context(error::PhysicalExprSnafu)?,
281                ))
282            }
283            FileMetadata::Json {
284                format,
285                path,
286                schema,
287            } => {
288                let projected_file_schema = Arc::new(
289                    schema
290                        .project(&projection)
291                        .context(error::ProjectSchemaSnafu)?,
292                );
293                let output_schema = Arc::new(
294                    compat_schema
295                        .project(&projection)
296                        .context(error::ProjectSchemaSnafu)?,
297                );
298                let store = object_store_opendal::OpendalStore::new(object_store.clone());
299                let stream = self
300                    .build_file_stream(
301                        JsonOpener::new(
302                            DEFAULT_BATCH_SIZE,
303                            projected_file_schema.clone(),
304                            format.compression_type.into(),
305                            Arc::new(store),
306                        ),
307                        path,
308                        projected_file_schema,
309                    )
310                    .await?;
311
312                Ok(Box::pin(
313                    // The projection is already applied in the JSON reader when we created the stream,
314                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
315                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
316                        .with_filter(filters)
317                        .context(error::PhysicalExprSnafu)?,
318                ))
319            }
320            FileMetadata::Parquet { metadata, path, .. } => {
321                let meta = object_store
322                    .stat(path)
323                    .await
324                    .context(error::ReadObjectSnafu { path })?;
325                let reader = object_store
326                    .reader_with(path)
327                    .chunk(DEFAULT_READ_BUFFER)
328                    .await
329                    .context(error::ReadObjectSnafu { path })?
330                    .into_futures_async_read(0..meta.content_length())
331                    .await
332                    .context(error::ReadObjectSnafu { path })?
333                    .compat();
334                let builder =
335                    ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
336                let stream = builder
337                    .build()
338                    .context(error::BuildParquetRecordBatchStreamSnafu)?;
339
340                let output_schema = Arc::new(
341                    compat_schema
342                        .project(&projection)
343                        .context(error::ProjectSchemaSnafu)?,
344                );
345                Ok(Box::pin(
346                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
347                        .with_filter(filters)
348                        .context(error::PhysicalExprSnafu)?,
349                ))
350            }
351            FileMetadata::Orc { path, .. } => {
352                let meta = object_store
353                    .stat(path)
354                    .await
355                    .context(error::ReadObjectSnafu { path })?;
356
357                let reader = object_store
358                    .reader_with(path)
359                    .chunk(DEFAULT_READ_BUFFER)
360                    .await
361                    .context(error::ReadObjectSnafu { path })?;
362                let stream =
363                    new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
364                        .await
365                        .context(error::ReadOrcSnafu)?;
366
367                let output_schema = Arc::new(
368                    compat_schema
369                        .project(&projection)
370                        .context(error::ProjectSchemaSnafu)?,
371                );
372
373                Ok(Box::pin(
374                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
375                        .with_filter(filters)
376                        .context(error::PhysicalExprSnafu)?,
377                ))
378            }
379        }
380    }
381
382    #[tracing::instrument(skip_all)]
383    pub async fn copy_table_from(
384        &self,
385        req: CopyTableRequest,
386        query_ctx: QueryContextRef,
387    ) -> Result<Output> {
388        let table_ref = TableReference {
389            catalog: &req.catalog_name,
390            schema: &req.schema_name,
391            table: &req.table_name,
392        };
393        let table = self.get_table(&table_ref).await?;
394        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
395        let (object_store, entries) = self.list_copy_from_entries(&req).await?;
396        let mut files = Vec::with_capacity(entries.len());
397        let table_schema = table.schema().arrow_schema().clone();
398        let filters = table
399            .schema()
400            .timestamp_column()
401            .and_then(|c| {
402                common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
403            })
404            .into_iter()
405            .collect::<Vec<_>>();
406
407        for entry in entries.iter() {
408            if entry.metadata().mode() != EntryMode::FILE {
409                continue;
410            }
411            let path = entry.path();
412            let file_metadata = self
413                .collect_metadata(&object_store, format, path.to_string())
414                .await?;
415
416            let file_schema = file_metadata.schema();
417            let (file_schema_projection, table_schema_projection, compat_schema) =
418                generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
419            let projected_file_schema = Arc::new(
420                file_schema
421                    .project(&file_schema_projection)
422                    .context(error::ProjectSchemaSnafu)?,
423            );
424            let projected_table_schema = Arc::new(
425                table_schema
426                    .project(&table_schema_projection)
427                    .context(error::ProjectSchemaSnafu)?,
428            );
429            ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
430
431            files.push((
432                Arc::new(compat_schema),
433                file_schema_projection,
434                projected_table_schema,
435                file_metadata,
436            ))
437        }
438
439        let mut rows_inserted = 0;
440        let mut insert_cost = 0;
441        let max_insert_rows = req.limit.map(|n| n as usize);
442        for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
443        {
444            let mut stream = self
445                .build_read_stream(
446                    compat_schema,
447                    &object_store,
448                    &file_metadata,
449                    file_schema_projection,
450                    filters.clone(),
451                )
452                .await?;
453
454            let fields = projected_table_schema
455                .fields()
456                .iter()
457                .map(|f| f.name().to_string())
458                .collect::<Vec<_>>();
459
460            // TODO(hl): make this configurable through options.
461            let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
462            let mut pending_mem_size = 0;
463            let mut pending = vec![];
464
465            while let Some(r) = stream.next().await {
466                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
467                let vectors =
468                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
469
470                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
471
472                let columns_values = fields
473                    .iter()
474                    .cloned()
475                    .zip(vectors)
476                    .collect::<HashMap<_, _>>();
477
478                pending.push(self.inserter.handle_table_insert(
479                    InsertRequest {
480                        catalog_name: req.catalog_name.to_string(),
481                        schema_name: req.schema_name.to_string(),
482                        table_name: req.table_name.to_string(),
483                        columns_values,
484                    },
485                    query_ctx.clone(),
486                ));
487
488                if pending_mem_size as u64 >= pending_mem_threshold {
489                    let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
490                    rows_inserted += rows;
491                    insert_cost += cost;
492                }
493
494                if let Some(max_insert_rows) = max_insert_rows {
495                    if rows_inserted >= max_insert_rows {
496                        return Ok(gen_insert_output(rows_inserted, insert_cost));
497                    }
498                }
499            }
500
501            if !pending.is_empty() {
502                let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
503                rows_inserted += rows;
504                insert_cost += cost;
505            }
506        }
507
508        Ok(gen_insert_output(rows_inserted, insert_cost))
509    }
510}
511
512fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
513    Output::new(
514        OutputData::AffectedRows(rows_inserted),
515        OutputMeta::new_with_cost(insert_cost),
516    )
517}
518
519/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
520async fn batch_insert(
521    pending: &mut Vec<impl Future<Output = Result<Output>>>,
522    pending_bytes: &mut usize,
523) -> Result<(OutputRows, OutputCost)> {
524    let batch = pending.drain(..);
525    let result = futures::future::try_join_all(batch)
526        .await?
527        .iter()
528        .map(|o| o.extract_rows_and_cost())
529        .reduce(|(a, b), (c, d)| (a + c, b + d))
530        .unwrap_or((0, 0));
531    *pending_bytes = 0;
532    Ok(result)
533}
534
535fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
536    let not_match = from
537        .fields
538        .iter()
539        .zip(to.fields.iter())
540        .map(|(l, r)| (l.data_type(), r.data_type()))
541        .enumerate()
542        .find(|(_, (l, r))| !can_cast_types(l, r));
543
544    if let Some((index, _)) = not_match {
545        error::InvalidSchemaSnafu {
546            index,
547            table_schema: to.to_string(),
548            file_schema: from.to_string(),
549        }
550        .fail()
551    } else {
552        Ok(())
553    }
554}
555
556/// Generates a maybe compatible schema of the file schema.
557///
558/// If there is a field is found in table schema,
559/// copy the field data type to maybe compatible schema(`compatible_fields`).
560fn generated_schema_projection_and_compatible_file_schema(
561    file: &SchemaRef,
562    table: &SchemaRef,
563) -> (Vec<usize>, Vec<usize>, Schema) {
564    let mut file_projection = Vec::with_capacity(file.fields.len());
565    let mut table_projection = Vec::with_capacity(file.fields.len());
566    let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
567    for (file_idx, file_field) in file.fields.iter().enumerate() {
568        if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
569            file_projection.push(file_idx);
570            table_projection.push(table_idx);
571
572            // Safety: the compatible_fields has same length as file schema
573            compatible_fields[file_idx] = table_field.clone();
574        }
575    }
576
577    (
578        file_projection,
579        table_projection,
580        Schema::new(compatible_fields),
581    )
582}
583
584#[cfg(test)]
585mod tests {
586    use std::sync::Arc;
587
588    use datatypes::arrow::datatypes::{DataType, Field, Schema};
589
590    use super::*;
591
592    fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
593        let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
594        let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
595        let res = ensure_schema_compatible(&s1, &s2);
596        assert_eq!(
597            matches,
598            res.is_ok(),
599            "from data type: {}, to data type: {}, expected: {}, but got: {}",
600            from.0,
601            to.0,
602            matches,
603            res.is_ok()
604        )
605    }
606
607    #[test]
608    fn test_ensure_datatype_matches_ignore_timezone() {
609        test_schema_matches(
610            (
611                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
612                true,
613            ),
614            (
615                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
616                true,
617            ),
618            true,
619        );
620
621        test_schema_matches(
622            (
623                DataType::Timestamp(
624                    datatypes::arrow::datatypes::TimeUnit::Second,
625                    Some("UTC".into()),
626                ),
627                true,
628            ),
629            (
630                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
631                true,
632            ),
633            true,
634        );
635
636        test_schema_matches(
637            (
638                DataType::Timestamp(
639                    datatypes::arrow::datatypes::TimeUnit::Second,
640                    Some("UTC".into()),
641                ),
642                true,
643            ),
644            (
645                DataType::Timestamp(
646                    datatypes::arrow::datatypes::TimeUnit::Second,
647                    Some("PDT".into()),
648                ),
649                true,
650            ),
651            true,
652        );
653
654        test_schema_matches(
655            (
656                DataType::Timestamp(
657                    datatypes::arrow::datatypes::TimeUnit::Second,
658                    Some("UTC".into()),
659                ),
660                true,
661            ),
662            (
663                DataType::Timestamp(
664                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
665                    Some("UTC".into()),
666                ),
667                true,
668            ),
669            true,
670        );
671
672        test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
673
674        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
675    }
676
677    #[test]
678    fn test_data_type_equals_ignore_timezone_with_options() {
679        test_schema_matches(
680            (
681                DataType::Timestamp(
682                    datatypes::arrow::datatypes::TimeUnit::Microsecond,
683                    Some("UTC".into()),
684                ),
685                true,
686            ),
687            (
688                DataType::Timestamp(
689                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
690                    Some("PDT".into()),
691                ),
692                true,
693            ),
694            true,
695        );
696
697        test_schema_matches(
698            (DataType::Utf8, true),
699            (
700                DataType::Timestamp(
701                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
702                    Some("PDT".into()),
703                ),
704                true,
705            ),
706            true,
707        );
708
709        test_schema_matches(
710            (
711                DataType::Timestamp(
712                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
713                    Some("PDT".into()),
714                ),
715                true,
716            ),
717            (DataType::Utf8, true),
718            true,
719        );
720    }
721
722    fn make_test_schema(v: &[Field]) -> Arc<Schema> {
723        Arc::new(Schema::new(v.to_vec()))
724    }
725
726    #[test]
727    fn test_compatible_file_schema() {
728        let file_schema0 = make_test_schema(&[
729            Field::new("c1", DataType::UInt8, true),
730            Field::new("c2", DataType::UInt8, true),
731        ]);
732
733        let table_schema = make_test_schema(&[
734            Field::new("c1", DataType::Int16, true),
735            Field::new("c2", DataType::Int16, true),
736            Field::new("c3", DataType::Int16, true),
737        ]);
738
739        let compat_schema = make_test_schema(&[
740            Field::new("c1", DataType::Int16, true),
741            Field::new("c2", DataType::Int16, true),
742        ]);
743
744        let (_, tp, _) =
745            generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
746
747        assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
748    }
749
750    #[test]
751    fn test_schema_projection() {
752        let file_schema0 = make_test_schema(&[
753            Field::new("c1", DataType::UInt8, true),
754            Field::new("c2", DataType::UInt8, true),
755            Field::new("c3", DataType::UInt8, true),
756        ]);
757
758        let file_schema1 = make_test_schema(&[
759            Field::new("c3", DataType::UInt8, true),
760            Field::new("c4", DataType::UInt8, true),
761        ]);
762
763        let file_schema2 = make_test_schema(&[
764            Field::new("c3", DataType::UInt8, true),
765            Field::new("c4", DataType::UInt8, true),
766            Field::new("c5", DataType::UInt8, true),
767        ]);
768
769        let file_schema3 = make_test_schema(&[
770            Field::new("c1", DataType::UInt8, true),
771            Field::new("c2", DataType::UInt8, true),
772        ]);
773
774        let table_schema = make_test_schema(&[
775            Field::new("c3", DataType::UInt8, true),
776            Field::new("c4", DataType::UInt8, true),
777            Field::new("c5", DataType::UInt8, true),
778        ]);
779
780        let tests = [
781            (&file_schema0, &table_schema, true), // intersection
782            (&file_schema1, &table_schema, true), // subset
783            (&file_schema2, &table_schema, true), // full-eq
784            (&file_schema3, &table_schema, true), // non-intersection
785        ];
786
787        for test in tests {
788            let (fp, tp, _) =
789                generated_schema_projection_and_compatible_file_schema(test.0, test.1);
790            assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
791        }
792    }
793}