1use std::ops::Range;
18use std::sync::Arc;
19
20use crate::sst::parquet::helper::MERGE_GAP;
21
22#[derive(Default, Debug, Clone)]
24pub struct ParquetFetchMetricsData {
25 pub page_cache_hit: usize,
27 pub write_cache_hit: usize,
29 pub cache_miss: usize,
31 pub pages_to_fetch_mem: usize,
33 pub page_size_to_fetch_mem: u64,
35 pub pages_to_fetch_write_cache: usize,
37 pub page_size_to_fetch_write_cache: u64,
39 pub pages_to_fetch_store: usize,
41 pub page_size_to_fetch_store: u64,
43 pub page_size_needed: u64,
45 pub write_cache_fetch_elapsed: std::time::Duration,
47 pub store_fetch_elapsed: std::time::Duration,
49 pub total_fetch_elapsed: std::time::Duration,
51 pub prefilter_cost: std::time::Duration,
53 pub prefilter_filtered_rows: usize,
55}
56
57impl ParquetFetchMetricsData {
58 fn is_empty(&self) -> bool {
60 self.total_fetch_elapsed.is_zero() && self.prefilter_cost.is_zero()
61 }
62}
63
64#[derive(Default, Clone)]
66pub struct ParquetFetchMetrics {
67 pub data: Arc<std::sync::Mutex<ParquetFetchMetricsData>>,
68}
69
70impl std::fmt::Debug for ParquetFetchMetrics {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 let data = self.data.lock().unwrap();
73 if data.is_empty() {
74 return write!(f, "{{}}");
75 }
76
77 let ParquetFetchMetricsData {
78 page_cache_hit,
79 write_cache_hit,
80 cache_miss,
81 pages_to_fetch_mem,
82 page_size_to_fetch_mem,
83 pages_to_fetch_write_cache,
84 page_size_to_fetch_write_cache,
85 pages_to_fetch_store,
86 page_size_to_fetch_store,
87 page_size_needed,
88 write_cache_fetch_elapsed,
89 store_fetch_elapsed,
90 total_fetch_elapsed,
91 prefilter_cost,
92 prefilter_filtered_rows,
93 } = *data;
94
95 write!(f, "{{")?;
96
97 write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
98
99 if page_cache_hit > 0 {
100 write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
101 }
102 if write_cache_hit > 0 {
103 write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
104 }
105 if cache_miss > 0 {
106 write!(f, ", \"cache_miss\":{}", cache_miss)?;
107 }
108 if pages_to_fetch_mem > 0 {
109 write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
110 }
111 if page_size_to_fetch_mem > 0 {
112 write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
113 }
114 if pages_to_fetch_write_cache > 0 {
115 write!(
116 f,
117 ", \"pages_to_fetch_write_cache\":{}",
118 pages_to_fetch_write_cache
119 )?;
120 }
121 if page_size_to_fetch_write_cache > 0 {
122 write!(
123 f,
124 ", \"page_size_to_fetch_write_cache\":{}",
125 page_size_to_fetch_write_cache
126 )?;
127 }
128 if pages_to_fetch_store > 0 {
129 write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
130 }
131 if page_size_to_fetch_store > 0 {
132 write!(
133 f,
134 ", \"page_size_to_fetch_store\":{}",
135 page_size_to_fetch_store
136 )?;
137 }
138 if page_size_needed > 0 {
139 write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
140 }
141 if !write_cache_fetch_elapsed.is_zero() {
142 write!(
143 f,
144 ", \"write_cache_fetch_elapsed\":\"{:?}\"",
145 write_cache_fetch_elapsed
146 )?;
147 }
148 if !store_fetch_elapsed.is_zero() {
149 write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
150 }
151 if !prefilter_cost.is_zero() {
152 write!(f, ", \"prefilter_cost\":\"{:?}\"", prefilter_cost)?;
153 }
154 if prefilter_filtered_rows > 0 {
155 write!(
156 f,
157 ", \"prefilter_filtered_rows\":{}",
158 prefilter_filtered_rows
159 )?;
160 }
161
162 write!(f, "}}")
163 }
164}
165
166impl ParquetFetchMetrics {
167 pub fn is_empty(&self) -> bool {
169 self.data.lock().unwrap().is_empty()
170 }
171
172 pub fn merge_from(&self, other: &ParquetFetchMetrics) {
174 let ParquetFetchMetricsData {
175 page_cache_hit,
176 write_cache_hit,
177 cache_miss,
178 pages_to_fetch_mem,
179 page_size_to_fetch_mem,
180 pages_to_fetch_write_cache,
181 page_size_to_fetch_write_cache,
182 pages_to_fetch_store,
183 page_size_to_fetch_store,
184 page_size_needed,
185 write_cache_fetch_elapsed,
186 store_fetch_elapsed,
187 total_fetch_elapsed,
188 prefilter_cost,
189 prefilter_filtered_rows,
190 } = *other.data.lock().unwrap();
191
192 let mut data = self.data.lock().unwrap();
193 data.page_cache_hit += page_cache_hit;
194 data.write_cache_hit += write_cache_hit;
195 data.cache_miss += cache_miss;
196 data.pages_to_fetch_mem += pages_to_fetch_mem;
197 data.page_size_to_fetch_mem += page_size_to_fetch_mem;
198 data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
199 data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
200 data.pages_to_fetch_store += pages_to_fetch_store;
201 data.page_size_to_fetch_store += page_size_to_fetch_store;
202 data.page_size_needed += page_size_needed;
203 data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
204 data.store_fetch_elapsed += store_fetch_elapsed;
205 data.total_fetch_elapsed += total_fetch_elapsed;
206 data.prefilter_cost += prefilter_cost;
207 data.prefilter_filtered_rows += prefilter_filtered_rows;
208 }
209}
210
211pub(crate) fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
217 if ranges.is_empty() {
218 return (0, 0);
219 }
220
221 let gap = MERGE_GAP as u64;
222 let mut sorted_ranges = ranges.to_vec();
223 sorted_ranges.sort_unstable_by_key(|a| a.start);
224
225 let mut total_size_aligned = 0;
226 let mut total_size_unaligned = 0;
227 let mut cur = sorted_ranges[0].clone();
228
229 for range in sorted_ranges.into_iter().skip(1) {
230 if range.start <= cur.end + gap {
231 cur.end = cur.end.max(range.end);
233 } else {
234 let range_size = cur.end - cur.start;
236 total_size_aligned += align_to_pooled_buf_size(range_size);
237 total_size_unaligned += range_size;
238 cur = range;
239 }
240 }
241
242 let range_size = cur.end - cur.start;
244 total_size_aligned += align_to_pooled_buf_size(range_size);
245 total_size_unaligned += range_size;
246
247 (total_size_aligned, total_size_unaligned)
248}
249
250fn align_to_pooled_buf_size(size: u64) -> u64 {
255 const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
256 size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
257}