operator/statement/
copy_table_from.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
24use common_datasource::file_format::{FileFormat, Format};
25use common_datasource::lister::{Lister, Source};
26use common_datasource::object_store::{build_backend, parse_url, FS_SCHEMA};
27use common_datasource::util::find_dir_and_filename;
28use common_query::{OutputCost, OutputRows};
29use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
30use common_recordbatch::DfSendableRecordBatchStream;
31use common_telemetry::{debug, tracing};
32use datafusion::datasource::listing::PartitionedFile;
33use datafusion::datasource::object_store::ObjectStoreUrl;
34use datafusion::datasource::physical_plan::{
35    CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource,
36};
37use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
38use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
39use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
40use datafusion_expr::Expr;
41use datatypes::arrow::compute::can_cast_types;
42use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
43use datatypes::vectors::Helper;
44use futures_util::StreamExt;
45use object_store::{Entry, EntryMode, ObjectStore};
46use regex::Regex;
47use session::context::QueryContextRef;
48use snafu::{ensure, ResultExt};
49use table::requests::{CopyTableRequest, InsertRequest};
50use table::table_reference::TableReference;
51use tokio_util::compat::FuturesAsyncReadCompatExt;
52
53use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
54use crate::statement::StatementExecutor;
55
56const DEFAULT_BATCH_SIZE: usize = 8192;
57const DEFAULT_READ_BUFFER: usize = 256 * 1024;
58enum FileMetadata {
59    Parquet {
60        schema: SchemaRef,
61        metadata: ArrowReaderMetadata,
62        path: String,
63    },
64    Orc {
65        schema: SchemaRef,
66        path: String,
67    },
68    Json {
69        schema: SchemaRef,
70        path: String,
71    },
72    Csv {
73        schema: SchemaRef,
74        format: CsvFormat,
75        path: String,
76    },
77}
78
79impl FileMetadata {
80    /// Returns the [SchemaRef]
81    pub fn schema(&self) -> &SchemaRef {
82        match self {
83            FileMetadata::Parquet { schema, .. } => schema,
84            FileMetadata::Orc { schema, .. } => schema,
85            FileMetadata::Json { schema, .. } => schema,
86            FileMetadata::Csv { schema, .. } => schema,
87        }
88    }
89}
90
91impl StatementExecutor {
92    async fn list_copy_from_entries(
93        &self,
94        req: &CopyTableRequest,
95    ) -> Result<(ObjectStore, Vec<Entry>)> {
96        let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
97
98        if schema.to_uppercase() == FS_SCHEMA {
99            ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
100        }
101
102        let object_store =
103            build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
104
105        let (dir, filename) = find_dir_and_filename(&path);
106        let regex = req
107            .pattern
108            .as_ref()
109            .map(|x| Regex::new(x))
110            .transpose()
111            .context(error::BuildRegexSnafu)?;
112
113        let source = if let Some(filename) = filename {
114            Source::Filename(filename)
115        } else {
116            Source::Dir
117        };
118
119        let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
120
121        let entries = lister.list().await.context(error::ListObjectsSnafu)?;
122        debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
123        Ok((object_store, entries))
124    }
125
126    async fn collect_metadata(
127        &self,
128        object_store: &ObjectStore,
129        format: Format,
130        path: String,
131    ) -> Result<FileMetadata> {
132        match format {
133            Format::Csv(format) => Ok(FileMetadata::Csv {
134                schema: Arc::new(
135                    format
136                        .infer_schema(object_store, &path)
137                        .await
138                        .context(error::InferSchemaSnafu { path: &path })?,
139                ),
140                format,
141                path,
142            }),
143            Format::Json(format) => Ok(FileMetadata::Json {
144                schema: Arc::new(
145                    format
146                        .infer_schema(object_store, &path)
147                        .await
148                        .context(error::InferSchemaSnafu { path: &path })?,
149                ),
150                path,
151            }),
152            Format::Parquet(_) => {
153                let meta = object_store
154                    .stat(&path)
155                    .await
156                    .context(error::ReadObjectSnafu { path: &path })?;
157                let mut reader = object_store
158                    .reader(&path)
159                    .await
160                    .context(error::ReadObjectSnafu { path: &path })?
161                    .into_futures_async_read(0..meta.content_length())
162                    .await
163                    .context(error::ReadObjectSnafu { path: &path })?
164                    .compat();
165                let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
166                    .await
167                    .context(error::ReadParquetMetadataSnafu)?;
168
169                Ok(FileMetadata::Parquet {
170                    schema: metadata.schema().clone(),
171                    metadata,
172                    path,
173                })
174            }
175            Format::Orc(_) => {
176                let meta = object_store
177                    .stat(&path)
178                    .await
179                    .context(error::ReadObjectSnafu { path: &path })?;
180
181                let reader = object_store
182                    .reader(&path)
183                    .await
184                    .context(error::ReadObjectSnafu { path: &path })?;
185
186                let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
187                    .await
188                    .context(error::ReadOrcSnafu)?;
189
190                Ok(FileMetadata::Orc {
191                    schema: Arc::new(schema),
192                    path,
193                })
194            }
195        }
196    }
197
198    async fn build_file_stream(
199        &self,
200        store: &ObjectStore,
201        filename: &str,
202        file_schema: SchemaRef,
203        file_source: Arc<dyn FileSource>,
204        projection: Option<Vec<usize>>,
205    ) -> Result<DfSendableRecordBatchStream> {
206        let config = FileScanConfigBuilder::new(
207            ObjectStoreUrl::local_filesystem(),
208            file_schema,
209            file_source.clone(),
210        )
211        .with_file_group(FileGroup::new(vec![PartitionedFile::new(filename, 0)]))
212        .with_projection(projection)
213        .build();
214
215        let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
216        let file_opener = file_source
217            .with_projection(&config)
218            .create_file_opener(store, &config, 0);
219        let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
220            .context(error::BuildFileStreamSnafu)?;
221
222        Ok(Box::pin(stream))
223    }
224
225    async fn build_read_stream(
226        &self,
227        compat_schema: SchemaRef,
228        object_store: &ObjectStore,
229        file_metadata: &FileMetadata,
230        projection: Vec<usize>,
231        filters: Vec<Expr>,
232    ) -> Result<DfSendableRecordBatchStream> {
233        match file_metadata {
234            FileMetadata::Csv {
235                format,
236                path,
237                schema,
238            } => {
239                let output_schema = Arc::new(
240                    compat_schema
241                        .project(&projection)
242                        .context(error::ProjectSchemaSnafu)?,
243                );
244
245                let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
246                    .with_schema(schema.clone())
247                    .with_batch_size(DEFAULT_BATCH_SIZE);
248
249                let stream = self
250                    .build_file_stream(
251                        object_store,
252                        path,
253                        schema.clone(),
254                        csv_source,
255                        Some(projection),
256                    )
257                    .await?;
258
259                Ok(Box::pin(
260                    // The projection is already applied in the CSV reader when we created the stream,
261                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
262                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
263                        .with_filter(filters)
264                        .context(error::PhysicalExprSnafu)?,
265                ))
266            }
267            FileMetadata::Json { path, schema } => {
268                let output_schema = Arc::new(
269                    compat_schema
270                        .project(&projection)
271                        .context(error::ProjectSchemaSnafu)?,
272                );
273
274                let json_source = JsonSource::new()
275                    .with_schema(schema.clone())
276                    .with_batch_size(DEFAULT_BATCH_SIZE);
277
278                let stream = self
279                    .build_file_stream(
280                        object_store,
281                        path,
282                        schema.clone(),
283                        json_source,
284                        Some(projection),
285                    )
286                    .await?;
287
288                Ok(Box::pin(
289                    // The projection is already applied in the JSON reader when we created the stream,
290                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
291                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
292                        .with_filter(filters)
293                        .context(error::PhysicalExprSnafu)?,
294                ))
295            }
296            FileMetadata::Parquet { metadata, path, .. } => {
297                let meta = object_store
298                    .stat(path)
299                    .await
300                    .context(error::ReadObjectSnafu { path })?;
301                let reader = object_store
302                    .reader_with(path)
303                    .chunk(DEFAULT_READ_BUFFER)
304                    .await
305                    .context(error::ReadObjectSnafu { path })?
306                    .into_futures_async_read(0..meta.content_length())
307                    .await
308                    .context(error::ReadObjectSnafu { path })?
309                    .compat();
310                let builder =
311                    ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
312                let stream = builder
313                    .build()
314                    .context(error::BuildParquetRecordBatchStreamSnafu)?;
315
316                let output_schema = Arc::new(
317                    compat_schema
318                        .project(&projection)
319                        .context(error::ProjectSchemaSnafu)?,
320                );
321                Ok(Box::pin(
322                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
323                        .with_filter(filters)
324                        .context(error::PhysicalExprSnafu)?,
325                ))
326            }
327            FileMetadata::Orc { path, .. } => {
328                let meta = object_store
329                    .stat(path)
330                    .await
331                    .context(error::ReadObjectSnafu { path })?;
332
333                let reader = object_store
334                    .reader_with(path)
335                    .chunk(DEFAULT_READ_BUFFER)
336                    .await
337                    .context(error::ReadObjectSnafu { path })?;
338                let stream =
339                    new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
340                        .await
341                        .context(error::ReadOrcSnafu)?;
342
343                let output_schema = Arc::new(
344                    compat_schema
345                        .project(&projection)
346                        .context(error::ProjectSchemaSnafu)?,
347                );
348
349                Ok(Box::pin(
350                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
351                        .with_filter(filters)
352                        .context(error::PhysicalExprSnafu)?,
353                ))
354            }
355        }
356    }
357
358    #[tracing::instrument(skip_all)]
359    pub async fn copy_table_from(
360        &self,
361        req: CopyTableRequest,
362        query_ctx: QueryContextRef,
363    ) -> Result<Output> {
364        let table_ref = TableReference {
365            catalog: &req.catalog_name,
366            schema: &req.schema_name,
367            table: &req.table_name,
368        };
369        let table = self.get_table(&table_ref).await?;
370        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
371        let (object_store, entries) = self.list_copy_from_entries(&req).await?;
372        let mut files = Vec::with_capacity(entries.len());
373        let table_schema = table.schema().arrow_schema().clone();
374        let filters = table
375            .schema()
376            .timestamp_column()
377            .and_then(|c| {
378                common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
379            })
380            .into_iter()
381            .collect::<Vec<_>>();
382
383        for entry in entries.iter() {
384            if entry.metadata().mode() != EntryMode::FILE {
385                continue;
386            }
387            let path = entry.path();
388            let file_metadata = self
389                .collect_metadata(&object_store, format, path.to_string())
390                .await?;
391
392            let file_schema = file_metadata.schema();
393            let (file_schema_projection, table_schema_projection, compat_schema) =
394                generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
395            let projected_file_schema = Arc::new(
396                file_schema
397                    .project(&file_schema_projection)
398                    .context(error::ProjectSchemaSnafu)?,
399            );
400            let projected_table_schema = Arc::new(
401                table_schema
402                    .project(&table_schema_projection)
403                    .context(error::ProjectSchemaSnafu)?,
404            );
405            ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
406
407            files.push((
408                Arc::new(compat_schema),
409                file_schema_projection,
410                projected_table_schema,
411                file_metadata,
412            ))
413        }
414
415        let mut rows_inserted = 0;
416        let mut insert_cost = 0;
417        let max_insert_rows = req.limit.map(|n| n as usize);
418        for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
419        {
420            let mut stream = self
421                .build_read_stream(
422                    compat_schema,
423                    &object_store,
424                    &file_metadata,
425                    file_schema_projection,
426                    filters.clone(),
427                )
428                .await?;
429
430            let fields = projected_table_schema
431                .fields()
432                .iter()
433                .map(|f| f.name().to_string())
434                .collect::<Vec<_>>();
435
436            // TODO(hl): make this configurable through options.
437            let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
438            let mut pending_mem_size = 0;
439            let mut pending = vec![];
440
441            while let Some(r) = stream.next().await {
442                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
443                let vectors =
444                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
445
446                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
447
448                let columns_values = fields
449                    .iter()
450                    .cloned()
451                    .zip(vectors)
452                    .collect::<HashMap<_, _>>();
453
454                pending.push(self.inserter.handle_table_insert(
455                    InsertRequest {
456                        catalog_name: req.catalog_name.to_string(),
457                        schema_name: req.schema_name.to_string(),
458                        table_name: req.table_name.to_string(),
459                        columns_values,
460                    },
461                    query_ctx.clone(),
462                ));
463
464                if pending_mem_size as u64 >= pending_mem_threshold {
465                    let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
466                    rows_inserted += rows;
467                    insert_cost += cost;
468                }
469
470                if let Some(max_insert_rows) = max_insert_rows {
471                    if rows_inserted >= max_insert_rows {
472                        return Ok(gen_insert_output(rows_inserted, insert_cost));
473                    }
474                }
475            }
476
477            if !pending.is_empty() {
478                let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
479                rows_inserted += rows;
480                insert_cost += cost;
481            }
482        }
483
484        Ok(gen_insert_output(rows_inserted, insert_cost))
485    }
486}
487
488fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
489    Output::new(
490        OutputData::AffectedRows(rows_inserted),
491        OutputMeta::new_with_cost(insert_cost),
492    )
493}
494
495/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
496async fn batch_insert(
497    pending: &mut Vec<impl Future<Output = Result<Output>>>,
498    pending_bytes: &mut usize,
499) -> Result<(OutputRows, OutputCost)> {
500    let batch = pending.drain(..);
501    let result = futures::future::try_join_all(batch)
502        .await?
503        .iter()
504        .map(|o| o.extract_rows_and_cost())
505        .reduce(|(a, b), (c, d)| (a + c, b + d))
506        .unwrap_or((0, 0));
507    *pending_bytes = 0;
508    Ok(result)
509}
510
511/// Custom type compatibility check for GreptimeDB that handles Map -> Binary (JSON) conversion
512fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
513    // Handle Map -> Binary conversion for JSON types
514    if let ArrowDataType::Map(_, _) = from {
515        if let ArrowDataType::Binary = to {
516            return true;
517        }
518    }
519
520    // For all other cases, use Arrow's built-in can_cast_types
521    can_cast_types(from, to)
522}
523
524fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
525    let not_match = from
526        .fields
527        .iter()
528        .zip(to.fields.iter())
529        .map(|(l, r)| (l.data_type(), r.data_type()))
530        .enumerate()
531        .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
532
533    if let Some((index, _)) = not_match {
534        error::InvalidSchemaSnafu {
535            index,
536            table_schema: to.to_string(),
537            file_schema: from.to_string(),
538        }
539        .fail()
540    } else {
541        Ok(())
542    }
543}
544
545/// Generates a maybe compatible schema of the file schema.
546///
547/// If there is a field is found in table schema,
548/// copy the field data type to maybe compatible schema(`compatible_fields`).
549fn generated_schema_projection_and_compatible_file_schema(
550    file: &SchemaRef,
551    table: &SchemaRef,
552) -> (Vec<usize>, Vec<usize>, Schema) {
553    let mut file_projection = Vec::with_capacity(file.fields.len());
554    let mut table_projection = Vec::with_capacity(file.fields.len());
555    let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
556    for (file_idx, file_field) in file.fields.iter().enumerate() {
557        if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
558            file_projection.push(file_idx);
559            table_projection.push(table_idx);
560
561            // Safety: the compatible_fields has same length as file schema
562            compatible_fields[file_idx] = table_field.clone();
563        }
564    }
565
566    (
567        file_projection,
568        table_projection,
569        Schema::new(compatible_fields),
570    )
571}
572
573#[cfg(test)]
574mod tests {
575    use std::sync::Arc;
576
577    use datatypes::arrow::datatypes::{DataType, Field, Schema};
578
579    use super::*;
580
581    fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
582        let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
583        let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
584        let res = ensure_schema_compatible(&s1, &s2);
585        assert_eq!(
586            matches,
587            res.is_ok(),
588            "from data type: {}, to data type: {}, expected: {}, but got: {}",
589            from.0,
590            to.0,
591            matches,
592            res.is_ok()
593        )
594    }
595
596    #[test]
597    fn test_ensure_datatype_matches_ignore_timezone() {
598        test_schema_matches(
599            (
600                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
601                true,
602            ),
603            (
604                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
605                true,
606            ),
607            true,
608        );
609
610        test_schema_matches(
611            (
612                DataType::Timestamp(
613                    datatypes::arrow::datatypes::TimeUnit::Second,
614                    Some("UTC".into()),
615                ),
616                true,
617            ),
618            (
619                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
620                true,
621            ),
622            true,
623        );
624
625        test_schema_matches(
626            (
627                DataType::Timestamp(
628                    datatypes::arrow::datatypes::TimeUnit::Second,
629                    Some("UTC".into()),
630                ),
631                true,
632            ),
633            (
634                DataType::Timestamp(
635                    datatypes::arrow::datatypes::TimeUnit::Second,
636                    Some("PDT".into()),
637                ),
638                true,
639            ),
640            true,
641        );
642
643        test_schema_matches(
644            (
645                DataType::Timestamp(
646                    datatypes::arrow::datatypes::TimeUnit::Second,
647                    Some("UTC".into()),
648                ),
649                true,
650            ),
651            (
652                DataType::Timestamp(
653                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
654                    Some("UTC".into()),
655                ),
656                true,
657            ),
658            true,
659        );
660
661        test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
662
663        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
664    }
665
666    #[test]
667    fn test_data_type_equals_ignore_timezone_with_options() {
668        test_schema_matches(
669            (
670                DataType::Timestamp(
671                    datatypes::arrow::datatypes::TimeUnit::Microsecond,
672                    Some("UTC".into()),
673                ),
674                true,
675            ),
676            (
677                DataType::Timestamp(
678                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
679                    Some("PDT".into()),
680                ),
681                true,
682            ),
683            true,
684        );
685
686        test_schema_matches(
687            (DataType::Utf8, true),
688            (
689                DataType::Timestamp(
690                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
691                    Some("PDT".into()),
692                ),
693                true,
694            ),
695            true,
696        );
697
698        test_schema_matches(
699            (
700                DataType::Timestamp(
701                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
702                    Some("PDT".into()),
703                ),
704                true,
705            ),
706            (DataType::Utf8, true),
707            true,
708        );
709    }
710
711    #[test]
712    fn test_map_to_binary_json_compatibility() {
713        // Test Map -> Binary conversion for JSON types
714        let map_type = DataType::Map(
715            Arc::new(Field::new(
716                "key_value",
717                DataType::Struct(
718                    vec![
719                        Field::new("key", DataType::Utf8, false),
720                        Field::new("value", DataType::Utf8, false),
721                    ]
722                    .into(),
723                ),
724                false,
725            )),
726            false,
727        );
728
729        test_schema_matches((map_type, false), (DataType::Binary, true), true);
730
731        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
732        test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
733    }
734
735    fn make_test_schema(v: &[Field]) -> Arc<Schema> {
736        Arc::new(Schema::new(v.to_vec()))
737    }
738
739    #[test]
740    fn test_compatible_file_schema() {
741        let file_schema0 = make_test_schema(&[
742            Field::new("c1", DataType::UInt8, true),
743            Field::new("c2", DataType::UInt8, true),
744        ]);
745
746        let table_schema = make_test_schema(&[
747            Field::new("c1", DataType::Int16, true),
748            Field::new("c2", DataType::Int16, true),
749            Field::new("c3", DataType::Int16, true),
750        ]);
751
752        let compat_schema = make_test_schema(&[
753            Field::new("c1", DataType::Int16, true),
754            Field::new("c2", DataType::Int16, true),
755        ]);
756
757        let (_, tp, _) =
758            generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
759
760        assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
761    }
762
763    #[test]
764    fn test_schema_projection() {
765        let file_schema0 = make_test_schema(&[
766            Field::new("c1", DataType::UInt8, true),
767            Field::new("c2", DataType::UInt8, true),
768            Field::new("c3", DataType::UInt8, true),
769        ]);
770
771        let file_schema1 = make_test_schema(&[
772            Field::new("c3", DataType::UInt8, true),
773            Field::new("c4", DataType::UInt8, true),
774        ]);
775
776        let file_schema2 = make_test_schema(&[
777            Field::new("c3", DataType::UInt8, true),
778            Field::new("c4", DataType::UInt8, true),
779            Field::new("c5", DataType::UInt8, true),
780        ]);
781
782        let file_schema3 = make_test_schema(&[
783            Field::new("c1", DataType::UInt8, true),
784            Field::new("c2", DataType::UInt8, true),
785        ]);
786
787        let table_schema = make_test_schema(&[
788            Field::new("c3", DataType::UInt8, true),
789            Field::new("c4", DataType::UInt8, true),
790            Field::new("c5", DataType::UInt8, true),
791        ]);
792
793        let tests = [
794            (&file_schema0, &table_schema, true), // intersection
795            (&file_schema1, &table_schema, true), // subset
796            (&file_schema2, &table_schema, true), // full-eq
797            (&file_schema3, &table_schema, true), // non-intersection
798        ];
799
800        for test in tests {
801            let (fp, tp, _) =
802                generated_schema_projection_and_compatible_file_schema(test.0, test.1);
803            assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
804        }
805    }
806}