mito2/sst/parquet/
push_decoder.rs1use std::ops::Range;
18use std::sync::Arc;
19
20use bytes::Bytes;
21use datatypes::arrow::record_batch::RecordBatch;
22use futures::StreamExt;
23use futures::stream::BoxStream;
24use object_store::ObjectStore;
25use parquet::DecodeResult;
26use parquet::arrow::ProjectionMask;
27use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection};
28use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
29use snafu::ResultExt;
30
31use crate::cache::file_cache::{FileType, IndexKey};
32use crate::cache::{CacheStrategy, PageKey, PageValue};
33use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result};
34use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
35use crate::sst::file::RegionFileId;
36use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
37use crate::sst::parquet::helper::fetch_byte_ranges;
38use crate::sst::parquet::row_group::{ParquetFetchMetrics, compute_total_range_size};
39
40pub(crate) struct SstParquetRangeFetcher {
46 region_file_id: RegionFileId,
48 file_path: String,
50 object_store: ObjectStore,
52 cache_strategy: CacheStrategy,
54 row_group_idx: usize,
56 fetch_metrics: Option<ParquetFetchMetrics>,
58}
59
60impl SstParquetRangeFetcher {
61 pub(crate) fn new(
63 region_file_id: RegionFileId,
64 file_path: String,
65 object_store: ObjectStore,
66 cache_strategy: CacheStrategy,
67 row_group_idx: usize,
68 fetch_metrics: Option<ParquetFetchMetrics>,
69 ) -> Self {
70 Self {
71 region_file_id,
72 file_path,
73 object_store,
74 cache_strategy,
75 row_group_idx,
76 fetch_metrics,
77 }
78 }
79
80 async fn fetch_bytes_with_cache(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
82 let fetch_start = self
83 .fetch_metrics
84 .as_ref()
85 .map(|_| std::time::Instant::now());
86 let _timer = READ_STAGE_FETCH_PAGES.start_timer();
87
88 let page_key = PageKey::new(
89 self.region_file_id.file_id(),
90 self.row_group_idx,
91 ranges.clone(),
92 );
93
94 if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
96 if let Some(metrics) = &self.fetch_metrics {
97 let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
98 let mut metrics_data = metrics.data.lock().unwrap();
99 metrics_data.page_cache_hit += 1;
100 metrics_data.pages_to_fetch_mem += ranges.len();
101 metrics_data.page_size_to_fetch_mem += total_size;
102 metrics_data.page_size_needed += total_size;
103 if let Some(start) = fetch_start {
104 metrics_data.total_fetch_elapsed += start.elapsed();
105 }
106 }
107 return Ok(pages.compressed.clone());
108 }
109
110 let (total_range_size, unaligned_size) = compute_total_range_size(&ranges);
112
113 let key = IndexKey::new(
115 self.region_file_id.region_id(),
116 self.region_file_id.file_id(),
117 FileType::Parquet,
118 );
119 let fetch_write_cache_start = self
120 .fetch_metrics
121 .as_ref()
122 .map(|_| std::time::Instant::now());
123 let write_cache_result = match self.cache_strategy.write_cache() {
124 Some(cache) => cache.file_cache().read_ranges(key, &ranges).await,
125 None => None,
126 };
127
128 let pages = match write_cache_result {
129 Some(data) => {
130 if let Some(metrics) = &self.fetch_metrics {
131 let elapsed = fetch_write_cache_start
132 .map(|start| start.elapsed())
133 .unwrap_or_default();
134 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
135 let mut metrics_data = metrics.data.lock().unwrap();
136 metrics_data.write_cache_fetch_elapsed += elapsed;
137 metrics_data.write_cache_hit += 1;
138 metrics_data.pages_to_fetch_write_cache += ranges.len();
139 metrics_data.page_size_to_fetch_write_cache += unaligned_size;
140 metrics_data.page_size_needed += range_size_needed;
141 }
142 data
143 }
144 None => {
145 let _timer = READ_STAGE_ELAPSED
147 .with_label_values(&["cache_miss_read"])
148 .start_timer();
149
150 let start = self
151 .fetch_metrics
152 .as_ref()
153 .map(|_| std::time::Instant::now());
154 let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
155 .await
156 .context(OpenDalSnafu)?;
157
158 if let Some(metrics) = &self.fetch_metrics {
159 let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
160 let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
161 let mut metrics_data = metrics.data.lock().unwrap();
162 metrics_data.store_fetch_elapsed += elapsed;
163 metrics_data.cache_miss += 1;
164 metrics_data.pages_to_fetch_store += ranges.len();
165 metrics_data.page_size_to_fetch_store += unaligned_size;
166 metrics_data.page_size_needed += range_size_needed;
167 }
168 data
169 }
170 };
171
172 let page_value = PageValue::new(pages.clone(), total_range_size);
174 self.cache_strategy
175 .put_pages(page_key, Arc::new(page_value));
176
177 if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
178 metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
179 }
180
181 Ok(pages)
182 }
183}
184
185pub(crate) fn build_sst_parquet_record_batch_stream(
187 arrow_metadata: ArrowReaderMetadata,
188 row_group_idx: usize,
189 row_selection: Option<RowSelection>,
190 projection: ProjectionMask,
191 fetcher: SstParquetRangeFetcher,
192 file_path: String,
193) -> Result<BoxStream<'static, Result<RecordBatch>>> {
194 let mut builder = ParquetPushDecoderBuilder::new_with_metadata(arrow_metadata)
195 .with_row_groups(vec![row_group_idx])
196 .with_projection(projection)
197 .with_batch_size(DEFAULT_READ_BATCH_SIZE);
198
199 if let Some(selection) = row_selection {
200 builder = builder.with_row_selection(selection);
201 }
202
203 let mut decoder = builder
204 .build()
205 .context(ReadParquetSnafu { path: &file_path })?;
206
207 Ok(async_stream::try_stream! {
208 loop {
209 match decoder.try_decode().context(ReadParquetSnafu { path: &file_path })? {
210 DecodeResult::NeedsData(ranges) => {
211 let data = fetcher.fetch_bytes_with_cache(ranges.clone()).await?;
212 decoder
213 .push_ranges(ranges, data)
214 .context(ReadParquetSnafu { path: &file_path })?;
215 }
216 DecodeResult::Data(batch) => yield batch,
217 DecodeResult::Finished => break,
218 }
219 }
220 }
221 .boxed())
222}