1use std::convert::TryFrom;
16
17use common_base::range_read::RangeReader;
18use common_telemetry::warn;
19use greptime_proto::v1::index::{BloomFilterMeta, InvertedIndexMeta, InvertedIndexMetas};
20use index::bitmap::BitmapType;
21use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
22use index::fulltext_index::Config as FulltextConfig;
23use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
24use index::target::IndexTarget;
25use puffin::blob_metadata::BlobMetadata;
26use puffin::puffin_manager::{PuffinManager, PuffinReader};
27use serde_json::{Map, Value, json};
28use store_api::sst_entry::PuffinIndexMetaEntry;
29use store_api::storage::{ColumnId, RegionGroup, RegionId, RegionNumber, RegionSeq, TableId};
30
31use crate::cache::index::bloom_filter_index::{
32 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
33};
34use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
35use crate::sst::file::RegionIndexId;
36use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE as BLOOM_BLOB_TYPE;
37use crate::sst::index::fulltext_index::{
38 INDEX_BLOB_TYPE_BLOOM as FULLTEXT_BLOOM_BLOB_TYPE,
39 INDEX_BLOB_TYPE_TANTIVY as FULLTEXT_TANTIVY_BLOB_TYPE,
40};
41use crate::sst::index::inverted_index::INDEX_BLOB_TYPE as INVERTED_BLOB_TYPE;
42use crate::sst::index::puffin_manager::{SstPuffinManager, SstPuffinReader};
43
44const INDEX_TYPE_BLOOM: &str = "bloom_filter";
45const INDEX_TYPE_FULLTEXT_BLOOM: &str = "fulltext_bloom";
46const INDEX_TYPE_FULLTEXT_TANTIVY: &str = "fulltext_tantivy";
47const INDEX_TYPE_INVERTED: &str = "inverted";
48
49const TARGET_TYPE_UNKNOWN: &str = "unknown";
50
51const TARGET_TYPE_COLUMN: &str = "column";
52
53pub(crate) struct IndexEntryContext<'a> {
54 pub(crate) table_dir: &'a str,
55 pub(crate) index_file_path: &'a str,
56 pub(crate) region_id: RegionId,
57 pub(crate) table_id: TableId,
58 pub(crate) region_number: RegionNumber,
59 pub(crate) region_group: RegionGroup,
60 pub(crate) region_sequence: RegionSeq,
61 pub(crate) file_id: &'a str,
62 pub(crate) index_file_size: Option<u64>,
63 pub(crate) node_id: Option<u64>,
64}
65
66pub(crate) async fn collect_index_entries_from_puffin(
68 manager: SstPuffinManager,
69 region_index_id: RegionIndexId,
70 context: IndexEntryContext<'_>,
71 bloom_filter_cache: Option<BloomFilterIndexCacheRef>,
72 inverted_index_cache: Option<InvertedIndexCacheRef>,
73) -> Vec<PuffinIndexMetaEntry> {
74 let mut entries = Vec::new();
75
76 let reader = match manager.reader(®ion_index_id).await {
77 Ok(reader) => reader,
78 Err(err) => {
79 warn!(
80 err;
81 "Failed to open puffin index file, table_dir: {}, file_id: {}",
82 context.table_dir,
83 context.file_id
84 );
85 return entries;
86 }
87 };
88
89 let file_metadata = match reader.metadata().await {
90 Ok(metadata) => metadata,
91 Err(err) => {
92 warn!(
93 err;
94 "Failed to read puffin file metadata, table_dir: {}, file_id: {}",
95 context.table_dir,
96 context.file_id
97 );
98 return entries;
99 }
100 };
101
102 for blob in &file_metadata.blobs {
103 match BlobIndexTypeTargetKey::from_blob_type(&blob.blob_type) {
104 Some(BlobIndexTypeTargetKey::BloomFilter(target_key)) => {
105 let bloom_meta = try_read_bloom_meta(
106 &reader,
107 region_index_id,
108 blob.blob_type.as_str(),
109 target_key,
110 bloom_filter_cache.as_ref(),
111 Tag::Skipping,
112 &context,
113 )
114 .await;
115
116 let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
117 let (target_type, target_json) = decode_target_info(target_key);
118 let meta_json = build_meta_json(bloom_value, None, None);
119 let entry = build_index_entry(
120 &context,
121 INDEX_TYPE_BLOOM,
122 target_type,
123 target_key.to_string(),
124 target_json,
125 blob.length as u64,
126 meta_json,
127 );
128 entries.push(entry);
129 }
130 Some(BlobIndexTypeTargetKey::FulltextBloom(target_key)) => {
131 let bloom_meta = try_read_bloom_meta(
132 &reader,
133 region_index_id,
134 blob.blob_type.as_str(),
135 target_key,
136 bloom_filter_cache.as_ref(),
137 Tag::Fulltext,
138 &context,
139 )
140 .await;
141
142 let bloom_value = bloom_meta.as_ref().map(bloom_meta_value);
143 let fulltext_value = Some(fulltext_meta_value(blob));
144 let (target_type, target_json) = decode_target_info(target_key);
145 let meta_json = build_meta_json(bloom_value, fulltext_value, None);
146 let entry = build_index_entry(
147 &context,
148 INDEX_TYPE_FULLTEXT_BLOOM,
149 target_type,
150 target_key.to_string(),
151 target_json,
152 blob.length as u64,
153 meta_json,
154 );
155 entries.push(entry);
156 }
157 Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key)) => {
158 let fulltext_value = Some(fulltext_meta_value(blob));
159 let (target_type, target_json) = decode_target_info(target_key);
160 let meta_json = build_meta_json(None, fulltext_value, None);
161 let entry = build_index_entry(
162 &context,
163 INDEX_TYPE_FULLTEXT_TANTIVY,
164 target_type,
165 target_key.to_string(),
166 target_json,
167 blob.length as u64,
168 meta_json,
169 );
170 entries.push(entry);
171 }
172 Some(BlobIndexTypeTargetKey::Inverted) => {
173 let mut inverted_entries = collect_inverted_entries(
174 &reader,
175 region_index_id,
176 inverted_index_cache.as_ref(),
177 &context,
178 )
179 .await;
180 entries.append(&mut inverted_entries);
181 }
182 None => {}
183 }
184 }
185
186 entries
187}
188
189async fn collect_inverted_entries(
190 reader: &SstPuffinReader,
191 region_index_id: RegionIndexId,
192 cache: Option<&InvertedIndexCacheRef>,
193 context: &IndexEntryContext<'_>,
194) -> Vec<PuffinIndexMetaEntry> {
195 let file_id = region_index_id.file_id();
197
198 let guard = match reader.blob(INVERTED_BLOB_TYPE).await {
199 Ok(guard) => guard,
200 Err(err) => {
201 warn!(
202 err;
203 "Failed to open inverted index blob, table_dir: {}, file_id: {}",
204 context.table_dir,
205 context.file_id
206 );
207 return Vec::new();
208 }
209 };
210
211 let blob_reader = match guard.reader().await {
212 Ok(reader) => reader,
213 Err(err) => {
214 warn!(
215 err;
216 "Failed to build inverted index blob reader, table_dir: {}, file_id: {}",
217 context.table_dir,
218 context.file_id
219 );
220 return Vec::new();
221 }
222 };
223
224 let blob_size = blob_reader
225 .metadata()
226 .await
227 .ok()
228 .map(|meta| meta.content_length);
229 let metas = if let (Some(cache), Some(blob_size)) = (cache, blob_size) {
230 let reader = CachedInvertedIndexBlobReader::new(
231 file_id,
232 region_index_id.version,
233 blob_size,
234 InvertedIndexBlobReader::new(blob_reader),
235 cache.clone(),
236 );
237 match reader.metadata(None).await {
238 Ok(metas) => metas,
239 Err(err) => {
240 warn!(
241 err;
242 "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
243 context.table_dir,
244 context.file_id
245 );
246 return Vec::new();
247 }
248 }
249 } else {
250 let reader = InvertedIndexBlobReader::new(blob_reader);
251 match reader.metadata(None).await {
252 Ok(metas) => metas,
253 Err(err) => {
254 warn!(
255 err;
256 "Failed to read inverted index metadata, table_dir: {}, file_id: {}",
257 context.table_dir,
258 context.file_id
259 );
260 return Vec::new();
261 }
262 }
263 };
264
265 build_inverted_entries(context, metas.as_ref())
266}
267
268fn build_inverted_entries(
269 context: &IndexEntryContext<'_>,
270 metas: &InvertedIndexMetas,
271) -> Vec<PuffinIndexMetaEntry> {
272 let mut entries = Vec::new();
273 for (name, meta) in &metas.metas {
274 let (target_type, target_json) = decode_target_info(name);
275 let inverted_value = inverted_meta_value(meta, metas);
276 let meta_json = build_meta_json(None, None, Some(inverted_value));
277 let entry = build_index_entry(
278 context,
279 INDEX_TYPE_INVERTED,
280 target_type,
281 name.clone(),
282 target_json,
283 meta.inverted_index_size,
284 meta_json,
285 );
286 entries.push(entry);
287 }
288 entries
289}
290
291async fn try_read_bloom_meta(
292 reader: &SstPuffinReader,
293 region_index_id: RegionIndexId,
294 blob_type: &str,
295 target_key: &str,
296 cache: Option<&BloomFilterIndexCacheRef>,
297 tag: Tag,
298 context: &IndexEntryContext<'_>,
299) -> Option<BloomFilterMeta> {
300 let column_id = decode_column_id(target_key);
301
302 match reader.blob(blob_type).await {
304 Ok(guard) => match guard.reader().await {
305 Ok(blob_reader) => {
306 let blob_size = blob_reader
307 .metadata()
308 .await
309 .ok()
310 .map(|meta| meta.content_length);
311 let bloom_reader = BloomFilterReaderImpl::new(blob_reader);
312 let result = match (cache, column_id, blob_size) {
313 (Some(cache), Some(column_id), Some(blob_size)) => {
314 CachedBloomFilterIndexBlobReader::new(
315 region_index_id.file_id(),
316 region_index_id.version,
317 column_id,
318 tag,
319 blob_size,
320 bloom_reader,
321 cache.clone(),
322 )
323 .metadata(None)
324 .await
325 }
326 _ => bloom_reader.metadata(None).await,
327 };
328
329 match result {
330 Ok(meta) => Some(meta),
331 Err(err) => {
332 warn!(
333 err;
334 "Failed to read index metadata, table_dir: {}, file_id: {}, blob: {}",
335 context.table_dir,
336 context.file_id,
337 blob_type
338 );
339 None
340 }
341 }
342 }
343 Err(err) => {
344 warn!(
345 err;
346 "Failed to open index blob reader, table_dir: {}, file_id: {}, blob: {}",
347 context.table_dir,
348 context.file_id,
349 blob_type
350 );
351 None
352 }
353 },
354 Err(err) => {
355 warn!(
356 err;
357 "Failed to open index blob, table_dir: {}, file_id: {}, blob: {}",
358 context.table_dir,
359 context.file_id,
360 blob_type
361 );
362 None
363 }
364 }
365}
366
367fn decode_target_info(target_key: &str) -> (String, String) {
368 match IndexTarget::decode(target_key) {
369 Ok(IndexTarget::ColumnId(id)) => (
370 TARGET_TYPE_COLUMN.to_string(),
371 json!({ "column": id }).to_string(),
372 ),
373 _ => (
374 TARGET_TYPE_UNKNOWN.to_string(),
375 json!({ "error": "failed_to_decode" }).to_string(),
376 ),
377 }
378}
379
380fn decode_column_id(target_key: &str) -> Option<ColumnId> {
381 match IndexTarget::decode(target_key) {
382 Ok(IndexTarget::ColumnId(id)) => Some(id),
383 _ => None,
384 }
385}
386
387fn bloom_meta_value(meta: &BloomFilterMeta) -> Value {
388 json!({
389 "rows_per_segment": meta.rows_per_segment,
390 "segment_count": meta.segment_count,
391 "row_count": meta.row_count,
392 "bloom_filter_size": meta.bloom_filter_size,
393 })
394}
395
396fn fulltext_meta_value(blob: &BlobMetadata) -> Value {
397 let config = FulltextConfig::from_blob_metadata(blob).unwrap_or_default();
398 json!({
399 "analyzer": config.analyzer.to_str(),
400 "case_sensitive": config.case_sensitive,
401 })
402}
403
404fn inverted_meta_value(meta: &InvertedIndexMeta, metas: &InvertedIndexMetas) -> Value {
405 let bitmap_type = BitmapType::try_from(meta.bitmap_type)
406 .map(|bt| format!("{:?}", bt))
407 .unwrap_or_else(|_| meta.bitmap_type.to_string());
408 json!({
409 "bitmap_type": bitmap_type,
410 "base_offset": meta.base_offset,
411 "inverted_index_size": meta.inverted_index_size,
412 "relative_fst_offset": meta.relative_fst_offset,
413 "fst_size": meta.fst_size,
414 "relative_null_bitmap_offset": meta.relative_null_bitmap_offset,
415 "null_bitmap_size": meta.null_bitmap_size,
416 "segment_row_count": metas.segment_row_count,
417 "total_row_count": metas.total_row_count,
418 })
419}
420
421fn build_meta_json(
422 bloom: Option<Value>,
423 fulltext: Option<Value>,
424 inverted: Option<Value>,
425) -> Option<String> {
426 let mut map = Map::new();
427 if let Some(value) = bloom {
428 map.insert("bloom".to_string(), value);
429 }
430 if let Some(value) = fulltext {
431 map.insert("fulltext".to_string(), value);
432 }
433 if let Some(value) = inverted {
434 map.insert("inverted".to_string(), value);
435 }
436 if map.is_empty() {
437 None
438 } else {
439 Some(Value::Object(map).to_string())
440 }
441}
442
443enum BlobIndexTypeTargetKey<'a> {
444 BloomFilter(&'a str),
445 FulltextBloom(&'a str),
446 FulltextTantivy(&'a str),
447 Inverted,
448}
449
450impl<'a> BlobIndexTypeTargetKey<'a> {
451 fn from_blob_type(blob_type: &'a str) -> Option<Self> {
452 if let Some(target_key) = Self::target_key_from_blob(blob_type, BLOOM_BLOB_TYPE) {
453 Some(BlobIndexTypeTargetKey::BloomFilter(target_key))
454 } else if let Some(target_key) =
455 Self::target_key_from_blob(blob_type, FULLTEXT_BLOOM_BLOB_TYPE)
456 {
457 Some(BlobIndexTypeTargetKey::FulltextBloom(target_key))
458 } else if let Some(target_key) =
459 Self::target_key_from_blob(blob_type, FULLTEXT_TANTIVY_BLOB_TYPE)
460 {
461 Some(BlobIndexTypeTargetKey::FulltextTantivy(target_key))
462 } else if blob_type == INVERTED_BLOB_TYPE {
463 Some(BlobIndexTypeTargetKey::Inverted)
464 } else {
465 None
466 }
467 }
468
469 fn target_key_from_blob(blob_type: &'a str, prefix: &str) -> Option<&'a str> {
470 blob_type
472 .strip_prefix(prefix)
473 .and_then(|suffix| suffix.strip_prefix('-'))
474 }
475}
476
477fn build_index_entry(
478 context: &IndexEntryContext<'_>,
479 index_type: &str,
480 target_type: String,
481 target_key: String,
482 target_json: String,
483 blob_size: u64,
484 meta_json: Option<String>,
485) -> PuffinIndexMetaEntry {
486 PuffinIndexMetaEntry {
487 table_dir: context.table_dir.to_string(),
488 index_file_path: context.index_file_path.to_string(),
489 region_id: context.region_id,
490 table_id: context.table_id,
491 region_number: context.region_number,
492 region_group: context.region_group,
493 region_sequence: context.region_sequence,
494 file_id: context.file_id.to_string(),
495 index_file_size: context.index_file_size,
496 index_type: index_type.to_string(),
497 target_type,
498 target_key,
499 target_json,
500 blob_size,
501 meta_json,
502 node_id: context.node_id,
503 }
504}