common_datasource/file_format/
orc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow_schema::Schema;
16use async_trait::async_trait;
17use bytes::Bytes;
18use futures::future::BoxFuture;
19use futures::FutureExt;
20use object_store::ObjectStore;
21use orc_rust::arrow_reader::ArrowReaderBuilder;
22use orc_rust::async_arrow_reader::ArrowStreamReader;
23use orc_rust::reader::AsyncChunkReader;
24use snafu::ResultExt;
25
26use crate::error::{self, Result};
27use crate::file_format::FileFormat;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub struct OrcFormat;
31
32#[derive(Clone)]
33pub struct ReaderAdapter {
34    reader: object_store::Reader,
35    len: u64,
36}
37
38impl ReaderAdapter {
39    pub fn new(reader: object_store::Reader, len: u64) -> Self {
40        Self { reader, len }
41    }
42}
43
44impl AsyncChunkReader for ReaderAdapter {
45    fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
46        async move { Ok(self.len) }.boxed()
47    }
48
49    fn get_bytes(
50        &mut self,
51        offset_from_start: u64,
52        length: u64,
53    ) -> BoxFuture<'_, std::io::Result<Bytes>> {
54        async move {
55            let bytes = self
56                .reader
57                .read(offset_from_start..offset_from_start + length)
58                .await?;
59            Ok(bytes.to_bytes())
60        }
61        .boxed()
62    }
63}
64
65pub async fn new_orc_stream_reader(
66    reader: ReaderAdapter,
67) -> Result<ArrowStreamReader<ReaderAdapter>> {
68    let reader_build = ArrowReaderBuilder::try_new_async(reader)
69        .await
70        .context(error::OrcReaderSnafu)?;
71    Ok(reader_build.build_async())
72}
73
74pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
75    let reader = new_orc_stream_reader(reader).await?;
76    Ok(reader.schema().as_ref().clone())
77}
78
79#[async_trait]
80impl FileFormat for OrcFormat {
81    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
82        let meta = store
83            .stat(path)
84            .await
85            .context(error::ReadObjectSnafu { path })?;
86        let reader = store
87            .reader(path)
88            .await
89            .context(error::ReadObjectSnafu { path })?;
90        let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
91        Ok(schema)
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use common_test_util::find_workspace_path;
98
99    use super::*;
100    use crate::file_format::FileFormat;
101    use crate::test_util::{format_schema, test_store};
102
103    fn test_data_root() -> String {
104        find_workspace_path("/src/common/datasource/tests/orc")
105            .display()
106            .to_string()
107    }
108
109    #[tokio::test]
110    async fn test_orc_infer_schema() {
111        let store = test_store(&test_data_root());
112        let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
113        let formatted: Vec<_> = format_schema(schema);
114
115        assert_eq!(
116            vec![
117                "double_a: Float64: NULL",
118                "a: Float32: NULL",
119                "b: Boolean: NULL",
120                "str_direct: Utf8: NULL",
121                "d: Utf8: NULL",
122                "e: Utf8: NULL",
123                "f: Utf8: NULL",
124                "int_short_repeated: Int32: NULL",
125                "int_neg_short_repeated: Int32: NULL",
126                "int_delta: Int32: NULL",
127                "int_neg_delta: Int32: NULL",
128                "int_direct: Int32: NULL",
129                "int_neg_direct: Int32: NULL",
130                "bigint_direct: Int64: NULL",
131                "bigint_neg_direct: Int64: NULL",
132                "bigint_other: Int64: NULL",
133                "utf8_increase: Utf8: NULL",
134                "utf8_decrease: Utf8: NULL",
135                "timestamp_simple: Timestamp(Nanosecond, None): NULL",
136                "date_simple: Date32: NULL"
137            ],
138            formatted
139        );
140    }
141}