index/inverted_index/format/
reader.rs1use std::collections::VecDeque;
16use std::ops::Range;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use bytes::Bytes;
22use greptime_proto::v1::index::InvertedIndexMetas;
23use snafu::ResultExt;
24
25use crate::bitmap::{Bitmap, BitmapType};
26use crate::inverted_index::FstMap;
27use crate::inverted_index::error::{DecodeBitmapSnafu, DecodeFstSnafu, Result};
28pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
29
30mod blob;
31mod footer;
32
33#[derive(Default, Clone)]
35pub struct InvertedIndexReadMetrics {
36 pub total_bytes: u64,
38 pub total_ranges: usize,
40 pub fetch_elapsed: Duration,
42 pub cache_hit: usize,
44 pub cache_miss: usize,
46}
47
48impl std::fmt::Debug for InvertedIndexReadMetrics {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 let Self {
51 total_bytes,
52 total_ranges,
53 fetch_elapsed,
54 cache_hit,
55 cache_miss,
56 } = self;
57
58 if *total_bytes == 0 && *cache_hit == 0 {
60 return write!(f, "{{}}");
61 }
62 write!(f, "{{")?;
63
64 if *total_bytes > 0 {
65 write!(f, "\"total_bytes\":{}", total_bytes)?;
66 }
67 if *cache_hit > 0 {
68 if *total_bytes > 0 {
69 write!(f, ", ")?;
70 }
71 write!(f, "\"cache_hit\":{}", cache_hit)?;
72 }
73
74 if *total_ranges > 0 {
75 write!(f, ", \"total_ranges\":{}", total_ranges)?;
76 }
77 if !fetch_elapsed.is_zero() {
78 write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?;
79 }
80 if *cache_miss > 0 {
81 write!(f, ", \"cache_miss\":{}", cache_miss)?;
82 }
83
84 write!(f, "}}")
85 }
86}
87
88impl InvertedIndexReadMetrics {
89 pub fn merge_from(&mut self, other: &Self) {
91 self.total_bytes += other.total_bytes;
92 self.total_ranges += other.total_ranges;
93 self.fetch_elapsed += other.fetch_elapsed;
94 self.cache_hit += other.cache_hit;
95 self.cache_miss += other.cache_miss;
96 }
97}
98
99#[mockall::automock]
101#[async_trait]
102pub trait InvertedIndexReader: Send + Sync {
103 async fn range_read<'a>(
105 &self,
106 offset: u64,
107 size: u32,
108 metrics: Option<&'a mut InvertedIndexReadMetrics>,
109 ) -> Result<Vec<u8>>;
110
111 async fn read_vec<'a>(
113 &self,
114 ranges: &[Range<u64>],
115 metrics: Option<&'a mut InvertedIndexReadMetrics>,
116 ) -> Result<Vec<Bytes>>;
117
118 async fn metadata<'a>(
120 &self,
121 metrics: Option<&'a mut InvertedIndexReadMetrics>,
122 ) -> Result<Arc<InvertedIndexMetas>>;
123
124 async fn fst<'a>(
126 &self,
127 offset: u64,
128 size: u32,
129 metrics: Option<&'a mut InvertedIndexReadMetrics>,
130 ) -> Result<FstMap> {
131 let fst_data = self.range_read(offset, size, metrics).await?;
132 FstMap::new(fst_data).context(DecodeFstSnafu)
133 }
134
135 async fn fst_vec<'a>(
137 &mut self,
138 ranges: &[Range<u64>],
139 metrics: Option<&'a mut InvertedIndexReadMetrics>,
140 ) -> Result<Vec<FstMap>> {
141 self.read_vec(ranges, metrics)
142 .await?
143 .into_iter()
144 .map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu))
145 .collect::<Result<Vec<_>>>()
146 }
147
148 async fn bitmap<'a>(
150 &self,
151 offset: u64,
152 size: u32,
153 bitmap_type: BitmapType,
154 metrics: Option<&'a mut InvertedIndexReadMetrics>,
155 ) -> Result<Bitmap> {
156 self.range_read(offset, size, metrics)
157 .await
158 .and_then(|bytes| {
159 Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
160 })
161 }
162
163 async fn bitmap_deque<'a>(
165 &mut self,
166 ranges: &[(Range<u64>, BitmapType)],
167 metrics: Option<&'a mut InvertedIndexReadMetrics>,
168 ) -> Result<VecDeque<Bitmap>> {
169 let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip();
170 let bytes = self.read_vec(&ranges, metrics).await?;
171 bytes
172 .into_iter()
173 .zip(types)
174 .map(|(bytes, bitmap_type)| {
175 Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
176 })
177 .collect::<Result<VecDeque<_>>>()
178 }
179}