common_datasource/
lister.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::{future, TryStreamExt};
16use object_store::{Entry, ObjectStore};
17use regex::Regex;
18use snafu::ResultExt;
19
20use crate::error::{self, Result};
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum Source {
23    Filename(String),
24    Dir,
25}
26
27pub struct Lister {
28    object_store: ObjectStore,
29    source: Source,
30    root: String,
31    regex: Option<Regex>,
32}
33
34impl Lister {
35    pub fn new(
36        object_store: ObjectStore,
37        source: Source,
38        root: String,
39        regex: Option<Regex>,
40    ) -> Self {
41        Lister {
42            object_store,
43            source,
44            root,
45            regex,
46        }
47    }
48
49    pub async fn list(&self) -> Result<Vec<Entry>> {
50        match &self.source {
51            Source::Dir => {
52                let streamer = self
53                    .object_store
54                    .lister_with("/")
55                    .await
56                    .context(error::ListObjectsSnafu { path: &self.root })?;
57
58                streamer
59                    .try_filter(|f| {
60                        let res = self
61                            .regex
62                            .as_ref()
63                            .map(|x| x.is_match(f.name()))
64                            .unwrap_or(true);
65                        future::ready(res)
66                    })
67                    .try_collect::<Vec<_>>()
68                    .await
69                    .context(error::ListObjectsSnafu { path: &self.root })
70            }
71            Source::Filename(filename) => {
72                // make sure this file exists
73                let _ = self.object_store.stat(filename).await.with_context(|_| {
74                    error::ListObjectsSnafu {
75                        path: format!("{}{}", &self.root, filename),
76                    }
77                })?;
78
79                Ok(self
80                    .object_store
81                    .list_with("/")
82                    .await
83                    .context(error::ListObjectsSnafu { path: &self.root })?
84                    .into_iter()
85                    .find(|f| f.name() == filename)
86                    .map(|f| vec![f])
87                    .unwrap_or_default())
88            }
89        }
90    }
91}