1use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use mito_codec::key_values::KeyValue;
26use mito_codec::primary_key_filter::is_partition_column;
27use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
28use snafu::ResultExt;
29use store_api::codec::PrimaryKeyEncoding;
30use store_api::metadata::RegionMetadataRef;
31use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
32use store_api::storage::ColumnId;
33
34use crate::error::{EncodeSnafu, Result};
35use crate::memtable::partition_tree::data::{DATA_INIT_CAP, DataBatch, DataParts};
36use crate::memtable::partition_tree::dedup::DedupReader;
37use crate::memtable::partition_tree::shard::{
38 BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
39};
40use crate::memtable::partition_tree::shard_builder::ShardBuilder;
41use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
42use crate::memtable::stats::WriteMetrics;
43use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
44use crate::read::{Batch, BatchBuilder};
45
46pub type PartitionKey = u32;
48
49pub struct Partition {
51 inner: RwLock<Inner>,
52 dedup: bool,
54}
55
56pub type PartitionRef = Arc<Partition>;
57
58impl Partition {
59 pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
61 Partition {
62 inner: RwLock::new(Inner::new(metadata, config)),
63 dedup: config.dedup,
64 }
65 }
66
67 pub fn write_with_key(
69 &self,
70 primary_key: &mut Vec<u8>,
71 row_codec: &dyn PrimaryKeyCodec,
72 key_value: KeyValue,
73 re_encode: bool,
74 metrics: &mut WriteMetrics,
75 ) -> Result<()> {
76 let mut inner = self.inner.write().unwrap();
77 if inner.shard_builder.should_freeze() {
79 inner.freeze_active_shard()?;
80 }
81
82 if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
84 inner.write_to_shard(pk_id, &key_value)?;
85 inner.num_rows += 1;
86 return Ok(());
87 }
88
89 if re_encode {
91 match row_codec.encoding() {
92 PrimaryKeyEncoding::Dense => {
93 let sparse_key = primary_key.clone();
95 primary_key.clear();
96 row_codec
97 .encode_key_value(&key_value, primary_key)
98 .context(EncodeSnafu)?;
99 let pk_id = inner.shard_builder.write_with_key(
100 primary_key,
101 Some(&sparse_key),
102 &key_value,
103 metrics,
104 );
105 inner.pk_to_pk_id.insert(sparse_key, pk_id);
106 }
107 PrimaryKeyEncoding::Sparse => {
108 let sparse_key = primary_key.clone();
109 let pk_id = inner.shard_builder.write_with_key(
110 primary_key,
111 Some(&sparse_key),
112 &key_value,
113 metrics,
114 );
115 inner.pk_to_pk_id.insert(sparse_key, pk_id);
116 }
117 }
118 } else {
119 let pk_id = inner
121 .shard_builder
122 .write_with_key(primary_key, None, &key_value, metrics);
123 inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
124 };
125
126 inner.num_rows += 1;
127 Ok(())
128 }
129
130 pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
132 let mut inner = self.inner.write().unwrap();
133 debug_assert!(!inner.shards.is_empty());
135 debug_assert_eq!(1, inner.shard_builder.current_shard_id());
136
137 let pk_id = PkId {
139 shard_id: 0,
140 pk_index: 0,
141 };
142 inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
143 inner.num_rows += 1;
144
145 Ok(())
146 }
147
148 fn build_primary_key_filter(
149 need_prune_key: bool,
150 metadata: &RegionMetadataRef,
151 row_codec: &dyn PrimaryKeyCodec,
152 filters: &Arc<Vec<SimpleFilterEvaluator>>,
153 ) -> Option<Box<dyn PrimaryKeyFilter>> {
154 if need_prune_key {
155 let filter = row_codec.primary_key_filter(metadata, filters.clone(), true);
157 Some(filter)
158 } else {
159 None
160 }
161 }
162
163 pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
165 let start = Instant::now();
166 let (builder_source, shard_reader_builders) = {
167 let inner = self.inner.read().unwrap();
168 let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
169 let builder_reader = if !inner.shard_builder.is_empty() {
170 let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
171 Some(builder_reader)
172 } else {
173 None
174 };
175 for shard in &inner.shards {
176 if !shard.is_empty() {
177 let shard_reader_builder = shard.read()?;
178 shard_source.push(shard_reader_builder);
179 }
180 }
181 (builder_reader, shard_source)
182 };
183
184 context.metrics.num_shards += shard_reader_builders.len();
185
186 let mut nodes = shard_reader_builders
187 .into_iter()
188 .map(|builder| {
189 let primary_key_filter = Self::build_primary_key_filter(
190 context.need_prune_key,
191 &context.metadata,
192 context.row_codec.as_ref(),
193 &context.filters,
194 );
195 Ok(ShardNode::new(ShardSource::Shard(
196 builder.build(primary_key_filter)?,
197 )))
198 })
199 .collect::<Result<Vec<_>>>()?;
200
201 if let Some(builder) = builder_source {
202 context.metrics.num_builder += 1;
203 let primary_key_filter = Self::build_primary_key_filter(
204 context.need_prune_key,
205 &context.metadata,
206 context.row_codec.as_ref(),
207 &context.filters,
208 );
209 let shard_builder_reader =
211 builder.build(Some(&context.pk_weights), primary_key_filter)?;
212 nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
213 }
214
215 let merger = ShardMerger::try_new(nodes)?;
217 if self.dedup {
218 let source = DedupReader::try_new(merger)?;
219 context.metrics.build_partition_reader += start.elapsed();
220 PartitionReader::new(context, Box::new(source))
221 } else {
222 context.metrics.build_partition_reader += start.elapsed();
223 PartitionReader::new(context, Box::new(merger))
224 }
225 }
226
227 pub fn freeze(&self) -> Result<()> {
229 let mut inner = self.inner.write().unwrap();
230 inner.freeze_active_shard()?;
231 Ok(())
232 }
233
234 pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
238 let (shards, shard_builder) = {
239 let inner = self.inner.read().unwrap();
240 debug_assert!(inner.shard_builder.is_empty());
241 let shard_builder = ShardBuilder::new(
242 metadata.clone(),
243 config,
244 inner.shard_builder.current_shard_id(),
245 );
246 let shards = inner
247 .shards
248 .iter()
249 .map(|shard| shard.fork(metadata.clone()))
250 .collect();
251
252 (shards, shard_builder)
253 };
254 let pk_to_pk_id = {
255 let mut inner = self.inner.write().unwrap();
256 std::mem::take(&mut inner.pk_to_pk_id)
257 };
258
259 Partition {
260 inner: RwLock::new(Inner {
261 metadata: metadata.clone(),
262 shard_builder,
263 shards,
264 num_rows: 0,
265 pk_to_pk_id,
266 frozen: false,
267 }),
268 dedup: self.dedup,
269 }
270 }
271
272 pub fn has_data(&self) -> bool {
274 let inner = self.inner.read().unwrap();
275 inner.num_rows > 0
276 }
277
278 pub(crate) fn stats(&self) -> PartitionStats {
280 let inner = self.inner.read().unwrap();
281 let num_rows = inner.num_rows;
282 let shard_num = inner.shards.len();
283 let shared_memory_size = inner
284 .shards
285 .iter()
286 .map(|shard| shard.shared_memory_size())
287 .sum();
288 PartitionStats {
289 num_rows,
290 shard_num,
291 shared_memory_size,
292 }
293 }
294
295 pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
297 if !is_partitioned {
298 return PartitionKey::default();
299 }
300
301 key_value.partition_key()
302 }
303
304 pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
306 metadata
307 .primary_key_columns()
308 .next()
309 .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
310 .unwrap_or(false)
311 }
312
313 pub(crate) fn series_count(&self) -> usize {
314 self.inner.read().unwrap().series_count()
315 }
316}
317
318pub(crate) struct PartitionStats {
319 pub(crate) num_rows: usize,
320 pub(crate) shard_num: usize,
321 pub(crate) shared_memory_size: usize,
322}
323
324#[derive(Default)]
325struct PartitionReaderMetrics {
326 build_partition_reader: Duration,
327 read_source: Duration,
328 data_batch_to_batch: Duration,
329 num_builder: usize,
330 num_shards: usize,
331}
332
333pub struct PartitionReader {
337 context: ReadPartitionContext,
338 source: BoxedDataBatchSource,
339}
340
341impl PartitionReader {
342 fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
343 let reader = Self { context, source };
344
345 Ok(reader)
346 }
347
348 pub fn is_valid(&self) -> bool {
350 self.source.is_valid()
351 }
352
353 pub fn next(&mut self) -> Result<()> {
358 self.advance_source()
359 }
360
361 pub fn convert_current_batch(&mut self) -> Result<Batch> {
366 let start = Instant::now();
367 let data_batch = self.source.current_data_batch();
368 let batch = data_batch_to_batch(
369 &self.context.metadata,
370 &self.context.projection,
371 self.source.current_key(),
372 data_batch,
373 )?;
374 self.context.metrics.data_batch_to_batch += start.elapsed();
375 Ok(batch)
376 }
377
378 pub(crate) fn into_context(self) -> ReadPartitionContext {
379 self.context
380 }
381
382 fn advance_source(&mut self) -> Result<()> {
383 let read_source = Instant::now();
384 self.source.next()?;
385 self.context.metrics.read_source += read_source.elapsed();
386 Ok(())
387 }
388}
389
390pub(crate) struct ReadPartitionContext {
392 metadata: RegionMetadataRef,
393 row_codec: Arc<dyn PrimaryKeyCodec>,
394 projection: HashSet<ColumnId>,
395 filters: Arc<Vec<SimpleFilterEvaluator>>,
396 pk_weights: Vec<u16>,
398 need_prune_key: bool,
399 metrics: PartitionReaderMetrics,
400}
401
402impl Drop for ReadPartitionContext {
403 fn drop(&mut self) {
404 let partition_read_source = self.metrics.read_source.as_secs_f64();
405 PARTITION_TREE_READ_STAGE_ELAPSED
406 .with_label_values(&["partition_read_source"])
407 .observe(partition_read_source);
408 let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
409 PARTITION_TREE_READ_STAGE_ELAPSED
410 .with_label_values(&["partition_data_batch_to_batch"])
411 .observe(partition_data_batch_to_batch);
412
413 common_telemetry::debug!(
414 "TreeIter partitions metrics, \
415 num_builder: {}, \
416 num_shards: {}, \
417 build_partition_reader: {}s, \
418 partition_read_source: {}s, \
419 partition_data_batch_to_batch: {}s",
420 self.metrics.num_builder,
421 self.metrics.num_shards,
422 self.metrics.build_partition_reader.as_secs_f64(),
423 partition_read_source,
424 partition_data_batch_to_batch,
425 );
426 }
427}
428
429impl ReadPartitionContext {
430 pub(crate) fn new(
431 metadata: RegionMetadataRef,
432 row_codec: Arc<dyn PrimaryKeyCodec>,
433 projection: HashSet<ColumnId>,
434 filters: Arc<Vec<SimpleFilterEvaluator>>,
435 ) -> ReadPartitionContext {
436 let need_prune_key = Self::need_prune_key(&metadata, &filters);
437 ReadPartitionContext {
438 metadata,
439 row_codec,
440 projection,
441 filters,
442 pk_weights: Vec::new(),
443 need_prune_key,
444 metrics: Default::default(),
445 }
446 }
447
448 fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
451 for filter in filters {
452 if is_partition_column(filter.column_name()) {
454 continue;
455 }
456 let Some(column) = metadata.column_by_name(filter.column_name()) else {
457 continue;
458 };
459 if column.semantic_type != SemanticType::Tag {
460 continue;
461 }
462
463 return true;
464 }
465
466 false
467 }
468}
469
470fn data_batch_to_batch(
473 metadata: &RegionMetadataRef,
474 projection: &HashSet<ColumnId>,
475 key: Option<&[u8]>,
476 data_batch: DataBatch,
477) -> Result<Batch> {
478 let record_batch = data_batch.slice_record_batch();
479 let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
480 let mut builder = BatchBuilder::new(primary_key);
481 builder
482 .timestamps_array(record_batch.column(1).clone())?
483 .sequences_array(record_batch.column(2).clone())?
484 .op_types_array(record_batch.column(3).clone())?;
485
486 if record_batch.num_columns() <= 4 {
487 return builder.build();
489 }
490
491 for (array, field) in record_batch
493 .columns()
494 .iter()
495 .zip(record_batch.schema().fields().iter())
496 .skip(4)
497 {
498 let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
501 if !projection.contains(&column_id) {
502 continue;
503 }
504 builder.push_field_array(column_id, array.clone())?;
505 }
506
507 builder.build()
508}
509
510struct Inner {
514 metadata: RegionMetadataRef,
515 pk_to_pk_id: HashMap<Vec<u8>, PkId>,
517 shard_builder: ShardBuilder,
519 shards: Vec<Shard>,
521 num_rows: usize,
522 frozen: bool,
523}
524
525impl Inner {
526 fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
527 let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
528 let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
529 (
530 vec![Shard::new(
531 0,
532 None,
533 data_parts,
534 config.dedup,
535 config.data_freeze_threshold,
536 )],
537 1,
538 )
539 } else {
540 (Vec::new(), 0)
541 };
542 let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
543 Self {
544 metadata,
545 pk_to_pk_id: HashMap::new(),
546 shard_builder,
547 shards,
548 num_rows: 0,
549 frozen: false,
550 }
551 }
552
553 fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
554 assert!(!self.frozen);
555 self.pk_to_pk_id.get(primary_key).copied()
556 }
557
558 fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
559 if pk_id.shard_id == self.shard_builder.current_shard_id() {
560 self.shard_builder.write_with_pk_id(pk_id, key_value);
561 return Ok(());
562 }
563
564 let shard = self
566 .shards
567 .iter_mut()
568 .find(|shard| shard.shard_id == pk_id.shard_id)
569 .unwrap();
570 shard.write_with_pk_id(pk_id, key_value)?;
571 self.num_rows += 1;
572
573 Ok(())
574 }
575
576 fn freeze_active_shard(&mut self) -> Result<()> {
577 if let Some(shard) = self
578 .shard_builder
579 .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
580 {
581 self.shards.push(shard);
582 }
583 Ok(())
584 }
585
586 fn series_count(&self) -> usize {
588 self.pk_to_pk_id.len()
589 }
590}