mito2/sst/parquet/
async_reader.rs1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use futures::FutureExt;
22use futures::future::BoxFuture;
23use object_store::ObjectStore;
24use parquet::arrow::async_reader::AsyncFileReader;
25use parquet::errors::{ParquetError, Result as ParquetResult};
26use parquet::file::metadata::ParquetMetaData;
27
28use crate::cache::file_cache::{FileType, IndexKey};
29use crate::cache::{CacheStrategy, PageKey, PageValue};
30use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
31use crate::sst::file::RegionFileId;
32use crate::sst::parquet::helper::fetch_byte_ranges;
33use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
34
35pub struct SstAsyncFileReader {
40 region_file_id: RegionFileId,
42 file_path: String,
44 object_store: ObjectStore,
46 cache_strategy: CacheStrategy,
48 metadata: Arc<ParquetMetaData>,
50 row_group_idx: usize,
52 fetch_metrics: Option<ParquetFetchMetrics>,
54}
55
56impl SstAsyncFileReader {
57 pub fn new(
59 region_file_id: RegionFileId,
60 file_path: String,
61 object_store: ObjectStore,
62 cache_strategy: CacheStrategy,
63 metadata: Arc<ParquetMetaData>,
64 row_group_idx: usize,
65 ) -> Self {
66 Self {
67 region_file_id,
68 file_path,
69 object_store,
70 cache_strategy,
71 metadata,
72 row_group_idx,
73 fetch_metrics: None,
74 }
75 }
76
77 pub fn with_fetch_metrics(mut self, metrics: Option<ParquetFetchMetrics>) -> Self {
79 self.fetch_metrics = metrics;
80 self
81 }
82
83 async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> ParquetResult<Vec<Bytes>> {
85 let fetch_start = self
86 .fetch_metrics
87 .as_ref()
88 .map(|_| std::time::Instant::now());
89 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
90
91 let page_key = PageKey::new(
92 self.region_file_id.file_id(),
93 self.row_group_idx,
94 ranges.clone(),
95 );
96
97 if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
99 if let Some(metrics) = &self.fetch_metrics {
100 let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
101 let mut metrics_data = metrics.data.lock().unwrap();
102 metrics_data.page_cache_hit += 1;
103 metrics_data.pages_to_fetch_mem += ranges.len();
104 metrics_data.page_size_to_fetch_mem += total_size;
105 metrics_data.page_size_needed += total_size;
106 if let Some(start) = fetch_start {
107 metrics_data.total_fetch_elapsed += start.elapsed();
108 }
109 }
110 return Ok(pages.compressed.clone());
111 }
112
113 let (total_range_size, unaligned_size) = compute_total_range_size(&ranges);
115
116 let key = IndexKey::new(
118 self.region_file_id.region_id(),
119 self.region_file_id.file_id(),
120 FileType::Parquet,
121 );
122 let fetch_write_cache_start = self
123 .fetch_metrics
124 .as_ref()
125 .map(|_| std::time::Instant::now());
126 let write_cache_result = self.fetch_ranges_from_write_cache(key, &ranges).await;
127
128 let pages = match write_cache_result {
129 Some(data) => {
130 if let Some(metrics) = &self.fetch_metrics {
131 let elapsed = fetch_write_cache_start
132 .map(|start| start.elapsed())
133 .unwrap_or_default();
134 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
135 let mut metrics_data = metrics.data.lock().unwrap();
136 metrics_data.write_cache_fetch_elapsed += elapsed;
137 metrics_data.write_cache_hit += 1;
138 metrics_data.pages_to_fetch_write_cache += ranges.len();
139 metrics_data.page_size_to_fetch_write_cache += unaligned_size;
140 metrics_data.page_size_needed += range_size_needed;
141 }
142 data
143 }
144 None => {
145 let _timer = READ_STAGE_ELAPSED
147 .with_label_values(&["cache_miss_read"])
148 .start_timer();
149
150 let start = self
151 .fetch_metrics
152 .as_ref()
153 .map(|_| std::time::Instant::now());
154 let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
155 .await
156 .map_err(|e| ParquetError::External(Box::new(e)))?;
157
158 if let Some(metrics) = &self.fetch_metrics {
159 let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
160 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
161 let mut metrics_data = metrics.data.lock().unwrap();
162 metrics_data.store_fetch_elapsed += elapsed;
163 metrics_data.cache_miss += 1;
164 metrics_data.pages_to_fetch_store += ranges.len();
165 metrics_data.page_size_to_fetch_store += unaligned_size;
166 metrics_data.page_size_needed += range_size_needed;
167 }
168 data
169 }
170 };
171
172 let page_value = PageValue::new(pages.clone(), total_range_size);
174 self.cache_strategy
175 .put_pages(page_key, Arc::new(page_value));
176
177 if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
178 metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
179 }
180
181 Ok(pages)
182 }
183
184 async fn fetch_ranges_from_write_cache(
187 &self,
188 key: IndexKey,
189 ranges: &[Range<u64>],
190 ) -> Option<Vec<Bytes>> {
191 if let Some(cache) = self.cache_strategy.write_cache() {
192 return cache.file_cache().read_ranges(key, ranges).await;
193 }
194 None
195 }
196}
197
198impl AsyncFileReader for SstAsyncFileReader {
199 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
200 async move {
201 let mut result = self.fetch_bytes_with_cache(vec![range]).await?;
202 Ok(result.pop().unwrap_or_default())
203 }
204 .boxed()
205 }
206
207 fn get_byte_ranges(
208 &mut self,
209 ranges: Vec<Range<u64>>,
210 ) -> BoxFuture<'_, ParquetResult<Vec<Bytes>>> {
211 async move { self.fetch_bytes_with_cache(ranges).await }.boxed()
212 }
213
214 fn get_metadata(
215 &mut self,
216 _options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>,
217 ) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
218 std::future::ready(Ok(self.metadata.clone())).boxed()
220 }
221}