1use std::collections::HashMap;
16use std::fmt::{self, Debug, Display, Formatter};
17use std::future::IntoFuture;
18use std::io;
19use std::ops::Range;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use bytes::Bytes;
24use datafusion_object_store::path::Path;
25use datafusion_object_store::{
26 Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
27 MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions,
28 PutOptions, PutPayload, PutResult, UploadPart,
29};
30use futures::stream::BoxStream;
31use futures::{FutureExt, StreamExt, TryStreamExt};
32use opendal::options::CopyOptions;
33use opendal::raw::percent_decode_path;
34use opendal::{Buffer, Operator, OperatorInfo, Writer};
35use tokio::sync::{Mutex, oneshot};
36
37#[derive(Clone)]
84pub struct OpendalStore {
85 info: Arc<OperatorInfo>,
86 inner: Operator,
87}
88
89impl OpendalStore {
90 pub fn new(op: Operator) -> Self {
92 Self {
93 info: op.info().into(),
94 inner: op,
95 }
96 }
97
98 pub fn info(&self) -> &OperatorInfo {
100 self.info.as_ref()
101 }
102
103 async fn copy_request(
105 &self,
106 from: &Path,
107 to: &Path,
108 if_not_exists: bool,
109 ) -> datafusion_object_store::Result<()> {
110 let mut copy_options = CopyOptions::default();
111 if if_not_exists {
112 copy_options.if_not_exists = true;
113 }
114
115 self.inner
117 .copy_options(
118 &percent_decode_path(from.as_ref()),
119 &percent_decode_path(to.as_ref()),
120 copy_options,
121 )
122 .await
123 .map_err(|err| {
124 if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
125 datafusion_object_store::Error::AlreadyExists {
126 path: to.to_string(),
127 source: Box::new(err),
128 }
129 } else {
130 format_object_store_error(err, from.as_ref())
131 }
132 })?;
133
134 Ok(())
135 }
136}
137
138impl Debug for OpendalStore {
139 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
140 f.debug_struct("OpendalStore")
141 .field("scheme", &self.info.scheme())
142 .field("name", &self.info.name())
143 .field("root", &self.info.root())
144 .field("capability", &self.info.full_capability())
145 .finish()
146 }
147}
148
149impl Display for OpendalStore {
150 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
151 let info = self.inner.info();
152 write!(
153 f,
154 "Opendal({}, bucket={}, root={})",
155 info.scheme(),
156 info.name(),
157 info.root()
158 )
159 }
160}
161
162impl From<Operator> for OpendalStore {
163 fn from(value: Operator) -> Self {
164 Self::new(value)
165 }
166}
167
168#[async_trait]
169impl ArrowObjectStore for OpendalStore {
170 async fn put_opts(
171 &self,
172 location: &Path,
173 bytes: PutPayload,
174 opts: PutOptions,
175 ) -> datafusion_object_store::Result<PutResult> {
176 let decoded_location = percent_decode_path(location.as_ref());
177 let mut future_write = self
178 .inner
179 .write_with(&decoded_location, Buffer::from_iter(bytes));
180 let opts_mode = opts.mode.clone();
181 match opts.mode {
182 PutMode::Overwrite => {}
183 PutMode::Create => {
184 future_write = future_write.if_not_exists(true);
185 }
186 PutMode::Update(update_version) => {
187 let Some(etag) = update_version.e_tag else {
188 return Err(datafusion_object_store::Error::NotSupported {
189 source: Box::new(opendal::Error::new(
190 opendal::ErrorKind::Unsupported,
191 "etag is required for conditional put",
192 )),
193 });
194 };
195 future_write = future_write.if_match(etag.as_str());
196 }
197 }
198 let rp = future_write.await.map_err(|err| {
199 match format_object_store_error(err, location.as_ref()) {
200 datafusion_object_store::Error::Precondition { path, source }
201 if opts_mode == PutMode::Create =>
202 {
203 datafusion_object_store::Error::AlreadyExists { path, source }
204 }
205 e => e,
206 }
207 })?;
208
209 let e_tag = rp.etag().map(|s| s.to_string());
210 let version = rp.version().map(|s| s.to_string());
211
212 Ok(PutResult { e_tag, version })
213 }
214
215 async fn put_multipart(
216 &self,
217 location: &Path,
218 ) -> datafusion_object_store::Result<Box<dyn MultipartUpload>> {
219 let decoded_location = percent_decode_path(location.as_ref());
220 let writer = self
221 .inner
222 .writer_with(&decoded_location)
223 .concurrent(8)
224 .await
225 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
226 let upload = OpendalMultipartUpload::new(writer, location.clone());
227
228 Ok(Box::new(upload))
229 }
230
231 async fn put_multipart_opts(
232 &self,
233 location: &Path,
234 opts: PutMultipartOptions,
235 ) -> datafusion_object_store::Result<Box<dyn MultipartUpload>> {
236 const DEFAULT_CONCURRENT: usize = 8;
237
238 let mut options = opendal::options::WriteOptions {
239 concurrent: DEFAULT_CONCURRENT,
240 ..Default::default()
241 };
242
243 let mut user_metadata = HashMap::new();
244
245 for (key, value) in opts.attributes.iter() {
246 match key {
247 Attribute::CacheControl => {
248 options.cache_control = Some(value.to_string());
249 }
250 Attribute::ContentDisposition => {
251 options.content_disposition = Some(value.to_string());
252 }
253 Attribute::ContentEncoding => {
254 options.content_encoding = Some(value.to_string());
255 }
256 Attribute::ContentLanguage => continue,
257 Attribute::ContentType => {
258 options.content_type = Some(value.to_string());
259 }
260 Attribute::Metadata(k) => {
261 user_metadata.insert(k.to_string(), value.to_string());
262 }
263 _ => {}
264 }
265 }
266
267 if !user_metadata.is_empty() {
268 options.user_metadata = Some(user_metadata);
269 }
270
271 let decoded_location = percent_decode_path(location.as_ref());
272 let writer = self
273 .inner
274 .writer_options(&decoded_location, options)
275 .await
276 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
277 let upload = OpendalMultipartUpload::new(writer, location.clone());
278
279 Ok(Box::new(upload))
280 }
281
282 async fn get_opts(
283 &self,
284 location: &Path,
285 options: GetOptions,
286 ) -> datafusion_object_store::Result<GetResult> {
287 let raw_location = percent_decode_path(location.as_ref());
288 let meta = {
289 let mut s = self.inner.stat_with(&raw_location);
290 if let Some(version) = &options.version {
291 s = s.version(version.as_str())
292 }
293 if let Some(if_match) = &options.if_match {
294 s = s.if_match(if_match.as_str());
295 }
296 if let Some(if_none_match) = &options.if_none_match {
297 s = s.if_none_match(if_none_match.as_str());
298 }
299 if let Some(if_modified_since) =
300 options.if_modified_since.and_then(datetime_to_timestamp)
301 {
302 s = s.if_modified_since(if_modified_since);
303 }
304 if let Some(if_unmodified_since) =
305 options.if_unmodified_since.and_then(datetime_to_timestamp)
306 {
307 s = s.if_unmodified_since(if_unmodified_since);
308 }
309 s.await
310 .map_err(|err| format_object_store_error(err, location.as_ref()))?
311 };
312
313 let mut attributes = Attributes::new();
314 if let Some(user_meta) = meta.user_metadata() {
315 for (key, value) in user_meta {
316 attributes.insert(
317 Attribute::Metadata(key.clone().into()),
318 value.clone().into(),
319 );
320 }
321 }
322
323 let meta = ObjectMeta {
324 location: location.clone(),
325 last_modified: meta
326 .last_modified()
327 .and_then(timestamp_to_datetime)
328 .unwrap_or_default(),
329 size: meta.content_length(),
330 e_tag: meta.etag().map(|x| x.to_string()),
331 version: meta.version().map(|x| x.to_string()),
332 };
333
334 if options.head {
335 return Ok(GetResult {
336 payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
337 range: 0..0,
338 meta,
339 attributes,
340 });
341 }
342
343 let reader = {
344 let mut r = self.inner.reader_with(raw_location.as_ref());
345 if let Some(version) = options.version {
346 r = r.version(version.as_str());
347 }
348 if let Some(if_match) = options.if_match {
349 r = r.if_match(if_match.as_str());
350 }
351 if let Some(if_none_match) = options.if_none_match {
352 r = r.if_none_match(if_none_match.as_str());
353 }
354 if let Some(if_modified_since) =
355 options.if_modified_since.and_then(datetime_to_timestamp)
356 {
357 r = r.if_modified_since(if_modified_since);
358 }
359 if let Some(if_unmodified_since) =
360 options.if_unmodified_since.and_then(datetime_to_timestamp)
361 {
362 r = r.if_unmodified_since(if_unmodified_since);
363 }
364 r.await
365 .map_err(|err| format_object_store_error(err, location.as_ref()))?
366 };
367
368 let read_range = match options.range {
369 Some(GetRange::Bounded(r)) => {
370 if r.start >= r.end || r.start >= meta.size {
371 0..0
372 } else {
373 let end = r.end.min(meta.size);
374 r.start..end
375 }
376 }
377 Some(GetRange::Offset(r)) => {
378 if r < meta.size {
379 r..meta.size
380 } else {
381 0..0
382 }
383 }
384 Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size,
385 _ => 0..meta.size,
386 };
387
388 let stream = reader
389 .into_bytes_stream(read_range.start..read_range.end)
390 .await
391 .map_err(|err| format_object_store_error(err, location.as_ref()))?
392 .map_ok(|buf| buf)
393 .map_err(|err: io::Error| datafusion_object_store::Error::Generic {
394 store: "IoError",
395 source: Box::new(err),
396 });
397
398 Ok(GetResult {
399 payload: GetResultPayload::Stream(Box::pin(stream)),
400 range: read_range.start..read_range.end,
401 meta,
402 attributes,
403 })
404 }
405
406 async fn get_range(
407 &self,
408 location: &Path,
409 range: Range<u64>,
410 ) -> datafusion_object_store::Result<Bytes> {
411 let raw_location = percent_decode_path(location.as_ref());
412 let reader = self
413 .inner
414 .reader_with(&raw_location)
415 .await
416 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
417
418 reader
419 .read(range.start..range.end)
420 .await
421 .map(|buf| buf.to_bytes())
422 .map_err(|err| format_object_store_error(err, location.as_ref()))
423 }
424
425 async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> {
426 let decoded_location = percent_decode_path(location.as_ref());
427 self.inner
428 .delete(&decoded_location)
429 .await
430 .map_err(|err| format_object_store_error(err, location.as_ref()))?;
431
432 Ok(())
433 }
434
435 fn list(
436 &self,
437 prefix: Option<&Path>,
438 ) -> BoxStream<'static, datafusion_object_store::Result<ObjectMeta>> {
439 let path = prefix.map_or("".into(), |x| {
442 format!("{}/", percent_decode_path(x.as_ref()))
443 });
444
445 let this = self.clone();
446 let fut = async move {
447 let stream = this
448 .inner
449 .lister_with(&path)
450 .recursive(true)
451 .await
452 .map_err(|err| format_object_store_error(err, &path))?;
453
454 let stream = stream.then(|res| async {
455 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
456 let meta = entry.metadata();
457
458 Ok(format_object_meta(entry.path(), meta))
459 });
460 Ok::<_, datafusion_object_store::Error>(stream)
461 };
462
463 fut.into_stream().try_flatten().boxed()
464 }
465
466 fn list_with_offset(
467 &self,
468 prefix: Option<&Path>,
469 offset: &Path,
470 ) -> BoxStream<'static, datafusion_object_store::Result<ObjectMeta>> {
471 let path = prefix.map_or("".into(), |x| {
472 format!("{}/", percent_decode_path(x.as_ref()))
473 });
474 let offset = offset.clone();
475
476 let this = self.clone();
479
480 let fut = async move {
481 let list_with_start_after = this.inner.info().full_capability().list_with_start_after;
482 let mut fut = this.inner.lister_with(&path).recursive(true);
483
484 if list_with_start_after {
486 fut = fut.start_after(offset.as_ref());
487 }
488
489 let lister = fut
490 .await
491 .map_err(|err| format_object_store_error(err, &path))?
492 .then(move |entry| {
493 let path = path.clone();
494 let this = this.clone();
495 async move {
496 let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
497 let (path, metadata) = entry.into_parts();
498
499 if metadata.is_dir() || metadata.last_modified().is_some() {
501 let object_meta = format_object_meta(&path, &metadata);
502 return Ok(object_meta);
503 }
504
505 let metadata = this
506 .inner
507 .stat(&path)
508 .await
509 .map_err(|err| format_object_store_error(err, &path))?;
510 let object_meta = format_object_meta(&path, &metadata);
511 Ok::<_, datafusion_object_store::Error>(object_meta)
512 }
513 })
514 .boxed();
515
516 let stream = if list_with_start_after {
517 lister
518 } else {
519 lister
520 .try_filter(move |entry| futures::future::ready(entry.location > offset))
521 .boxed()
522 };
523
524 Ok::<_, datafusion_object_store::Error>(stream)
525 };
526
527 fut.into_stream().try_flatten().boxed()
528 }
529
530 async fn list_with_delimiter(
531 &self,
532 prefix: Option<&Path>,
533 ) -> datafusion_object_store::Result<ListResult> {
534 let path = prefix.map_or("".into(), |x| {
535 format!("{}/", percent_decode_path(x.as_ref()))
536 });
537 let mut stream = self
538 .inner
539 .lister_with(&path)
540 .into_future()
541 .await
542 .map_err(|err| format_object_store_error(err, &path))?;
543
544 let mut common_prefixes = Vec::new();
545 let mut objects = Vec::new();
546
547 while let Some(res) = stream.next().await {
548 let entry = res.map_err(|err| format_object_store_error(err, ""))?;
549 let meta = entry.metadata();
550
551 if meta.is_dir() {
552 common_prefixes.push(entry.path().into());
553 } else if meta.last_modified().is_some() {
554 objects.push(format_object_meta(entry.path(), meta));
555 } else {
556 let meta = self
557 .inner
558 .stat(entry.path())
559 .await
560 .map_err(|err| format_object_store_error(err, entry.path()))?;
561 objects.push(format_object_meta(entry.path(), &meta));
562 }
563 }
564
565 Ok(ListResult {
566 common_prefixes,
567 objects,
568 })
569 }
570
571 async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
572 self.copy_request(from, to, false).await
573 }
574
575 async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
576 self.inner
577 .rename(
578 &percent_decode_path(from.as_ref()),
579 &percent_decode_path(to.as_ref()),
580 )
581 .await
582 .map_err(|err| format_object_store_error(err, from.as_ref()))?;
583
584 Ok(())
585 }
586
587 async fn copy_if_not_exists(
588 &self,
589 from: &Path,
590 to: &Path,
591 ) -> datafusion_object_store::Result<()> {
592 self.copy_request(from, to, true).await
593 }
594}
595
596struct OpendalMultipartUpload {
605 writer: Arc<Mutex<Writer>>,
606 location: Path,
607 next_notify: oneshot::Receiver<()>,
608}
609
610impl OpendalMultipartUpload {
611 fn new(writer: Writer, location: Path) -> Self {
612 let (_, rx) = oneshot::channel();
614
615 Self {
616 writer: Arc::new(Mutex::new(writer)),
617 location,
618 next_notify: rx,
619 }
620 }
621}
622
623#[async_trait]
624impl MultipartUpload for OpendalMultipartUpload {
625 fn put_part(&mut self, data: PutPayload) -> UploadPart {
626 let writer = self.writer.clone();
627 let location = self.location.clone();
628
629 let (tx, rx) = oneshot::channel();
631 let last_rx = std::mem::replace(&mut self.next_notify, rx);
633
634 async move {
635 let _ = last_rx.await;
637
638 let mut writer = writer.lock().await;
639 let result = writer
640 .write(Buffer::from_iter(data))
641 .await
642 .map_err(|err| format_object_store_error(err, location.as_ref()));
643
644 drop(tx);
646
647 result
648 }
649 .boxed()
650 }
651
652 async fn complete(&mut self) -> datafusion_object_store::Result<PutResult> {
653 let mut writer = self.writer.lock().await;
654 let metadata = writer
655 .close()
656 .await
657 .map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
658
659 let e_tag = metadata.etag().map(|s| s.to_string());
660 let version = metadata.version().map(|s| s.to_string());
661
662 Ok(PutResult { e_tag, version })
663 }
664
665 async fn abort(&mut self) -> datafusion_object_store::Result<()> {
666 let mut writer = self.writer.lock().await;
667 writer
668 .abort()
669 .await
670 .map_err(|err| format_object_store_error(err, self.location.as_ref()))
671 }
672}
673
674impl Debug for OpendalMultipartUpload {
675 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
676 f.debug_struct("OpendalMultipartUpload")
677 .field("location", &self.location)
678 .finish()
679 }
680}
681
682fn format_object_store_error(err: opendal::Error, path: &str) -> datafusion_object_store::Error {
683 match err.kind() {
684 opendal::ErrorKind::NotFound => datafusion_object_store::Error::NotFound {
685 path: path.to_string(),
686 source: Box::new(err),
687 },
688 opendal::ErrorKind::Unsupported => datafusion_object_store::Error::NotSupported {
689 source: Box::new(err),
690 },
691 opendal::ErrorKind::AlreadyExists => datafusion_object_store::Error::AlreadyExists {
692 path: path.to_string(),
693 source: Box::new(err),
694 },
695 opendal::ErrorKind::ConditionNotMatch => datafusion_object_store::Error::Precondition {
696 path: path.to_string(),
697 source: Box::new(err),
698 },
699 kind => datafusion_object_store::Error::Generic {
700 store: kind.into_static(),
701 source: Box::new(err),
702 },
703 }
704}
705
706fn format_object_meta(path: &str, meta: &opendal::Metadata) -> ObjectMeta {
707 ObjectMeta {
708 location: path.into(),
709 last_modified: meta
710 .last_modified()
711 .and_then(timestamp_to_datetime)
712 .unwrap_or_default(),
713 size: meta.content_length(),
714 e_tag: meta.etag().map(|x| x.to_string()),
715 version: meta.version().map(|x| x.to_string()),
716 }
717}
718
719fn timestamp_to_datetime(ts: opendal::raw::Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
720 let ts = ts.into_inner();
721 chrono::DateTime::<chrono::Utc>::from_timestamp(ts.as_second(), ts.subsec_nanosecond() as u32)
722}
723
724fn datetime_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Option<opendal::raw::Timestamp> {
725 opendal::raw::Timestamp::new(dt.timestamp(), dt.timestamp_subsec_nanos() as i32).ok()
726}
727
728#[cfg(test)]
729mod tests {
730 use std::sync::Arc;
731
732 use bytes::Bytes;
733 use datafusion_object_store::path::Path;
734 use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart};
735 use opendal::{Operator, services};
736 use rand::{Rng, RngCore};
737
738 use super::*;
739
740 async fn create_test_object_store() -> Arc<dyn ArrowObjectStore> {
741 let op = Operator::new(services::Memory::default()).unwrap().finish();
742 let object_store = Arc::new(OpendalStore::new(op));
743
744 let path: Path = "data/test.txt".into();
745 let bytes = Bytes::from_static(b"hello, world!");
746 object_store.put(&path, bytes.into()).await.unwrap();
747
748 let path: Path = "data/nested/test.txt".into();
749 let bytes = Bytes::from_static(b"hello, world! I am nested.");
750 object_store.put(&path, bytes.into()).await.unwrap();
751
752 object_store
753 }
754
755 #[tokio::test]
756 async fn test_basic() {
757 let op = Operator::new(services::Memory::default()).unwrap().finish();
758 let object_store: Arc<dyn ArrowObjectStore> = Arc::new(OpendalStore::new(op));
759
760 let path: Path = "data/test.txt".into();
762
763 let bytes = Bytes::from_static(b"hello, world!");
764 object_store.put(&path, bytes.clone().into()).await.unwrap();
765
766 let meta = object_store.head(&path).await.unwrap();
767
768 assert_eq!(meta.size, 13);
769
770 assert_eq!(
771 object_store
772 .get(&path)
773 .await
774 .unwrap()
775 .bytes()
776 .await
777 .unwrap(),
778 bytes
779 );
780 }
781
782 #[tokio::test]
783 async fn test_put_multipart() {
784 let op = Operator::new(services::Memory::default()).unwrap().finish();
785 let object_store: Arc<dyn ArrowObjectStore> = Arc::new(OpendalStore::new(op));
786
787 let mut rng = rand::rng();
788
789 let path: Path = "data/test_complete.txt".into();
791 let upload = object_store.put_multipart(&path).await.unwrap();
792
793 let mut write = WriteMultipart::new(upload);
794
795 let mut all_bytes = vec![];
796 let round = rng.random_range(1..=1024);
797 for _ in 0..round {
798 let size = rng.random_range(1..=1024);
799 let mut bytes = vec![0; size];
800 rng.fill_bytes(&mut bytes);
801
802 all_bytes.extend_from_slice(&bytes);
803 write.put(bytes.into());
804 }
805
806 let _ = write.finish().await.unwrap();
807
808 let meta = object_store.head(&path).await.unwrap();
809
810 assert_eq!(meta.size, all_bytes.len() as u64);
811
812 assert_eq!(
813 object_store
814 .get(&path)
815 .await
816 .unwrap()
817 .bytes()
818 .await
819 .unwrap(),
820 Bytes::from(all_bytes)
821 );
822
823 let path: Path = "data/test_abort.txt".into();
825 let mut upload = object_store.put_multipart(&path).await.unwrap();
826 upload.put_part(vec![1; 1024].into()).await.unwrap();
827 upload.abort().await.unwrap();
828
829 let res = object_store.head(&path).await;
830 let err = res.unwrap_err();
831
832 assert!(matches!(
833 err,
834 datafusion_object_store::Error::NotFound { .. }
835 ))
836 }
837
838 #[tokio::test]
839 async fn test_list() {
840 let object_store = create_test_object_store().await;
841 let path: Path = "data/".into();
842 let results = object_store.list(Some(&path)).collect::<Vec<_>>().await;
843 assert_eq!(results.len(), 2);
844 let mut locations = results
845 .iter()
846 .map(|x| x.as_ref().unwrap().location.as_ref())
847 .collect::<Vec<_>>();
848
849 let expected_files = vec![
850 (
851 "data/nested/test.txt",
852 Bytes::from_static(b"hello, world! I am nested."),
853 ),
854 ("data/test.txt", Bytes::from_static(b"hello, world!")),
855 ];
856
857 let expected_locations = expected_files.iter().map(|x| x.0).collect::<Vec<&str>>();
858
859 locations.sort();
860 assert_eq!(locations, expected_locations);
861
862 for (location, bytes) in expected_files {
863 let path: Path = location.into();
864 assert_eq!(
865 object_store
866 .get(&path)
867 .await
868 .unwrap()
869 .bytes()
870 .await
871 .unwrap(),
872 bytes
873 );
874 }
875 }
876
877 #[tokio::test]
878 async fn test_list_with_delimiter() {
879 let object_store = create_test_object_store().await;
880 let path: Path = "data/".into();
881 let result = object_store.list_with_delimiter(Some(&path)).await.unwrap();
882 assert_eq!(result.objects.len(), 1);
883 assert_eq!(result.common_prefixes.len(), 1);
884 assert_eq!(result.objects[0].location.as_ref(), "data/test.txt");
885 assert_eq!(result.common_prefixes[0].as_ref(), "data/nested");
886 }
887
888 #[tokio::test]
889 async fn test_list_with_offset() {
890 let object_store = create_test_object_store().await;
891 let path: Path = "data/".into();
892 let offset: Path = "data/nested/test.txt".into();
893 let result = object_store
894 .list_with_offset(Some(&path), &offset)
895 .collect::<Vec<_>>()
896 .await;
897 assert_eq!(result.len(), 1);
898 assert_eq!(
899 result[0].as_ref().unwrap().location.as_ref(),
900 "data/test.txt"
901 );
902 }
903
904 mod stat_counter {
905 use std::sync::atomic::{AtomicUsize, Ordering};
906
907 use super::*;
908
909 #[derive(Debug, Clone)]
910 pub struct StatCounterLayer {
911 count: Arc<AtomicUsize>,
912 }
913
914 impl StatCounterLayer {
915 pub fn new(count: Arc<AtomicUsize>) -> Self {
916 Self { count }
917 }
918 }
919
920 impl<A: opendal::raw::Access> opendal::raw::Layer<A> for StatCounterLayer {
921 type LayeredAccess = StatCounterAccessor<A>;
922
923 fn layer(&self, inner: A) -> Self::LayeredAccess {
924 StatCounterAccessor {
925 inner,
926 count: self.count.clone(),
927 }
928 }
929 }
930
931 #[derive(Debug, Clone)]
932 pub struct StatCounterAccessor<A> {
933 inner: A,
934 count: Arc<AtomicUsize>,
935 }
936
937 impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for StatCounterAccessor<A> {
938 type Inner = A;
939 type Reader = A::Reader;
940 type Writer = A::Writer;
941 type Lister = A::Lister;
942 type Deleter = A::Deleter;
943
944 fn inner(&self) -> &Self::Inner {
945 &self.inner
946 }
947
948 async fn stat(
949 &self,
950 path: &str,
951 args: opendal::raw::OpStat,
952 ) -> opendal::Result<opendal::raw::RpStat> {
953 self.count.fetch_add(1, Ordering::SeqCst);
954 self.inner.stat(path, args).await
955 }
956
957 async fn read(
958 &self,
959 path: &str,
960 args: opendal::raw::OpRead,
961 ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
962 self.inner.read(path, args).await
963 }
964
965 async fn write(
966 &self,
967 path: &str,
968 args: opendal::raw::OpWrite,
969 ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
970 self.inner.write(path, args).await
971 }
972
973 async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> {
974 self.inner.delete().await
975 }
976
977 async fn list(
978 &self,
979 path: &str,
980 args: opendal::raw::OpList,
981 ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
982 self.inner.list(path, args).await
983 }
984
985 async fn copy(
986 &self,
987 from: &str,
988 to: &str,
989 args: opendal::raw::OpCopy,
990 ) -> opendal::Result<opendal::raw::RpCopy> {
991 self.inner.copy(from, to, args).await
992 }
993
994 async fn rename(
995 &self,
996 from: &str,
997 to: &str,
998 args: opendal::raw::OpRename,
999 ) -> opendal::Result<opendal::raw::RpRename> {
1000 self.inner.rename(from, to, args).await
1001 }
1002 }
1003 }
1004
1005 #[tokio::test]
1006 async fn test_get_range_no_stat() {
1007 use std::sync::atomic::{AtomicUsize, Ordering};
1008
1009 let stat_count = Arc::new(AtomicUsize::new(0));
1011 let op = Operator::new(opendal::services::Memory::default())
1012 .unwrap()
1013 .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
1014 .finish();
1015 let store = OpendalStore::new(op);
1016
1017 let location = "test_get_range.txt".into();
1019 let value = Bytes::from_static(b"Hello, world!");
1020 store.put(&location, value.clone().into()).await.unwrap();
1021
1022 stat_count.store(0, Ordering::SeqCst);
1024
1025 let ret = store.get_range(&location, 0..5).await.unwrap();
1027 assert_eq!(Bytes::from_static(b"Hello"), ret);
1028 assert_eq!(
1029 stat_count.load(Ordering::SeqCst),
1030 0,
1031 "get_range should not call stat()"
1032 );
1033
1034 stat_count.store(0, Ordering::SeqCst);
1036
1037 let opts = datafusion_object_store::GetOptions {
1039 range: Some(datafusion_object_store::GetRange::Bounded(0..5)),
1040 ..Default::default()
1041 };
1042 let ret = store.get_opts(&location, opts).await.unwrap();
1043 let data = ret.bytes().await.unwrap();
1044 assert_eq!(Bytes::from_static(b"Hello"), data);
1045 assert!(
1046 stat_count.load(Ordering::SeqCst) > 0,
1047 "get_opts should call stat() to get metadata"
1048 );
1049
1050 store.delete(&location).await.unwrap();
1052 }
1053}