common_datasource/file_format/
orc.rs1use arrow_schema::Schema;
16use async_trait::async_trait;
17use bytes::Bytes;
18use futures::future::BoxFuture;
19use futures::FutureExt;
20use object_store::ObjectStore;
21use orc_rust::arrow_reader::ArrowReaderBuilder;
22use orc_rust::async_arrow_reader::ArrowStreamReader;
23use orc_rust::reader::AsyncChunkReader;
24use snafu::ResultExt;
25
26use crate::error::{self, Result};
27use crate::file_format::FileFormat;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub struct OrcFormat;
31
32#[derive(Clone)]
33pub struct ReaderAdapter {
34 reader: object_store::Reader,
35 len: u64,
36}
37
38impl ReaderAdapter {
39 pub fn new(reader: object_store::Reader, len: u64) -> Self {
40 Self { reader, len }
41 }
42}
43
44impl AsyncChunkReader for ReaderAdapter {
45 fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
46 async move { Ok(self.len) }.boxed()
47 }
48
49 fn get_bytes(
50 &mut self,
51 offset_from_start: u64,
52 length: u64,
53 ) -> BoxFuture<'_, std::io::Result<Bytes>> {
54 async move {
55 let bytes = self
56 .reader
57 .read(offset_from_start..offset_from_start + length)
58 .await?;
59 Ok(bytes.to_bytes())
60 }
61 .boxed()
62 }
63}
64
65pub async fn new_orc_stream_reader(
66 reader: ReaderAdapter,
67) -> Result<ArrowStreamReader<ReaderAdapter>> {
68 let reader_build = ArrowReaderBuilder::try_new_async(reader)
69 .await
70 .context(error::OrcReaderSnafu)?;
71 Ok(reader_build.build_async())
72}
73
74pub async fn infer_orc_schema(reader: ReaderAdapter) -> Result<Schema> {
75 let reader = new_orc_stream_reader(reader).await?;
76 Ok(reader.schema().as_ref().clone())
77}
78
79#[async_trait]
80impl FileFormat for OrcFormat {
81 async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
82 let meta = store
83 .stat(path)
84 .await
85 .context(error::ReadObjectSnafu { path })?;
86 let reader = store
87 .reader(path)
88 .await
89 .context(error::ReadObjectSnafu { path })?;
90 let schema = infer_orc_schema(ReaderAdapter::new(reader, meta.content_length())).await?;
91 Ok(schema)
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use common_test_util::find_workspace_path;
98
99 use super::*;
100 use crate::file_format::FileFormat;
101 use crate::test_util::{format_schema, test_store};
102
103 fn test_data_root() -> String {
104 find_workspace_path("/src/common/datasource/tests/orc")
105 .display()
106 .to_string()
107 }
108
109 #[tokio::test]
110 async fn test_orc_infer_schema() {
111 let store = test_store(&test_data_root());
112 let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
113 let formatted: Vec<_> = format_schema(schema);
114
115 assert_eq!(
116 vec![
117 "double_a: Float64: NULL",
118 "a: Float32: NULL",
119 "b: Boolean: NULL",
120 "str_direct: Utf8: NULL",
121 "d: Utf8: NULL",
122 "e: Utf8: NULL",
123 "f: Utf8: NULL",
124 "int_short_repeated: Int32: NULL",
125 "int_neg_short_repeated: Int32: NULL",
126 "int_delta: Int32: NULL",
127 "int_neg_delta: Int32: NULL",
128 "int_direct: Int32: NULL",
129 "int_neg_direct: Int32: NULL",
130 "bigint_direct: Int64: NULL",
131 "bigint_neg_direct: Int64: NULL",
132 "bigint_other: Int64: NULL",
133 "utf8_increase: Utf8: NULL",
134 "utf8_decrease: Utf8: NULL",
135 "timestamp_simple: Timestamp(Nanosecond, None): NULL",
136 "date_simple: Date32: NULL"
137 ],
138 formatted
139 );
140 }
141}