Skip to main content

mito2/sst/parquet/
row_group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet row group reading utilities.
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use crate::sst::parquet::helper::MERGE_GAP;
21
22/// Inner data for ParquetFetchMetrics.
23#[derive(Default, Debug, Clone)]
24pub struct ParquetFetchMetricsData {
25    /// Number of page cache hits.
26    pub page_cache_hit: usize,
27    /// Number of write cache hits.
28    pub write_cache_hit: usize,
29    /// Number of cache misses.
30    pub cache_miss: usize,
31    /// Number of pages to fetch from mem cache.
32    pub pages_to_fetch_mem: usize,
33    /// Total size in bytes of pages to fetch from mem cache.
34    pub page_size_to_fetch_mem: u64,
35    /// Number of pages to fetch from write cache.
36    pub pages_to_fetch_write_cache: usize,
37    /// Total size in bytes of pages to fetch from write cache.
38    pub page_size_to_fetch_write_cache: u64,
39    /// Number of pages to fetch from store.
40    pub pages_to_fetch_store: usize,
41    /// Total size in bytes of pages to fetch from store.
42    pub page_size_to_fetch_store: u64,
43    /// Total size in bytes of pages actually returned.
44    pub page_size_needed: u64,
45    /// Elapsed time fetching from write cache.
46    pub write_cache_fetch_elapsed: std::time::Duration,
47    /// Elapsed time fetching from object store.
48    pub store_fetch_elapsed: std::time::Duration,
49    /// Total elapsed time for fetching row groups.
50    pub total_fetch_elapsed: std::time::Duration,
51    /// Elapsed time for prefilter execution.
52    pub prefilter_cost: std::time::Duration,
53    /// Number of rows filtered out by prefiltering.
54    pub prefilter_filtered_rows: usize,
55}
56
57impl ParquetFetchMetricsData {
58    /// Returns true if the metrics are empty (contain no meaningful data).
59    fn is_empty(&self) -> bool {
60        self.total_fetch_elapsed.is_zero() && self.prefilter_cost.is_zero()
61    }
62}
63
64/// Metrics for tracking page/row group fetch operations.
65#[derive(Default, Clone)]
66pub struct ParquetFetchMetrics {
67    pub data: Arc<std::sync::Mutex<ParquetFetchMetricsData>>,
68}
69
70impl std::fmt::Debug for ParquetFetchMetrics {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        let data = self.data.lock().unwrap();
73        if data.is_empty() {
74            return write!(f, "{{}}");
75        }
76
77        let ParquetFetchMetricsData {
78            page_cache_hit,
79            write_cache_hit,
80            cache_miss,
81            pages_to_fetch_mem,
82            page_size_to_fetch_mem,
83            pages_to_fetch_write_cache,
84            page_size_to_fetch_write_cache,
85            pages_to_fetch_store,
86            page_size_to_fetch_store,
87            page_size_needed,
88            write_cache_fetch_elapsed,
89            store_fetch_elapsed,
90            total_fetch_elapsed,
91            prefilter_cost,
92            prefilter_filtered_rows,
93        } = *data;
94
95        write!(f, "{{")?;
96
97        write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
98
99        if page_cache_hit > 0 {
100            write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
101        }
102        if write_cache_hit > 0 {
103            write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
104        }
105        if cache_miss > 0 {
106            write!(f, ", \"cache_miss\":{}", cache_miss)?;
107        }
108        if pages_to_fetch_mem > 0 {
109            write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
110        }
111        if page_size_to_fetch_mem > 0 {
112            write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
113        }
114        if pages_to_fetch_write_cache > 0 {
115            write!(
116                f,
117                ", \"pages_to_fetch_write_cache\":{}",
118                pages_to_fetch_write_cache
119            )?;
120        }
121        if page_size_to_fetch_write_cache > 0 {
122            write!(
123                f,
124                ", \"page_size_to_fetch_write_cache\":{}",
125                page_size_to_fetch_write_cache
126            )?;
127        }
128        if pages_to_fetch_store > 0 {
129            write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
130        }
131        if page_size_to_fetch_store > 0 {
132            write!(
133                f,
134                ", \"page_size_to_fetch_store\":{}",
135                page_size_to_fetch_store
136            )?;
137        }
138        if page_size_needed > 0 {
139            write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
140        }
141        if !write_cache_fetch_elapsed.is_zero() {
142            write!(
143                f,
144                ", \"write_cache_fetch_elapsed\":\"{:?}\"",
145                write_cache_fetch_elapsed
146            )?;
147        }
148        if !store_fetch_elapsed.is_zero() {
149            write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
150        }
151        if !prefilter_cost.is_zero() {
152            write!(f, ", \"prefilter_cost\":\"{:?}\"", prefilter_cost)?;
153        }
154        if prefilter_filtered_rows > 0 {
155            write!(
156                f,
157                ", \"prefilter_filtered_rows\":{}",
158                prefilter_filtered_rows
159            )?;
160        }
161
162        write!(f, "}}")
163    }
164}
165
166impl ParquetFetchMetrics {
167    /// Returns true if the metrics are empty (contain no meaningful data).
168    pub fn is_empty(&self) -> bool {
169        self.data.lock().unwrap().is_empty()
170    }
171
172    /// Merges metrics from another [ParquetFetchMetrics].
173    pub fn merge_from(&self, other: &ParquetFetchMetrics) {
174        let ParquetFetchMetricsData {
175            page_cache_hit,
176            write_cache_hit,
177            cache_miss,
178            pages_to_fetch_mem,
179            page_size_to_fetch_mem,
180            pages_to_fetch_write_cache,
181            page_size_to_fetch_write_cache,
182            pages_to_fetch_store,
183            page_size_to_fetch_store,
184            page_size_needed,
185            write_cache_fetch_elapsed,
186            store_fetch_elapsed,
187            total_fetch_elapsed,
188            prefilter_cost,
189            prefilter_filtered_rows,
190        } = *other.data.lock().unwrap();
191
192        let mut data = self.data.lock().unwrap();
193        data.page_cache_hit += page_cache_hit;
194        data.write_cache_hit += write_cache_hit;
195        data.cache_miss += cache_miss;
196        data.pages_to_fetch_mem += pages_to_fetch_mem;
197        data.page_size_to_fetch_mem += page_size_to_fetch_mem;
198        data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
199        data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
200        data.pages_to_fetch_store += pages_to_fetch_store;
201        data.page_size_to_fetch_store += page_size_to_fetch_store;
202        data.page_size_needed += page_size_needed;
203        data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
204        data.store_fetch_elapsed += store_fetch_elapsed;
205        data.total_fetch_elapsed += total_fetch_elapsed;
206        data.prefilter_cost += prefilter_cost;
207        data.prefilter_filtered_rows += prefilter_filtered_rows;
208    }
209}
210
211/// Computes the max possible buffer size to read the given `ranges`.
212/// Returns (aligned_size, unaligned_size) where:
213/// - aligned_size: total size aligned to pooled buffer size
214/// - unaligned_size: actual total size without alignment
215// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
216pub(crate) fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
217    if ranges.is_empty() {
218        return (0, 0);
219    }
220
221    let gap = MERGE_GAP as u64;
222    let mut sorted_ranges = ranges.to_vec();
223    sorted_ranges.sort_unstable_by_key(|a| a.start);
224
225    let mut total_size_aligned = 0;
226    let mut total_size_unaligned = 0;
227    let mut cur = sorted_ranges[0].clone();
228
229    for range in sorted_ranges.into_iter().skip(1) {
230        if range.start <= cur.end + gap {
231            // There is an overlap or the gap is small enough to merge
232            cur.end = cur.end.max(range.end);
233        } else {
234            // No overlap and the gap is too large, add current range to total and start a new one
235            let range_size = cur.end - cur.start;
236            total_size_aligned += align_to_pooled_buf_size(range_size);
237            total_size_unaligned += range_size;
238            cur = range;
239        }
240    }
241
242    // Add the last range
243    let range_size = cur.end - cur.start;
244    total_size_aligned += align_to_pooled_buf_size(range_size);
245    total_size_unaligned += range_size;
246
247    (total_size_aligned, total_size_unaligned)
248}
249
250/// Aligns the given size to the multiple of the pooled buffer size.
251// See:
252// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178
253// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46
254fn align_to_pooled_buf_size(size: u64) -> u64 {
255    const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
256    size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
257}