1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use store_api::metadata::RegionMetadataRef;
22
23use crate::error::Result;
24use crate::memtable::key_values::KeyValue;
25use crate::memtable::partition_tree::data::{
26 DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
27};
28use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder};
29use crate::memtable::partition_tree::shard::Shard;
30use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
31use crate::memtable::stats::WriteMetrics;
32use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
33use crate::row_converter::PrimaryKeyFilter;
34
35pub struct ShardBuilder {
38 current_shard_id: ShardId,
40 dict_builder: KeyDictBuilder,
42 data_buffer: DataBuffer,
44 data_freeze_threshold: usize,
46 dedup: bool,
47}
48
49impl ShardBuilder {
50 pub fn new(
52 metadata: RegionMetadataRef,
53 config: &PartitionTreeConfig,
54 shard_id: ShardId,
55 ) -> ShardBuilder {
56 ShardBuilder {
57 current_shard_id: shard_id,
58 dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard),
59 data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, config.dedup),
60 data_freeze_threshold: config.data_freeze_threshold,
61 dedup: config.dedup,
62 }
63 }
64
65 pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
67 assert_eq!(self.current_shard_id, pk_id.shard_id);
68 self.data_buffer.write_row(pk_id.pk_index, key_value);
69 }
70
71 pub fn write_with_key(
73 &mut self,
74 full_primary_key: &[u8],
75 sparse_key: Option<&[u8]>,
76 key_value: &KeyValue,
77 metrics: &mut WriteMetrics,
78 ) -> PkId {
79 let pk_index = self
81 .dict_builder
82 .insert_key(full_primary_key, sparse_key, metrics);
83 self.data_buffer.write_row(pk_index, key_value);
84 PkId {
85 shard_id: self.current_shard_id,
86 pk_index,
87 }
88 }
89
90 pub fn should_freeze(&self) -> bool {
92 self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold
93 }
94
95 pub fn current_shard_id(&self) -> ShardId {
97 self.current_shard_id
98 }
99
100 pub fn finish(
104 &mut self,
105 metadata: RegionMetadataRef,
106 pk_to_pk_id: &mut HashMap<Vec<u8>, PkId>,
107 ) -> Result<Option<Shard>> {
108 if self.is_empty() {
109 return Ok(None);
110 }
111
112 let (data_part, key_dict) = match self.dict_builder.finish() {
113 Some((dict, pk_to_index)) => {
114 pk_to_pk_id.reserve(pk_to_index.len());
116 for (k, pk_index) in pk_to_index {
117 pk_to_pk_id.insert(
118 k,
119 PkId {
120 shard_id: self.current_shard_id,
121 pk_index,
122 },
123 );
124 }
125
126 let pk_weights = dict.pk_weights_to_sort_data();
127 let part = self.data_buffer.freeze(Some(&pk_weights), true)?;
128 (part, Some(dict))
129 }
130 None => {
131 let pk_weights = [0];
132 (self.data_buffer.freeze(Some(&pk_weights), true)?, None)
133 }
134 };
135
136 let data_parts =
138 DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]);
139 let key_dict = key_dict.map(Arc::new);
140 let shard_id = self.current_shard_id;
141 self.current_shard_id += 1;
142
143 Ok(Some(Shard::new(
144 shard_id,
145 key_dict,
146 data_parts,
147 self.dedup,
148 self.data_freeze_threshold,
149 )))
150 }
151
152 pub fn read(&self, pk_weights_buffer: &mut Vec<u16>) -> Result<ShardBuilderReaderBuilder> {
154 let dict_reader = {
155 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
156 .with_label_values(&["shard_builder_read_pk"])
157 .start_timer();
158 self.dict_builder.read()
159 };
160
161 {
162 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
163 .with_label_values(&["sort_pk"])
164 .start_timer();
165 dict_reader.pk_weights_to_sort_data(pk_weights_buffer);
166 }
167
168 let data_reader = self.data_buffer.read()?;
169 Ok(ShardBuilderReaderBuilder {
170 shard_id: self.current_shard_id,
171 dict_reader,
172 data_reader,
173 })
174 }
175
176 pub fn is_empty(&self) -> bool {
178 self.data_buffer.is_empty()
179 }
180}
181
182pub(crate) struct ShardBuilderReaderBuilder {
183 shard_id: ShardId,
184 dict_reader: DictBuilderReader,
185 data_reader: DataBufferReaderBuilder,
186}
187
188impl ShardBuilderReaderBuilder {
189 pub(crate) fn build(
190 self,
191 pk_weights: Option<&[u16]>,
192 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
193 ) -> Result<ShardBuilderReader> {
194 let now = Instant::now();
195 let data_reader = self.data_reader.build(pk_weights)?;
196 ShardBuilderReader::new(
197 self.shard_id,
198 self.dict_reader,
199 data_reader,
200 key_filter,
201 now.elapsed(),
202 )
203 }
204}
205
206pub struct ShardBuilderReader {
208 shard_id: ShardId,
209 dict_reader: DictBuilderReader,
210 data_reader: DataBufferReader,
211 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
212 last_yield_pk_index: Option<PkIndex>,
213 keys_before_pruning: usize,
214 keys_after_pruning: usize,
215 prune_pk_cost: Duration,
216 data_build_cost: Duration,
217}
218
219impl ShardBuilderReader {
220 fn new(
221 shard_id: ShardId,
222 dict_reader: DictBuilderReader,
223 data_reader: DataBufferReader,
224 key_filter: Option<Box<dyn PrimaryKeyFilter>>,
225 data_build_cost: Duration,
226 ) -> Result<Self> {
227 let mut reader = ShardBuilderReader {
228 shard_id,
229 dict_reader,
230 data_reader,
231 key_filter,
232 last_yield_pk_index: None,
233 keys_before_pruning: 0,
234 keys_after_pruning: 0,
235 prune_pk_cost: Duration::default(),
236 data_build_cost,
237 };
238 reader.prune_batch_by_key()?;
239
240 Ok(reader)
241 }
242
243 pub fn is_valid(&self) -> bool {
244 self.data_reader.is_valid()
245 }
246
247 pub fn next(&mut self) -> Result<()> {
248 self.data_reader.next()?;
249 self.prune_batch_by_key()
250 }
251
252 pub fn current_key(&self) -> Option<&[u8]> {
253 let pk_index = self.data_reader.current_data_batch().pk_index();
254 Some(self.dict_reader.key_by_pk_index(pk_index))
255 }
256
257 pub fn current_pk_id(&self) -> PkId {
258 let pk_index = self.data_reader.current_data_batch().pk_index();
259 PkId {
260 shard_id: self.shard_id,
261 pk_index,
262 }
263 }
264
265 pub fn current_data_batch(&self) -> DataBatch {
266 self.data_reader.current_data_batch()
267 }
268
269 fn prune_batch_by_key(&mut self) -> Result<()> {
270 let Some(key_filter) = &mut self.key_filter else {
271 return Ok(());
272 };
273
274 while self.data_reader.is_valid() {
275 let pk_index = self.data_reader.current_data_batch().pk_index();
276 if let Some(yield_pk_index) = self.last_yield_pk_index {
277 if pk_index == yield_pk_index {
278 break;
279 }
280 }
281 self.keys_before_pruning += 1;
282 let key = self.dict_reader.key_by_pk_index(pk_index);
283 let now = Instant::now();
284 if key_filter.matches(key) {
285 self.prune_pk_cost += now.elapsed();
286 self.last_yield_pk_index = Some(pk_index);
287 self.keys_after_pruning += 1;
288 break;
289 }
290 self.prune_pk_cost += now.elapsed();
291 self.data_reader.next()?;
292 }
293
294 Ok(())
295 }
296}
297
298impl Drop for ShardBuilderReader {
299 fn drop(&mut self) {
300 let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64();
301 PARTITION_TREE_READ_STAGE_ELAPSED
302 .with_label_values(&["shard_builder_prune_pk"])
303 .observe(shard_builder_prune_pk);
304 if self.keys_before_pruning > 0 {
305 common_telemetry::debug!(
306 "ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
307 self.keys_before_pruning,
308 self.keys_after_pruning,
309 shard_builder_prune_pk,
310 self.data_build_cost.as_secs_f64(),
311 );
312 }
313 }
314}
315
316#[cfg(test)]
317mod tests {
318
319 use super::*;
320 use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
321 use crate::memtable::KeyValues;
322 use crate::test_util::memtable_util::{
323 build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test,
324 };
325
326 fn input_with_key(metadata: &RegionMetadataRef) -> Vec<KeyValues> {
327 vec![
328 build_key_values_with_ts_seq_values(
329 metadata,
330 "shard_builder".to_string(),
331 2,
332 [20, 21].into_iter(),
333 [Some(0.0), Some(1.0)].into_iter(),
334 0,
335 ),
336 build_key_values_with_ts_seq_values(
337 metadata,
338 "shard_builder".to_string(),
339 0,
340 [0, 1].into_iter(),
341 [Some(0.0), Some(1.0)].into_iter(),
342 1,
343 ),
344 build_key_values_with_ts_seq_values(
345 metadata,
346 "shard_builder".to_string(),
347 1,
348 [10, 11].into_iter(),
349 [Some(0.0), Some(1.0)].into_iter(),
350 2,
351 ),
352 ]
353 }
354
355 #[test]
356 fn test_write_shard_builder() {
357 let metadata = metadata_for_test();
358 let input = input_with_key(&metadata);
359 let config = PartitionTreeConfig::default();
360 let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
361 let mut metrics = WriteMetrics::default();
362 assert!(shard_builder
363 .finish(metadata.clone(), &mut HashMap::new())
364 .unwrap()
365 .is_none());
366 assert_eq!(1, shard_builder.current_shard_id);
367
368 for key_values in &input {
369 for kv in key_values.iter() {
370 let key = encode_key_by_kv(&kv);
371 shard_builder.write_with_key(&key, None, &kv, &mut metrics);
372 }
373 }
374 let shard = shard_builder
375 .finish(metadata, &mut HashMap::new())
376 .unwrap()
377 .unwrap();
378 assert_eq!(1, shard.shard_id);
379 assert_eq!(2, shard_builder.current_shard_id);
380 }
381
382 #[test]
383 fn test_write_read_shard_builder() {
384 let metadata = metadata_for_test();
385 let input = input_with_key(&metadata);
386 let config = PartitionTreeConfig::default();
387 let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
388 let mut metrics = WriteMetrics::default();
389
390 for key_values in &input {
391 for kv in key_values.iter() {
392 let key = encode_key_by_kv(&kv);
393 shard_builder.write_with_key(&key, None, &kv, &mut metrics);
394 }
395 }
396
397 let mut pk_weights = Vec::new();
398 let mut reader = shard_builder
399 .read(&mut pk_weights)
400 .unwrap()
401 .build(Some(&pk_weights), None)
402 .unwrap();
403 let mut timestamps = Vec::new();
404 while reader.is_valid() {
405 let rb = reader.current_data_batch().slice_record_batch();
406 let ts_array = rb.column(1);
407 let ts_slice = timestamp_array_to_i64_slice(ts_array);
408 timestamps.extend_from_slice(ts_slice);
409
410 reader.next().unwrap();
411 }
412 assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
413 assert_eq!(vec![2, 0, 1], pk_weights);
414 }
415}