Skip to main content

mito2/sst/parquet/
push_decoder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Push decoder stream implementation for SST parquet files.
16
17use std::ops::Range;
18
19use bytes::{Bytes, BytesMut};
20use datatypes::arrow::record_batch::RecordBatch;
21use futures::StreamExt;
22use futures::stream::BoxStream;
23use object_store::ObjectStore;
24use parquet::DecodeResult;
25use parquet::arrow::ProjectionMask;
26use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection};
27use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
28use snafu::{ResultExt, ensure};
29
30use crate::cache::file_cache::{FileType, IndexKey};
31use crate::cache::{CacheStrategy, PageRangePart};
32use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
33use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
34use crate::sst::file::RegionFileId;
35use crate::sst::parquet::helper::fetch_byte_ranges;
36use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
37
38/// Fetches parquet byte ranges through Greptime's cache hierarchy.
39///
40/// The push decoder decides which ranges are required for decoding, while this
41/// fetcher keeps cache lookup, local write-cache reads, and remote I/O explicit
42/// in Greptime code.
43pub struct SstParquetRangeFetcher {
44    /// Region file ID for cache key.
45    region_file_id: RegionFileId,
46    /// Path to the parquet file in object storage.
47    file_path: String,
48    /// Object store for reading data.
49    object_store: ObjectStore,
50    /// Cache strategy for reading pages.
51    cache_strategy: CacheStrategy,
52    /// Row group index for cache key.
53    row_group_idx: usize,
54    /// Optional metrics for tracking fetch operations.
55    fetch_metrics: Option<ParquetFetchMetrics>,
56}
57
58impl SstParquetRangeFetcher {
59    /// Creates a new [SstParquetRangeFetcher].
60    pub fn new(
61        region_file_id: RegionFileId,
62        file_path: String,
63        object_store: ObjectStore,
64        cache_strategy: CacheStrategy,
65        row_group_idx: usize,
66        fetch_metrics: Option<ParquetFetchMetrics>,
67    ) -> Self {
68        Self {
69            region_file_id,
70            file_path,
71            object_store,
72            cache_strategy,
73            row_group_idx,
74            fetch_metrics,
75        }
76    }
77
78    /// Fetches byte ranges from page cache, write cache, or object store.
79    async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
80        let fetch_start = self
81            .fetch_metrics
82            .as_ref()
83            .map(|_| std::time::Instant::now());
84        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
85
86        let mut page_lookup = self.cache_strategy.get_page_ranges(
87            self.region_file_id.file_id(),
88            self.row_group_idx,
89            &ranges,
90        );
91        if let Some(lookup) = &page_lookup
92            && lookup.cached_bytes > 0
93            && let Some(metrics) = &self.fetch_metrics
94        {
95            let mut metrics_data = metrics.data.lock().unwrap();
96            metrics_data.page_cache_hit += 1;
97            metrics_data.pages_to_fetch_mem += lookup.cached_range_count;
98            metrics_data.page_size_to_fetch_mem += lookup.cached_bytes;
99            metrics_data.page_size_needed += lookup.cached_bytes;
100        }
101
102        // Fast path: all requested ranges can be assembled from cached fragments.
103        if page_lookup
104            .as_ref()
105            .map(|lookup| lookup.is_fully_cached())
106            .unwrap_or(false)
107        {
108            let lookup = page_lookup.take().unwrap();
109            if let Some(metrics) = &self.fetch_metrics
110                && let Some(start) = fetch_start
111            {
112                metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
113            }
114            return assemble_ranges(&ranges, lookup.cached_parts, &[]);
115        }
116
117        let missing_ranges = page_lookup
118            .as_ref()
119            .map(|lookup| lookup.missing_ranges.clone())
120            .unwrap_or_else(|| ranges.clone());
121
122        // Calculate total range size for metrics.
123        let (_, unaligned_size) = compute_total_range_size(&missing_ranges);
124
125        // Check write cache.
126        let key = IndexKey::new(
127            self.region_file_id.region_id(),
128            self.region_file_id.file_id(),
129            FileType::Parquet,
130        );
131        let fetch_write_cache_start = self
132            .fetch_metrics
133            .as_ref()
134            .map(|_| std::time::Instant::now());
135        let write_cache_result = match self.cache_strategy.write_cache() {
136            Some(cache) => cache.file_cache().read_ranges(key, &missing_ranges).await,
137            None => None,
138        };
139
140        let fetched_pages = match write_cache_result {
141            Some(data) => {
142                if let Some(metrics) = &self.fetch_metrics {
143                    let elapsed = fetch_write_cache_start
144                        .map(|start| start.elapsed())
145                        .unwrap_or_default();
146                    let range_size_needed: u64 =
147                        missing_ranges.iter().map(|r| r.end - r.start).sum();
148                    let mut metrics_data = metrics.data.lock().unwrap();
149                    metrics_data.write_cache_fetch_elapsed += elapsed;
150                    metrics_data.write_cache_hit += 1;
151                    metrics_data.pages_to_fetch_write_cache += missing_ranges.len();
152                    metrics_data.page_size_to_fetch_write_cache += unaligned_size;
153                    metrics_data.page_size_needed += range_size_needed;
154                }
155                data
156            }
157            None => {
158                // Fetch data from object store.
159                let _timer = READ_STAGE_ELAPSED
160                    .with_label_values(&["cache_miss_read"])
161                    .start_timer();
162
163                let start = self
164                    .fetch_metrics
165                    .as_ref()
166                    .map(|_| std::time::Instant::now());
167                let data =
168                    fetch_byte_ranges(&self.file_path, self.object_store.clone(), &missing_ranges)
169                        .await
170                        .context(OpenDalSnafu)?;
171
172                if let Some(metrics) = &self.fetch_metrics {
173                    let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
174                    let range_size_needed: u64 =
175                        missing_ranges.iter().map(|r| r.end - r.start).sum();
176                    let mut metrics_data = metrics.data.lock().unwrap();
177                    metrics_data.store_fetch_elapsed += elapsed;
178                    metrics_data.cache_miss += 1;
179                    metrics_data.pages_to_fetch_store += missing_ranges.len();
180                    metrics_data.page_size_to_fetch_store += unaligned_size;
181                    metrics_data.page_size_needed += range_size_needed;
182                }
183                data
184            }
185        };
186        ensure!(
187            fetched_pages.len() == missing_ranges.len(),
188            UnexpectedSnafu {
189                reason: format!(
190                    "Invalid parquet range fetch: {} missing ranges but {} fetched byte ranges",
191                    missing_ranges.len(),
192                    fetched_pages.len()
193                ),
194            }
195        );
196
197        self.cache_strategy.put_page_ranges(
198            self.region_file_id.file_id(),
199            self.row_group_idx,
200            &missing_ranges,
201            &fetched_pages,
202        );
203
204        if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
205            metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
206        }
207
208        if let Some(lookup) = page_lookup {
209            let fetched_parts = missing_ranges
210                .into_iter()
211                .zip(fetched_pages)
212                .map(|(range, bytes)| PageRangePart { range, bytes })
213                .collect::<Vec<_>>();
214            return assemble_ranges(&ranges, lookup.cached_parts, &fetched_parts);
215        }
216
217        Ok(fetched_pages)
218    }
219}
220
221fn assemble_ranges(
222    ranges: &[Range<u64>],
223    cached_parts: Vec<Vec<PageRangePart>>,
224    fetched_parts: &[PageRangePart],
225) -> Result<Vec<Bytes>> {
226    ensure!(
227        ranges.len() == cached_parts.len(),
228        UnexpectedSnafu {
229            reason: format!(
230                "Invalid parquet range assembly: {} requested ranges but {} cached part groups",
231                ranges.len(),
232                cached_parts.len()
233            ),
234        }
235    );
236
237    ranges
238        .iter()
239        .zip(cached_parts)
240        .map(|(range, mut parts)| {
241            parts.extend(
242                fetched_parts
243                    .iter()
244                    .filter_map(|part| overlapping_part(range, part)),
245            );
246            assemble_range(range, parts)
247        })
248        .collect()
249}
250
251fn overlapping_part(range: &Range<u64>, part: &PageRangePart) -> Option<PageRangePart> {
252    let start = range.start.max(part.range.start);
253    let end = range.end.min(part.range.end);
254    if start >= end {
255        return None;
256    }
257
258    let slice_start = (start - part.range.start) as usize;
259    let slice_end = (end - part.range.start) as usize;
260    Some(PageRangePart {
261        range: start..end,
262        bytes: part.bytes.slice(slice_start..slice_end),
263    })
264}
265
266fn assemble_range(range: &Range<u64>, mut parts: Vec<PageRangePart>) -> Result<Bytes> {
267    if range.start >= range.end {
268        return Ok(Bytes::new());
269    }
270
271    parts.sort_unstable_by_key(|part| part.range.start);
272    if parts.len() == 1 && parts[0].range == *range {
273        return Ok(parts.pop().unwrap().bytes);
274    }
275
276    let mut cursor = range.start;
277    let mut output = BytesMut::with_capacity((range.end - range.start) as usize);
278    for part in parts {
279        ensure!(
280            part.range.start <= cursor,
281            UnexpectedSnafu {
282                reason: format!(
283                    "Missing cached parquet bytes for range {}..{}, next part starts at {}",
284                    range.start, range.end, part.range.start
285                ),
286            }
287        );
288        if part.range.end <= cursor {
289            continue;
290        }
291
292        let slice_start = (cursor - part.range.start) as usize;
293        let slice_end = (part.range.end.min(range.end) - part.range.start) as usize;
294        output.extend_from_slice(&part.bytes.slice(slice_start..slice_end));
295        cursor = part.range.end.min(range.end);
296        if cursor >= range.end {
297            break;
298        }
299    }
300
301    ensure!(
302        cursor == range.end,
303        UnexpectedSnafu {
304            reason: format!(
305                "Missing cached parquet bytes for range {}..{}, assembled through {}",
306                range.start, range.end, cursor
307            ),
308        }
309    );
310
311    Ok(output.freeze())
312}
313
314/// Builds a parquet record batch stream driven directly by [ParquetPushDecoderBuilder].
315pub fn build_sst_parquet_record_batch_stream(
316    arrow_metadata: ArrowReaderMetadata,
317    row_group_idx: usize,
318    row_selection: Option<RowSelection>,
319    projection: ProjectionMask,
320    fetcher: SstParquetRangeFetcher,
321    file_path: String,
322    batch_size: usize,
323) -> Result<BoxStream<'static, Result<RecordBatch>>> {
324    let mut builder = ParquetPushDecoderBuilder::new_with_metadata(arrow_metadata)
325        .with_row_groups(vec![row_group_idx])
326        .with_projection(projection)
327        .with_batch_size(batch_size);
328
329    if let Some(selection) = row_selection {
330        builder = builder.with_row_selection(selection);
331    }
332
333    let mut decoder = builder
334        .build()
335        .context(ReadParquetSnafu { path: &file_path })?;
336
337    Ok(async_stream::try_stream! {
338        loop {
339            match decoder.try_decode().context(ReadParquetSnafu { path: &file_path })? {
340                DecodeResult::NeedsData(ranges) => {
341                    let data = fetcher.fetch_bytes_with_cache(ranges.clone()).await?;
342                    decoder
343                        .push_ranges(ranges, data)
344                        .context(ReadParquetSnafu { path: &file_path })?;
345                }
346                DecodeResult::Data(batch) => yield batch,
347                DecodeResult::Finished => break,
348            }
349        }
350    }
351    .boxed())
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_assemble_range_from_cached_subrange_and_fetched_tail() {
360        let cached_parts = vec![vec![PageRangePart {
361            range: 400..500,
362            bytes: Bytes::from(vec![1; 100]),
363        }]];
364        let fetched_parts = vec![PageRangePart {
365            range: 500..600,
366            bytes: Bytes::from(vec![2; 100]),
367        }];
368
369        let requested = 400..600;
370        let output = assemble_ranges(
371            std::slice::from_ref(&requested),
372            cached_parts,
373            &fetched_parts,
374        )
375        .unwrap();
376        assert_eq!(1, output.len());
377        assert_eq!(vec![1; 100].as_slice(), &output[0][..100]);
378        assert_eq!(vec![2; 100].as_slice(), &output[0][100..]);
379    }
380
381    #[test]
382    fn test_assemble_range_returns_single_covering_part_without_copy() {
383        let bytes = Bytes::from_static(b"abcdef");
384        let cached_parts = vec![vec![PageRangePart {
385            range: 10..16,
386            bytes: bytes.clone(),
387        }]];
388
389        let requested = 10..16;
390        let output = assemble_ranges(std::slice::from_ref(&requested), cached_parts, &[]).unwrap();
391        assert_eq!(bytes, output[0]);
392    }
393
394    #[test]
395    fn test_assemble_range_clamps_overlapping_part_to_requested_end() {
396        let parts = vec![PageRangePart {
397            range: 0..10,
398            bytes: Bytes::from_static(b"0123456789"),
399        }];
400
401        let output = assemble_range(&(2..5), parts).unwrap();
402        assert_eq!(Bytes::from_static(b"234"), output);
403    }
404}