common_datasource/
lister.rs1use futures::{future, TryStreamExt};
16use object_store::{Entry, ObjectStore};
17use regex::Regex;
18use snafu::ResultExt;
19
20use crate::error::{self, Result};
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum Source {
23 Filename(String),
24 Dir,
25}
26
27pub struct Lister {
28 object_store: ObjectStore,
29 source: Source,
30 root: String,
31 regex: Option<Regex>,
32}
33
34impl Lister {
35 pub fn new(
36 object_store: ObjectStore,
37 source: Source,
38 root: String,
39 regex: Option<Regex>,
40 ) -> Self {
41 Lister {
42 object_store,
43 source,
44 root,
45 regex,
46 }
47 }
48
49 pub async fn list(&self) -> Result<Vec<Entry>> {
50 match &self.source {
51 Source::Dir => {
52 let streamer = self
53 .object_store
54 .lister_with("/")
55 .await
56 .context(error::ListObjectsSnafu { path: &self.root })?;
57
58 streamer
59 .try_filter(|f| {
60 let res = self
61 .regex
62 .as_ref()
63 .map(|x| x.is_match(f.name()))
64 .unwrap_or(true);
65 future::ready(res)
66 })
67 .try_collect::<Vec<_>>()
68 .await
69 .context(error::ListObjectsSnafu { path: &self.root })
70 }
71 Source::Filename(filename) => {
72 let _ = self.object_store.stat(filename).await.with_context(|_| {
74 error::ListObjectsSnafu {
75 path: format!("{}{}", &self.root, filename),
76 }
77 })?;
78
79 Ok(self
80 .object_store
81 .list_with("/")
82 .await
83 .context(error::ListObjectsSnafu { path: &self.root })?
84 .into_iter()
85 .find(|f| f.name() == filename)
86 .map(|f| vec![f])
87 .unwrap_or_default())
88 }
89 }
90 }
91}