1use std::convert::TryFrom;
16
17use common_base::range_read::RangeReader;
18use common_telemetry::warn;
19use greptime_proto::v1::index::{BloomFilterMeta, InvertedIndexMeta, InvertedIndexMetas};
20use index::bitmap::BitmapType;
21use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
22use index::fulltext_index::Config as FulltextConfig;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
24use index::target::IndexTarget;
25use puffin::blob_metadata::BlobMetadata;
26use puffin::puffin_manager::{PuffinManager, PuffinReader};
27use serde_json::{Map, Value, json};
28use store_api::sst_entry::{
29 PUFFIN_INDEX_TYPE_BLOOM_FILTER, PUFFIN_INDEX_TYPE_FULLTEXT_BLOOM,
30 PUFFIN_INDEX_TYPE_FULLTEXT_TANTIVY, PUFFIN_INDEX_TYPE_INVERTED, PuffinIndexMetaEntry,
31};
32use store_api::storage::{ColumnId, RegionGroup, RegionId, RegionNumber, RegionSeq, TableId};
33
34use crate::cache::index::bloom_filter_index::{
35 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
38use crate::sst::file::RegionIndexId;
39use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
40use crate::sst::index::fulltext_index::{
41 INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
42 INDEX_BLOB_TYPE_TANTIVY as FULLTEXT_TANTIVY_BLOB_TYPE,
43};
44use crate::sst::index::inverted_index::INDEX_BLOB_TYPE as INVERTED_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{SstPuffinManager, SstPuffinReader};
46
47const TARGET_TYPE_UNKNOWN: &str = "unknown";
48
49const TARGET_TYPE_COLUMN: &str = "column";
50
51pub(crate) struct IndexEntryContext<'a> {
52 pub(crate) table_dir: &'a str,
53 pub(crate) index_file_path: &'a str,
54 pub(crate) region_id: RegionId,
55 pub(crate) table_id: TableId,
56 pub(crate) region_number: RegionNumber,
57 pub(crate) region_group: RegionGroup,
58 pub(crate) region_sequence: RegionSeq,
59 pub(crate) file_id: &'a str,
60 pub(crate) index_file_size: Option<u64>,
61 pub(crate) node_id: Option<u64>,
62}
63
64pub(crate) async fn collect_index_entries_from_puffin(
66 manager: SstPuffinManager,
67 region_index_id: RegionIndexId,
68 context: IndexEntryContext<'_>,
69 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
70 inverted_index_cache: Option<InvertedIndexCacheRef>,
71) -> Vec<PuffinIndexMetaEntry> {
72 let mut entries = Vec::new();
73
74 let reader = match manager.reader(®ion_index_id).await {
75 Ok(reader) => reader,
76 Err(err) => {
77 warn!(
78 err;
79 "Failed to open puffin index file, table_dir: {}, file_id: {}",
80 context.table_dir,
81 context.file_id
82 );
83 return entries;
84 }
85 };
86
87 let file_metadata = match reader.metadata().await {
88 Ok(metadata) => metadata,
89 Err(err) => {
90 warn!(
91 err;
92 "Failed to read puffin file metadata, table_dir: {}, file_id: {}",
93 context.table_dir,
94 context.file_id
95 );
96 return entries;
97 }
98 };
99
100 for blob in &file_metadata.blobs {
101 match BlobIndexTypeTargetKey::from_blob_type(&blob.blob_type) {
102 Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
103 let bloom_meta = try_read_bloom_meta(
104 &reader,
105 region_index_id,
106 blob.blob_type.as_str(),
107 target_key,
108 bloom_filter_cache.as_ref(),
109 Tag::Skipping,
110 &context,
111 )
112 .await;
113
114 let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
115 let (target_type, target_json) = decode_target_info(target_key);
116 let meta_json = build_meta_json(bloom_value, None, None);
117 let entry = build_index_entry(
118 &context,
119 PUFFIN_INDEX_TYPE_BLOOM_FILTER,
120 target_type,
121 target_key.to_string(),
122 target_json,
123 blob.length as u64,
124 meta_json,
125 );
126 entries.push(entry);
127 }
128 Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
129 let bloom_meta = try_read_bloom_meta(
130 &reader,
131 region_index_id,
132 blob.blob_type.as_str(),
133 target_key,
134 bloom_filter_cache.as_ref(),
135 Tag::Fulltext,
136 &context,
137 )
138 .await;
139
140 let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
141 let fulltext_value = Some(fulltext_meta_value(blob));
142 let (target_type, target_json) = decode_target_info(target_key);
143 let meta_json = build_meta_json(bloom_value, fulltext_value, None);
144 let entry = build_index_entry(
145 &context,
146 PUFFIN_INDEX_TYPE_FULLTEXT_BLOOM,
147 target_type,
148 target_key.to_string(),
149 target_json,
150 blob.length as u64,
151 meta_json,
152 );
153 entries.push(entry);
154 }
155 Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) => {
156 let fulltext_value = Some(fulltext_meta_value(blob));
157 let (target_type, target_json) = decode_target_info(target_key);
158 let meta_json = build_meta_json(None, fulltext_value, None);
159 let entry = build_index_entry(
160 &context,
161 PUFFIN_INDEX_TYPE_FULLTEXT_TANTIVY,
162 target_type,
163 target_key.to_string(),
164 target_json,
165 blob.length as u64,
166 meta_json,
167 );
168 entries.push(entry);
169 }
170 Some(BlobIndexTypeTargetKey::Inverted) => {
171 let mut inverted_entries = collect_inverted_entries(
172 &reader,
173 region_index_id,
174 inverted_index_cache.as_ref(),
175 &context,
176 )
177 .await;
178 entries.append(&mut inverted_entries);
179 }
180 None => {}
181 }
182 }
183
184 entries
185}
186
187async fn collect_inverted_entries(
188 reader: &SstPuffinReader,
189 region_index_id: RegionIndexId,
190 cache: Option<&InvertedIndexCacheRef>,
191 context: &IndexEntryContext<'_>,
192) -> Vec<PuffinIndexMetaEntry> {
193 let file_id = region_index_id.file_id();
195
196 let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
197 Ok(guard) => guard,
198 Err(err) => {
199 warn!(
200 err;
201 "Failed to open inverted index blob, table_dir: {}, file_id: {}",
202 context.table_dir,
203 context.file_id
204 );
205 return Vec::new();
206 }
207 };
208
209 let blob_reader = match guard.reader().await {
210 Ok(reader) => reader,
211 Err(err) => {
212 warn!(
213 err;
214 "Failed to build inverted index blob reader, table_dir: {}, file_id: {}",
215 context.table_dir,
216 context.file_id
217 );
218 return Vec::new();
219 }
220 };
221
222 let blob_size = blob_reader
223 .metadata()
224 .await
225 .ok()
226 .map(|meta| meta.content_length);
227 let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
228 let reader = CachedInvertedIndexBlobReader::new(
229 file_id,
230 region_index_id.version,
231 blob_size,
232 InvertedIndexBlobReader::new(blob_reader),
233 cache.clone(),
234 );
235 match reader.metadata(None).await {
236 Ok(metas) => metas,
237 Err(err) => {
238 warn!(
239 err;
240 "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
241 context.table_dir,
242 context.file_id
243 );
244 return Vec::new();
245 }
246 }
247 } else {
248 let reader = InvertedIndexBlobReader::new(blob_reader);
249 match reader.metadata(None).await {
250 Ok(metas) => metas,
251 Err(err) => {
252 warn!(
253 err;
254 "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
255 context.table_dir,
256 context.file_id
257 );
258 return Vec::new();
259 }
260 }
261 };
262
263 build_inverted_entries(context, metas.as_ref())
264}
265
266fn build_inverted_entries(
267 context: &IndexEntryContext<'_>,
268 metas: &InvertedIndexMetas,
269) -> Vec<PuffinIndexMetaEntry> {
270 let mut entries = Vec::new();
271 for (name, meta) in &metas.metas {
272 let (target_type, target_json) = decode_target_info(name);
273 let inverted_value = inverted_meta_value(meta, metas);
274 let meta_json = build_meta_json(None, None, Some(inverted_value));
275 let entry = build_index_entry(
276 context,
277 PUFFIN_INDEX_TYPE_INVERTED,
278 target_type,
279 name.clone(),
280 target_json,
281 meta.inverted_index_size,
282 meta_json,
283 );
284 entries.push(entry);
285 }
286 entries
287}
288
289async fn try_read_bloom_meta(
290 reader: &SstPuffinReader,
291 region_index_id: RegionIndexId,
292 blob_type: &str,
293 target_key: &str,
294 cache: Option<&BloomFilterIndexCacheRef>,
295 tag: Tag,
296 context: &IndexEntryContext<'_>,
297) -> Option<BloomFilterMeta> {
298 let column_id = decode_column_id(target_key);
299
300 match reader.blob(blob_type).await {
302 Ok(guard) => match guard.reader().await {
303 Ok(blob_reader) => {
304 let blob_size = blob_reader
305 .metadata()
306 .await
307 .ok()
308 .map(|meta| meta.content_length);
309 let bloom_reader = BloomFilterReaderImpl::new(blob_reader);
310 let result = match (cache, column_id, blob_size) {
311 (Some(cache), Some(column_id), Some(blob_size)) => {
312 CachedBloomFilterIndexBlobReader::new(
313 region_index_id.file_id(),
314 region_index_id.version,
315 column_id,
316 tag,
317 blob_size,
318 bloom_reader,
319 cache.clone(),
320 )
321 .metadata(None)
322 .await
323 }
324 _ => bloom_reader.metadata(None).await,
325 };
326
327 match result {
328 Ok(meta) => Some(meta),
329 Err(err) => {
330 warn!(
331 err;
332 "Failed to read index metadata, table_dir: {}, file_id: {}, blob: {}",
333 context.table_dir,
334 context.file_id,
335 blob_type
336 );
337 None
338 }
339 }
340 }
341 Err(err) => {
342 warn!(
343 err;
344 "Failed to open index blob reader, table_dir: {}, file_id: {}, blob: {}",
345 context.table_dir,
346 context.file_id,
347 blob_type
348 );
349 None
350 }
351 },
352 Err(err) => {
353 warn!(
354 err;
355 "Failed to open index blob, table_dir: {}, file_id: {}, blob: {}",
356 context.table_dir,
357 context.file_id,
358 blob_type
359 );
360 None
361 }
362 }
363}
364
365fn decode_target_info(target_key: &str) -> (String, String) {
366 match IndexTarget::decode(target_key) {
367 Ok(IndexTarget::ColumnId(id)) => (
368 TARGET_TYPE_COLUMN.to_string(),
369 json!({ "column": id }).to_string(),
370 ),
371 _ => (
372 TARGET_TYPE_UNKNOWN.to_string(),
373 json!({ "error": "failed_to_decode" }).to_string(),
374 ),
375 }
376}
377
378fn decode_column_id(target_key: &str) -> Option<ColumnId> {
379 match IndexTarget::decode(target_key) {
380 Ok(IndexTarget::ColumnId(id)) => Some(id),
381 _ => None,
382 }
383}
384
385fn bloom_meta_value(meta: &BloomFilterMeta) -> Value {
386 json!({
387 "rows_per_segment": meta.rows_per_segment,
388 "segment_count": meta.segment_count,
389 "row_count": meta.row_count,
390 "bloom_filter_size": meta.bloom_filter_size,
391 })
392}
393
394fn fulltext_meta_value(blob: &BlobMetadata) -> Value {
395 let config = FulltextConfig::from_blob_metadata(blob).unwrap_or_default();
396 json!({
397 "analyzer": config.analyzer.to_str(),
398 "case_sensitive": config.case_sensitive,
399 })
400}
401
402fn inverted_meta_value(meta: &InvertedIndexMeta, metas: &InvertedIndexMetas) -> Value {
403 let bitmap_type = BitmapType::try_from(meta.bitmap_type)
404 .map(|bt| format!("{:?}", bt))
405 .unwrap_or_else(|_| meta.bitmap_type.to_string());
406 json!({
407 "bitmap_type": bitmap_type,
408 "base_offset": meta.base_offset,
409 "inverted_index_size": meta.inverted_index_size,
410 "relative_fst_offset": meta.relative_fst_offset,
411 "fst_size": meta.fst_size,
412 "relative_null_bitmap_offset": meta.relative_null_bitmap_offset,
413 "null_bitmap_size": meta.null_bitmap_size,
414 "segment_row_count": metas.segment_row_count,
415 "total_row_count": metas.total_row_count,
416 })
417}
418
419fn build_meta_json(
420 bloom: Option<Value>,
421 fulltext: Option<Value>,
422 inverted: Option<Value>,
423) -> Option<String> {
424 let mut map = Map::new();
425 if let Some(value) = bloom {
426 map.insert("bloom".to_string(), value);
427 }
428 if let Some(value) = fulltext {
429 map.insert("fulltext".to_string(), value);
430 }
431 if let Some(value) = inverted {
432 map.insert("inverted".to_string(), value);
433 }
434 if map.is_empty() {
435 None
436 } else {
437 Some(Value::Object(map).to_string())
438 }
439}
440
441enum BlobIndexTypeTargetKey<'a> {
442 BloomFilter(&'a str),
443 FulltextBloom(&'a str),
444 FulltextTantivy(&'a str),
445 Inverted,
446}
447
448impl<'a> BlobIndexTypeTargetKey<'a> {
449 fn from_blob_type(blob_type: &'a str) -> Option<Self> {
450 if let Some(target_key) = Self::target_key_from_blob(blob_type, BLOOM_BLOB_TYPE) {
451 Some(BlobIndexTypeTargetKey::BloomFilter(target_key))
452 } else if let Some(target_key) =
453 Self::target_key_from_blob(blob_type, FULLTEXT_BLOOM_BLOB_TYPE)
454 {
455 Some(BlobIndexTypeTargetKey::FulltextBloom(target_key))
456 } else if let Some(target_key) =
457 Self::target_key_from_blob(blob_type, FULLTEXT_TANTIVY_BLOB_TYPE)
458 {
459 Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key))
460 } else if blob_type == INVERTED_BLOB_TYPE {
461 Some(BlobIndexTypeTargetKey::Inverted)
462 } else {
463 None
464 }
465 }
466
467 fn target_key_from_blob(blob_type: &'a str, prefix: &str) -> Option<&'a str> {
468 blob_type
470 .strip_prefix(prefix)
471 .and_then(|suffix| suffix.strip_prefix('-'))
472 }
473}
474
475fn build_index_entry(
476 context: &IndexEntryContext<'_>,
477 index_type: &str,
478 target_type: String,
479 target_key: String,
480 target_json: String,
481 blob_size: u64,
482 meta_json: Option<String>,
483) -> PuffinIndexMetaEntry {
484 PuffinIndexMetaEntry {
485 table_dir: context.table_dir.to_string(),
486 index_file_path: context.index_file_path.to_string(),
487 region_id: context.region_id,
488 table_id: context.table_id,
489 region_number: context.region_number,
490 region_group: context.region_group,
491 region_sequence: context.region_sequence,
492 file_id: context.file_id.to_string(),
493 index_file_size: context.index_file_size,
494 index_type: index_type.to_string(),
495 target_type,
496 target_key,
497 target_json,
498 blob_size,
499 meta_json,
500 node_id: context.node_id,
501 }
502}