1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use mito_codec::key_values::KeyValue;
22use mito_codec::row_converter::PrimaryKeyFilter;
23use snafu::ResultExt;
24use store_api::metadata::RegionMetadataRef;
25
26use crate::error::{DecodeSnafu, Result};
27use crate::memtable::partition_tree::data::{
28 DATA_INIT_CAP, DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts,
29};
30use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder};
31use crate::memtable::partition_tree::shard::Shard;
32use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
33use crate::memtable::stats::WriteMetrics;
34use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
35
36pub struct ShardBuilder {
39 current_shard_id: ShardId,
41 dict_builder: KeyDictBuilder,
43 data_buffer: DataBuffer,
45 data_freeze_threshold: usize,
47 dedup: bool,
48}
49
50impl ShardBuilder {
51 pub fn new(
53 metadata: RegionMetadataRef,
54 config: &PartitionTreeConfig,
55 shard_id: ShardId,
56 ) -> ShardBuilder {
57 ShardBuilder {
58 current_shard_id: shard_id,
59 dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard),
60 data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, config.dedup),
61 data_freeze_threshold: config.data_freeze_threshold,
62 dedup: config.dedup,
63 }
64 }
65
66 pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
68 assert_eq!(self.current_shard_id, pk_id.shard_id);
69 self.data_buffer.write_row(pk_id.pk_index, key_value);
70 }
71
72 pub fn write_with_key(
74 &mut self,
75 full_primary_key: &[u8],
76 sparse_key: Option<&[u8]>,
77 key_value: &KeyValue,
78 metrics: &mut WriteMetrics,
79 ) -> PkId {
80 let pk_index = self
82 .dict_builder
83 .insert_key(full_primary_key, sparse_key, metrics);
84 self.data_buffer.write_row(pk_index, key_value);
85 PkId {
86 shard_id: self.current_shard_id,
87 pk_index,
88 }
89 }
90
91 pub fn should_freeze(&self) -> bool {
93 self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold
94 }
95
96 pub fn current_shard_id(&self) -> ShardId {
98 self.current_shard_id
99 }
100
101 pub fn finish(
105 &mut self,
106 metadata: RegionMetadataRef,
107 pk_to_pk_id: &mut HashMap<Vec<u8>, PkId>,
108 ) -> Result<Option<Shard>> {
109 if self.is_empty() {
110 return Ok(None);
111 }
112
113 let (data_part, key_dict) = match self.dict_builder.finish() {
114 Some((dict, pk_to_index)) => {
115 pk_to_pk_id.reserve(pk_to_index.len());
117 for (k, pk_index) in pk_to_index {
118 pk_to_pk_id.insert(
119 k,
120 PkId {
121 shard_id: self.current_shard_id,
122 pk_index,
123 },
124 );
125 }
126
127 let pk_weights = dict.pk_weights_to_sort_data();
128 let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
129 (part, Some(dict))
130 }
131 None => {
132 let pk_weights = [0];
133 (self.data_buffer.freeze(Some(&pk_weights), true)?, None)
134 }
135 };
136
137 let data_parts =
139 DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]);
140 let key_dict = key_dict.map(Arc::new);
141 let shard_id = self.current_shard_id;
142 self.current_shard_id += 1;
143
144 Ok(Some(Shard::new(
145 shard_id,
146 key_dict,
147 data_parts,
148 self.dedup,
149 self.data_freeze_threshold,
150 )))
151 }
152
153 pub fn read(&self, pk_weights_buffer: &mut Vec<u16>) -> Result<ShardBuilderReaderBuilder> {
155 let dict_reader = {
156 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
157 .with_label_values(&["shard_builder_read_pk"])
158 .start_timer();
159 self.dict_builder.read()
160 };
161
162 {
163 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
164 .with_label_values(&["sort_pk"])
165 .start_timer();
166 dict_reader.pk_weights_to_sort_data(pk_weights_buffer);
167 }
168
169 let data_reader = self.data_buffer.read()?;
170 Ok(ShardBuilderReaderBuilder {
171 shard_id: self.current_shard_id,
172 dict_reader,
173 data_reader,
174 })
175 }
176
177 pub fn is_empty(&self) -> bool {
179 self.data_buffer.is_empty()
180 }
181}
182
183pub(crate) struct ShardBuilderReaderBuilder {
184 shard_id: ShardId,
185 dict_reader: DictBuilderReader,
186 data_reader: DataBufferReaderBuilder,
187}
188
189impl ShardBuilderReaderBuilder {
190 pub(crate) fn build(
191 self,
192 pk_weights: Option<&[u16]>,
193 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
194 ) -> Result<ShardBuilderReader> {
195 let now = Instant::now();
196 let data_reader = self.data_reader.build(pk_weights)?;
197 ShardBuilderReader::new(
198 self.shard_id,
199 self.dict_reader,
200 data_reader,
201 key_filter,
202 now.elapsed(),
203 )
204 }
205}
206
207pub struct ShardBuilderReader {
209 shard_id: ShardId,
210 dict_reader: DictBuilderReader,
211 data_reader: DataBufferReader,
212 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
213 last_yield_pk_index: Option<PkIndex>,
214 keys_before_pruning: usize,
215 keys_after_pruning: usize,
216 prune_pk_cost: Duration,
217 data_build_cost: Duration,
218}
219
220impl ShardBuilderReader {
221 fn new(
222 shard_id: ShardId,
223 dict_reader: DictBuilderReader,
224 data_reader: DataBufferReader,
225 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
226 data_build_cost: Duration,
227 ) -> Result<Self> {
228 let mut reader = ShardBuilderReader {
229 shard_id,
230 dict_reader,
231 data_reader,
232 key_filter,
233 last_yield_pk_index: None,
234 keys_before_pruning: 0,
235 keys_after_pruning: 0,
236 prune_pk_cost: Duration::default(),
237 data_build_cost,
238 };
239 reader.prune_batch_by_key()?;
240
241 Ok(reader)
242 }
243
244 pub fn is_valid(&self) -> bool {
245 self.data_reader.is_valid()
246 }
247
248 pub fn next(&mut self) -> Result<()> {
249 self.data_reader.next()?;
250 self.prune_batch_by_key()
251 }
252
253 pub fn current_key(&self) -> Option<&[u8]> {
254 let pk_index = self.data_reader.current_data_batch().pk_index();
255 Some(self.dict_reader.key_by_pk_index(pk_index))
256 }
257
258 pub fn current_pk_id(&self) -> PkId {
259 let pk_index = self.data_reader.current_data_batch().pk_index();
260 PkId {
261 shard_id: self.shard_id,
262 pk_index,
263 }
264 }
265
266 pub fn current_data_batch(&self) -> DataBatch<'_> {
267 self.data_reader.current_data_batch()
268 }
269
270 fn prune_batch_by_key(&mut self) -> Result<()> {
271 let Some(key_filter) = &mut self.key_filter else {
272 return Ok(());
273 };
274
275 while self.data_reader.is_valid() {
276 let pk_index = self.data_reader.current_data_batch().pk_index();
277 if let Some(yield_pk_index) = self.last_yield_pk_index
278 && pk_index == yield_pk_index
279 {
280 break;
281 }
282 self.keys_before_pruning += 1;
283 let key = self.dict_reader.key_by_pk_index(pk_index);
284 let now = Instant::now();
285 if key_filter.matches(key).context(DecodeSnafu)? {
286 self.prune_pk_cost += now.elapsed();
287 self.last_yield_pk_index = Some(pk_index);
288 self.keys_after_pruning += 1;
289 break;
290 }
291 self.prune_pk_cost += now.elapsed();
292 self.data_reader.next()?;
293 }
294
295 Ok(())
296 }
297}
298
299impl Drop for ShardBuilderReader {
300 fn drop(&mut self) {
301 let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64();
302 PARTITION_TREE_READ_STAGE_ELAPSED
303 .with_label_values(&["shard_builder_prune_pk"])
304 .observe(shard_builder_prune_pk);
305 if self.keys_before_pruning > 0 {
306 common_telemetry::debug!(
307 "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
308 self.keys_before_pruning,
309 self.keys_after_pruning,
310 shard_builder_prune_pk,
311 self.data_build_cost.as_secs_f64(),
312 );
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319
320 use super::*;
321 use crate::memtable::KeyValues;
322 use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
323 use crate::test_util::memtable_util::{
324 build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test,
325 };
326
327 fn input_with_key(metadata: &RegionMetadataRef) -> Vec<KeyValues> {
328 vec![
329 build_key_values_with_ts_seq_values(
330 metadata,
331 "shard_builder".to_string(),
332 2,
333 [20, 21].into_iter(),
334 [Some(0.0), Some(1.0)].into_iter(),
335 0,
336 ),
337 build_key_values_with_ts_seq_values(
338 metadata,
339 "shard_builder".to_string(),
340 0,
341 [0, 1].into_iter(),
342 [Some(0.0), Some(1.0)].into_iter(),
343 1,
344 ),
345 build_key_values_with_ts_seq_values(
346 metadata,
347 "shard_builder".to_string(),
348 1,
349 [10, 11].into_iter(),
350 [Some(0.0), Some(1.0)].into_iter(),
351 2,
352 ),
353 ]
354 }
355
356 #[test]
357 fn test_write_shard_builder() {
358 let metadata = metadata_for_test();
359 let input = input_with_key(&metadata);
360 let config = PartitionTreeConfig::default();
361 let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
362 let mut metrics = WriteMetrics::default();
363 assert!(
364 shard_builder
365 .finish(metadata.clone(), &mut HashMap::new())
366 .unwrap()
367 .is_none()
368 );
369 assert_eq!(1, shard_builder.current_shard_id);
370
371 for key_values in &input {
372 for kv in key_values.iter() {
373 let key = encode_key_by_kv(&kv);
374 shard_builder.write_with_key(&key, None, &kv, &mut metrics);
375 }
376 }
377 let shard = shard_builder
378 .finish(metadata, &mut HashMap::new())
379 .unwrap()
380 .unwrap();
381 assert_eq!(1, shard.shard_id);
382 assert_eq!(2, shard_builder.current_shard_id);
383 }
384
385 #[test]
386 fn test_write_read_shard_builder() {
387 let metadata = metadata_for_test();
388 let input = input_with_key(&metadata);
389 let config = PartitionTreeConfig::default();
390 let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
391 let mut metrics = WriteMetrics::default();
392
393 for key_values in &input {
394 for kv in key_values.iter() {
395 let key = encode_key_by_kv(&kv);
396 shard_builder.write_with_key(&key, None, &kv, &mut metrics);
397 }
398 }
399
400 let mut pk_weights = Vec::new();
401 let mut reader = shard_builder
402 .read(&mut pk_weights)
403 .unwrap()
404 .build(Some(&pk_weights), None)
405 .unwrap();
406 let mut timestamps = Vec::new();
407 while reader.is_valid() {
408 let rb = reader.current_data_batch().slice_record_batch();
409 let ts_array = rb.column(1);
410 let ts_slice = timestamp_array_to_i64_slice(ts_array);
411 timestamps.extend_from_slice(ts_slice);
412
413 reader.next().unwrap();
414 }
415 assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
416 assert_eq!(vec![2, 0, 1], pk_weights);
417 }
418}