common_datasource/file_format/
orc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow_schema::{ArrowError, Schema, SchemaRef};
18use async_trait::async_trait;
19use bytes::Bytes;
20use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
21use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
22use datafusion::error::{DataFusionError, Result as DfResult};
23use futures::future::BoxFuture;
24use futures::{FutureExt, StreamExt, TryStreamExt};
25use object_store::ObjectStore;
26use orc_rust::arrow_reader::ArrowReaderBuilder;
27use orc_rust::async_arrow_reader::ArrowStreamReader;
28use orc_rust::reader::AsyncChunkReader;
29use snafu::ResultExt;
30
31use crate::error::{self, Result};
32use crate::file_format::FileFormat;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
35pub struct OrcFormat;
36
37#[derive(Clone)]
38pub struct ReaderAdapter {
39    reader: object_store::Reader,
40    len: u64,
41}
42
43impl ReaderAdapter {
44    pub fn new(reader: object_store::Reader, len: u64) -> Self {
45        Self { reader, len }
46    }
47}
48
49impl AsyncChunkReader for ReaderAdapter {
50    fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
51        async move { Ok(self.len) }.boxed()
52    }
53
54    fn get_bytes(
55        &mut self,
56        offset_from_start: u64,
57        length: u64,
58    ) -> BoxFuture<'_, std::io::Result<Bytes>> {
59        async move {
60            let bytes = self
61                .reader
62                .read(offset_from_start..offset_from_start + length)
63                .await?;
64            Ok(bytes.to_bytes())
65        }
66        .boxed()
67    }
68}
69
70pub async fn new_orc_stream_reader(
71    reader: ReaderAdapter,
72) -> Result<ArrowStreamReader<ReaderAdapter>> {
73    let reader_build = ArrowReaderBuilder::try_new_async(reader)
74        .await
75        .context(error::OrcReaderSnafu)?;
76    Ok(reader_build.build_async())
77}
78
79pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
80    let reader = new_orc_stream_reader(reader).await?;
81    Ok(reader.schema().as_ref().clone())
82}
83
84#[async_trait]
85impl FileFormat for OrcFormat {
86    async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
87        let meta = store
88            .stat(path)
89            .await
90            .context(error::ReadObjectSnafu { path })?;
91        let reader = store
92            .reader(path)
93            .await
94            .context(error::ReadObjectSnafu { path })?;
95        let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
96        Ok(schema)
97    }
98}
99
100#[derive(Debug, Clone)]
101pub struct OrcOpener {
102    object_store: Arc<ObjectStore>,
103    output_schema: SchemaRef,
104    projection: Option<Vec<usize>>,
105}
106
107impl OrcOpener {
108    pub fn new(
109        object_store: ObjectStore,
110        output_schema: SchemaRef,
111        projection: Option<Vec<usize>>,
112    ) -> Self {
113        Self {
114            object_store: Arc::from(object_store),
115            output_schema,
116            projection,
117        }
118    }
119}
120
121impl FileOpener for OrcOpener {
122    fn open(&self, meta: FileMeta) -> DfResult<FileOpenFuture> {
123        let object_store = self.object_store.clone();
124        let projected_schema = if let Some(projection) = &self.projection {
125            let projected_schema = self
126                .output_schema
127                .project(projection)
128                .map_err(|e| DataFusionError::External(Box::new(e)))?;
129            Arc::new(projected_schema)
130        } else {
131            self.output_schema.clone()
132        };
133        let projection = self.projection.clone();
134        Ok(Box::pin(async move {
135            let path = meta.location().to_string();
136
137            let meta = object_store
138                .stat(&path)
139                .await
140                .map_err(|e| DataFusionError::External(Box::new(e)))?;
141
142            let reader = object_store
143                .reader(&path)
144                .await
145                .map_err(|e| DataFusionError::External(Box::new(e)))?;
146
147            let stream_reader =
148                new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
149                    .await
150                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
151
152            let stream =
153                RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);
154
155            let adopted = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
156            Ok(adopted.boxed())
157        }))
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use common_test_util::find_workspace_path;
164
165    use super::*;
166    use crate::file_format::FileFormat;
167    use crate::test_util::{format_schema, test_store};
168
169    fn test_data_root() -> String {
170        find_workspace_path("/src/common/datasource/tests/orc")
171            .display()
172            .to_string()
173    }
174
175    #[tokio::test]
176    async fn test_orc_infer_schema() {
177        let store = test_store(&test_data_root());
178        let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
179        let formatted: Vec<_> = format_schema(schema);
180
181        assert_eq!(
182            vec![
183                "double_a: Float64: NULL",
184                "a: Float32: NULL",
185                "b: Boolean: NULL",
186                "str_direct: Utf8: NULL",
187                "d: Utf8: NULL",
188                "e: Utf8: NULL",
189                "f: Utf8: NULL",
190                "int_short_repeated: Int32: NULL",
191                "int_neg_short_repeated: Int32: NULL",
192                "int_delta: Int32: NULL",
193                "int_neg_delta: Int32: NULL",
194                "int_direct: Int32: NULL",
195                "int_neg_direct: Int32: NULL",
196                "bigint_direct: Int64: NULL",
197                "bigint_neg_direct: Int64: NULL",
198                "bigint_other: Int64: NULL",
199                "utf8_increase: Utf8: NULL",
200                "utf8_decrease: Utf8: NULL",
201                "timestamp_simple: Timestamp(Nanosecond, None): NULL",
202                "date_simple: Date32: NULL"
203            ],
204            formatted
205        );
206    }
207}