1use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use mito_codec::key_values::KeyValue;
26use mito_codec::primary_key_filter::is_partition_column;
27use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
28use snafu::ResultExt;
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
32use store_api::storage::ColumnId;
33
34use crate::error::{EncodeSnafu, Result};
35use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
36use crate::memtable::partition_tree::dedup::DedupReader;
37use crate::memtable::partition_tree::shard::{
38 BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
39};
40use crate::memtable::partition_tree::shard_builder::ShardBuilder;
41use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
42use crate::memtable::stats::WriteMetrics;
43use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
44use crate::read::{Batch, BatchBuilder};
45
46pub type PartitionKey = u32;
48
49pub struct Partition {
51 inner: RwLock<Inner>,
52 dedup: bool,
54}
55
56pub type PartitionRef = Arc<Partition>;
57
58impl Partition {
59 pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
61 Partition {
62 inner: RwLock::new(Inner::new(metadata, config)),
63 dedup: config.dedup,
64 }
65 }
66
67 pub fn write_with_key(
69 &self,
70 primary_key: &mut Vec<u8>,
71 row_codec: &dyn PrimaryKeyCodec,
72 key_value: KeyValue,
73 re_encode: bool,
74 metrics: &mut WriteMetrics,
75 ) -> Result<()> {
76 let mut inner = self.inner.write().unwrap();
77 if inner.shard_builder.should_freeze() {
79 inner.freeze_active_shard()?;
80 }
81
82 if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
84 inner.write_to_shard(pk_id, &key_value)?;
85 inner.num_rows += 1;
86 return Ok(());
87 }
88
89 if re_encode {
91 match row_codec.encoding() {
92 PrimaryKeyEncoding::Dense => {
93 let sparse_key = primary_key.clone();
95 primary_key.clear();
96 row_codec
97 .encode_key_value(&key_value, primary_key)
98 .context(EncodeSnafu)?;
99 let pk_id = inner.shard_builder.write_with_key(
100 primary_key,
101 Some(&sparse_key),
102 &key_value,
103 metrics,
104 );
105 inner.pk_to_pk_id.insert(sparse_key, pk_id);
106 }
107 PrimaryKeyEncoding::Sparse => {
108 let sparse_key = primary_key.clone();
109 let pk_id = inner.shard_builder.write_with_key(
110 primary_key,
111 Some(&sparse_key),
112 &key_value,
113 metrics,
114 );
115 inner.pk_to_pk_id.insert(sparse_key, pk_id);
116 }
117 }
118 } else {
119 let pk_id = inner
121 .shard_builder
122 .write_with_key(primary_key, None, &key_value, metrics);
123 inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
124 };
125
126 inner.num_rows += 1;
127 Ok(())
128 }
129
130 pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
132 let mut inner = self.inner.write().unwrap();
133 debug_assert!(!inner.shards.is_empty());
135 debug_assert_eq!(1, inner.shard_builder.current_shard_id());
136
137 let pk_id = PkId {
139 shard_id: 0,
140 pk_index: 0,
141 };
142 inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
143 inner.num_rows += 1;
144
145 Ok(())
146 }
147
148 fn build_primary_key_filter(
149 need_prune_key: bool,
150 metadata: &RegionMetadataRef,
151 row_codec: &dyn PrimaryKeyCodec,
152 filters: &Arc<Vec<SimpleFilterEvaluator>>,
153 ) -> Option<Box<dyn PrimaryKeyFilter>> {
154 if need_prune_key {
155 let filter = row_codec.primary_key_filter(metadata, filters.clone());
156 Some(filter)
157 } else {
158 None
159 }
160 }
161
162 pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
164 let start = Instant::now();
165 let (builder_source, shard_reader_builders) = {
166 let inner = self.inner.read().unwrap();
167 let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
168 let builder_reader = if !inner.shard_builder.is_empty() {
169 let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
170 Some(builder_reader)
171 } else {
172 None
173 };
174 for shard in &inner.shards {
175 if !shard.is_empty() {
176 let shard_reader_builder = shard.read()?;
177 shard_source.push(shard_reader_builder);
178 }
179 }
180 (builder_reader, shard_source)
181 };
182
183 context.metrics.num_shards += shard_reader_builders.len();
184
185 let mut nodes = shard_reader_builders
186 .into_iter()
187 .map(|builder| {
188 let primary_key_filter = Self::build_primary_key_filter(
189 context.need_prune_key,
190 &context.metadata,
191 context.row_codec.as_ref(),
192 &context.filters,
193 );
194 Ok(ShardNode::new(ShardSource::Shard(
195 builder.build(primary_key_filter)?,
196 )))
197 })
198 .collect::<Result<Vec<_>>>()?;
199
200 if let Some(builder) = builder_source {
201 context.metrics.num_builder += 1;
202 let primary_key_filter = Self::build_primary_key_filter(
203 context.need_prune_key,
204 &context.metadata,
205 context.row_codec.as_ref(),
206 &context.filters,
207 );
208 let shard_builder_reader =
210 builder.build(Some(&context.pk_weights), primary_key_filter)?;
211 nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
212 }
213
214 let merger = ShardMerger::try_new(nodes)?;
216 if self.dedup {
217 let source = DedupReader::try_new(merger)?;
218 context.metrics.build_partition_reader += start.elapsed();
219 PartitionReader::new(context, Box::new(source))
220 } else {
221 context.metrics.build_partition_reader += start.elapsed();
222 PartitionReader::new(context, Box::new(merger))
223 }
224 }
225
226 pub fn freeze(&self) -> Result<()> {
228 let mut inner = self.inner.write().unwrap();
229 inner.freeze_active_shard()?;
230 Ok(())
231 }
232
233 pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
237 let (shards, shard_builder) = {
238 let inner = self.inner.read().unwrap();
239 debug_assert!(inner.shard_builder.is_empty());
240 let shard_builder = ShardBuilder::new(
241 metadata.clone(),
242 config,
243 inner.shard_builder.current_shard_id(),
244 );
245 let shards = inner
246 .shards
247 .iter()
248 .map(|shard| shard.fork(metadata.clone()))
249 .collect();
250
251 (shards, shard_builder)
252 };
253 let pk_to_pk_id = {
254 let mut inner = self.inner.write().unwrap();
255 std::mem::take(&mut inner.pk_to_pk_id)
256 };
257
258 Partition {
259 inner: RwLock::new(Inner {
260 metadata: metadata.clone(),
261 shard_builder,
262 shards,
263 num_rows: 0,
264 pk_to_pk_id,
265 frozen: false,
266 }),
267 dedup: self.dedup,
268 }
269 }
270
271 pub fn has_data(&self) -> bool {
273 let inner = self.inner.read().unwrap();
274 inner.num_rows > 0
275 }
276
277 pub(crate) fn stats(&self) -> PartitionStats {
279 let inner = self.inner.read().unwrap();
280 let num_rows = inner.num_rows;
281 let shard_num = inner.shards.len();
282 let shared_memory_size = inner
283 .shards
284 .iter()
285 .map(|shard| shard.shared_memory_size())
286 .sum();
287 PartitionStats {
288 num_rows,
289 shard_num,
290 shared_memory_size,
291 }
292 }
293
294 pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
296 if !is_partitioned {
297 return PartitionKey::default();
298 }
299
300 key_value.partition_key()
301 }
302
303 pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
305 metadata
306 .primary_key_columns()
307 .next()
308 .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
309 .unwrap_or(false)
310 }
311}
312
313pub(crate) struct PartitionStats {
314 pub(crate) num_rows: usize,
315 pub(crate) shard_num: usize,
316 pub(crate) shared_memory_size: usize,
317}
318
319#[derive(Default)]
320struct PartitionReaderMetrics {
321 build_partition_reader: Duration,
322 read_source: Duration,
323 data_batch_to_batch: Duration,
324 num_builder: usize,
325 num_shards: usize,
326}
327
328pub struct PartitionReader {
332 context: ReadPartitionContext,
333 source: BoxedDataBatchSource,
334}
335
336impl PartitionReader {
337 fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
338 let reader = Self { context, source };
339
340 Ok(reader)
341 }
342
343 pub fn is_valid(&self) -> bool {
345 self.source.is_valid()
346 }
347
348 pub fn next(&mut self) -> Result<()> {
353 self.advance_source()
354 }
355
356 pub fn convert_current_batch(&mut self) -> Result<Batch> {
361 let start = Instant::now();
362 let data_batch = self.source.current_data_batch();
363 let batch = data_batch_to_batch(
364 &self.context.metadata,
365 &self.context.projection,
366 self.source.current_key(),
367 data_batch,
368 )?;
369 self.context.metrics.data_batch_to_batch += start.elapsed();
370 Ok(batch)
371 }
372
373 pub(crate) fn into_context(self) -> ReadPartitionContext {
374 self.context
375 }
376
377 fn advance_source(&mut self) -> Result<()> {
378 let read_source = Instant::now();
379 self.source.next()?;
380 self.context.metrics.read_source += read_source.elapsed();
381 Ok(())
382 }
383}
384
385pub(crate) struct ReadPartitionContext {
387 metadata: RegionMetadataRef,
388 row_codec: Arc<dyn PrimaryKeyCodec>,
389 projection: HashSet<ColumnId>,
390 filters: Arc<Vec<SimpleFilterEvaluator>>,
391 pk_weights: Vec<u16>,
393 need_prune_key: bool,
394 metrics: PartitionReaderMetrics,
395}
396
397impl Drop for ReadPartitionContext {
398 fn drop(&mut self) {
399 let partition_read_source = self.metrics.read_source.as_secs_f64();
400 PARTITION_TREE_READ_STAGE_ELAPSED
401 .with_label_values(&["partition_read_source"])
402 .observe(partition_read_source);
403 let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
404 PARTITION_TREE_READ_STAGE_ELAPSED
405 .with_label_values(&["partition_data_batch_to_batch"])
406 .observe(partition_data_batch_to_batch);
407
408 common_telemetry::debug!(
409 "TreeIter partitions metrics, \
410 num_builder: {}, \
411 num_shards: {}, \
412 build_partition_reader: {}s, \
413 partition_read_source: {}s, \
414 partition_data_batch_to_batch: {}s",
415 self.metrics.num_builder,
416 self.metrics.num_shards,
417 self.metrics.build_partition_reader.as_secs_f64(),
418 partition_read_source,
419 partition_data_batch_to_batch,
420 );
421 }
422}
423
424impl ReadPartitionContext {
425 pub(crate) fn new(
426 metadata: RegionMetadataRef,
427 row_codec: Arc<dyn PrimaryKeyCodec>,
428 projection: HashSet<ColumnId>,
429 filters: Arc<Vec<SimpleFilterEvaluator>>,
430 ) -> ReadPartitionContext {
431 let need_prune_key = Self::need_prune_key(&metadata, &filters);
432 ReadPartitionContext {
433 metadata,
434 row_codec,
435 projection,
436 filters,
437 pk_weights: Vec::new(),
438 need_prune_key,
439 metrics: Default::default(),
440 }
441 }
442
443 fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
446 for filter in filters {
447 if is_partition_column(filter.column_name()) {
449 continue;
450 }
451 let Some(column) = metadata.column_by_name(filter.column_name()) else {
452 continue;
453 };
454 if column.semantic_type != SemanticType::Tag {
455 continue;
456 }
457
458 return true;
459 }
460
461 false
462 }
463}
464
465fn data_batch_to_batch(
468 metadata: &RegionMetadataRef,
469 projection: &HashSet<ColumnId>,
470 key: Option<&[u8]>,
471 data_batch: DataBatch,
472) -> Result<Batch> {
473 let record_batch = data_batch.slice_record_batch();
474 let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
475 let mut builder = BatchBuilder::new(primary_key);
476 builder
477 .timestamps_array(record_batch.column(1).clone())?
478 .sequences_array(record_batch.column(2).clone())?
479 .op_types_array(record_batch.column(3).clone())?;
480
481 if record_batch.num_columns() <= 4 {
482 return builder.build();
484 }
485
486 for (array, field) in record_batch
488 .columns()
489 .iter()
490 .zip(record_batch.schema().fields().iter())
491 .skip(4)
492 {
493 let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
496 if !projection.contains(&column_id) {
497 continue;
498 }
499 builder.push_field_array(column_id, array.clone())?;
500 }
501
502 builder.build()
503}
504
505struct Inner {
509 metadata: RegionMetadataRef,
510 pk_to_pk_id: HashMap<Vec<u8>, PkId>,
512 shard_builder: ShardBuilder,
514 shards: Vec<Shard>,
516 num_rows: usize,
517 frozen: bool,
518}
519
520impl Inner {
521 fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
522 let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
523 let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
524 (
525 vec![Shard::new(
526 0,
527 None,
528 data_parts,
529 config.dedup,
530 config.data_freeze_threshold,
531 )],
532 1,
533 )
534 } else {
535 (Vec::new(), 0)
536 };
537 let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
538 Self {
539 metadata,
540 pk_to_pk_id: HashMap::new(),
541 shard_builder,
542 shards,
543 num_rows: 0,
544 frozen: false,
545 }
546 }
547
548 fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
549 assert!(!self.frozen);
550 self.pk_to_pk_id.get(primary_key).copied()
551 }
552
553 fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
554 if pk_id.shard_id == self.shard_builder.current_shard_id() {
555 self.shard_builder.write_with_pk_id(pk_id, key_value);
556 return Ok(());
557 }
558
559 let shard = self
561 .shards
562 .iter_mut()
563 .find(|shard| shard.shard_id == pk_id.shard_id)
564 .unwrap();
565 shard.write_with_pk_id(pk_id, key_value)?;
566 self.num_rows += 1;
567
568 Ok(())
569 }
570
571 fn freeze_active_shard(&mut self) -> Result<()> {
572 if let Some(shard) = self
573 .shard_builder
574 .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
575 {
576 self.shards.push(shard);
577 }
578 Ok(())
579 }
580}