mito2/sst/parquet/
flat_format.rs1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{ArrayRef, UInt64Array};
37use datatypes::arrow::datatypes::SchemaRef;
38use datatypes::arrow::record_batch::RecordBatch;
39use parquet::file::metadata::RowGroupMetaData;
40use snafu::ResultExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::storage::{ColumnId, SequenceNumber};
43
44use crate::error::{NewRecordBatchSnafu, Result};
45use crate::sst::parquet::format::{FormatProjection, ReadFormat, StatValues};
46use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
47
48#[allow(dead_code)]
50pub(crate) struct FlatWriteFormat {
51 metadata: RegionMetadataRef,
52 arrow_schema: SchemaRef,
54 override_sequence: Option<SequenceNumber>,
55}
56
57impl FlatWriteFormat {
58 #[allow(dead_code)]
60 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
61 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
62 FlatWriteFormat {
63 metadata,
64 arrow_schema,
65 override_sequence: None,
66 }
67 }
68
69 #[allow(dead_code)]
71 pub(crate) fn with_override_sequence(
72 mut self,
73 override_sequence: Option<SequenceNumber>,
74 ) -> Self {
75 self.override_sequence = override_sequence;
76 self
77 }
78
79 #[allow(dead_code)]
81 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
82 &self.arrow_schema
83 }
84
85 #[allow(dead_code)]
87 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
88 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
89
90 let Some(override_sequence) = self.override_sequence else {
91 return Ok(batch.clone());
92 };
93
94 let mut columns = batch.columns().to_vec();
95 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
96 columns[sequence_column_index(batch.num_columns())] = sequence_array;
97
98 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
99 }
100}
101
102pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
104 num_columns - 2
105}
106
107pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
109 num_columns - 4
110}
111
112pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
114 num_columns - 3
115}
116
117pub struct FlatReadFormat {
122 metadata: RegionMetadataRef,
124 arrow_schema: SchemaRef,
126 projection_indices: Vec<usize>,
128 column_id_to_projected_index: HashMap<ColumnId, usize>,
131 column_id_to_sst_index: HashMap<ColumnId, usize>,
133 override_sequence: Option<SequenceNumber>,
135}
136
137impl FlatReadFormat {
138 pub fn new(
140 metadata: RegionMetadataRef,
141 column_ids: impl Iterator<Item = ColumnId>,
142 ) -> FlatReadFormat {
143 let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
144
145 let id_to_index = sst_column_id_indices(&metadata);
147
148 let format_projection = FormatProjection::compute_format_projection(
149 &id_to_index,
150 arrow_schema.fields.len(),
151 column_ids,
152 );
153
154 FlatReadFormat {
155 metadata,
156 arrow_schema,
157 projection_indices: format_projection.projection_indices,
158 column_id_to_projected_index: format_projection.column_id_to_projected_index,
159 column_id_to_sst_index: id_to_index,
160 override_sequence: None,
161 }
162 }
163
164 #[allow(dead_code)]
166 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
167 self.override_sequence = sequence;
168 }
169
170 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
172 self.column_id_to_projected_index.get(&column_id).copied()
173 }
174
175 pub fn min_values(
177 &self,
178 row_groups: &[impl Borrow<RowGroupMetaData>],
179 column_id: ColumnId,
180 ) -> StatValues {
181 self.get_stat_values(row_groups, column_id, true)
182 }
183
184 pub fn max_values(
186 &self,
187 row_groups: &[impl Borrow<RowGroupMetaData>],
188 column_id: ColumnId,
189 ) -> StatValues {
190 self.get_stat_values(row_groups, column_id, false)
191 }
192
193 pub fn null_counts(
195 &self,
196 row_groups: &[impl Borrow<RowGroupMetaData>],
197 column_id: ColumnId,
198 ) -> StatValues {
199 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
200 return StatValues::NoColumn;
202 };
203
204 let stats = ReadFormat::column_null_counts(row_groups, *index);
205 StatValues::from_stats_opt(stats)
206 }
207
208 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
213 &self.arrow_schema
214 }
215
216 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
218 &self.metadata
219 }
220
221 pub(crate) fn projection_indices(&self) -> &[usize] {
223 &self.projection_indices
224 }
225
226 #[allow(dead_code)]
228 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
229 self.override_sequence
230 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
231 }
232
233 #[allow(dead_code)]
237 pub(crate) fn convert_batch(
238 &self,
239 record_batch: &RecordBatch,
240 override_sequence_array: Option<&ArrayRef>,
241 ) -> Result<RecordBatch> {
242 let Some(override_array) = override_sequence_array else {
243 return Ok(record_batch.clone());
244 };
245
246 let mut columns = record_batch.columns().to_vec();
247 let sequence_column_idx = sequence_column_index(record_batch.num_columns());
248
249 let sequence_array = if override_array.len() > record_batch.num_rows() {
251 override_array.slice(0, record_batch.num_rows())
252 } else {
253 override_array.clone()
254 };
255
256 columns[sequence_column_idx] = sequence_array;
257
258 RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu)
259 }
260
261 fn get_stat_values(
262 &self,
263 row_groups: &[impl Borrow<RowGroupMetaData>],
264 column_id: ColumnId,
265 is_min: bool,
266 ) -> StatValues {
267 let Some(column) = self.metadata.column_by_id(column_id) else {
268 return StatValues::NoColumn;
270 };
271 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
273
274 let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
275 StatValues::from_stats_opt(stats)
276 }
277}
278
279fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
283 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
284 let mut column_index = 0;
285 for pk_id in &metadata.primary_key {
287 id_to_index.insert(*pk_id, column_index);
288 column_index += 1;
289 }
290 for column in &metadata.column_metadatas {
292 if column.semantic_type == SemanticType::Field {
293 id_to_index.insert(column.column_id, column_index);
294 column_index += 1;
295 }
296 }
297 id_to_index.insert(metadata.time_index_column().column_id, column_index);
299
300 id_to_index
301}
302
303#[cfg(test)]
304impl FlatReadFormat {
305 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
307 Self::new(
308 Arc::clone(&metadata),
309 metadata.column_metadatas.iter().map(|c| c.column_id),
310 )
311 }
312}