mito2/engine/
puffin_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::convert::TryFrom;
16
17use common_base::range_read::RangeReader;
18use common_telemetry::warn;
19use greptime_proto::v1::index::{BloomFilterMeta, InvertedIndexMeta, InvertedIndexMetas};
20use index::bitmap::BitmapType;
21use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
22use index::fulltext_index::Config as FulltextConfig;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
24use index::target::IndexTarget;
25use puffin::blob_metadata::BlobMetadata;
26use puffin::puffin_manager::{PuffinManager, PuffinReader};
27use serde_json::{Map, Value, json};
28use store_api::sst_entry::PuffinIndexMetaEntry;
29use store_api::storage::{ColumnId, RegionGroup, RegionId, RegionNumber, RegionSeq, TableId};
30
31use crate::cache::index::bloom_filter_index::{
32    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
33};
34use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
35use crate::sst::file::RegionIndexId;
36use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
37use crate::sst::index::fulltext_index::{
38    INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
39    INDEX_BLOB_TYPE_TANTIVY as FULLTEXT_TANTIVY_BLOB_TYPE,
40};
41use crate::sst::index::inverted_index::INDEX_BLOB_TYPE as INVERTED_BLOB_TYPE;
42use crate::sst::index::puffin_manager::{SstPuffinManager, SstPuffinReader};
43
44const INDEX_TYPE_BLOOM: &str = "bloom_filter";
45const INDEX_TYPE_FULLTEXT_BLOOM: &str = "fulltext_bloom";
46const INDEX_TYPE_FULLTEXT_TANTIVY: &str = "fulltext_tantivy";
47const INDEX_TYPE_INVERTED: &str = "inverted";
48
49const TARGET_TYPE_UNKNOWN: &str = "unknown";
50
51const TARGET_TYPE_COLUMN: &str = "column";
52
53pub(crate) struct IndexEntryContext<'a> {
54    pub(crate) table_dir: &'a str,
55    pub(crate) index_file_path: &'a str,
56    pub(crate) region_id: RegionId,
57    pub(crate) table_id: TableId,
58    pub(crate) region_number: RegionNumber,
59    pub(crate) region_group: RegionGroup,
60    pub(crate) region_sequence: RegionSeq,
61    pub(crate) file_id: &'a str,
62    pub(crate) index_file_size: Option<u64>,
63    pub(crate) node_id: Option<u64>,
64}
65
66/// Collect index metadata entries present in the SST puffin file.
67pub(crate) async fn collect_index_entries_from_puffin(
68    manager: SstPuffinManager,
69    region_index_id: RegionIndexId,
70    context: IndexEntryContext<'_>,
71    bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
72    inverted_index_cache: Option<InvertedIndexCacheRef>,
73) -> Vec<PuffinIndexMetaEntry> {
74    let mut entries = Vec::new();
75
76    let reader = match manager.reader(&region_index_id).await {
77        Ok(reader) => reader,
78        Err(err) => {
79            warn!(
80                err;
81                "Failed to open puffin index file, table_dir: {}, file_id: {}",
82                context.table_dir,
83                context.file_id
84            );
85            return entries;
86        }
87    };
88
89    let file_metadata = match reader.metadata().await {
90        Ok(metadata) => metadata,
91        Err(err) => {
92            warn!(
93                err;
94                "Failed to read puffin file metadata, table_dir: {}, file_id: {}",
95                context.table_dir,
96                context.file_id
97            );
98            return entries;
99        }
100    };
101
102    for blob in &file_metadata.blobs {
103        match BlobIndexTypeTargetKey::from_blob_type(&blob.blob_type) {
104            Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
105                let bloom_meta = try_read_bloom_meta(
106                    &reader,
107                    region_index_id,
108                    blob.blob_type.as_str(),
109                    target_key,
110                    bloom_filter_cache.as_ref(),
111                    Tag::Skipping,
112                    &context,
113                )
114                .await;
115
116                let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
117                let (target_type, target_json) = decode_target_info(target_key);
118                let meta_json = build_meta_json(bloom_value, None, None);
119                let entry = build_index_entry(
120                    &context,
121                    INDEX_TYPE_BLOOM,
122                    target_type,
123                    target_key.to_string(),
124                    target_json,
125                    blob.length as u64,
126                    meta_json,
127                );
128                entries.push(entry);
129            }
130            Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
131                let bloom_meta = try_read_bloom_meta(
132                    &reader,
133                    region_index_id,
134                    blob.blob_type.as_str(),
135                    target_key,
136                    bloom_filter_cache.as_ref(),
137                    Tag::Fulltext,
138                    &context,
139                )
140                .await;
141
142                let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
143                let fulltext_value = Some(fulltext_meta_value(blob));
144                let (target_type, target_json) = decode_target_info(target_key);
145                let meta_json = build_meta_json(bloom_value, fulltext_value, None);
146                let entry = build_index_entry(
147                    &context,
148                    INDEX_TYPE_FULLTEXT_BLOOM,
149                    target_type,
150                    target_key.to_string(),
151                    target_json,
152                    blob.length as u64,
153                    meta_json,
154                );
155                entries.push(entry);
156            }
157            Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) => {
158                let fulltext_value = Some(fulltext_meta_value(blob));
159                let (target_type, target_json) = decode_target_info(target_key);
160                let meta_json = build_meta_json(None, fulltext_value, None);
161                let entry = build_index_entry(
162                    &context,
163                    INDEX_TYPE_FULLTEXT_TANTIVY,
164                    target_type,
165                    target_key.to_string(),
166                    target_json,
167                    blob.length as u64,
168                    meta_json,
169                );
170                entries.push(entry);
171            }
172            Some(BlobIndexTypeTargetKey::Inverted) => {
173                let mut inverted_entries = collect_inverted_entries(
174                    &reader,
175                    region_index_id,
176                    inverted_index_cache.as_ref(),
177                    &context,
178                )
179                .await;
180                entries.append(&mut inverted_entries);
181            }
182            None => {}
183        }
184    }
185
186    entries
187}
188
189async fn collect_inverted_entries(
190    reader: &SstPuffinReader,
191    region_index_id: RegionIndexId,
192    cache: Option<&InvertedIndexCacheRef>,
193    context: &IndexEntryContext<'_>,
194) -> Vec<PuffinIndexMetaEntry> {
195    // Read the inverted index blob and surface its per-column metadata entries.
196    let file_id = region_index_id.file_id();
197
198    let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
199        Ok(guard) => guard,
200        Err(err) => {
201            warn!(
202                err;
203                "Failed to open inverted index blob, table_dir: {}, file_id: {}",
204                context.table_dir,
205                context.file_id
206            );
207            return Vec::new();
208        }
209    };
210
211    let blob_reader = match guard.reader().await {
212        Ok(reader) => reader,
213        Err(err) => {
214            warn!(
215                err;
216                "Failed to build inverted index blob reader, table_dir: {}, file_id: {}",
217                context.table_dir,
218                context.file_id
219            );
220            return Vec::new();
221        }
222    };
223
224    let blob_size = blob_reader
225        .metadata()
226        .await
227        .ok()
228        .map(|meta| meta.content_length);
229    let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
230        let reader = CachedInvertedIndexBlobReader::new(
231            file_id,
232            region_index_id.version,
233            blob_size,
234            InvertedIndexBlobReader::new(blob_reader),
235            cache.clone(),
236        );
237        match reader.metadata(None).await {
238            Ok(metas) => metas,
239            Err(err) => {
240                warn!(
241                    err;
242                    "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
243                    context.table_dir,
244                    context.file_id
245                );
246                return Vec::new();
247            }
248        }
249    } else {
250        let reader = InvertedIndexBlobReader::new(blob_reader);
251        match reader.metadata(None).await {
252            Ok(metas) => metas,
253            Err(err) => {
254                warn!(
255                    err;
256                    "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
257                    context.table_dir,
258                    context.file_id
259                );
260                return Vec::new();
261            }
262        }
263    };
264
265    build_inverted_entries(context, metas.as_ref())
266}
267
268fn build_inverted_entries(
269    context: &IndexEntryContext<'_>,
270    metas: &InvertedIndexMetas,
271) -> Vec<PuffinIndexMetaEntry> {
272    let mut entries = Vec::new();
273    for (name, meta) in &metas.metas {
274        let (target_type, target_json) = decode_target_info(name);
275        let inverted_value = inverted_meta_value(meta, metas);
276        let meta_json = build_meta_json(None, None, Some(inverted_value));
277        let entry = build_index_entry(
278            context,
279            INDEX_TYPE_INVERTED,
280            target_type,
281            name.clone(),
282            target_json,
283            meta.inverted_index_size,
284            meta_json,
285        );
286        entries.push(entry);
287    }
288    entries
289}
290
291async fn try_read_bloom_meta(
292    reader: &SstPuffinReader,
293    region_index_id: RegionIndexId,
294    blob_type: &str,
295    target_key: &str,
296    cache: Option<&BloomFilterIndexCacheRef>,
297    tag: Tag,
298    context: &IndexEntryContext<'_>,
299) -> Option<BloomFilterMeta> {
300    let column_id = decode_column_id(target_key);
301
302    // Failures are logged but do not abort the overall metadata collection.
303    match reader.blob(blob_type).await {
304        Ok(guard) => match guard.reader().await {
305            Ok(blob_reader) => {
306                let blob_size = blob_reader
307                    .metadata()
308                    .await
309                    .ok()
310                    .map(|meta| meta.content_length);
311                let bloom_reader = BloomFilterReaderImpl::new(blob_reader);
312                let result = match (cache, column_id, blob_size) {
313                    (Some(cache), Some(column_id), Some(blob_size)) => {
314                        CachedBloomFilterIndexBlobReader::new(
315                            region_index_id.file_id(),
316                            region_index_id.version,
317                            column_id,
318                            tag,
319                            blob_size,
320                            bloom_reader,
321                            cache.clone(),
322                        )
323                        .metadata(None)
324                        .await
325                    }
326                    _ => bloom_reader.metadata(None).await,
327                };
328
329                match result {
330                    Ok(meta) => Some(meta),
331                    Err(err) => {
332                        warn!(
333                            err;
334                            "Failed to read index metadata, table_dir: {}, file_id: {}, blob: {}",
335                            context.table_dir,
336                            context.file_id,
337                            blob_type
338                        );
339                        None
340                    }
341                }
342            }
343            Err(err) => {
344                warn!(
345                    err;
346                    "Failed to open index blob reader, table_dir: {}, file_id: {}, blob: {}",
347                    context.table_dir,
348                    context.file_id,
349                    blob_type
350                );
351                None
352            }
353        },
354        Err(err) => {
355            warn!(
356                err;
357                "Failed to open index blob, table_dir: {}, file_id: {}, blob: {}",
358                context.table_dir,
359                context.file_id,
360                blob_type
361            );
362            None
363        }
364    }
365}
366
367fn decode_target_info(target_key: &str) -> (String, String) {
368    match IndexTarget::decode(target_key) {
369        Ok(IndexTarget::ColumnId(id)) => (
370            TARGET_TYPE_COLUMN.to_string(),
371            json!({ "column": id }).to_string(),
372        ),
373        _ => (
374            TARGET_TYPE_UNKNOWN.to_string(),
375            json!({ "error": "failed_to_decode" }).to_string(),
376        ),
377    }
378}
379
380fn decode_column_id(target_key: &str) -> Option<ColumnId> {
381    match IndexTarget::decode(target_key) {
382        Ok(IndexTarget::ColumnId(id)) => Some(id),
383        _ => None,
384    }
385}
386
387fn bloom_meta_value(meta: &BloomFilterMeta) -> Value {
388    json!({
389        "rows_per_segment": meta.rows_per_segment,
390        "segment_count": meta.segment_count,
391        "row_count": meta.row_count,
392        "bloom_filter_size": meta.bloom_filter_size,
393    })
394}
395
396fn fulltext_meta_value(blob: &BlobMetadata) -> Value {
397    let config = FulltextConfig::from_blob_metadata(blob).unwrap_or_default();
398    json!({
399        "analyzer": config.analyzer.to_str(),
400        "case_sensitive": config.case_sensitive,
401    })
402}
403
404fn inverted_meta_value(meta: &InvertedIndexMeta, metas: &InvertedIndexMetas) -> Value {
405    let bitmap_type = BitmapType::try_from(meta.bitmap_type)
406        .map(|bt| format!("{:?}", bt))
407        .unwrap_or_else(|_| meta.bitmap_type.to_string());
408    json!({
409        "bitmap_type": bitmap_type,
410        "base_offset": meta.base_offset,
411        "inverted_index_size": meta.inverted_index_size,
412        "relative_fst_offset": meta.relative_fst_offset,
413        "fst_size": meta.fst_size,
414        "relative_null_bitmap_offset": meta.relative_null_bitmap_offset,
415        "null_bitmap_size": meta.null_bitmap_size,
416        "segment_row_count": metas.segment_row_count,
417        "total_row_count": metas.total_row_count,
418    })
419}
420
421fn build_meta_json(
422    bloom: Option<Value>,
423    fulltext: Option<Value>,
424    inverted: Option<Value>,
425) -> Option<String> {
426    let mut map = Map::new();
427    if let Some(value) = bloom {
428        map.insert("bloom".to_string(), value);
429    }
430    if let Some(value) = fulltext {
431        map.insert("fulltext".to_string(), value);
432    }
433    if let Some(value) = inverted {
434        map.insert("inverted".to_string(), value);
435    }
436    if map.is_empty() {
437        None
438    } else {
439        Some(Value::Object(map).to_string())
440    }
441}
442
443enum BlobIndexTypeTargetKey<'a> {
444    BloomFilter(&'a str),
445    FulltextBloom(&'a str),
446    FulltextTantivy(&'a str),
447    Inverted,
448}
449
450impl<'a> BlobIndexTypeTargetKey<'a> {
451    fn from_blob_type(blob_type: &'a str) -> Option<Self> {
452        if let Some(target_key) = Self::target_key_from_blob(blob_type, BLOOM_BLOB_TYPE) {
453            Some(BlobIndexTypeTargetKey::BloomFilter(target_key))
454        } else if let Some(target_key) =
455            Self::target_key_from_blob(blob_type, FULLTEXT_BLOOM_BLOB_TYPE)
456        {
457            Some(BlobIndexTypeTargetKey::FulltextBloom(target_key))
458        } else if let Some(target_key) =
459            Self::target_key_from_blob(blob_type, FULLTEXT_TANTIVY_BLOB_TYPE)
460        {
461            Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key))
462        } else if blob_type == INVERTED_BLOB_TYPE {
463            Some(BlobIndexTypeTargetKey::Inverted)
464        } else {
465            None
466        }
467    }
468
469    fn target_key_from_blob(blob_type: &'a str, prefix: &str) -> Option<&'a str> {
470        // Blob types encode their target as "<prefix>-<target>".
471        blob_type
472            .strip_prefix(prefix)
473            .and_then(|suffix| suffix.strip_prefix('-'))
474    }
475}
476
477fn build_index_entry(
478    context: &IndexEntryContext<'_>,
479    index_type: &str,
480    target_type: String,
481    target_key: String,
482    target_json: String,
483    blob_size: u64,
484    meta_json: Option<String>,
485) -> PuffinIndexMetaEntry {
486    PuffinIndexMetaEntry {
487        table_dir: context.table_dir.to_string(),
488        index_file_path: context.index_file_path.to_string(),
489        region_id: context.region_id,
490        table_id: context.table_id,
491        region_number: context.region_number,
492        region_group: context.region_group,
493        region_sequence: context.region_sequence,
494        file_id: context.file_id.to_string(),
495        index_file_size: context.index_file_size,
496        index_type: index_type.to_string(),
497        target_type,
498        target_key,
499        target_json,
500        blob_size,
501        meta_json,
502        node_id: context.node_id,
503    }
504}