1use common_telemetry::{info, warn};
18use object_store::factory::new_raw_object_store;
19use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
20use object_store::{ATOMIC_WRITE_DIR, ObjectStore};
21use snafu::prelude::*;
22
23use crate::config::ObjectStoreConfig;
24use crate::error::{self, Result};
25
26pub(crate) async fn new_object_store_without_cache(
27 store: &ObjectStoreConfig,
28 data_home: &str,
29) -> Result<ObjectStore> {
30 let object_store = new_raw_object_store(store, data_home)
31 .await
32 .context(error::ObjectStoreSnafu)?;
33 let object_store = if store.is_object_storage() {
35 with_retry_layers(object_store)
37 } else {
38 object_store
39 };
40
41 let object_store = with_instrument_layers(object_store, true);
42 Ok(object_store)
43}
44
45fn clean_old_read_cache(store: &ObjectStoreConfig, data_home: &str) {
47 if !store.is_object_storage() {
48 return;
49 }
50
51 let Some(cache_config) = store.cache_config() else {
52 return;
53 };
54
55 if !cache_config.enable_read_cache {
57 return;
58 }
59
60 let cache_base_dir = if cache_config.cache_path.is_empty() {
61 data_home
62 } else {
63 &cache_config.cache_path
64 };
65
66 let old_read_cache_dir = join_dir(cache_base_dir, "cache/object/read");
68 info!(
69 "Cleaning up old read cache directory: {}",
70 old_read_cache_dir
71 );
72 if let Err(e) = clean_temp_dir(&old_read_cache_dir) {
73 warn!(e; "Failed to clean old read cache directory {}", old_read_cache_dir);
74 }
75
76 let cache_atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
78 info!(
79 "Cleaning up old cache atomic temp directory: {}",
80 cache_atomic_temp_dir
81 );
82 if let Err(e) = clean_temp_dir(&cache_atomic_temp_dir) {
83 warn!(e; "Failed to clean old cache atomic temp directory {}", cache_atomic_temp_dir);
84 }
85}
86
87pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
88 clean_old_read_cache(&store, data_home);
91
92 let object_store = new_raw_object_store(&store, data_home)
93 .await
94 .context(error::ObjectStoreSnafu)?;
95 let object_store = if store.is_object_storage() {
97 with_retry_layers(object_store)
99 } else {
100 object_store
101 };
102
103 let object_store = with_instrument_layers(object_store, true);
104 Ok(object_store)
105}