Skip to main content

mito2/engine/
puffin_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::convert::TryFrom;
16
17use common_base::range_read::RangeReader;
18use common_telemetry::warn;
19use greptime_proto::v1::index::{BloomFilterMeta, InvertedIndexMeta, InvertedIndexMetas};
20use index::bitmap::BitmapType;
21use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
22use index::fulltext_index::Config as FulltextConfig;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
24use index::target::IndexTarget;
25use puffin::blob_metadata::BlobMetadata;
26use puffin::puffin_manager::{PuffinManager, PuffinReader};
27use serde_json::{Map, Value, json};
28use store_api::sst_entry::{
29    PUFFIN_INDEX_TYPE_BLOOM_FILTER, PUFFIN_INDEX_TYPE_FULLTEXT_BLOOM,
30    PUFFIN_INDEX_TYPE_FULLTEXT_TANTIVY, PUFFIN_INDEX_TYPE_INVERTED, PuffinIndexMetaEntry,
31};
32use store_api::storage::{ColumnId, RegionGroup, RegionId, RegionNumber, RegionSeq, TableId};
33
34use crate::cache::index::bloom_filter_index::{
35    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
38use crate::sst::file::RegionIndexId;
39use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
40use crate::sst::index::fulltext_index::{
41    INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
42    INDEX_BLOB_TYPE_TANTIVY as FULLTEXT_TANTIVY_BLOB_TYPE,
43};
44use crate::sst::index::inverted_index::INDEX_BLOB_TYPE as INVERTED_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{SstPuffinManager, SstPuffinReader};
46
47const TARGET_TYPE_UNKNOWN: &str = "unknown";
48
49const TARGET_TYPE_COLUMN: &str = "column";
50
51pub(crate) struct IndexEntryContext<'a> {
52    pub(crate) table_dir: &'a str,
53    pub(crate) index_file_path: &'a str,
54    pub(crate) region_id: RegionId,
55    pub(crate) table_id: TableId,
56    pub(crate) region_number: RegionNumber,
57    pub(crate) region_group: RegionGroup,
58    pub(crate) region_sequence: RegionSeq,
59    pub(crate) file_id: &'a str,
60    pub(crate) index_file_size: Option<u64>,
61    pub(crate) node_id: Option<u64>,
62}
63
64/// Collect index metadata entries present in the SST puffin file.
65pub(crate) async fn collect_index_entries_from_puffin(
66    manager: SstPuffinManager,
67    region_index_id: RegionIndexId,
68    context: IndexEntryContext<'_>,
69    bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
70    inverted_index_cache: Option<InvertedIndexCacheRef>,
71) -> Vec<PuffinIndexMetaEntry> {
72    let mut entries = Vec::new();
73
74    let reader = match manager.reader(&region_index_id).await {
75        Ok(reader) => reader,
76        Err(err) => {
77            warn!(
78                err;
79                "Failed to open puffin index file, table_dir: {}, file_id: {}",
80                context.table_dir,
81                context.file_id
82            );
83            return entries;
84        }
85    };
86
87    let file_metadata = match reader.metadata().await {
88        Ok(metadata) => metadata,
89        Err(err) => {
90            warn!(
91                err;
92                "Failed to read puffin file metadata, table_dir: {}, file_id: {}",
93                context.table_dir,
94                context.file_id
95            );
96            return entries;
97        }
98    };
99
100    for blob in &file_metadata.blobs {
101        match BlobIndexTypeTargetKey::from_blob_type(&blob.blob_type) {
102            Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
103                let bloom_meta = try_read_bloom_meta(
104                    &reader,
105                    region_index_id,
106                    blob.blob_type.as_str(),
107                    target_key,
108                    bloom_filter_cache.as_ref(),
109                    Tag::Skipping,
110                    &context,
111                )
112                .await;
113
114                let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
115                let (target_type, target_json) = decode_target_info(target_key);
116                let meta_json = build_meta_json(bloom_value, None, None);
117                let entry = build_index_entry(
118                    &context,
119                    PUFFIN_INDEX_TYPE_BLOOM_FILTER,
120                    target_type,
121                    target_key.to_string(),
122                    target_json,
123                    blob.length as u64,
124                    meta_json,
125                );
126                entries.push(entry);
127            }
128            Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
129                let bloom_meta = try_read_bloom_meta(
130                    &reader,
131                    region_index_id,
132                    blob.blob_type.as_str(),
133                    target_key,
134                    bloom_filter_cache.as_ref(),
135                    Tag::Fulltext,
136                    &context,
137                )
138                .await;
139
140                let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
141                let fulltext_value = Some(fulltext_meta_value(blob));
142                let (target_type, target_json) = decode_target_info(target_key);
143                let meta_json = build_meta_json(bloom_value, fulltext_value, None);
144                let entry = build_index_entry(
145                    &context,
146                    PUFFIN_INDEX_TYPE_FULLTEXT_BLOOM,
147                    target_type,
148                    target_key.to_string(),
149                    target_json,
150                    blob.length as u64,
151                    meta_json,
152                );
153                entries.push(entry);
154            }
155            Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) => {
156                let fulltext_value = Some(fulltext_meta_value(blob));
157                let (target_type, target_json) = decode_target_info(target_key);
158                let meta_json = build_meta_json(None, fulltext_value, None);
159                let entry = build_index_entry(
160                    &context,
161                    PUFFIN_INDEX_TYPE_FULLTEXT_TANTIVY,
162                    target_type,
163                    target_key.to_string(),
164                    target_json,
165                    blob.length as u64,
166                    meta_json,
167                );
168                entries.push(entry);
169            }
170            Some(BlobIndexTypeTargetKey::Inverted) => {
171                let mut inverted_entries = collect_inverted_entries(
172                    &reader,
173                    region_index_id,
174                    inverted_index_cache.as_ref(),
175                    &context,
176                )
177                .await;
178                entries.append(&mut inverted_entries);
179            }
180            None => {}
181        }
182    }
183
184    entries
185}
186
187async fn collect_inverted_entries(
188    reader: &SstPuffinReader,
189    region_index_id: RegionIndexId,
190    cache: Option<&InvertedIndexCacheRef>,
191    context: &IndexEntryContext<'_>,
192) -> Vec<PuffinIndexMetaEntry> {
193    // Read the inverted index blob and surface its per-column metadata entries.
194    let file_id = region_index_id.file_id();
195
196    let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
197        Ok(guard) => guard,
198        Err(err) => {
199            warn!(
200                err;
201                "Failed to open inverted index blob, table_dir: {}, file_id: {}",
202                context.table_dir,
203                context.file_id
204            );
205            return Vec::new();
206        }
207    };
208
209    let blob_reader = match guard.reader().await {
210        Ok(reader) => reader,
211        Err(err) => {
212            warn!(
213                err;
214                "Failed to build inverted index blob reader, table_dir: {}, file_id: {}",
215                context.table_dir,
216                context.file_id
217            );
218            return Vec::new();
219        }
220    };
221
222    let blob_size = blob_reader
223        .metadata()
224        .await
225        .ok()
226        .map(|meta| meta.content_length);
227    let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
228        let reader = CachedInvertedIndexBlobReader::new(
229            file_id,
230            region_index_id.version,
231            blob_size,
232            InvertedIndexBlobReader::new(blob_reader),
233            cache.clone(),
234        );
235        match reader.metadata(None).await {
236            Ok(metas) => metas,
237            Err(err) => {
238                warn!(
239                    err;
240                    "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
241                    context.table_dir,
242                    context.file_id
243                );
244                return Vec::new();
245            }
246        }
247    } else {
248        let reader = InvertedIndexBlobReader::new(blob_reader);
249        match reader.metadata(None).await {
250            Ok(metas) => metas,
251            Err(err) => {
252                warn!(
253                    err;
254                    "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
255                    context.table_dir,
256                    context.file_id
257                );
258                return Vec::new();
259            }
260        }
261    };
262
263    build_inverted_entries(context, metas.as_ref())
264}
265
266fn build_inverted_entries(
267    context: &IndexEntryContext<'_>,
268    metas: &InvertedIndexMetas,
269) -> Vec<PuffinIndexMetaEntry> {
270    let mut entries = Vec::new();
271    for (name, meta) in &metas.metas {
272        let (target_type, target_json) = decode_target_info(name);
273        let inverted_value = inverted_meta_value(meta, metas);
274        let meta_json = build_meta_json(None, None, Some(inverted_value));
275        let entry = build_index_entry(
276            context,
277            PUFFIN_INDEX_TYPE_INVERTED,
278            target_type,
279            name.clone(),
280            target_json,
281            meta.inverted_index_size,
282            meta_json,
283        );
284        entries.push(entry);
285    }
286    entries
287}
288
289async fn try_read_bloom_meta(
290    reader: &SstPuffinReader,
291    region_index_id: RegionIndexId,
292    blob_type: &str,
293    target_key: &str,
294    cache: Option<&BloomFilterIndexCacheRef>,
295    tag: Tag,
296    context: &IndexEntryContext<'_>,
297) -> Option<BloomFilterMeta> {
298    let column_id = decode_column_id(target_key);
299
300    // Failures are logged but do not abort the overall metadata collection.
301    match reader.blob(blob_type).await {
302        Ok(guard) => match guard.reader().await {
303            Ok(blob_reader) => {
304                let blob_size = blob_reader
305                    .metadata()
306                    .await
307                    .ok()
308                    .map(|meta| meta.content_length);
309                let bloom_reader = BloomFilterReaderImpl::new(blob_reader);
310                let result = match (cache, column_id, blob_size) {
311                    (Some(cache), Some(column_id), Some(blob_size)) => {
312                        CachedBloomFilterIndexBlobReader::new(
313                            region_index_id.file_id(),
314                            region_index_id.version,
315                            column_id,
316                            tag,
317                            blob_size,
318                            bloom_reader,
319                            cache.clone(),
320                        )
321                        .metadata(None)
322                        .await
323                    }
324                    _ => bloom_reader.metadata(None).await,
325                };
326
327                match result {
328                    Ok(meta) => Some(meta),
329                    Err(err) => {
330                        warn!(
331                            err;
332                            "Failed to read index metadata, table_dir: {}, file_id: {}, blob: {}",
333                            context.table_dir,
334                            context.file_id,
335                            blob_type
336                        );
337                        None
338                    }
339                }
340            }
341            Err(err) => {
342                warn!(
343                    err;
344                    "Failed to open index blob reader, table_dir: {}, file_id: {}, blob: {}",
345                    context.table_dir,
346                    context.file_id,
347                    blob_type
348                );
349                None
350            }
351        },
352        Err(err) => {
353            warn!(
354                err;
355                "Failed to open index blob, table_dir: {}, file_id: {}, blob: {}",
356                context.table_dir,
357                context.file_id,
358                blob_type
359            );
360            None
361        }
362    }
363}
364
365fn decode_target_info(target_key: &str) -> (String, String) {
366    match IndexTarget::decode(target_key) {
367        Ok(IndexTarget::ColumnId(id)) => (
368            TARGET_TYPE_COLUMN.to_string(),
369            json!({ "column": id }).to_string(),
370        ),
371        _ => (
372            TARGET_TYPE_UNKNOWN.to_string(),
373            json!({ "error": "failed_to_decode" }).to_string(),
374        ),
375    }
376}
377
378fn decode_column_id(target_key: &str) -> Option<ColumnId> {
379    match IndexTarget::decode(target_key) {
380        Ok(IndexTarget::ColumnId(id)) => Some(id),
381        _ => None,
382    }
383}
384
385fn bloom_meta_value(meta: &BloomFilterMeta) -> Value {
386    json!({
387        "rows_per_segment": meta.rows_per_segment,
388        "segment_count": meta.segment_count,
389        "row_count": meta.row_count,
390        "bloom_filter_size": meta.bloom_filter_size,
391    })
392}
393
394fn fulltext_meta_value(blob: &BlobMetadata) -> Value {
395    let config = FulltextConfig::from_blob_metadata(blob).unwrap_or_default();
396    json!({
397        "analyzer": config.analyzer.to_str(),
398        "case_sensitive": config.case_sensitive,
399    })
400}
401
402fn inverted_meta_value(meta: &InvertedIndexMeta, metas: &InvertedIndexMetas) -> Value {
403    let bitmap_type = BitmapType::try_from(meta.bitmap_type)
404        .map(|bt| format!("{:?}", bt))
405        .unwrap_or_else(|_| meta.bitmap_type.to_string());
406    json!({
407        "bitmap_type": bitmap_type,
408        "base_offset": meta.base_offset,
409        "inverted_index_size": meta.inverted_index_size,
410        "relative_fst_offset": meta.relative_fst_offset,
411        "fst_size": meta.fst_size,
412        "relative_null_bitmap_offset": meta.relative_null_bitmap_offset,
413        "null_bitmap_size": meta.null_bitmap_size,
414        "segment_row_count": metas.segment_row_count,
415        "total_row_count": metas.total_row_count,
416    })
417}
418
419fn build_meta_json(
420    bloom: Option<Value>,
421    fulltext: Option<Value>,
422    inverted: Option<Value>,
423) -> Option<String> {
424    let mut map = Map::new();
425    if let Some(value) = bloom {
426        map.insert("bloom".to_string(), value);
427    }
428    if let Some(value) = fulltext {
429        map.insert("fulltext".to_string(), value);
430    }
431    if let Some(value) = inverted {
432        map.insert("inverted".to_string(), value);
433    }
434    if map.is_empty() {
435        None
436    } else {
437        Some(Value::Object(map).to_string())
438    }
439}
440
441enum BlobIndexTypeTargetKey<'a> {
442    BloomFilter(&'a str),
443    FulltextBloom(&'a str),
444    FulltextTantivy(&'a str),
445    Inverted,
446}
447
448impl<'a> BlobIndexTypeTargetKey<'a> {
449    fn from_blob_type(blob_type: &'a str) -> Option<Self> {
450        if let Some(target_key) = Self::target_key_from_blob(blob_type, BLOOM_BLOB_TYPE) {
451            Some(BlobIndexTypeTargetKey::BloomFilter(target_key))
452        } else if let Some(target_key) =
453            Self::target_key_from_blob(blob_type, FULLTEXT_BLOOM_BLOB_TYPE)
454        {
455            Some(BlobIndexTypeTargetKey::FulltextBloom(target_key))
456        } else if let Some(target_key) =
457            Self::target_key_from_blob(blob_type, FULLTEXT_TANTIVY_BLOB_TYPE)
458        {
459            Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key))
460        } else if blob_type == INVERTED_BLOB_TYPE {
461            Some(BlobIndexTypeTargetKey::Inverted)
462        } else {
463            None
464        }
465    }
466
467    fn target_key_from_blob(blob_type: &'a str, prefix: &str) -> Option<&'a str> {
468        // Blob types encode their target as "<prefix>-<target>".
469        blob_type
470            .strip_prefix(prefix)
471            .and_then(|suffix| suffix.strip_prefix('-'))
472    }
473}
474
475fn build_index_entry(
476    context: &IndexEntryContext<'_>,
477    index_type: &str,
478    target_type: String,
479    target_key: String,
480    target_json: String,
481    blob_size: u64,
482    meta_json: Option<String>,
483) -> PuffinIndexMetaEntry {
484    PuffinIndexMetaEntry {
485        table_dir: context.table_dir.to_string(),
486        index_file_path: context.index_file_path.to_string(),
487        region_id: context.region_id,
488        table_id: context.table_id,
489        region_number: context.region_number,
490        region_group: context.region_group,
491        region_sequence: context.region_sequence,
492        file_id: context.file_id.to_string(),
493        index_file_size: context.index_file_size,
494        index_type: index_type.to_string(),
495        target_type,
496        target_key,
497        target_json,
498        blob_size,
499        meta_json,
500        node_id: context.node_id,
501    }
502}