1pub mod bloom_filter_index;
16pub mod inverted_index;
17pub mod result_cache;
18#[cfg(feature = "vector_index")]
19pub mod vector_index;
20
21use std::future::Future;
22use std::hash::Hash;
23use std::ops::Range;
24use std::sync::Arc;
25
26use bytes::Bytes;
27use object_store::Buffer;
28
29use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
30
31const INDEX_METADATA_TYPE: &str = "index_metadata";
33const INDEX_CONTENT_TYPE: &str = "index_content";
35
36#[derive(Debug, Default, Clone)]
38pub struct IndexCacheMetrics {
39 pub cache_hit: usize,
41 pub cache_miss: usize,
43 pub num_pages: usize,
45 pub page_bytes: u64,
47}
48
49impl IndexCacheMetrics {
50 pub fn merge(&mut self, other: &Self) {
52 self.cache_hit += other.cache_hit;
53 self.cache_miss += other.cache_miss;
54 self.num_pages += other.num_pages;
55 self.page_bytes += other.page_bytes;
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub struct PageKey {
61 page_id: u64,
62}
63
64impl PageKey {
65 fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
67 offset / page_size
68 }
69
70 fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
72 let start_page = Self::calculate_page_id(offset, page_size);
73 let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
74 (end_page + 1 - start_page) as u32
75 }
76
77 fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
85 let start = (offset % page_size) as usize;
86 let end = start + size as usize;
87 start..end
88 }
89
90 fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator<Item = Self> {
92 let start_page = Self::calculate_page_id(offset, page_size);
93 let total_pages = Self::calculate_page_count(offset, size, page_size);
94 (0..total_pages).map(move |i| Self {
95 page_id: start_page + i as u64,
96 })
97 }
98}
99
100pub struct IndexCache<K, M> {
102 index_metadata: moka::sync::Cache<K, Arc<M>>,
104 index: moka::sync::Cache<(K, PageKey), Bytes>,
106 page_size: u64,
108
109 weight_of_metadata: fn(&K, &Arc<M>) -> u32,
111 weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
113}
114
115impl<K, M> IndexCache<K, M>
116where
117 K: Hash + Eq + Send + Sync + 'static,
118 M: Send + Sync + 'static,
119{
120 pub fn new_with_weighter(
121 index_metadata_cap: u64,
122 index_content_cap: u64,
123 page_size: u64,
124 index_type: &'static str,
125 weight_of_metadata: fn(&K, &Arc<M>) -> u32,
126 weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
127 ) -> Self {
128 common_telemetry::debug!(
129 "Building IndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}, page size: {page_size}, index type: {index_type}"
130 );
131 let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
132 .name(&format!("index_metadata_{}", index_type))
133 .weigher(weight_of_metadata)
134 .eviction_listener(move |k, v, _cause| {
135 let size = weight_of_metadata(&k, &v);
136 CACHE_BYTES
137 .with_label_values(&[INDEX_METADATA_TYPE])
138 .sub(size.into());
139 })
140 .support_invalidation_closures()
141 .build();
142 let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
143 .name(&format!("index_content_{}", index_type))
144 .weigher(weight_of_content)
145 .eviction_listener(move |k, v, _cause| {
146 let size = weight_of_content(&k, &v);
147 CACHE_BYTES
148 .with_label_values(&[INDEX_CONTENT_TYPE])
149 .sub(size.into());
150 })
151 .support_invalidation_closures()
152 .build();
153 Self {
154 index_metadata,
155 index: index_cache,
156 page_size,
157 weight_of_content,
158 weight_of_metadata,
159 }
160 }
161}
162
163impl<K, M> IndexCache<K, M>
164where
165 K: Hash + Eq + Clone + Copy + Send + Sync + 'static,
166 M: Send + Sync + 'static,
167{
168 pub fn get_metadata(&self, key: K) -> Option<Arc<M>> {
169 self.index_metadata.get(&key)
170 }
171
172 pub fn put_metadata(&self, key: K, metadata: Arc<M>) {
173 CACHE_BYTES
174 .with_label_values(&[INDEX_METADATA_TYPE])
175 .add((self.weight_of_metadata)(&key, &metadata).into());
176 self.index_metadata.insert(key, metadata)
177 }
178
179 async fn get_or_load<F, Fut, E>(
182 &self,
183 key: K,
184 file_size: u64,
185 offset: u64,
186 size: u32,
187 load: F,
188 ) -> Result<(Vec<u8>, IndexCacheMetrics), E>
189 where
190 F: Fn(Vec<Range<u64>>) -> Fut,
191 Fut: Future<Output = Result<Vec<Bytes>, E>>,
192 E: std::error::Error,
193 {
194 let mut metrics = IndexCacheMetrics::default();
195 let page_keys =
196 PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
197 if page_keys.is_empty() {
199 return Ok((Vec::new(), metrics));
200 }
201 metrics.num_pages = page_keys.len();
202 let mut data = Vec::with_capacity(page_keys.len());
203 data.resize(page_keys.len(), Bytes::new());
204 let mut cache_miss_range = vec![];
205 let mut cache_miss_idx = vec![];
206 let last_index = page_keys.len() - 1;
207 for (i, page_key) in page_keys.iter().enumerate() {
209 match self.get_page(key, *page_key) {
210 Some(page) => {
211 CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
212 metrics.cache_hit += 1;
213 metrics.page_bytes += page.len() as u64;
214 data[i] = page;
215 }
216 None => {
217 CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
218 metrics.cache_miss += 1;
219 let base_offset = page_key.page_id * self.page_size;
220 let pruned_size = if i == last_index {
221 prune_size(page_keys.iter(), file_size, self.page_size)
222 } else {
223 self.page_size
224 };
225 cache_miss_range.push(base_offset..base_offset + pruned_size);
226 cache_miss_idx.push(i);
227 }
228 }
229 }
230 if !cache_miss_range.is_empty() {
231 let pages = load(cache_miss_range).await?;
232 for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
233 let page_key = page_keys[i];
234 metrics.page_bytes += page.len() as u64;
235 data[i] = page.clone();
236 self.put_page(key, page_key, page.clone());
237 }
238 }
239 let buffer = Buffer::from_iter(data.into_iter());
240 Ok((
241 buffer
242 .slice(PageKey::calculate_range(offset, size, self.page_size))
243 .to_vec(),
244 metrics,
245 ))
246 }
247
248 fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {
249 self.index.get(&(key, page_key))
250 }
251
252 fn put_page(&self, key: K, page_key: PageKey, value: Bytes) {
253 let value = Bytes::from(value.to_vec());
255 CACHE_BYTES
256 .with_label_values(&[INDEX_CONTENT_TYPE])
257 .add((self.weight_of_content)(&(key, page_key), &value).into());
258 self.index.insert((key, page_key), value);
259 }
260
261 pub fn invalidate_if<F>(&self, predicate: F)
263 where
264 F: Fn(&K) -> bool + Send + Sync + 'static,
265 {
266 let predicate = Arc::new(predicate);
267 let metadata_predicate = Arc::clone(&predicate);
268
269 self.index_metadata
270 .invalidate_entries_if(move |key, _| metadata_predicate(key))
271 .expect("cache should support invalidation closures");
272
273 self.index
274 .invalidate_entries_if(move |(key, _), _| predicate(key))
275 .expect("cache should support invalidation closures");
276 }
277}
278
279fn prune_size<'a>(
284 indexes: impl Iterator<Item = &'a PageKey>,
285 file_size: u64,
286 page_size: u64,
287) -> u64 {
288 let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
289 page_size.min(file_size - last_page_start)
290}