operator/statement/
copy_table_from.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::Future;
17use std::path::Path;
18use std::sync::Arc;
19
20use client::{Output, OutputData, OutputMeta};
21use common_base::readable_size::ReadableSize;
22use common_datasource::file_format::csv::CsvFormat;
23use common_datasource::file_format::json::JsonFormat;
24use common_datasource::file_format::orc::{infer_orc_schema, new_orc_stream_reader, ReaderAdapter};
25use common_datasource::file_format::{FileFormat, Format};
26use common_datasource::lister::{Lister, Source};
27use common_datasource::object_store::{build_backend, parse_url, FS_SCHEMA};
28use common_datasource::util::find_dir_and_filename;
29use common_query::{OutputCost, OutputRows};
30use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
31use common_recordbatch::DfSendableRecordBatchStream;
32use common_telemetry::{debug, tracing};
33use datafusion::datasource::listing::PartitionedFile;
34use datafusion::datasource::object_store::ObjectStoreUrl;
35use datafusion::datasource::physical_plan::{
36    CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener,
37};
38use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
39use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
40use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
41use datafusion_common::{Constraints, Statistics};
42use datafusion_expr::Expr;
43use datatypes::arrow::compute::can_cast_types;
44use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
45use datatypes::vectors::Helper;
46use futures_util::StreamExt;
47use object_store::{Entry, EntryMode, ObjectStore};
48use regex::Regex;
49use session::context::QueryContextRef;
50use snafu::{ensure, ResultExt};
51use table::requests::{CopyTableRequest, InsertRequest};
52use table::table_reference::TableReference;
53use tokio_util::compat::FuturesAsyncReadCompatExt;
54
55use crate::error::{self, IntoVectorsSnafu, PathNotFoundSnafu, Result};
56use crate::statement::StatementExecutor;
57
58const DEFAULT_BATCH_SIZE: usize = 8192;
59const DEFAULT_READ_BUFFER: usize = 256 * 1024;
60enum FileMetadata {
61    Parquet {
62        schema: SchemaRef,
63        metadata: ArrowReaderMetadata,
64        path: String,
65    },
66    Orc {
67        schema: SchemaRef,
68        path: String,
69    },
70    Json {
71        schema: SchemaRef,
72        format: JsonFormat,
73        path: String,
74    },
75    Csv {
76        schema: SchemaRef,
77        format: CsvFormat,
78        path: String,
79    },
80}
81
82impl FileMetadata {
83    /// Returns the [SchemaRef]
84    pub fn schema(&self) -> &SchemaRef {
85        match self {
86            FileMetadata::Parquet { schema, .. } => schema,
87            FileMetadata::Orc { schema, .. } => schema,
88            FileMetadata::Json { schema, .. } => schema,
89            FileMetadata::Csv { schema, .. } => schema,
90        }
91    }
92}
93
94impl StatementExecutor {
95    async fn list_copy_from_entries(
96        &self,
97        req: &CopyTableRequest,
98    ) -> Result<(ObjectStore, Vec<Entry>)> {
99        let (schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
100
101        if schema.to_uppercase() == FS_SCHEMA {
102            ensure!(Path::new(&path).exists(), PathNotFoundSnafu { path });
103        }
104
105        let object_store =
106            build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
107
108        let (dir, filename) = find_dir_and_filename(&path);
109        let regex = req
110            .pattern
111            .as_ref()
112            .map(|x| Regex::new(x))
113            .transpose()
114            .context(error::BuildRegexSnafu)?;
115
116        let source = if let Some(filename) = filename {
117            Source::Filename(filename)
118        } else {
119            Source::Dir
120        };
121
122        let lister = Lister::new(object_store.clone(), source.clone(), dir.to_string(), regex);
123
124        let entries = lister.list().await.context(error::ListObjectsSnafu)?;
125        debug!("Copy from dir: {dir:?}, {source:?}, entries: {entries:?}");
126        Ok((object_store, entries))
127    }
128
129    async fn collect_metadata(
130        &self,
131        object_store: &ObjectStore,
132        format: Format,
133        path: String,
134    ) -> Result<FileMetadata> {
135        match format {
136            Format::Csv(format) => Ok(FileMetadata::Csv {
137                schema: Arc::new(
138                    format
139                        .infer_schema(object_store, &path)
140                        .await
141                        .context(error::InferSchemaSnafu { path: &path })?,
142                ),
143                format,
144                path,
145            }),
146            Format::Json(format) => Ok(FileMetadata::Json {
147                schema: Arc::new(
148                    format
149                        .infer_schema(object_store, &path)
150                        .await
151                        .context(error::InferSchemaSnafu { path: &path })?,
152                ),
153                format,
154                path,
155            }),
156            Format::Parquet(_) => {
157                let meta = object_store
158                    .stat(&path)
159                    .await
160                    .context(error::ReadObjectSnafu { path: &path })?;
161                let mut reader = object_store
162                    .reader(&path)
163                    .await
164                    .context(error::ReadObjectSnafu { path: &path })?
165                    .into_futures_async_read(0..meta.content_length())
166                    .await
167                    .context(error::ReadObjectSnafu { path: &path })?
168                    .compat();
169                let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
170                    .await
171                    .context(error::ReadParquetMetadataSnafu)?;
172
173                Ok(FileMetadata::Parquet {
174                    schema: metadata.schema().clone(),
175                    metadata,
176                    path,
177                })
178            }
179            Format::Orc(_) => {
180                let meta = object_store
181                    .stat(&path)
182                    .await
183                    .context(error::ReadObjectSnafu { path: &path })?;
184
185                let reader = object_store
186                    .reader(&path)
187                    .await
188                    .context(error::ReadObjectSnafu { path: &path })?;
189
190                let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length()))
191                    .await
192                    .context(error::ReadOrcSnafu)?;
193
194                Ok(FileMetadata::Orc {
195                    schema: Arc::new(schema),
196                    path,
197                })
198            }
199        }
200    }
201
202    async fn build_file_stream<F: FileOpener + Send + 'static>(
203        &self,
204        opener: F,
205        filename: &str,
206        file_schema: SchemaRef,
207    ) -> Result<DfSendableRecordBatchStream> {
208        let statistics = Statistics::new_unknown(file_schema.as_ref());
209        let stream = FileStream::new(
210            &FileScanConfig {
211                object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
212                file_schema,
213                file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
214                statistics,
215                projection: None,
216                limit: None,
217                table_partition_cols: vec![],
218                output_ordering: vec![],
219                constraints: Constraints::empty(),
220            },
221            0,
222            opener,
223            &ExecutionPlanMetricsSet::new(),
224        )
225        .context(error::BuildFileStreamSnafu)?;
226
227        Ok(Box::pin(stream))
228    }
229
230    async fn build_read_stream(
231        &self,
232        compat_schema: SchemaRef,
233        object_store: &ObjectStore,
234        file_metadata: &FileMetadata,
235        projection: Vec<usize>,
236        filters: Vec<Expr>,
237    ) -> Result<DfSendableRecordBatchStream> {
238        match file_metadata {
239            FileMetadata::Csv {
240                format,
241                path,
242                schema,
243            } => {
244                let output_schema = Arc::new(
245                    compat_schema
246                        .project(&projection)
247                        .context(error::ProjectSchemaSnafu)?,
248                );
249                let csv_config = Arc::new(CsvConfig::new(
250                    DEFAULT_BATCH_SIZE,
251                    schema.clone(),
252                    Some(projection.clone()),
253                    format.has_header,
254                    format.delimiter,
255                    b'"',
256                    None,
257                    Arc::new(object_store_opendal::OpendalStore::new(
258                        object_store.clone(),
259                    )),
260                    None,
261                ));
262                let projected_file_schema = Arc::new(
263                    schema
264                        .project(&projection)
265                        .context(error::ProjectSchemaSnafu)?,
266                );
267                let stream = self
268                    .build_file_stream(
269                        CsvOpener::new(csv_config, format.compression_type.into()),
270                        path,
271                        projected_file_schema,
272                    )
273                    .await?;
274
275                Ok(Box::pin(
276                    // The projection is already applied in the CSV reader when we created the stream,
277                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
278                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
279                        .with_filter(filters)
280                        .context(error::PhysicalExprSnafu)?,
281                ))
282            }
283            FileMetadata::Json {
284                format,
285                path,
286                schema,
287            } => {
288                let projected_file_schema = Arc::new(
289                    schema
290                        .project(&projection)
291                        .context(error::ProjectSchemaSnafu)?,
292                );
293                let output_schema = Arc::new(
294                    compat_schema
295                        .project(&projection)
296                        .context(error::ProjectSchemaSnafu)?,
297                );
298                let store = object_store_opendal::OpendalStore::new(object_store.clone());
299                let stream = self
300                    .build_file_stream(
301                        JsonOpener::new(
302                            DEFAULT_BATCH_SIZE,
303                            projected_file_schema.clone(),
304                            format.compression_type.into(),
305                            Arc::new(store),
306                        ),
307                        path,
308                        projected_file_schema,
309                    )
310                    .await?;
311
312                Ok(Box::pin(
313                    // The projection is already applied in the JSON reader when we created the stream,
314                    // so we pass None here to avoid double projection which would cause schema mismatch errors.
315                    RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
316                        .with_filter(filters)
317                        .context(error::PhysicalExprSnafu)?,
318                ))
319            }
320            FileMetadata::Parquet { metadata, path, .. } => {
321                let meta = object_store
322                    .stat(path)
323                    .await
324                    .context(error::ReadObjectSnafu { path })?;
325                let reader = object_store
326                    .reader_with(path)
327                    .chunk(DEFAULT_READ_BUFFER)
328                    .await
329                    .context(error::ReadObjectSnafu { path })?
330                    .into_futures_async_read(0..meta.content_length())
331                    .await
332                    .context(error::ReadObjectSnafu { path })?
333                    .compat();
334                let builder =
335                    ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
336                let stream = builder
337                    .build()
338                    .context(error::BuildParquetRecordBatchStreamSnafu)?;
339
340                let output_schema = Arc::new(
341                    compat_schema
342                        .project(&projection)
343                        .context(error::ProjectSchemaSnafu)?,
344                );
345                Ok(Box::pin(
346                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
347                        .with_filter(filters)
348                        .context(error::PhysicalExprSnafu)?,
349                ))
350            }
351            FileMetadata::Orc { path, .. } => {
352                let meta = object_store
353                    .stat(path)
354                    .await
355                    .context(error::ReadObjectSnafu { path })?;
356
357                let reader = object_store
358                    .reader_with(path)
359                    .chunk(DEFAULT_READ_BUFFER)
360                    .await
361                    .context(error::ReadObjectSnafu { path })?;
362                let stream =
363                    new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
364                        .await
365                        .context(error::ReadOrcSnafu)?;
366
367                let output_schema = Arc::new(
368                    compat_schema
369                        .project(&projection)
370                        .context(error::ProjectSchemaSnafu)?,
371                );
372
373                Ok(Box::pin(
374                    RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
375                        .with_filter(filters)
376                        .context(error::PhysicalExprSnafu)?,
377                ))
378            }
379        }
380    }
381
382    #[tracing::instrument(skip_all)]
383    pub async fn copy_table_from(
384        &self,
385        req: CopyTableRequest,
386        query_ctx: QueryContextRef,
387    ) -> Result<Output> {
388        let table_ref = TableReference {
389            catalog: &req.catalog_name,
390            schema: &req.schema_name,
391            table: &req.table_name,
392        };
393        let table = self.get_table(&table_ref).await?;
394        let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
395        let (object_store, entries) = self.list_copy_from_entries(&req).await?;
396        let mut files = Vec::with_capacity(entries.len());
397        let table_schema = table.schema().arrow_schema().clone();
398        let filters = table
399            .schema()
400            .timestamp_column()
401            .and_then(|c| {
402                common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range)
403            })
404            .into_iter()
405            .collect::<Vec<_>>();
406
407        for entry in entries.iter() {
408            if entry.metadata().mode() != EntryMode::FILE {
409                continue;
410            }
411            let path = entry.path();
412            let file_metadata = self
413                .collect_metadata(&object_store, format, path.to_string())
414                .await?;
415
416            let file_schema = file_metadata.schema();
417            let (file_schema_projection, table_schema_projection, compat_schema) =
418                generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
419            let projected_file_schema = Arc::new(
420                file_schema
421                    .project(&file_schema_projection)
422                    .context(error::ProjectSchemaSnafu)?,
423            );
424            let projected_table_schema = Arc::new(
425                table_schema
426                    .project(&table_schema_projection)
427                    .context(error::ProjectSchemaSnafu)?,
428            );
429            ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
430
431            files.push((
432                Arc::new(compat_schema),
433                file_schema_projection,
434                projected_table_schema,
435                file_metadata,
436            ))
437        }
438
439        let mut rows_inserted = 0;
440        let mut insert_cost = 0;
441        let max_insert_rows = req.limit.map(|n| n as usize);
442        for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
443        {
444            let mut stream = self
445                .build_read_stream(
446                    compat_schema,
447                    &object_store,
448                    &file_metadata,
449                    file_schema_projection,
450                    filters.clone(),
451                )
452                .await?;
453
454            let fields = projected_table_schema
455                .fields()
456                .iter()
457                .map(|f| f.name().to_string())
458                .collect::<Vec<_>>();
459
460            // TODO(hl): make this configurable through options.
461            let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
462            let mut pending_mem_size = 0;
463            let mut pending = vec![];
464
465            while let Some(r) = stream.next().await {
466                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
467                let vectors =
468                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
469
470                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
471
472                let columns_values = fields
473                    .iter()
474                    .cloned()
475                    .zip(vectors)
476                    .collect::<HashMap<_, _>>();
477
478                pending.push(self.inserter.handle_table_insert(
479                    InsertRequest {
480                        catalog_name: req.catalog_name.to_string(),
481                        schema_name: req.schema_name.to_string(),
482                        table_name: req.table_name.to_string(),
483                        columns_values,
484                    },
485                    query_ctx.clone(),
486                ));
487
488                if pending_mem_size as u64 >= pending_mem_threshold {
489                    let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
490                    rows_inserted += rows;
491                    insert_cost += cost;
492                }
493
494                if let Some(max_insert_rows) = max_insert_rows {
495                    if rows_inserted >= max_insert_rows {
496                        return Ok(gen_insert_output(rows_inserted, insert_cost));
497                    }
498                }
499            }
500
501            if !pending.is_empty() {
502                let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
503                rows_inserted += rows;
504                insert_cost += cost;
505            }
506        }
507
508        Ok(gen_insert_output(rows_inserted, insert_cost))
509    }
510}
511
512fn gen_insert_output(rows_inserted: usize, insert_cost: usize) -> Output {
513    Output::new(
514        OutputData::AffectedRows(rows_inserted),
515        OutputMeta::new_with_cost(insert_cost),
516    )
517}
518
519/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
520async fn batch_insert(
521    pending: &mut Vec<impl Future<Output = Result<Output>>>,
522    pending_bytes: &mut usize,
523) -> Result<(OutputRows, OutputCost)> {
524    let batch = pending.drain(..);
525    let result = futures::future::try_join_all(batch)
526        .await?
527        .iter()
528        .map(|o| o.extract_rows_and_cost())
529        .reduce(|(a, b), (c, d)| (a + c, b + d))
530        .unwrap_or((0, 0));
531    *pending_bytes = 0;
532    Ok(result)
533}
534
535/// Custom type compatibility check for GreptimeDB that handles Map -> Binary (JSON) conversion
536fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool {
537    // Handle Map -> Binary conversion for JSON types
538    if let ArrowDataType::Map(_, _) = from {
539        if let ArrowDataType::Binary = to {
540            return true;
541        }
542    }
543
544    // For all other cases, use Arrow's built-in can_cast_types
545    can_cast_types(from, to)
546}
547
548fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
549    let not_match = from
550        .fields
551        .iter()
552        .zip(to.fields.iter())
553        .map(|(l, r)| (l.data_type(), r.data_type()))
554        .enumerate()
555        .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r));
556
557    if let Some((index, _)) = not_match {
558        error::InvalidSchemaSnafu {
559            index,
560            table_schema: to.to_string(),
561            file_schema: from.to_string(),
562        }
563        .fail()
564    } else {
565        Ok(())
566    }
567}
568
569/// Generates a maybe compatible schema of the file schema.
570///
571/// If there is a field is found in table schema,
572/// copy the field data type to maybe compatible schema(`compatible_fields`).
573fn generated_schema_projection_and_compatible_file_schema(
574    file: &SchemaRef,
575    table: &SchemaRef,
576) -> (Vec<usize>, Vec<usize>, Schema) {
577    let mut file_projection = Vec::with_capacity(file.fields.len());
578    let mut table_projection = Vec::with_capacity(file.fields.len());
579    let mut compatible_fields = file.fields.iter().cloned().collect::<Vec<_>>();
580    for (file_idx, file_field) in file.fields.iter().enumerate() {
581        if let Some((table_idx, table_field)) = table.fields.find(file_field.name()) {
582            file_projection.push(file_idx);
583            table_projection.push(table_idx);
584
585            // Safety: the compatible_fields has same length as file schema
586            compatible_fields[file_idx] = table_field.clone();
587        }
588    }
589
590    (
591        file_projection,
592        table_projection,
593        Schema::new(compatible_fields),
594    )
595}
596
597#[cfg(test)]
598mod tests {
599    use std::sync::Arc;
600
601    use datatypes::arrow::datatypes::{DataType, Field, Schema};
602
603    use super::*;
604
605    fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
606        let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
607        let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
608        let res = ensure_schema_compatible(&s1, &s2);
609        assert_eq!(
610            matches,
611            res.is_ok(),
612            "from data type: {}, to data type: {}, expected: {}, but got: {}",
613            from.0,
614            to.0,
615            matches,
616            res.is_ok()
617        )
618    }
619
620    #[test]
621    fn test_ensure_datatype_matches_ignore_timezone() {
622        test_schema_matches(
623            (
624                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
625                true,
626            ),
627            (
628                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
629                true,
630            ),
631            true,
632        );
633
634        test_schema_matches(
635            (
636                DataType::Timestamp(
637                    datatypes::arrow::datatypes::TimeUnit::Second,
638                    Some("UTC".into()),
639                ),
640                true,
641            ),
642            (
643                DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
644                true,
645            ),
646            true,
647        );
648
649        test_schema_matches(
650            (
651                DataType::Timestamp(
652                    datatypes::arrow::datatypes::TimeUnit::Second,
653                    Some("UTC".into()),
654                ),
655                true,
656            ),
657            (
658                DataType::Timestamp(
659                    datatypes::arrow::datatypes::TimeUnit::Second,
660                    Some("PDT".into()),
661                ),
662                true,
663            ),
664            true,
665        );
666
667        test_schema_matches(
668            (
669                DataType::Timestamp(
670                    datatypes::arrow::datatypes::TimeUnit::Second,
671                    Some("UTC".into()),
672                ),
673                true,
674            ),
675            (
676                DataType::Timestamp(
677                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
678                    Some("UTC".into()),
679                ),
680                true,
681            ),
682            true,
683        );
684
685        test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
686
687        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
688    }
689
690    #[test]
691    fn test_data_type_equals_ignore_timezone_with_options() {
692        test_schema_matches(
693            (
694                DataType::Timestamp(
695                    datatypes::arrow::datatypes::TimeUnit::Microsecond,
696                    Some("UTC".into()),
697                ),
698                true,
699            ),
700            (
701                DataType::Timestamp(
702                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
703                    Some("PDT".into()),
704                ),
705                true,
706            ),
707            true,
708        );
709
710        test_schema_matches(
711            (DataType::Utf8, true),
712            (
713                DataType::Timestamp(
714                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
715                    Some("PDT".into()),
716                ),
717                true,
718            ),
719            true,
720        );
721
722        test_schema_matches(
723            (
724                DataType::Timestamp(
725                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
726                    Some("PDT".into()),
727                ),
728                true,
729            ),
730            (DataType::Utf8, true),
731            true,
732        );
733    }
734
735    #[test]
736    fn test_map_to_binary_json_compatibility() {
737        // Test Map -> Binary conversion for JSON types
738        let map_type = DataType::Map(
739            Arc::new(Field::new(
740                "key_value",
741                DataType::Struct(
742                    vec![
743                        Field::new("key", DataType::Utf8, false),
744                        Field::new("value", DataType::Utf8, false),
745                    ]
746                    .into(),
747                ),
748                false,
749            )),
750            false,
751        );
752
753        test_schema_matches((map_type, false), (DataType::Binary, true), true);
754
755        test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
756        test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true);
757    }
758
759    fn make_test_schema(v: &[Field]) -> Arc<Schema> {
760        Arc::new(Schema::new(v.to_vec()))
761    }
762
763    #[test]
764    fn test_compatible_file_schema() {
765        let file_schema0 = make_test_schema(&[
766            Field::new("c1", DataType::UInt8, true),
767            Field::new("c2", DataType::UInt8, true),
768        ]);
769
770        let table_schema = make_test_schema(&[
771            Field::new("c1", DataType::Int16, true),
772            Field::new("c2", DataType::Int16, true),
773            Field::new("c3", DataType::Int16, true),
774        ]);
775
776        let compat_schema = make_test_schema(&[
777            Field::new("c1", DataType::Int16, true),
778            Field::new("c2", DataType::Int16, true),
779        ]);
780
781        let (_, tp, _) =
782            generated_schema_projection_and_compatible_file_schema(&file_schema0, &table_schema);
783
784        assert_eq!(table_schema.project(&tp).unwrap(), *compat_schema);
785    }
786
787    #[test]
788    fn test_schema_projection() {
789        let file_schema0 = make_test_schema(&[
790            Field::new("c1", DataType::UInt8, true),
791            Field::new("c2", DataType::UInt8, true),
792            Field::new("c3", DataType::UInt8, true),
793        ]);
794
795        let file_schema1 = make_test_schema(&[
796            Field::new("c3", DataType::UInt8, true),
797            Field::new("c4", DataType::UInt8, true),
798        ]);
799
800        let file_schema2 = make_test_schema(&[
801            Field::new("c3", DataType::UInt8, true),
802            Field::new("c4", DataType::UInt8, true),
803            Field::new("c5", DataType::UInt8, true),
804        ]);
805
806        let file_schema3 = make_test_schema(&[
807            Field::new("c1", DataType::UInt8, true),
808            Field::new("c2", DataType::UInt8, true),
809        ]);
810
811        let table_schema = make_test_schema(&[
812            Field::new("c3", DataType::UInt8, true),
813            Field::new("c4", DataType::UInt8, true),
814            Field::new("c5", DataType::UInt8, true),
815        ]);
816
817        let tests = [
818            (&file_schema0, &table_schema, true), // intersection
819            (&file_schema1, &table_schema, true), // subset
820            (&file_schema2, &table_schema, true), // full-eq
821            (&file_schema3, &table_schema, true), // non-intersection
822        ];
823
824        for test in tests {
825            let (fp, tp, _) =
826                generated_schema_projection_and_compatible_file_schema(test.0, test.1);
827            assert_eq!(test.0.project(&fp).unwrap(), test.1.project(&tp).unwrap());
828        }
829    }
830}