Skip to main content

object_store/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Debug, Display, Formatter};
17use std::future::IntoFuture;
18use std::io;
19use std::ops::Range;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use bytes::Bytes;
24use datafusion_object_store::path::Path;
25use datafusion_object_store::{
26    Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
27    MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions,
28    PutOptions, PutPayload, PutResult, UploadPart,
29};
30use futures::stream::BoxStream;
31use futures::{FutureExt, StreamExt, TryStreamExt};
32use opendal::options::CopyOptions;
33use opendal::raw::percent_decode_path;
34use opendal::{Buffer, Operator, OperatorInfo, Writer};
35use tokio::sync::{Mutex, oneshot};
36
37/// OpendalStore implements ObjectStore trait by using opendal.
38///
39/// This allows users to use opendal as an object store without extra cost.
40///
41/// Visit [`opendal::services`] for more information about supported services.
42///
43/// ```no_run
44/// use std::sync::Arc;
45///
46/// use bytes::Bytes;
47/// use object_store::path::Path;
48/// use object_store::ObjectStore;
49/// use object_store_opendal::OpendalStore;
50/// use opendal::services::S3;
51/// use opendal::{Builder, Operator};
52///
53/// #[tokio::main]
54/// async fn main() {
55///    let builder = S3::default()
56///     .access_key_id("my_access_key")
57///     .secret_access_key("my_secret_key")
58///     .endpoint("my_endpoint")
59///     .region("my_region");
60///
61///     // Create a new operator
62///     let operator = Operator::new(builder).unwrap().finish();
63///
64///     // Create a new object store
65///     let object_store = Arc::new(OpendalStore::new(operator));
66///
67///     let path = Path::from("data/nested/test.txt");
68///     let bytes = Bytes::from_static(b"hello, world! I am nested.");
69///
70///     object_store.put(&path, bytes.clone().into()).await.unwrap();
71///
72///     let content = object_store
73///         .get(&path)
74///         .await
75///         .unwrap()
76///         .bytes()
77///         .await
78///         .unwrap();
79///
80///     assert_eq!(content, bytes);
81/// }
82/// ```
83#[derive(Clone)]
84pub struct OpendalStore {
85    info: Arc<OperatorInfo>,
86    inner: Operator,
87}
88
89impl OpendalStore {
90    /// Create OpendalStore by given Operator.
91    pub fn new(op: Operator) -> Self {
92        Self {
93            info: op.info().into(),
94            inner: op,
95        }
96    }
97
98    /// Get the Operator info.
99    pub fn info(&self) -> &OperatorInfo {
100        self.info.as_ref()
101    }
102
103    /// Copy a file from one location to another.
104    async fn copy_request(
105        &self,
106        from: &Path,
107        to: &Path,
108        if_not_exists: bool,
109    ) -> datafusion_object_store::Result<()> {
110        let mut copy_options = CopyOptions::default();
111        if if_not_exists {
112            copy_options.if_not_exists = true;
113        }
114
115        // Perform the copy operation
116        self.inner
117            .copy_options(
118                &percent_decode_path(from.as_ref()),
119                &percent_decode_path(to.as_ref()),
120                copy_options,
121            )
122            .await
123            .map_err(|err| {
124                if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
125                    datafusion_object_store::Error::AlreadyExists {
126                        path: to.to_string(),
127                        source: Box::new(err),
128                    }
129                } else {
130                    format_object_store_error(err, from.as_ref())
131                }
132            })?;
133
134        Ok(())
135    }
136}
137
138impl Debug for OpendalStore {
139    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
140        f.debug_struct("OpendalStore")
141            .field("scheme", &self.info.scheme())
142            .field("name", &self.info.name())
143            .field("root", &self.info.root())
144            .field("capability", &self.info.full_capability())
145            .finish()
146    }
147}
148
149impl Display for OpendalStore {
150    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
151        let info = self.inner.info();
152        write!(
153            f,
154            "Opendal({}, bucket={}, root={})",
155            info.scheme(),
156            info.name(),
157            info.root()
158        )
159    }
160}
161
162impl From<Operator> for OpendalStore {
163    fn from(value: Operator) -> Self {
164        Self::new(value)
165    }
166}
167
168#[async_trait]
169impl ArrowObjectStore for OpendalStore {
170    async fn put_opts(
171        &self,
172        location: &Path,
173        bytes: PutPayload,
174        opts: PutOptions,
175    ) -> datafusion_object_store::Result<PutResult> {
176        let decoded_location = percent_decode_path(location.as_ref());
177        let mut future_write = self
178            .inner
179            .write_with(&decoded_location, Buffer::from_iter(bytes));
180        let opts_mode = opts.mode.clone();
181        match opts.mode {
182            PutMode::Overwrite => {}
183            PutMode::Create => {
184                future_write = future_write.if_not_exists(true);
185            }
186            PutMode::Update(update_version) => {
187                let Some(etag) = update_version.e_tag else {
188                    return Err(datafusion_object_store::Error::NotSupported {
189                        source: Box::new(opendal::Error::new(
190                            opendal::ErrorKind::Unsupported,
191                            "etag is required for conditional put",
192                        )),
193                    });
194                };
195                future_write = future_write.if_match(etag.as_str());
196            }
197        }
198        let rp = future_write.await.map_err(|err| {
199            match format_object_store_error(err, location.as_ref()) {
200                datafusion_object_store::Error::Precondition { path, source }
201                    if opts_mode == PutMode::Create =>
202                {
203                    datafusion_object_store::Error::AlreadyExists { path, source }
204                }
205                e => e,
206            }
207        })?;
208
209        let e_tag = rp.etag().map(|s| s.to_string());
210        let version = rp.version().map(|s| s.to_string());
211
212        Ok(PutResult { e_tag, version })
213    }
214
215    async fn put_multipart(
216        &self,
217        location: &Path,
218    ) -> datafusion_object_store::Result<Box<dyn MultipartUpload>> {
219        let decoded_location = percent_decode_path(location.as_ref());
220        let writer = self
221            .inner
222            .writer_with(&decoded_location)
223            .concurrent(8)
224            .await
225            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
226        let upload = OpendalMultipartUpload::new(writer, location.clone());
227
228        Ok(Box::new(upload))
229    }
230
231    async fn put_multipart_opts(
232        &self,
233        location: &Path,
234        opts: PutMultipartOptions,
235    ) -> datafusion_object_store::Result<Box<dyn MultipartUpload>> {
236        const DEFAULT_CONCURRENT: usize = 8;
237
238        let mut options = opendal::options::WriteOptions {
239            concurrent: DEFAULT_CONCURRENT,
240            ..Default::default()
241        };
242
243        let mut user_metadata = HashMap::new();
244
245        for (key, value) in opts.attributes.iter() {
246            match key {
247                Attribute::CacheControl => {
248                    options.cache_control = Some(value.to_string());
249                }
250                Attribute::ContentDisposition => {
251                    options.content_disposition = Some(value.to_string());
252                }
253                Attribute::ContentEncoding => {
254                    options.content_encoding = Some(value.to_string());
255                }
256                Attribute::ContentLanguage => continue,
257                Attribute::ContentType => {
258                    options.content_type = Some(value.to_string());
259                }
260                Attribute::Metadata(k) => {
261                    user_metadata.insert(k.to_string(), value.to_string());
262                }
263                _ => {}
264            }
265        }
266
267        if !user_metadata.is_empty() {
268            options.user_metadata = Some(user_metadata);
269        }
270
271        let decoded_location = percent_decode_path(location.as_ref());
272        let writer = self
273            .inner
274            .writer_options(&decoded_location, options)
275            .await
276            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
277        let upload = OpendalMultipartUpload::new(writer, location.clone());
278
279        Ok(Box::new(upload))
280    }
281
282    async fn get_opts(
283        &self,
284        location: &Path,
285        options: GetOptions,
286    ) -> datafusion_object_store::Result<GetResult> {
287        let raw_location = percent_decode_path(location.as_ref());
288        let meta = {
289            let mut s = self.inner.stat_with(&raw_location);
290            if let Some(version) = &options.version {
291                s = s.version(version.as_str())
292            }
293            if let Some(if_match) = &options.if_match {
294                s = s.if_match(if_match.as_str());
295            }
296            if let Some(if_none_match) = &options.if_none_match {
297                s = s.if_none_match(if_none_match.as_str());
298            }
299            if let Some(if_modified_since) =
300                options.if_modified_since.and_then(datetime_to_timestamp)
301            {
302                s = s.if_modified_since(if_modified_since);
303            }
304            if let Some(if_unmodified_since) =
305                options.if_unmodified_since.and_then(datetime_to_timestamp)
306            {
307                s = s.if_unmodified_since(if_unmodified_since);
308            }
309            s.await
310                .map_err(|err| format_object_store_error(err, location.as_ref()))?
311        };
312
313        let mut attributes = Attributes::new();
314        if let Some(user_meta) = meta.user_metadata() {
315            for (key, value) in user_meta {
316                attributes.insert(
317                    Attribute::Metadata(key.clone().into()),
318                    value.clone().into(),
319                );
320            }
321        }
322
323        let meta = ObjectMeta {
324            location: location.clone(),
325            last_modified: meta
326                .last_modified()
327                .and_then(timestamp_to_datetime)
328                .unwrap_or_default(),
329            size: meta.content_length(),
330            e_tag: meta.etag().map(|x| x.to_string()),
331            version: meta.version().map(|x| x.to_string()),
332        };
333
334        if options.head {
335            return Ok(GetResult {
336                payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
337                range: 0..0,
338                meta,
339                attributes,
340            });
341        }
342
343        let reader = {
344            let mut r = self.inner.reader_with(raw_location.as_ref());
345            if let Some(version) = options.version {
346                r = r.version(version.as_str());
347            }
348            if let Some(if_match) = options.if_match {
349                r = r.if_match(if_match.as_str());
350            }
351            if let Some(if_none_match) = options.if_none_match {
352                r = r.if_none_match(if_none_match.as_str());
353            }
354            if let Some(if_modified_since) =
355                options.if_modified_since.and_then(datetime_to_timestamp)
356            {
357                r = r.if_modified_since(if_modified_since);
358            }
359            if let Some(if_unmodified_since) =
360                options.if_unmodified_since.and_then(datetime_to_timestamp)
361            {
362                r = r.if_unmodified_since(if_unmodified_since);
363            }
364            r.await
365                .map_err(|err| format_object_store_error(err, location.as_ref()))?
366        };
367
368        let read_range = match options.range {
369            Some(GetRange::Bounded(r)) => {
370                if r.start >= r.end || r.start >= meta.size {
371                    0..0
372                } else {
373                    let end = r.end.min(meta.size);
374                    r.start..end
375                }
376            }
377            Some(GetRange::Offset(r)) => {
378                if r < meta.size {
379                    r..meta.size
380                } else {
381                    0..0
382                }
383            }
384            Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
385            _ => 0..meta.size,
386        };
387
388        let stream = reader
389            .into_bytes_stream(read_range.start..read_range.end)
390            .await
391            .map_err(|err| format_object_store_error(err, location.as_ref()))?
392            .map_ok(|buf| buf)
393            .map_err(|err: io::Error| datafusion_object_store::Error::Generic {
394                store: "IoError",
395                source: Box::new(err),
396            });
397
398        Ok(GetResult {
399            payload: GetResultPayload::Stream(Box::pin(stream)),
400            range: read_range.start..read_range.end,
401            meta,
402            attributes,
403        })
404    }
405
406    async fn get_range(
407        &self,
408        location: &Path,
409        range: Range<u64>,
410    ) -> datafusion_object_store::Result<Bytes> {
411        let raw_location = percent_decode_path(location.as_ref());
412        let reader = self
413            .inner
414            .reader_with(&raw_location)
415            .await
416            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
417
418        reader
419            .read(range.start..range.end)
420            .await
421            .map(|buf| buf.to_bytes())
422            .map_err(|err| format_object_store_error(err, location.as_ref()))
423    }
424
425    async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> {
426        let decoded_location = percent_decode_path(location.as_ref());
427        self.inner
428            .delete(&decoded_location)
429            .await
430            .map_err(|err| format_object_store_error(err, location.as_ref()))?;
431
432        Ok(())
433    }
434
435    fn list(
436        &self,
437        prefix: Option<&Path>,
438    ) -> BoxStream<'static, datafusion_object_store::Result<ObjectMeta>> {
439        // object_store `Path` always removes trailing slash
440        // need to add it back
441        let path = prefix.map_or("".into(), |x| {
442            format!("{}/", percent_decode_path(x.as_ref()))
443        });
444
445        let this = self.clone();
446        let fut = async move {
447            let stream = this
448                .inner
449                .lister_with(&path)
450                .recursive(true)
451                .await
452                .map_err(|err| format_object_store_error(err, &path))?;
453
454            let stream = stream.then(|res| async {
455                let entry = res.map_err(|err| format_object_store_error(err, ""))?;
456                let meta = entry.metadata();
457
458                Ok(format_object_meta(entry.path(), meta))
459            });
460            Ok::<_, datafusion_object_store::Error>(stream)
461        };
462
463        fut.into_stream().try_flatten().boxed()
464    }
465
466    fn list_with_offset(
467        &self,
468        prefix: Option<&Path>,
469        offset: &Path,
470    ) -> BoxStream<'static, datafusion_object_store::Result<ObjectMeta>> {
471        let path = prefix.map_or("".into(), |x| {
472            format!("{}/", percent_decode_path(x.as_ref()))
473        });
474        let offset = offset.clone();
475
476        // clone self for 'static lifetime
477        // clone self is cheap
478        let this = self.clone();
479
480        let fut = async move {
481            let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
482            let mut fut = this.inner.lister_with(&path).recursive(true);
483
484            // Use native start_after support if possible.
485            if list_with_start_after {
486                fut = fut.start_after(offset.as_ref());
487            }
488
489            let lister = fut
490                .await
491                .map_err(|err| format_object_store_error(err, &path))?
492                .then(move |entry| {
493                    let path = path.clone();
494                    let this = this.clone();
495                    async move {
496                        let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
497                        let (path, metadata) = entry.into_parts();
498
499                        // If it's a dir or last_modified is present, we can use it directly.
500                        if metadata.is_dir() || metadata.last_modified().is_some() {
501                            let object_meta = format_object_meta(&path, &metadata);
502                            return Ok(object_meta);
503                        }
504
505                        let metadata = this
506                            .inner
507                            .stat(&path)
508                            .await
509                            .map_err(|err| format_object_store_error(err, &path))?;
510                        let object_meta = format_object_meta(&path, &metadata);
511                        Ok::<_, datafusion_object_store::Error>(object_meta)
512                    }
513                })
514                .boxed();
515
516            let stream = if list_with_start_after {
517                lister
518            } else {
519                lister
520                    .try_filter(move |entry| futures::future::ready(entry.location > offset))
521                    .boxed()
522            };
523
524            Ok::<_, datafusion_object_store::Error>(stream)
525        };
526
527        fut.into_stream().try_flatten().boxed()
528    }
529
530    async fn list_with_delimiter(
531        &self,
532        prefix: Option<&Path>,
533    ) -> datafusion_object_store::Result<ListResult> {
534        let path = prefix.map_or("".into(), |x| {
535            format!("{}/", percent_decode_path(x.as_ref()))
536        });
537        let mut stream = self
538            .inner
539            .lister_with(&path)
540            .into_future()
541            .await
542            .map_err(|err| format_object_store_error(err, &path))?;
543
544        let mut common_prefixes = Vec::new();
545        let mut objects = Vec::new();
546
547        while let Some(res) = stream.next().await {
548            let entry = res.map_err(|err| format_object_store_error(err, ""))?;
549            let meta = entry.metadata();
550
551            if meta.is_dir() {
552                common_prefixes.push(entry.path().into());
553            } else if meta.last_modified().is_some() {
554                objects.push(format_object_meta(entry.path(), meta));
555            } else {
556                let meta = self
557                    .inner
558                    .stat(entry.path())
559                    .await
560                    .map_err(|err| format_object_store_error(err, entry.path()))?;
561                objects.push(format_object_meta(entry.path(), &meta));
562            }
563        }
564
565        Ok(ListResult {
566            common_prefixes,
567            objects,
568        })
569    }
570
571    async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
572        self.copy_request(from, to, false).await
573    }
574
575    async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
576        self.inner
577            .rename(
578                &percent_decode_path(from.as_ref()),
579                &percent_decode_path(to.as_ref()),
580            )
581            .await
582            .map_err(|err| format_object_store_error(err, from.as_ref()))?;
583
584        Ok(())
585    }
586
587    async fn copy_if_not_exists(
588        &self,
589        from: &Path,
590        to: &Path,
591    ) -> datafusion_object_store::Result<()> {
592        self.copy_request(from, to, true).await
593    }
594}
595
596/// `MultipartUpload` implementation based on `Writer` in opendal.
597///
598/// # Notes
599///
600/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing
601/// implementation do. Instead, we just write the part and notify the next task to be written.
602///
603/// The lock here doesn't really involve the write process, it's just for the notify mechanism.
604struct OpendalMultipartUpload {
605    writer: Arc<Mutex<Writer>>,
606    location: Path,
607    next_notify: oneshot::Receiver<()>,
608}
609
610impl OpendalMultipartUpload {
611    fn new(writer: Writer, location: Path) -> Self {
612        // an immediately dropped sender for the first part to write without waiting
613        let (_, rx) = oneshot::channel();
614
615        Self {
616            writer: Arc::new(Mutex::new(writer)),
617            location,
618            next_notify: rx,
619        }
620    }
621}
622
623#[async_trait]
624impl MultipartUpload for OpendalMultipartUpload {
625    fn put_part(&mut self, data: PutPayload) -> UploadPart {
626        let writer = self.writer.clone();
627        let location = self.location.clone();
628
629        // Generate next notify which will be notified after the current part is written.
630        let (tx, rx) = oneshot::channel();
631        // Fetch the notify for current part to wait for it to be written.
632        let last_rx = std::mem::replace(&mut self.next_notify, rx);
633
634        async move {
635            // Wait for the previous part to be written
636            let _ = last_rx.await;
637
638            let mut writer = writer.lock().await;
639            let result = writer
640                .write(Buffer::from_iter(data))
641                .await
642                .map_err(|err| format_object_store_error(err, location.as_ref()));
643
644            // Notify the next part to be written
645            drop(tx);
646
647            result
648        }
649        .boxed()
650    }
651
652    async fn complete(&mut self) -> datafusion_object_store::Result<PutResult> {
653        let mut writer = self.writer.lock().await;
654        let metadata = writer
655            .close()
656            .await
657            .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
658
659        let e_tag = metadata.etag().map(|s| s.to_string());
660        let version = metadata.version().map(|s| s.to_string());
661
662        Ok(PutResult { e_tag, version })
663    }
664
665    async fn abort(&mut self) -> datafusion_object_store::Result<()> {
666        let mut writer = self.writer.lock().await;
667        writer
668            .abort()
669            .await
670            .map_err(|err| format_object_store_error(err, self.location.as_ref()))
671    }
672}
673
674impl Debug for OpendalMultipartUpload {
675    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
676        f.debug_struct("OpendalMultipartUpload")
677            .field("location", &self.location)
678            .finish()
679    }
680}
681
682fn format_object_store_error(err: opendal::Error, path: &str) -> datafusion_object_store::Error {
683    match err.kind() {
684        opendal::ErrorKind::NotFound => datafusion_object_store::Error::NotFound {
685            path: path.to_string(),
686            source: Box::new(err),
687        },
688        opendal::ErrorKind::Unsupported => datafusion_object_store::Error::NotSupported {
689            source: Box::new(err),
690        },
691        opendal::ErrorKind::AlreadyExists => datafusion_object_store::Error::AlreadyExists {
692            path: path.to_string(),
693            source: Box::new(err),
694        },
695        opendal::ErrorKind::ConditionNotMatch => datafusion_object_store::Error::Precondition {
696            path: path.to_string(),
697            source: Box::new(err),
698        },
699        kind => datafusion_object_store::Error::Generic {
700            store: kind.into_static(),
701            source: Box::new(err),
702        },
703    }
704}
705
706fn format_object_meta(path: &str, meta: &opendal::Metadata) -> ObjectMeta {
707    ObjectMeta {
708        location: path.into(),
709        last_modified: meta
710            .last_modified()
711            .and_then(timestamp_to_datetime)
712            .unwrap_or_default(),
713        size: meta.content_length(),
714        e_tag: meta.etag().map(|x| x.to_string()),
715        version: meta.version().map(|x| x.to_string()),
716    }
717}
718
719fn timestamp_to_datetime(ts: opendal::raw::Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
720    let ts = ts.into_inner();
721    chrono::DateTime::<chrono::Utc>::from_timestamp(ts.as_second(), ts.subsec_nanosecond() as u32)
722}
723
724fn datetime_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Option<opendal::raw::Timestamp> {
725    opendal::raw::Timestamp::new(dt.timestamp(), dt.timestamp_subsec_nanos() as i32).ok()
726}
727
728#[cfg(test)]
729mod tests {
730    use std::sync::Arc;
731
732    use bytes::Bytes;
733    use datafusion_object_store::path::Path;
734    use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart};
735    use opendal::{Operator, services};
736    use rand::{Rng, RngCore};
737
738    use super::*;
739
740    async fn create_test_object_store() -> Arc<dyn ArrowObjectStore> {
741        let op = Operator::new(services::Memory::default()).unwrap().finish();
742        let object_store = Arc::new(OpendalStore::new(op));
743
744        let path: Path = "data/test.txt".into();
745        let bytes = Bytes::from_static(b"hello, world!");
746        object_store.put(&path, bytes.into()).await.unwrap();
747
748        let path: Path = "data/nested/test.txt".into();
749        let bytes = Bytes::from_static(b"hello, world! I am nested.");
750        object_store.put(&path, bytes.into()).await.unwrap();
751
752        object_store
753    }
754
755    #[tokio::test]
756    async fn test_basic() {
757        let op = Operator::new(services::Memory::default()).unwrap().finish();
758        let object_store: Arc<dyn ArrowObjectStore> = Arc::new(OpendalStore::new(op));
759
760        // Retrieve a specific file
761        let path: Path = "data/test.txt".into();
762
763        let bytes = Bytes::from_static(b"hello, world!");
764        object_store.put(&path, bytes.clone().into()).await.unwrap();
765
766        let meta = object_store.head(&path).await.unwrap();
767
768        assert_eq!(meta.size, 13);
769
770        assert_eq!(
771            object_store
772                .get(&path)
773                .await
774                .unwrap()
775                .bytes()
776                .await
777                .unwrap(),
778            bytes
779        );
780    }
781
782    #[tokio::test]
783    async fn test_put_multipart() {
784        let op = Operator::new(services::Memory::default()).unwrap().finish();
785        let object_store: Arc<dyn ArrowObjectStore> = Arc::new(OpendalStore::new(op));
786
787        let mut rng = rand::rng();
788
789        // Case complete
790        let path: Path = "data/test_complete.txt".into();
791        let upload = object_store.put_multipart(&path).await.unwrap();
792
793        let mut write = WriteMultipart::new(upload);
794
795        let mut all_bytes = vec![];
796        let round = rng.random_range(1..=1024);
797        for _ in 0..round {
798            let size = rng.random_range(1..=1024);
799            let mut bytes = vec![0; size];
800            rng.fill_bytes(&mut bytes);
801
802            all_bytes.extend_from_slice(&bytes);
803            write.put(bytes.into());
804        }
805
806        let _ = write.finish().await.unwrap();
807
808        let meta = object_store.head(&path).await.unwrap();
809
810        assert_eq!(meta.size, all_bytes.len() as u64);
811
812        assert_eq!(
813            object_store
814                .get(&path)
815                .await
816                .unwrap()
817                .bytes()
818                .await
819                .unwrap(),
820            Bytes::from(all_bytes)
821        );
822
823        // Case abort
824        let path: Path = "data/test_abort.txt".into();
825        let mut upload = object_store.put_multipart(&path).await.unwrap();
826        upload.put_part(vec![1; 1024].into()).await.unwrap();
827        upload.abort().await.unwrap();
828
829        let res = object_store.head(&path).await;
830        let err = res.unwrap_err();
831
832        assert!(matches!(
833            err,
834            datafusion_object_store::Error::NotFound { .. }
835        ))
836    }
837
838    #[tokio::test]
839    async fn test_list() {
840        let object_store = create_test_object_store().await;
841        let path: Path = "data/".into();
842        let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
843        assert_eq!(results.len(), 2);
844        let mut locations = results
845            .iter()
846            .map(|x| x.as_ref().unwrap().location.as_ref())
847            .collect::<Vec<_>>();
848
849        let expected_files = vec![
850            (
851                "data/nested/test.txt",
852                Bytes::from_static(b"hello, world! I am nested."),
853            ),
854            ("data/test.txt", Bytes::from_static(b"hello, world!")),
855        ];
856
857        let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
858
859        locations.sort();
860        assert_eq!(locations, expected_locations);
861
862        for (location, bytes) in expected_files {
863            let path: Path = location.into();
864            assert_eq!(
865                object_store
866                    .get(&path)
867                    .await
868                    .unwrap()
869                    .bytes()
870                    .await
871                    .unwrap(),
872                bytes
873            );
874        }
875    }
876
877    #[tokio::test]
878    async fn test_list_with_delimiter() {
879        let object_store = create_test_object_store().await;
880        let path: Path = "data/".into();
881        let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
882        assert_eq!(result.objects.len(), 1);
883        assert_eq!(result.common_prefixes.len(), 1);
884        assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
885        assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
886    }
887
888    #[tokio::test]
889    async fn test_list_with_offset() {
890        let object_store = create_test_object_store().await;
891        let path: Path = "data/".into();
892        let offset: Path = "data/nested/test.txt".into();
893        let result = object_store
894            .list_with_offset(Some(&path), &offset)
895            .collect::<Vec<_>>()
896            .await;
897        assert_eq!(result.len(), 1);
898        assert_eq!(
899            result[0].as_ref().unwrap().location.as_ref(),
900            "data/test.txt"
901        );
902    }
903
904    mod stat_counter {
905        use std::sync::atomic::{AtomicUsize, Ordering};
906
907        use super::*;
908
909        #[derive(Debug, Clone)]
910        pub struct StatCounterLayer {
911            count: Arc<AtomicUsize>,
912        }
913
914        impl StatCounterLayer {
915            pub fn new(count: Arc<AtomicUsize>) -> Self {
916                Self { count }
917            }
918        }
919
920        impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
921            type LayeredAccess = StatCounterAccessor<A>;
922
923            fn layer(&self, inner: A) -> Self::LayeredAccess {
924                StatCounterAccessor {
925                    inner,
926                    count: self.count.clone(),
927                }
928            }
929        }
930
931        #[derive(Debug, Clone)]
932        pub struct StatCounterAccessor<A> {
933            inner: A,
934            count: Arc<AtomicUsize>,
935        }
936
937        impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
938            type Inner = A;
939            type Reader = A::Reader;
940            type Writer = A::Writer;
941            type Lister = A::Lister;
942            type Deleter = A::Deleter;
943
944            fn inner(&self) -> &Self::Inner {
945                &self.inner
946            }
947
948            async fn stat(
949                &self,
950                path: &str,
951                args: opendal::raw::OpStat,
952            ) -> opendal::Result<opendal::raw::RpStat> {
953                self.count.fetch_add(1, Ordering::SeqCst);
954                self.inner.stat(path, args).await
955            }
956
957            async fn read(
958                &self,
959                path: &str,
960                args: opendal::raw::OpRead,
961            ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
962                self.inner.read(path, args).await
963            }
964
965            async fn write(
966                &self,
967                path: &str,
968                args: opendal::raw::OpWrite,
969            ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
970                self.inner.write(path, args).await
971            }
972
973            async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
974                self.inner.delete().await
975            }
976
977            async fn list(
978                &self,
979                path: &str,
980                args: opendal::raw::OpList,
981            ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
982                self.inner.list(path, args).await
983            }
984
985            async fn copy(
986                &self,
987                from: &str,
988                to: &str,
989                args: opendal::raw::OpCopy,
990            ) -> opendal::Result<opendal::raw::RpCopy> {
991                self.inner.copy(from, to, args).await
992            }
993
994            async fn rename(
995                &self,
996                from: &str,
997                to: &str,
998                args: opendal::raw::OpRename,
999            ) -> opendal::Result<opendal::raw::RpRename> {
1000                self.inner.rename(from, to, args).await
1001            }
1002        }
1003    }
1004
1005    #[tokio::test]
1006    async fn test_get_range_no_stat() {
1007        use std::sync::atomic::{AtomicUsize, Ordering};
1008
1009        // Create a stat counter and operator with tracking layer
1010        let stat_count = Arc::new(AtomicUsize::new(0));
1011        let op = Operator::new(opendal::services::Memory::default())
1012            .unwrap()
1013            .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
1014            .finish();
1015        let store = OpendalStore::new(op);
1016
1017        // Create a test file
1018        let location = "test_get_range.txt".into();
1019        let value = Bytes::from_static(b"Hello, world!");
1020        store.put(&location, value.clone().into()).await.unwrap();
1021
1022        // Reset counter after put
1023        stat_count.store(0, Ordering::SeqCst);
1024
1025        // Test 1: get_range should NOT call stat()
1026        let ret = store.get_range(&location, 0..5).await.unwrap();
1027        assert_eq!(Bytes::from_static(b"Hello"), ret);
1028        assert_eq!(
1029            stat_count.load(Ordering::SeqCst),
1030            0,
1031            "get_range should not call stat()"
1032        );
1033
1034        // Reset counter
1035        stat_count.store(0, Ordering::SeqCst);
1036
1037        // Test 2: get_opts SHOULD call stat() to get metadata
1038        let opts = datafusion_object_store::GetOptions {
1039            range: Some(datafusion_object_store::GetRange::Bounded(0..5)),
1040            ..Default::default()
1041        };
1042        let ret = store.get_opts(&location, opts).await.unwrap();
1043        let data = ret.bytes().await.unwrap();
1044        assert_eq!(Bytes::from_static(b"Hello"), data);
1045        assert!(
1046            stat_count.load(Ordering::SeqCst) > 0,
1047            "get_opts should call stat() to get metadata"
1048        );
1049
1050        // Cleanup
1051        store.delete(&location).await.unwrap();
1052    }
1053}