1use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::config::ObjectStorageCacheConfig;
21use object_store::factory::new_raw_object_store;
22use object_store::layers::LruCacheLayer;
23use object_store::services::Fs;
24use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
25use object_store::{ATOMIC_WRITE_DIR, Access, ObjectStore, ObjectStoreBuilder};
26use snafu::prelude::*;
27
28use crate::config::ObjectStoreConfig;
29use crate::error::{self, Result};
30
31pub(crate) async fn new_object_store_without_cache(
32 store: &ObjectStoreConfig,
33 data_home: &str,
34) -> Result<ObjectStore> {
35 let object_store = new_raw_object_store(store, data_home)
36 .await
37 .context(error::ObjectStoreSnafu)?;
38 let object_store = if store.is_object_storage() {
40 with_retry_layers(object_store)
42 } else {
43 object_store
44 };
45
46 let object_store = with_instrument_layers(object_store, true);
47 Ok(object_store)
48}
49
50pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
51 let object_store = new_raw_object_store(&store, data_home)
52 .await
53 .context(error::ObjectStoreSnafu)?;
54 let object_store = if store.is_object_storage() {
56 let object_store = {
57 let cache_config = store.cache_config().unwrap();
59 if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? {
60 object_store.layer(cache_layer)
62 } else {
63 object_store
64 }
65 };
66
67 with_retry_layers(object_store)
69 } else {
70 object_store
71 };
72
73 let object_store = with_instrument_layers(object_store, true);
74 Ok(object_store)
75}
76
77async fn build_cache_layer(
78 cache_config: &ObjectStorageCacheConfig,
79 data_home: &str,
80) -> Result<Option<LruCacheLayer<impl Access>>> {
81 if !cache_config.enable_read_cache {
83 return Ok(None);
84 }
85 let cache_base_dir = if cache_config.cache_path.is_empty() {
86 data_home
87 } else {
88 &cache_config.cache_path
89 };
90 let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
91 clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
92
93 let cache_store = Fs::default()
94 .root(cache_base_dir)
95 .atomic_write_dir(&atomic_temp_dir)
96 .build()
97 .context(error::BuildCacheStoreSnafu)?;
98
99 let cache_layer = LruCacheLayer::new(
100 Arc::new(cache_store),
101 cache_config.cache_capacity.0 as usize,
102 )
103 .context(error::BuildCacheStoreSnafu)?;
104 cache_layer.recover_cache(false).await;
105
106 info!(
107 "Enabled local object storage cache, path: {}, capacity: {}.",
108 cache_config.cache_path, cache_config.cache_capacity
109 );
110
111 Ok(Some(cache_layer))
112}