common_datasource/file_format/
orc.rs1use std::sync::Arc;
16
17use arrow_schema::{ArrowError, Schema, SchemaRef};
18use async_trait::async_trait;
19use bytes::Bytes;
20use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
21use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
22use datafusion::error::{DataFusionError, Result as DfResult};
23use futures::future::BoxFuture;
24use futures::{FutureExt, StreamExt, TryStreamExt};
25use object_store::ObjectStore;
26use orc_rust::arrow_reader::ArrowReaderBuilder;
27use orc_rust::async_arrow_reader::ArrowStreamReader;
28use orc_rust::reader::AsyncChunkReader;
29use snafu::ResultExt;
30
31use crate::error::{self, Result};
32use crate::file_format::FileFormat;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
35pub struct OrcFormat;
36
37#[derive(Clone)]
38pub struct ReaderAdapter {
39 reader: object_store::Reader,
40 len: u64,
41}
42
43impl ReaderAdapter {
44 pub fn new(reader: object_store::Reader, len: u64) -> Self {
45 Self { reader, len }
46 }
47}
48
49impl AsyncChunkReader for ReaderAdapter {
50 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
51 async move { Ok(self.len) }.boxed()
52 }
53
54 fn get_bytes(
55 &mut self,
56 offset_from_start: u64,
57 length: u64,
58 ) -> BoxFuture<'_, std::io::Result<Bytes>> {
59 async move {
60 let bytes = self
61 .reader
62 .read(offset_from_start..offset_from_start + length)
63 .await?;
64 Ok(bytes.to_bytes())
65 }
66 .boxed()
67 }
68}
69
70pub async fn new_orc_stream_reader(
71 reader: ReaderAdapter,
72) -> Result<ArrowStreamReader<ReaderAdapter>> {
73 let reader_build = ArrowReaderBuilder::try_new_async(reader)
74 .await
75 .context(error::OrcReaderSnafu)?;
76 Ok(reader_build.build_async())
77}
78
79pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
80 let reader = new_orc_stream_reader(reader).await?;
81 Ok(reader.schema().as_ref().clone())
82}
83
84#[async_trait]
85impl FileFormat for OrcFormat {
86 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
87 let meta = store
88 .stat(path)
89 .await
90 .context(error::ReadObjectSnafu { path })?;
91 let reader = store
92 .reader(path)
93 .await
94 .context(error::ReadObjectSnafu { path })?;
95 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
96 Ok(schema)
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct OrcOpener {
102 object_store: Arc<ObjectStore>,
103 output_schema: SchemaRef,
104 projection: Option<Vec<usize>>,
105}
106
107impl OrcOpener {
108 pub fn new(
109 object_store: ObjectStore,
110 output_schema: SchemaRef,
111 projection: Option<Vec<usize>>,
112 ) -> Self {
113 Self {
114 object_store: Arc::from(object_store),
115 output_schema,
116 projection,
117 }
118 }
119}
120
121impl FileOpener for OrcOpener {
122 fn open(&self, meta: FileMeta) -> DfResult<FileOpenFuture> {
123 let object_store = self.object_store.clone();
124 let projected_schema = if let Some(projection) = &self.projection {
125 let projected_schema = self
126 .output_schema
127 .project(projection)
128 .map_err(|e| DataFusionError::External(Box::new(e)))?;
129 Arc::new(projected_schema)
130 } else {
131 self.output_schema.clone()
132 };
133 let projection = self.projection.clone();
134 Ok(Box::pin(async move {
135 let path = meta.location().to_string();
136
137 let meta = object_store
138 .stat(&path)
139 .await
140 .map_err(|e| DataFusionError::External(Box::new(e)))?;
141
142 let reader = object_store
143 .reader(&path)
144 .await
145 .map_err(|e| DataFusionError::External(Box::new(e)))?;
146
147 let stream_reader =
148 new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
149 .await
150 .map_err(|e| DataFusionError::External(Box::new(e)))?;
151
152 let stream =
153 RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);
154
155 let adopted = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
156 Ok(adopted.boxed())
157 }))
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use common_test_util::find_workspace_path;
164
165 use super::*;
166 use crate::file_format::FileFormat;
167 use crate::test_util::{format_schema, test_store};
168
169 fn test_data_root() -> String {
170 find_workspace_path("/src/common/datasource/tests/orc")
171 .display()
172 .to_string()
173 }
174
175 #[tokio::test]
176 async fn test_orc_infer_schema() {
177 let store = test_store(&test_data_root());
178 let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
179 let formatted: Vec<_> = format_schema(schema);
180
181 assert_eq!(
182 vec![
183 "double_a: Float64: NULL",
184 "a: Float32: NULL",
185 "b: Boolean: NULL",
186 "str_direct: Utf8: NULL",
187 "d: Utf8: NULL",
188 "e: Utf8: NULL",
189 "f: Utf8: NULL",
190 "int_short_repeated: Int32: NULL",
191 "int_neg_short_repeated: Int32: NULL",
192 "int_delta: Int32: NULL",
193 "int_neg_delta: Int32: NULL",
194 "int_direct: Int32: NULL",
195 "int_neg_direct: Int32: NULL",
196 "bigint_direct: Int64: NULL",
197 "bigint_neg_direct: Int64: NULL",
198 "bigint_other: Int64: NULL",
199 "utf8_increase: Utf8: NULL",
200 "utf8_decrease: Utf8: NULL",
201 "timestamp_simple: Timestamp(Nanosecond, None): NULL",
202 "date_simple: Date32: NULL"
203 ],
204 formatted
205 );
206 }
207}