Skip to main content

mito2/sst/parquet/
push_decoder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Push decoder stream implementation for SST parquet files.
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use datatypes::arrow::record_batch::RecordBatch;
22use futures::StreamExt;
23use futures::stream::BoxStream;
24use object_store::ObjectStore;
25use parquet::DecodeResult;
26use parquet::arrow::ProjectionMask;
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection};
28use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
29use snafu::ResultExt;
30
31use crate::cache::file_cache::{FileType, IndexKey};
32use crate::cache::{CacheStrategy, PageKey, PageValue};
33use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result};
34use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
35use crate::sst::file::RegionFileId;
36use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
37use crate::sst::parquet::helper::fetch_byte_ranges;
38use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
39
40/// Fetches parquet byte ranges through Greptime's cache hierarchy.
41///
42/// The push decoder decides which ranges are required for decoding, while this
43/// fetcher keeps cache lookup, local write-cache reads, and remote I/O explicit
44/// in Greptime code.
45pub(crate) struct SstParquetRangeFetcher {
46    /// Region file ID for cache key.
47    region_file_id: RegionFileId,
48    /// Path to the parquet file in object storage.
49    file_path: String,
50    /// Object store for reading data.
51    object_store: ObjectStore,
52    /// Cache strategy for reading pages.
53    cache_strategy: CacheStrategy,
54    /// Row group index for cache key.
55    row_group_idx: usize,
56    /// Optional metrics for tracking fetch operations.
57    fetch_metrics: Option<ParquetFetchMetrics>,
58}
59
60impl SstParquetRangeFetcher {
61    /// Creates a new [SstParquetRangeFetcher].
62    pub(crate) fn new(
63        region_file_id: RegionFileId,
64        file_path: String,
65        object_store: ObjectStore,
66        cache_strategy: CacheStrategy,
67        row_group_idx: usize,
68        fetch_metrics: Option<ParquetFetchMetrics>,
69    ) -> Self {
70        Self {
71            region_file_id,
72            file_path,
73            object_store,
74            cache_strategy,
75            row_group_idx,
76            fetch_metrics,
77        }
78    }
79
80    /// Fetches byte ranges from page cache, write cache, or object store.
81    async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
82        let fetch_start = self
83            .fetch_metrics
84            .as_ref()
85            .map(|_| std::time::Instant::now());
86        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
87
88        let page_key = PageKey::new(
89            self.region_file_id.file_id(),
90            self.row_group_idx,
91            ranges.clone(),
92        );
93
94        // Check page cache first.
95        if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
96            if let Some(metrics) = &self.fetch_metrics {
97                let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
98                let mut metrics_data = metrics.data.lock().unwrap();
99                metrics_data.page_cache_hit += 1;
100                metrics_data.pages_to_fetch_mem += ranges.len();
101                metrics_data.page_size_to_fetch_mem += total_size;
102                metrics_data.page_size_needed += total_size;
103                if let Some(start) = fetch_start {
104                    metrics_data.total_fetch_elapsed += start.elapsed();
105                }
106            }
107            return Ok(pages.compressed.clone());
108        }
109
110        // Calculate total range size for metrics.
111        let (total_range_size, unaligned_size) = compute_total_range_size(&ranges);
112
113        // Check write cache.
114        let key = IndexKey::new(
115            self.region_file_id.region_id(),
116            self.region_file_id.file_id(),
117            FileType::Parquet,
118        );
119        let fetch_write_cache_start = self
120            .fetch_metrics
121            .as_ref()
122            .map(|_| std::time::Instant::now());
123        let write_cache_result = match self.cache_strategy.write_cache() {
124            Some(cache) => cache.file_cache().read_ranges(key, &ranges).await,
125            None => None,
126        };
127
128        let pages = match write_cache_result {
129            Some(data) => {
130                if let Some(metrics) = &self.fetch_metrics {
131                    let elapsed = fetch_write_cache_start
132                        .map(|start| start.elapsed())
133                        .unwrap_or_default();
134                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
135                    let mut metrics_data = metrics.data.lock().unwrap();
136                    metrics_data.write_cache_fetch_elapsed += elapsed;
137                    metrics_data.write_cache_hit += 1;
138                    metrics_data.pages_to_fetch_write_cache += ranges.len();
139                    metrics_data.page_size_to_fetch_write_cache += unaligned_size;
140                    metrics_data.page_size_needed += range_size_needed;
141                }
142                data
143            }
144            None => {
145                // Fetch data from object store.
146                let _timer = READ_STAGE_ELAPSED
147                    .with_label_values(&["cache_miss_read"])
148                    .start_timer();
149
150                let start = self
151                    .fetch_metrics
152                    .as_ref()
153                    .map(|_| std::time::Instant::now());
154                let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
155                    .await
156                    .context(OpenDalSnafu)?;
157
158                if let Some(metrics) = &self.fetch_metrics {
159                    let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
160                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
161                    let mut metrics_data = metrics.data.lock().unwrap();
162                    metrics_data.store_fetch_elapsed += elapsed;
163                    metrics_data.cache_miss += 1;
164                    metrics_data.pages_to_fetch_store += ranges.len();
165                    metrics_data.page_size_to_fetch_store += unaligned_size;
166                    metrics_data.page_size_needed += range_size_needed;
167                }
168                data
169            }
170        };
171
172        // Put pages back to the cache.
173        let page_value = PageValue::new(pages.clone(), total_range_size);
174        self.cache_strategy
175            .put_pages(page_key, Arc::new(page_value));
176
177        if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
178            metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
179        }
180
181        Ok(pages)
182    }
183}
184
185/// Builds a parquet record batch stream driven directly by [ParquetPushDecoderBuilder].
186pub(crate) fn build_sst_parquet_record_batch_stream(
187    arrow_metadata: ArrowReaderMetadata,
188    row_group_idx: usize,
189    row_selection: Option<RowSelection>,
190    projection: ProjectionMask,
191    fetcher: SstParquetRangeFetcher,
192    file_path: String,
193) -> Result<BoxStream<'static, Result<RecordBatch>>> {
194    let mut builder = ParquetPushDecoderBuilder::new_with_metadata(arrow_metadata)
195        .with_row_groups(vec![row_group_idx])
196        .with_projection(projection)
197        .with_batch_size(DEFAULT_READ_BATCH_SIZE);
198
199    if let Some(selection) = row_selection {
200        builder = builder.with_row_selection(selection);
201    }
202
203    let mut decoder = builder
204        .build()
205        .context(ReadParquetSnafu { path: &file_path })?;
206
207    Ok(async_stream::try_stream! {
208        loop {
209            match decoder.try_decode().context(ReadParquetSnafu { path: &file_path })? {
210                DecodeResult::NeedsData(ranges) => {
211                    let data = fetcher.fetch_bytes_with_cache(ranges.clone()).await?;
212                    decoder
213                        .push_ranges(ranges, data)
214                        .context(ReadParquetSnafu { path: &file_path })?;
215                }
216                DecodeResult::Data(batch) => yield batch,
217                DecodeResult::Finished => break,
218            }
219        }
220    }
221    .boxed())
222}