1pub mod bloom_filter_index;
16pub mod inverted_index;
17pub mod result_cache;
18
19use std::future::Future;
20use std::hash::Hash;
21use std::ops::Range;
22use std::sync::Arc;
23
24use bytes::Bytes;
25use object_store::Buffer;
26
27use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
28
29const INDEX_METADATA_TYPE: &str = "index_metadata";
31const INDEX_CONTENT_TYPE: &str = "index_content";
33
34#[derive(Debug, Default, Clone)]
36pub struct IndexCacheMetrics {
37 pub cache_hit: usize,
39 pub cache_miss: usize,
41 pub num_pages: usize,
43 pub page_bytes: u64,
45}
46
47impl IndexCacheMetrics {
48 pub fn merge(&mut self, other: &Self) {
50 self.cache_hit += other.cache_hit;
51 self.cache_miss += other.cache_miss;
52 self.num_pages += other.num_pages;
53 self.page_bytes += other.page_bytes;
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58pub struct PageKey {
59 page_id: u64,
60}
61
62impl PageKey {
63 fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
65 offset / page_size
66 }
67
68 fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
70 let start_page = Self::calculate_page_id(offset, page_size);
71 let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
72 (end_page + 1 - start_page) as u32
73 }
74
75 fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
83 let start = (offset % page_size) as usize;
84 let end = start + size as usize;
85 start..end
86 }
87
88 fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator<Item = Self> {
90 let start_page = Self::calculate_page_id(offset, page_size);
91 let total_pages = Self::calculate_page_count(offset, size, page_size);
92 (0..total_pages).map(move |i| Self {
93 page_id: start_page + i as u64,
94 })
95 }
96}
97
98pub struct IndexCache<K, M> {
100 index_metadata: moka::sync::Cache<K, Arc<M>>,
102 index: moka::sync::Cache<(K, PageKey), Bytes>,
104 page_size: u64,
106
107 weight_of_metadata: fn(&K, &Arc<M>) -> u32,
109 weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
111}
112
113impl<K, M> IndexCache<K, M>
114where
115 K: Hash + Eq + Send + Sync + 'static,
116 M: Send + Sync + 'static,
117{
118 pub fn new_with_weighter(
119 index_metadata_cap: u64,
120 index_content_cap: u64,
121 page_size: u64,
122 index_type: &'static str,
123 weight_of_metadata: fn(&K, &Arc<M>) -> u32,
124 weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
125 ) -> Self {
126 common_telemetry::debug!(
127 "Building IndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}, page size: {page_size}, index type: {index_type}"
128 );
129 let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
130 .name(&format!("index_metadata_{}", index_type))
131 .weigher(weight_of_metadata)
132 .eviction_listener(move |k, v, _cause| {
133 let size = weight_of_metadata(&k, &v);
134 CACHE_BYTES
135 .with_label_values(&[INDEX_METADATA_TYPE])
136 .sub(size.into());
137 })
138 .support_invalidation_closures()
139 .build();
140 let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
141 .name(&format!("index_content_{}", index_type))
142 .weigher(weight_of_content)
143 .eviction_listener(move |k, v, _cause| {
144 let size = weight_of_content(&k, &v);
145 CACHE_BYTES
146 .with_label_values(&[INDEX_CONTENT_TYPE])
147 .sub(size.into());
148 })
149 .support_invalidation_closures()
150 .build();
151 Self {
152 index_metadata,
153 index: index_cache,
154 page_size,
155 weight_of_content,
156 weight_of_metadata,
157 }
158 }
159}
160
161impl<K, M> IndexCache<K, M>
162where
163 K: Hash + Eq + Clone + Copy + Send + Sync + 'static,
164 M: Send + Sync + 'static,
165{
166 pub fn get_metadata(&self, key: K) -> Option<Arc<M>> {
167 self.index_metadata.get(&key)
168 }
169
170 pub fn put_metadata(&self, key: K, metadata: Arc<M>) {
171 CACHE_BYTES
172 .with_label_values(&[INDEX_METADATA_TYPE])
173 .add((self.weight_of_metadata)(&key, &metadata).into());
174 self.index_metadata.insert(key, metadata)
175 }
176
177 async fn get_or_load<F, Fut, E>(
180 &self,
181 key: K,
182 file_size: u64,
183 offset: u64,
184 size: u32,
185 load: F,
186 ) -> Result<(Vec<u8>, IndexCacheMetrics), E>
187 where
188 F: Fn(Vec<Range<u64>>) -> Fut,
189 Fut: Future<Output = Result<Vec<Bytes>, E>>,
190 E: std::error::Error,
191 {
192 let mut metrics = IndexCacheMetrics::default();
193 let page_keys =
194 PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
195 if page_keys.is_empty() {
197 return Ok((Vec::new(), metrics));
198 }
199 metrics.num_pages = page_keys.len();
200 let mut data = Vec::with_capacity(page_keys.len());
201 data.resize(page_keys.len(), Bytes::new());
202 let mut cache_miss_range = vec![];
203 let mut cache_miss_idx = vec![];
204 let last_index = page_keys.len() - 1;
205 for (i, page_key) in page_keys.iter().enumerate() {
207 match self.get_page(key, *page_key) {
208 Some(page) => {
209 CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
210 metrics.cache_hit += 1;
211 metrics.page_bytes += page.len() as u64;
212 data[i] = page;
213 }
214 None => {
215 CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
216 metrics.cache_miss += 1;
217 let base_offset = page_key.page_id * self.page_size;
218 let pruned_size = if i == last_index {
219 prune_size(page_keys.iter(), file_size, self.page_size)
220 } else {
221 self.page_size
222 };
223 cache_miss_range.push(base_offset..base_offset + pruned_size);
224 cache_miss_idx.push(i);
225 }
226 }
227 }
228 if !cache_miss_range.is_empty() {
229 let pages = load(cache_miss_range).await?;
230 for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
231 let page_key = page_keys[i];
232 metrics.page_bytes += page.len() as u64;
233 data[i] = page.clone();
234 self.put_page(key, page_key, page.clone());
235 }
236 }
237 let buffer = Buffer::from_iter(data.into_iter());
238 Ok((
239 buffer
240 .slice(PageKey::calculate_range(offset, size, self.page_size))
241 .to_vec(),
242 metrics,
243 ))
244 }
245
246 fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {
247 self.index.get(&(key, page_key))
248 }
249
250 fn put_page(&self, key: K, page_key: PageKey, value: Bytes) {
251 let value = Bytes::from(value.to_vec());
253 CACHE_BYTES
254 .with_label_values(&[INDEX_CONTENT_TYPE])
255 .add((self.weight_of_content)(&(key, page_key), &value).into());
256 self.index.insert((key, page_key), value);
257 }
258
259 pub fn invalidate_if<F>(&self, predicate: F)
261 where
262 F: Fn(&K) -> bool + Send + Sync + 'static,
263 {
264 let predicate = Arc::new(predicate);
265 let metadata_predicate = Arc::clone(&predicate);
266
267 self.index_metadata
268 .invalidate_entries_if(move |key, _| metadata_predicate(key))
269 .expect("cache should support invalidation closures");
270
271 self.index
272 .invalidate_entries_if(move |(key, _), _| predicate(key))
273 .expect("cache should support invalidation closures");
274 }
275}
276
277fn prune_size<'a>(
282 indexes: impl Iterator<Item = &'a PageKey>,
283 file_size: u64,
284 page_size: u64,
285) -> u64 {
286 let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
287 page_size.min(file_size - last_page_start)
288}