mito2/sst/parquet/
async_reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Async file reader implementation for SST parquet files.
16
17use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use futures::FutureExt;
22use futures::future::BoxFuture;
23use object_store::ObjectStore;
24use parquet::arrow::async_reader::AsyncFileReader;
25use parquet::errors::{ParquetError, Result as ParquetResult};
26use parquet::file::metadata::ParquetMetaData;
27
28use crate::cache::file_cache::{FileType, IndexKey};
29use crate::cache::{CacheStrategy, PageKey, PageValue};
30use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
31use crate::sst::file::RegionFileId;
32use crate::sst::parquet::helper::fetch_byte_ranges;
33use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
34
35/// An [AsyncFileReader] implementation for SST parquet files.
36///
37/// This reader provides async byte access to parquet data in object storage,
38/// with caching support (page cache and write cache).
39pub struct SstAsyncFileReader {
40    /// Region file ID for cache key.
41    region_file_id: RegionFileId,
42    /// Path to the parquet file in object storage.
43    file_path: String,
44    /// Object store for reading data.
45    object_store: ObjectStore,
46    /// Cache strategy for reading pages.
47    cache_strategy: CacheStrategy,
48    /// Cached parquet metadata.
49    metadata: Arc<ParquetMetaData>,
50    /// Row group index for cache key.
51    row_group_idx: usize,
52    /// Optional metrics for tracking fetch operations.
53    fetch_metrics: Option<ParquetFetchMetrics>,
54}
55
56impl SstAsyncFileReader {
57    /// Creates a new [SstAsyncFileReader].
58    pub fn new(
59        region_file_id: RegionFileId,
60        file_path: String,
61        object_store: ObjectStore,
62        cache_strategy: CacheStrategy,
63        metadata: Arc<ParquetMetaData>,
64        row_group_idx: usize,
65    ) -> Self {
66        Self {
67            region_file_id,
68            file_path,
69            object_store,
70            cache_strategy,
71            metadata,
72            row_group_idx,
73            fetch_metrics: None,
74        }
75    }
76
77    /// Sets the fetch metrics.
78    pub fn with_fetch_metrics(mut self, metrics: Option<ParquetFetchMetrics>) -> Self {
79        self.fetch_metrics = metrics;
80        self
81    }
82
83    /// Fetches byte ranges from page cache, write cache, or object store.
84    async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> ParquetResult<Vec<Bytes>> {
85        let fetch_start = self
86            .fetch_metrics
87            .as_ref()
88            .map(|_| std::time::Instant::now());
89        let _timer = READ_STAGE_FETCH_PAGES.start_timer();
90
91        let page_key = PageKey::new(
92            self.region_file_id.file_id(),
93            self.row_group_idx,
94            ranges.clone(),
95        );
96
97        // Check page cache first.
98        if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
99            if let Some(metrics) = &self.fetch_metrics {
100                let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
101                let mut metrics_data = metrics.data.lock().unwrap();
102                metrics_data.page_cache_hit += 1;
103                metrics_data.pages_to_fetch_mem += ranges.len();
104                metrics_data.page_size_to_fetch_mem += total_size;
105                metrics_data.page_size_needed += total_size;
106                if let Some(start) = fetch_start {
107                    metrics_data.total_fetch_elapsed += start.elapsed();
108                }
109            }
110            return Ok(pages.compressed.clone());
111        }
112
113        // Calculate total range size for metrics.
114        let (total_range_size, unaligned_size) = compute_total_range_size(&ranges);
115
116        // Check write cache.
117        let key = IndexKey::new(
118            self.region_file_id.region_id(),
119            self.region_file_id.file_id(),
120            FileType::Parquet,
121        );
122        let fetch_write_cache_start = self
123            .fetch_metrics
124            .as_ref()
125            .map(|_| std::time::Instant::now());
126        let write_cache_result = self.fetch_ranges_from_write_cache(key, &ranges).await;
127
128        let pages = match write_cache_result {
129            Some(data) => {
130                if let Some(metrics) = &self.fetch_metrics {
131                    let elapsed = fetch_write_cache_start
132                        .map(|start| start.elapsed())
133                        .unwrap_or_default();
134                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
135                    let mut metrics_data = metrics.data.lock().unwrap();
136                    metrics_data.write_cache_fetch_elapsed += elapsed;
137                    metrics_data.write_cache_hit += 1;
138                    metrics_data.pages_to_fetch_write_cache += ranges.len();
139                    metrics_data.page_size_to_fetch_write_cache += unaligned_size;
140                    metrics_data.page_size_needed += range_size_needed;
141                }
142                data
143            }
144            None => {
145                // Fetch data from object store.
146                let _timer = READ_STAGE_ELAPSED
147                    .with_label_values(&["cache_miss_read"])
148                    .start_timer();
149
150                let start = self
151                    .fetch_metrics
152                    .as_ref()
153                    .map(|_| std::time::Instant::now());
154                let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
155                    .await
156                    .map_err(|e| ParquetError::External(Box::new(e)))?;
157
158                if let Some(metrics) = &self.fetch_metrics {
159                    let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
160                    let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
161                    let mut metrics_data = metrics.data.lock().unwrap();
162                    metrics_data.store_fetch_elapsed += elapsed;
163                    metrics_data.cache_miss += 1;
164                    metrics_data.pages_to_fetch_store += ranges.len();
165                    metrics_data.page_size_to_fetch_store += unaligned_size;
166                    metrics_data.page_size_needed += range_size_needed;
167                }
168                data
169            }
170        };
171
172        // Put pages back to the cache.
173        let page_value = PageValue::new(pages.clone(), total_range_size);
174        self.cache_strategy
175            .put_pages(page_key, Arc::new(page_value));
176
177        if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
178            metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
179        }
180
181        Ok(pages)
182    }
183
184    /// Fetches data from write cache.
185    /// Returns `None` if the data is not in the cache.
186    async fn fetch_ranges_from_write_cache(
187        &self,
188        key: IndexKey,
189        ranges: &[Range<u64>],
190    ) -> Option<Vec<Bytes>> {
191        if let Some(cache) = self.cache_strategy.write_cache() {
192            return cache.file_cache().read_ranges(key, ranges).await;
193        }
194        None
195    }
196}
197
198impl AsyncFileReader for SstAsyncFileReader {
199    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
200        async move {
201            let mut result = self.fetch_bytes_with_cache(vec![range]).await?;
202            Ok(result.pop().unwrap_or_default())
203        }
204        .boxed()
205    }
206
207    fn get_byte_ranges(
208        &mut self,
209        ranges: Vec<Range<u64>>,
210    ) -> BoxFuture<'_, ParquetResult<Vec<Bytes>>> {
211        async move { self.fetch_bytes_with_cache(ranges).await }.boxed()
212    }
213
214    fn get_metadata(
215        &mut self,
216        _options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>,
217    ) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
218        // Metadata is already cached, return it immediately.
219        std::future::ready(Ok(self.metadata.clone())).boxed()
220    }
221}