1pub mod bloom_filter_index;
16pub mod inverted_index;
17pub mod result_cache;
18
19use std::future::Future;
20use std::hash::Hash;
21use std::ops::Range;
22use std::sync::Arc;
23
24use bytes::Bytes;
25use object_store::Buffer;
26
27use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
28
29const INDEX_METADATA_TYPE: &str = "index_metadata";
31const INDEX_CONTENT_TYPE: &str = "index_content";
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct PageKey {
36 page_id: u64,
37}
38
39impl PageKey {
40 fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
42 offset / page_size
43 }
44
45 fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
47 let start_page = Self::calculate_page_id(offset, page_size);
48 let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
49 (end_page + 1 - start_page) as u32
50 }
51
52 fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
60 let start = (offset % page_size) as usize;
61 let end = start + size as usize;
62 start..end
63 }
64
65 fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator<Item = Self> {
67 let start_page = Self::calculate_page_id(offset, page_size);
68 let total_pages = Self::calculate_page_count(offset, size, page_size);
69 (0..total_pages).map(move |i| Self {
70 page_id: start_page + i as u64,
71 })
72 }
73}
74
75pub struct IndexCache<K, M> {
77 index_metadata: moka::sync::Cache<K, Arc<M>>,
79 index: moka::sync::Cache<(K, PageKey), Bytes>,
81 page_size: u64,
83
84 weight_of_metadata: fn(&K, &Arc<M>) -> u32,
86 weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
88}
89
90impl<K, M> IndexCache<K, M>
91where
92 K: Hash + Eq + Send + Sync + 'static,
93 M: Send + Sync + 'static,
94{
95 pub fn new_with_weighter(
96 index_metadata_cap: u64,
97 index_content_cap: u64,
98 page_size: u64,
99 index_type: &'static str,
100 weight_of_metadata: fn(&K, &Arc<M>) -> u32,
101 weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
102 ) -> Self {
103 common_telemetry::debug!(
104 "Building IndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}, page size: {page_size}, index type: {index_type}"
105 );
106 let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
107 .name(&format!("index_metadata_{}", index_type))
108 .weigher(weight_of_metadata)
109 .eviction_listener(move |k, v, _cause| {
110 let size = weight_of_metadata(&k, &v);
111 CACHE_BYTES
112 .with_label_values(&[INDEX_METADATA_TYPE])
113 .sub(size.into());
114 })
115 .support_invalidation_closures()
116 .build();
117 let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
118 .name(&format!("index_content_{}", index_type))
119 .weigher(weight_of_content)
120 .eviction_listener(move |k, v, _cause| {
121 let size = weight_of_content(&k, &v);
122 CACHE_BYTES
123 .with_label_values(&[INDEX_CONTENT_TYPE])
124 .sub(size.into());
125 })
126 .support_invalidation_closures()
127 .build();
128 Self {
129 index_metadata,
130 index: index_cache,
131 page_size,
132 weight_of_content,
133 weight_of_metadata,
134 }
135 }
136}
137
138impl<K, M> IndexCache<K, M>
139where
140 K: Hash + Eq + Clone + Copy + Send + Sync + 'static,
141 M: Send + Sync + 'static,
142{
143 pub fn get_metadata(&self, key: K) -> Option<Arc<M>> {
144 self.index_metadata.get(&key)
145 }
146
147 pub fn put_metadata(&self, key: K, metadata: Arc<M>) {
148 CACHE_BYTES
149 .with_label_values(&[INDEX_METADATA_TYPE])
150 .add((self.weight_of_metadata)(&key, &metadata).into());
151 self.index_metadata.insert(key, metadata)
152 }
153
154 async fn get_or_load<F, Fut, E>(
157 &self,
158 key: K,
159 file_size: u64,
160 offset: u64,
161 size: u32,
162 load: F,
163 ) -> Result<Vec<u8>, E>
164 where
165 F: Fn(Vec<Range<u64>>) -> Fut,
166 Fut: Future<Output = Result<Vec<Bytes>, E>>,
167 E: std::error::Error,
168 {
169 let page_keys =
170 PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
171 if page_keys.is_empty() {
173 return Ok(Vec::new());
174 }
175 let mut data = Vec::with_capacity(page_keys.len());
176 data.resize(page_keys.len(), Bytes::new());
177 let mut cache_miss_range = vec![];
178 let mut cache_miss_idx = vec![];
179 let last_index = page_keys.len() - 1;
180 for (i, page_key) in page_keys.iter().enumerate() {
182 match self.get_page(key, *page_key) {
183 Some(page) => {
184 CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
185 data[i] = page;
186 }
187 None => {
188 CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
189 let base_offset = page_key.page_id * self.page_size;
190 let pruned_size = if i == last_index {
191 prune_size(page_keys.iter(), file_size, self.page_size)
192 } else {
193 self.page_size
194 };
195 cache_miss_range.push(base_offset..base_offset + pruned_size);
196 cache_miss_idx.push(i);
197 }
198 }
199 }
200 if !cache_miss_range.is_empty() {
201 let pages = load(cache_miss_range).await?;
202 for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
203 let page_key = page_keys[i];
204 data[i] = page.clone();
205 self.put_page(key, page_key, page.clone());
206 }
207 }
208 let buffer = Buffer::from_iter(data.into_iter());
209 Ok(buffer
210 .slice(PageKey::calculate_range(offset, size, self.page_size))
211 .to_vec())
212 }
213
214 fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {
215 self.index.get(&(key, page_key))
216 }
217
218 fn put_page(&self, key: K, page_key: PageKey, value: Bytes) {
219 CACHE_BYTES
220 .with_label_values(&[INDEX_CONTENT_TYPE])
221 .add((self.weight_of_content)(&(key, page_key), &value).into());
222 self.index.insert((key, page_key), value);
223 }
224
225 pub fn invalidate_if<F>(&self, predicate: F)
227 where
228 F: Fn(&K) -> bool + Send + Sync + 'static,
229 {
230 let predicate = Arc::new(predicate);
231 let metadata_predicate = Arc::clone(&predicate);
232
233 self.index_metadata
234 .invalidate_entries_if(move |key, _| metadata_predicate(key))
235 .expect("cache should support invalidation closures");
236
237 self.index
238 .invalidate_entries_if(move |(key, _), _| predicate(key))
239 .expect("cache should support invalidation closures");
240 }
241}
242
243fn prune_size<'a>(
248 indexes: impl Iterator<Item = &'a PageKey>,
249 file_size: u64,
250 page_size: u64,
251) -> u64 {
252 let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
253 page_size.min(file_size - last_page_start)
254}