mito2/cache/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod bloom_filter_index;
16pub mod inverted_index;
17pub mod result_cache;
18
19use std::future::Future;
20use std::hash::Hash;
21use std::ops::Range;
22use std::sync::Arc;
23
24use bytes::Bytes;
25use object_store::Buffer;
26
27use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
28
29/// Metrics for index metadata.
30const INDEX_METADATA_TYPE: &str = "index_metadata";
31/// Metrics for index content.
32const INDEX_CONTENT_TYPE: &str = "index_content";
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct PageKey {
36    page_id: u64,
37}
38
39impl PageKey {
40    /// Converts an offset to a page ID based on the page size.
41    fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
42        offset / page_size
43    }
44
45    /// Calculates the total number of pages that a given size spans, starting from a specific offset.
46    fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
47        let start_page = Self::calculate_page_id(offset, page_size);
48        let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
49        (end_page + 1 - start_page) as u32
50    }
51
52    /// Calculates the byte range for data retrieval based on the specified offset and size.
53    ///
54    /// This function determines the starting and ending byte positions required for reading data.
55    /// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096,
56    /// the resulting byte range will be 904..5904. This indicates that:
57    /// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288).
58    /// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages.
59    fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
60        let start = (offset % page_size) as usize;
61        let end = start + size as usize;
62        start..end
63    }
64
65    /// Generates a iterator of `IndexKey` for the pages that a given offset and size span.
66    fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator<Item = Self> {
67        let start_page = Self::calculate_page_id(offset, page_size);
68        let total_pages = Self::calculate_page_count(offset, size, page_size);
69        (0..total_pages).map(move |i| Self {
70            page_id: start_page + i as u64,
71        })
72    }
73}
74
75/// Cache for index metadata and content.
76pub struct IndexCache<K, M> {
77    /// Cache for index metadata
78    index_metadata: moka::sync::Cache<K, Arc<M>>,
79    /// Cache for index content.
80    index: moka::sync::Cache<(K, PageKey), Bytes>,
81    // Page size for index content.
82    page_size: u64,
83
84    /// Weighter for metadata.
85    weight_of_metadata: fn(&K, &Arc<M>) -> u32,
86    /// Weighter for content.
87    weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
88}
89
90impl<K, M> IndexCache<K, M>
91where
92    K: Hash + Eq + Send + Sync + 'static,
93    M: Send + Sync + 'static,
94{
95    pub fn new_with_weighter(
96        index_metadata_cap: u64,
97        index_content_cap: u64,
98        page_size: u64,
99        index_type: &'static str,
100        weight_of_metadata: fn(&K, &Arc<M>) -> u32,
101        weight_of_content: fn(&(K, PageKey), &Bytes) -> u32,
102    ) -> Self {
103        common_telemetry::debug!(
104            "Building IndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}, page size: {page_size}, index type: {index_type}"
105        );
106        let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
107            .name(&format!("index_metadata_{}", index_type))
108            .weigher(weight_of_metadata)
109            .eviction_listener(move |k, v, _cause| {
110                let size = weight_of_metadata(&k, &v);
111                CACHE_BYTES
112                    .with_label_values(&[INDEX_METADATA_TYPE])
113                    .sub(size.into());
114            })
115            .build();
116        let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
117            .name(&format!("index_content_{}", index_type))
118            .weigher(weight_of_content)
119            .eviction_listener(move |k, v, _cause| {
120                let size = weight_of_content(&k, &v);
121                CACHE_BYTES
122                    .with_label_values(&[INDEX_CONTENT_TYPE])
123                    .sub(size.into());
124            })
125            .build();
126        Self {
127            index_metadata,
128            index: index_cache,
129            page_size,
130            weight_of_content,
131            weight_of_metadata,
132        }
133    }
134}
135
136impl<K, M> IndexCache<K, M>
137where
138    K: Hash + Eq + Clone + Copy + Send + Sync + 'static,
139    M: Send + Sync + 'static,
140{
141    pub fn get_metadata(&self, key: K) -> Option<Arc<M>> {
142        self.index_metadata.get(&key)
143    }
144
145    pub fn put_metadata(&self, key: K, metadata: Arc<M>) {
146        CACHE_BYTES
147            .with_label_values(&[INDEX_METADATA_TYPE])
148            .add((self.weight_of_metadata)(&key, &metadata).into());
149        self.index_metadata.insert(key, metadata)
150    }
151
152    /// Gets given range of index data from cache, and loads from source if the file
153    /// is not already cached.
154    async fn get_or_load<F, Fut, E>(
155        &self,
156        key: K,
157        file_size: u64,
158        offset: u64,
159        size: u32,
160        load: F,
161    ) -> Result<Vec<u8>, E>
162    where
163        F: Fn(Vec<Range<u64>>) -> Fut,
164        Fut: Future<Output = Result<Vec<Bytes>, E>>,
165        E: std::error::Error,
166    {
167        let page_keys =
168            PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
169        // Size is 0, return empty data.
170        if page_keys.is_empty() {
171            return Ok(Vec::new());
172        }
173        let mut data = Vec::with_capacity(page_keys.len());
174        data.resize(page_keys.len(), Bytes::new());
175        let mut cache_miss_range = vec![];
176        let mut cache_miss_idx = vec![];
177        let last_index = page_keys.len() - 1;
178        // TODO: Avoid copy as much as possible.
179        for (i, page_key) in page_keys.iter().enumerate() {
180            match self.get_page(key, *page_key) {
181                Some(page) => {
182                    CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
183                    data[i] = page;
184                }
185                None => {
186                    CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
187                    let base_offset = page_key.page_id * self.page_size;
188                    let pruned_size = if i == last_index {
189                        prune_size(page_keys.iter(), file_size, self.page_size)
190                    } else {
191                        self.page_size
192                    };
193                    cache_miss_range.push(base_offset..base_offset + pruned_size);
194                    cache_miss_idx.push(i);
195                }
196            }
197        }
198        if !cache_miss_range.is_empty() {
199            let pages = load(cache_miss_range).await?;
200            for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
201                let page_key = page_keys[i];
202                data[i] = page.clone();
203                self.put_page(key, page_key, page.clone());
204            }
205        }
206        let buffer = Buffer::from_iter(data.into_iter());
207        Ok(buffer
208            .slice(PageKey::calculate_range(offset, size, self.page_size))
209            .to_vec())
210    }
211
212    fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {
213        self.index.get(&(key, page_key))
214    }
215
216    fn put_page(&self, key: K, page_key: PageKey, value: Bytes) {
217        CACHE_BYTES
218            .with_label_values(&[INDEX_CONTENT_TYPE])
219            .add((self.weight_of_content)(&(key, page_key), &value).into());
220        self.index.insert((key, page_key), value);
221    }
222}
223
224/// Prunes the size of the last page based on the indexes.
225/// We have following cases:
226/// 1. The rest file size is less than the page size, read to the end of the file.
227/// 2. Otherwise, read the page size.
228fn prune_size<'a>(
229    indexes: impl Iterator<Item = &'a PageKey>,
230    file_size: u64,
231    page_size: u64,
232) -> u64 {
233    let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
234    page_size.min(file_size - last_page_start)
235}