1use std::ops::Range;
18
19use bytes::{Bytes, BytesMut};
20use datatypes::arrow::record_batch::RecordBatch;
21use futures::StreamExt;
22use futures::stream::BoxStream;
23use object_store::ObjectStore;
24use parquet::DecodeResult;
25use parquet::arrow::ProjectionMask;
26use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection};
27use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
28use snafu::{ResultExt, ensure};
29
30use crate::cache::file_cache::{FileType, IndexKey};
31use crate::cache::{CacheStrategy, PageRangePart};
32use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
33use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
34use crate::sst::file::RegionFileId;
35use crate::sst::parquet::helper::fetch_byte_ranges;
36use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
37
38pub struct SstParquetRangeFetcher {
44 region_file_id: RegionFileId,
46 file_path: String,
48 object_store: ObjectStore,
50 cache_strategy: CacheStrategy,
52 row_group_idx: usize,
54 fetch_metrics: Option<ParquetFetchMetrics>,
56}
57
58impl SstParquetRangeFetcher {
59 pub fn new(
61 region_file_id: RegionFileId,
62 file_path: String,
63 object_store: ObjectStore,
64 cache_strategy: CacheStrategy,
65 row_group_idx: usize,
66 fetch_metrics: Option<ParquetFetchMetrics>,
67 ) -> Self {
68 Self {
69 region_file_id,
70 file_path,
71 object_store,
72 cache_strategy,
73 row_group_idx,
74 fetch_metrics,
75 }
76 }
77
78 async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
80 let fetch_start = self
81 .fetch_metrics
82 .as_ref()
83 .map(|_| std::time::Instant::now());
84 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
85
86 let mut page_lookup = self.cache_strategy.get_page_ranges(
87 self.region_file_id.file_id(),
88 self.row_group_idx,
89 &ranges,
90 );
91 if let Some(lookup) = &page_lookup
92 && lookup.cached_bytes > 0
93 && let Some(metrics) = &self.fetch_metrics
94 {
95 let mut metrics_data = metrics.data.lock().unwrap();
96 metrics_data.page_cache_hit += 1;
97 metrics_data.pages_to_fetch_mem += lookup.cached_range_count;
98 metrics_data.page_size_to_fetch_mem += lookup.cached_bytes;
99 metrics_data.page_size_needed += lookup.cached_bytes;
100 }
101
102 if page_lookup
104 .as_ref()
105 .map(|lookup| lookup.is_fully_cached())
106 .unwrap_or(false)
107 {
108 let lookup = page_lookup.take().unwrap();
109 if let Some(metrics) = &self.fetch_metrics
110 && let Some(start) = fetch_start
111 {
112 metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
113 }
114 return assemble_ranges(&ranges, lookup.cached_parts, &[]);
115 }
116
117 let missing_ranges = page_lookup
118 .as_ref()
119 .map(|lookup| lookup.missing_ranges.clone())
120 .unwrap_or_else(|| ranges.clone());
121
122 let (_, unaligned_size) = compute_total_range_size(&missing_ranges);
124
125 let key = IndexKey::new(
127 self.region_file_id.region_id(),
128 self.region_file_id.file_id(),
129 FileType::Parquet,
130 );
131 let fetch_write_cache_start = self
132 .fetch_metrics
133 .as_ref()
134 .map(|_| std::time::Instant::now());
135 let write_cache_result = match self.cache_strategy.write_cache() {
136 Some(cache) => cache.file_cache().read_ranges(key, &missing_ranges).await,
137 None => None,
138 };
139
140 let fetched_pages = match write_cache_result {
141 Some(data) => {
142 if let Some(metrics) = &self.fetch_metrics {
143 let elapsed = fetch_write_cache_start
144 .map(|start| start.elapsed())
145 .unwrap_or_default();
146 let range_size_needed: u64 =
147 missing_ranges.iter().map(|r| r.end - r.start).sum();
148 let mut metrics_data = metrics.data.lock().unwrap();
149 metrics_data.write_cache_fetch_elapsed += elapsed;
150 metrics_data.write_cache_hit += 1;
151 metrics_data.pages_to_fetch_write_cache += missing_ranges.len();
152 metrics_data.page_size_to_fetch_write_cache += unaligned_size;
153 metrics_data.page_size_needed += range_size_needed;
154 }
155 data
156 }
157 None => {
158 let _timer = READ_STAGE_ELAPSED
160 .with_label_values(&["cache_miss_read"])
161 .start_timer();
162
163 let start = self
164 .fetch_metrics
165 .as_ref()
166 .map(|_| std::time::Instant::now());
167 let data =
168 fetch_byte_ranges(&self.file_path, self.object_store.clone(), &missing_ranges)
169 .await
170 .context(OpenDalSnafu)?;
171
172 if let Some(metrics) = &self.fetch_metrics {
173 let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
174 let range_size_needed: u64 =
175 missing_ranges.iter().map(|r| r.end - r.start).sum();
176 let mut metrics_data = metrics.data.lock().unwrap();
177 metrics_data.store_fetch_elapsed += elapsed;
178 metrics_data.cache_miss += 1;
179 metrics_data.pages_to_fetch_store += missing_ranges.len();
180 metrics_data.page_size_to_fetch_store += unaligned_size;
181 metrics_data.page_size_needed += range_size_needed;
182 }
183 data
184 }
185 };
186 ensure!(
187 fetched_pages.len() == missing_ranges.len(),
188 UnexpectedSnafu {
189 reason: format!(
190 "Invalid parquet range fetch: {} missing ranges but {} fetched byte ranges",
191 missing_ranges.len(),
192 fetched_pages.len()
193 ),
194 }
195 );
196
197 self.cache_strategy.put_page_ranges(
198 self.region_file_id.file_id(),
199 self.row_group_idx,
200 &missing_ranges,
201 &fetched_pages,
202 );
203
204 if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
205 metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
206 }
207
208 if let Some(lookup) = page_lookup {
209 let fetched_parts = missing_ranges
210 .into_iter()
211 .zip(fetched_pages)
212 .map(|(range, bytes)| PageRangePart { range, bytes })
213 .collect::<Vec<_>>();
214 return assemble_ranges(&ranges, lookup.cached_parts, &fetched_parts);
215 }
216
217 Ok(fetched_pages)
218 }
219}
220
221fn assemble_ranges(
222 ranges: &[Range<u64>],
223 cached_parts: Vec<Vec<PageRangePart>>,
224 fetched_parts: &[PageRangePart],
225) -> Result<Vec<Bytes>> {
226 ensure!(
227 ranges.len() == cached_parts.len(),
228 UnexpectedSnafu {
229 reason: format!(
230 "Invalid parquet range assembly: {} requested ranges but {} cached part groups",
231 ranges.len(),
232 cached_parts.len()
233 ),
234 }
235 );
236
237 ranges
238 .iter()
239 .zip(cached_parts)
240 .map(|(range, mut parts)| {
241 parts.extend(
242 fetched_parts
243 .iter()
244 .filter_map(|part| overlapping_part(range, part)),
245 );
246 assemble_range(range, parts)
247 })
248 .collect()
249}
250
251fn overlapping_part(range: &Range<u64>, part: &PageRangePart) -> Option<PageRangePart> {
252 let start = range.start.max(part.range.start);
253 let end = range.end.min(part.range.end);
254 if start >= end {
255 return None;
256 }
257
258 let slice_start = (start - part.range.start) as usize;
259 let slice_end = (end - part.range.start) as usize;
260 Some(PageRangePart {
261 range: start..end,
262 bytes: part.bytes.slice(slice_start..slice_end),
263 })
264}
265
266fn assemble_range(range: &Range<u64>, mut parts: Vec<PageRangePart>) -> Result<Bytes> {
267 if range.start >= range.end {
268 return Ok(Bytes::new());
269 }
270
271 parts.sort_unstable_by_key(|part| part.range.start);
272 if parts.len() == 1 && parts[0].range == *range {
273 return Ok(parts.pop().unwrap().bytes);
274 }
275
276 let mut cursor = range.start;
277 let mut output = BytesMut::with_capacity((range.end - range.start) as usize);
278 for part in parts {
279 ensure!(
280 part.range.start <= cursor,
281 UnexpectedSnafu {
282 reason: format!(
283 "Missing cached parquet bytes for range {}..{}, next part starts at {}",
284 range.start, range.end, part.range.start
285 ),
286 }
287 );
288 if part.range.end <= cursor {
289 continue;
290 }
291
292 let slice_start = (cursor - part.range.start) as usize;
293 let slice_end = (part.range.end.min(range.end) - part.range.start) as usize;
294 output.extend_from_slice(&part.bytes.slice(slice_start..slice_end));
295 cursor = part.range.end.min(range.end);
296 if cursor >= range.end {
297 break;
298 }
299 }
300
301 ensure!(
302 cursor == range.end,
303 UnexpectedSnafu {
304 reason: format!(
305 "Missing cached parquet bytes for range {}..{}, assembled through {}",
306 range.start, range.end, cursor
307 ),
308 }
309 );
310
311 Ok(output.freeze())
312}
313
314pub fn build_sst_parquet_record_batch_stream(
316 arrow_metadata: ArrowReaderMetadata,
317 row_group_idx: usize,
318 row_selection: Option<RowSelection>,
319 projection: ProjectionMask,
320 fetcher: SstParquetRangeFetcher,
321 file_path: String,
322 batch_size: usize,
323) -> Result<BoxStream<'static, Result<RecordBatch>>> {
324 let mut builder = ParquetPushDecoderBuilder::new_with_metadata(arrow_metadata)
325 .with_row_groups(vec![row_group_idx])
326 .with_projection(projection)
327 .with_batch_size(batch_size);
328
329 if let Some(selection) = row_selection {
330 builder = builder.with_row_selection(selection);
331 }
332
333 let mut decoder = builder
334 .build()
335 .context(ReadParquetSnafu { path: &file_path })?;
336
337 Ok(async_stream::try_stream! {
338 loop {
339 match decoder.try_decode().context(ReadParquetSnafu { path: &file_path })? {
340 DecodeResult::NeedsData(ranges) => {
341 let data = fetcher.fetch_bytes_with_cache(ranges.clone()).await?;
342 decoder
343 .push_ranges(ranges, data)
344 .context(ReadParquetSnafu { path: &file_path })?;
345 }
346 DecodeResult::Data(batch) => yield batch,
347 DecodeResult::Finished => break,
348 }
349 }
350 }
351 .boxed())
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn test_assemble_range_from_cached_subrange_and_fetched_tail() {
360 let cached_parts = vec![vec![PageRangePart {
361 range: 400..500,
362 bytes: Bytes::from(vec![1; 100]),
363 }]];
364 let fetched_parts = vec![PageRangePart {
365 range: 500..600,
366 bytes: Bytes::from(vec![2; 100]),
367 }];
368
369 let requested = 400..600;
370 let output = assemble_ranges(
371 std::slice::from_ref(&requested),
372 cached_parts,
373 &fetched_parts,
374 )
375 .unwrap();
376 assert_eq!(1, output.len());
377 assert_eq!(vec![1; 100].as_slice(), &output[0][..100]);
378 assert_eq!(vec![2; 100].as_slice(), &output[0][100..]);
379 }
380
381 #[test]
382 fn test_assemble_range_returns_single_covering_part_without_copy() {
383 let bytes = Bytes::from_static(b"abcdef");
384 let cached_parts = vec![vec![PageRangePart {
385 range: 10..16,
386 bytes: bytes.clone(),
387 }]];
388
389 let requested = 10..16;
390 let output = assemble_ranges(std::slice::from_ref(&requested), cached_parts, &[]).unwrap();
391 assert_eq!(bytes, output[0]);
392 }
393
394 #[test]
395 fn test_assemble_range_clamps_overlapping_part_to_requested_end() {
396 let parts = vec![PageRangePart {
397 range: 0..10,
398 bytes: Bytes::from_static(b"0123456789"),
399 }];
400
401 let output = assemble_range(&(2..5), parts).unwrap();
402 assert_eq!(Bytes::from_static(b"234"), output);
403 }
404}