1use std::collections::{HashMap, HashSet};
20use std::sync::{Arc, RwLock};
21use std::time::{Duration, Instant};
22
23use api::v1::SemanticType;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadataRef;
27use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
28use store_api::storage::ColumnId;
29
30use crate::error::Result;
31use crate::memtable::key_values::KeyValue;
32use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
33use crate::memtable::partition_tree::dedup::DedupReader;
34use crate::memtable::partition_tree::shard::{
35 BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
36};
37use crate::memtable::partition_tree::shard_builder::ShardBuilder;
38use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
39use crate::memtable::stats::WriteMetrics;
40use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
41use crate::read::{Batch, BatchBuilder};
42use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
43
44pub type PartitionKey = u32;
46
47pub struct Partition {
49 inner: RwLock<Inner>,
50 dedup: bool,
52}
53
54pub type PartitionRef = Arc<Partition>;
55
56impl Partition {
57 pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
59 Partition {
60 inner: RwLock::new(Inner::new(metadata, config)),
61 dedup: config.dedup,
62 }
63 }
64
65 pub fn write_with_key(
67 &self,
68 primary_key: &mut Vec<u8>,
69 row_codec: &dyn PrimaryKeyCodec,
70 key_value: KeyValue,
71 re_encode: bool,
72 metrics: &mut WriteMetrics,
73 ) -> Result<()> {
74 let mut inner = self.inner.write().unwrap();
75 if inner.shard_builder.should_freeze() {
77 inner.freeze_active_shard()?;
78 }
79
80 if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
82 inner.write_to_shard(pk_id, &key_value)?;
83 inner.num_rows += 1;
84 return Ok(());
85 }
86
87 if re_encode {
89 match row_codec.encoding() {
90 PrimaryKeyEncoding::Dense => {
91 let sparse_key = primary_key.clone();
93 primary_key.clear();
94 row_codec.encode_key_value(&key_value, primary_key)?;
95 let pk_id = inner.shard_builder.write_with_key(
96 primary_key,
97 Some(&sparse_key),
98 &key_value,
99 metrics,
100 );
101 inner.pk_to_pk_id.insert(sparse_key, pk_id);
102 }
103 PrimaryKeyEncoding::Sparse => {
104 let sparse_key = primary_key.clone();
105 let pk_id = inner.shard_builder.write_with_key(
106 primary_key,
107 Some(&sparse_key),
108 &key_value,
109 metrics,
110 );
111 inner.pk_to_pk_id.insert(sparse_key, pk_id);
112 }
113 }
114 } else {
115 let pk_id = inner
117 .shard_builder
118 .write_with_key(primary_key, None, &key_value, metrics);
119 inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
120 };
121
122 inner.num_rows += 1;
123 Ok(())
124 }
125
126 pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
128 let mut inner = self.inner.write().unwrap();
129 debug_assert!(!inner.shards.is_empty());
131 debug_assert_eq!(1, inner.shard_builder.current_shard_id());
132
133 let pk_id = PkId {
135 shard_id: 0,
136 pk_index: 0,
137 };
138 inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
139 inner.num_rows += 1;
140
141 Ok(())
142 }
143
144 fn build_primary_key_filter(
145 need_prune_key: bool,
146 metadata: &RegionMetadataRef,
147 row_codec: &dyn PrimaryKeyCodec,
148 filters: &Arc<Vec<SimpleFilterEvaluator>>,
149 ) -> Option<Box<dyn PrimaryKeyFilter>> {
150 if need_prune_key {
151 let filter = row_codec.primary_key_filter(metadata, filters.clone());
152 Some(filter)
153 } else {
154 None
155 }
156 }
157
158 pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
160 let start = Instant::now();
161 let (builder_source, shard_reader_builders) = {
162 let inner = self.inner.read().unwrap();
163 let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
164 let builder_reader = if !inner.shard_builder.is_empty() {
165 let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
166 Some(builder_reader)
167 } else {
168 None
169 };
170 for shard in &inner.shards {
171 if !shard.is_empty() {
172 let shard_reader_builder = shard.read()?;
173 shard_source.push(shard_reader_builder);
174 }
175 }
176 (builder_reader, shard_source)
177 };
178
179 context.metrics.num_shards += shard_reader_builders.len();
180
181 let mut nodes = shard_reader_builders
182 .into_iter()
183 .map(|builder| {
184 let primary_key_filter = Self::build_primary_key_filter(
185 context.need_prune_key,
186 &context.metadata,
187 context.row_codec.as_ref(),
188 &context.filters,
189 );
190 Ok(ShardNode::new(ShardSource::Shard(
191 builder.build(primary_key_filter)?,
192 )))
193 })
194 .collect::<Result<Vec<_>>>()?;
195
196 if let Some(builder) = builder_source {
197 context.metrics.num_builder += 1;
198 let primary_key_filter = Self::build_primary_key_filter(
199 context.need_prune_key,
200 &context.metadata,
201 context.row_codec.as_ref(),
202 &context.filters,
203 );
204 let shard_builder_reader =
206 builder.build(Some(&context.pk_weights), primary_key_filter)?;
207 nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
208 }
209
210 let merger = ShardMerger::try_new(nodes)?;
212 if self.dedup {
213 let source = DedupReader::try_new(merger)?;
214 context.metrics.build_partition_reader += start.elapsed();
215 PartitionReader::new(context, Box::new(source))
216 } else {
217 context.metrics.build_partition_reader += start.elapsed();
218 PartitionReader::new(context, Box::new(merger))
219 }
220 }
221
222 pub fn freeze(&self) -> Result<()> {
224 let mut inner = self.inner.write().unwrap();
225 inner.freeze_active_shard()?;
226 Ok(())
227 }
228
229 pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
233 let (shards, shard_builder) = {
234 let inner = self.inner.read().unwrap();
235 debug_assert!(inner.shard_builder.is_empty());
236 let shard_builder = ShardBuilder::new(
237 metadata.clone(),
238 config,
239 inner.shard_builder.current_shard_id(),
240 );
241 let shards = inner
242 .shards
243 .iter()
244 .map(|shard| shard.fork(metadata.clone()))
245 .collect();
246
247 (shards, shard_builder)
248 };
249 let pk_to_pk_id = {
250 let mut inner = self.inner.write().unwrap();
251 std::mem::take(&mut inner.pk_to_pk_id)
252 };
253
254 Partition {
255 inner: RwLock::new(Inner {
256 metadata: metadata.clone(),
257 shard_builder,
258 shards,
259 num_rows: 0,
260 pk_to_pk_id,
261 frozen: false,
262 }),
263 dedup: self.dedup,
264 }
265 }
266
267 pub fn has_data(&self) -> bool {
269 let inner = self.inner.read().unwrap();
270 inner.num_rows > 0
271 }
272
273 pub(crate) fn stats(&self) -> PartitionStats {
275 let inner = self.inner.read().unwrap();
276 let num_rows = inner.num_rows;
277 let shard_num = inner.shards.len();
278 let shared_memory_size = inner
279 .shards
280 .iter()
281 .map(|shard| shard.shared_memory_size())
282 .sum();
283 PartitionStats {
284 num_rows,
285 shard_num,
286 shared_memory_size,
287 }
288 }
289
290 pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
292 if !is_partitioned {
293 return PartitionKey::default();
294 }
295
296 key_value.partition_key()
297 }
298
299 pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
301 metadata
302 .primary_key_columns()
303 .next()
304 .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
305 .unwrap_or(false)
306 }
307
308 pub(crate) fn is_partition_column(name: &str) -> bool {
310 name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
311 }
312}
313
314pub(crate) struct PartitionStats {
315 pub(crate) num_rows: usize,
316 pub(crate) shard_num: usize,
317 pub(crate) shared_memory_size: usize,
318}
319
320#[derive(Default)]
321struct PartitionReaderMetrics {
322 build_partition_reader: Duration,
323 read_source: Duration,
324 data_batch_to_batch: Duration,
325 num_builder: usize,
326 num_shards: usize,
327}
328
329pub struct PartitionReader {
333 context: ReadPartitionContext,
334 source: BoxedDataBatchSource,
335}
336
337impl PartitionReader {
338 fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
339 let reader = Self { context, source };
340
341 Ok(reader)
342 }
343
344 pub fn is_valid(&self) -> bool {
346 self.source.is_valid()
347 }
348
349 pub fn next(&mut self) -> Result<()> {
354 self.advance_source()
355 }
356
357 pub fn convert_current_batch(&mut self) -> Result<Batch> {
362 let start = Instant::now();
363 let data_batch = self.source.current_data_batch();
364 let batch = data_batch_to_batch(
365 &self.context.metadata,
366 &self.context.projection,
367 self.source.current_key(),
368 data_batch,
369 )?;
370 self.context.metrics.data_batch_to_batch += start.elapsed();
371 Ok(batch)
372 }
373
374 pub(crate) fn into_context(self) -> ReadPartitionContext {
375 self.context
376 }
377
378 fn advance_source(&mut self) -> Result<()> {
379 let read_source = Instant::now();
380 self.source.next()?;
381 self.context.metrics.read_source += read_source.elapsed();
382 Ok(())
383 }
384}
385
386pub(crate) struct ReadPartitionContext {
388 metadata: RegionMetadataRef,
389 row_codec: Arc<dyn PrimaryKeyCodec>,
390 projection: HashSet<ColumnId>,
391 filters: Arc<Vec<SimpleFilterEvaluator>>,
392 pk_weights: Vec<u16>,
394 need_prune_key: bool,
395 metrics: PartitionReaderMetrics,
396}
397
398impl Drop for ReadPartitionContext {
399 fn drop(&mut self) {
400 let partition_read_source = self.metrics.read_source.as_secs_f64();
401 PARTITION_TREE_READ_STAGE_ELAPSED
402 .with_label_values(&["partition_read_source"])
403 .observe(partition_read_source);
404 let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
405 PARTITION_TREE_READ_STAGE_ELAPSED
406 .with_label_values(&["partition_data_batch_to_batch"])
407 .observe(partition_data_batch_to_batch);
408
409 common_telemetry::debug!(
410 "TreeIter partitions metrics, \
411 num_builder: {}, \
412 num_shards: {}, \
413 build_partition_reader: {}s, \
414 partition_read_source: {}s, \
415 partition_data_batch_to_batch: {}s",
416 self.metrics.num_builder,
417 self.metrics.num_shards,
418 self.metrics.build_partition_reader.as_secs_f64(),
419 partition_read_source,
420 partition_data_batch_to_batch,
421 );
422 }
423}
424
425impl ReadPartitionContext {
426 pub(crate) fn new(
427 metadata: RegionMetadataRef,
428 row_codec: Arc<dyn PrimaryKeyCodec>,
429 projection: HashSet<ColumnId>,
430 filters: Arc<Vec<SimpleFilterEvaluator>>,
431 ) -> ReadPartitionContext {
432 let need_prune_key = Self::need_prune_key(&metadata, &filters);
433 ReadPartitionContext {
434 metadata,
435 row_codec,
436 projection,
437 filters,
438 pk_weights: Vec::new(),
439 need_prune_key,
440 metrics: Default::default(),
441 }
442 }
443
444 fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
447 for filter in filters {
448 if Partition::is_partition_column(filter.column_name()) {
450 continue;
451 }
452 let Some(column) = metadata.column_by_name(filter.column_name()) else {
453 continue;
454 };
455 if column.semantic_type != SemanticType::Tag {
456 continue;
457 }
458
459 return true;
460 }
461
462 false
463 }
464}
465
466fn data_batch_to_batch(
469 metadata: &RegionMetadataRef,
470 projection: &HashSet<ColumnId>,
471 key: Option<&[u8]>,
472 data_batch: DataBatch,
473) -> Result<Batch> {
474 let record_batch = data_batch.slice_record_batch();
475 let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
476 let mut builder = BatchBuilder::new(primary_key);
477 builder
478 .timestamps_array(record_batch.column(1).clone())?
479 .sequences_array(record_batch.column(2).clone())?
480 .op_types_array(record_batch.column(3).clone())?;
481
482 if record_batch.num_columns() <= 4 {
483 return builder.build();
485 }
486
487 for (array, field) in record_batch
489 .columns()
490 .iter()
491 .zip(record_batch.schema().fields().iter())
492 .skip(4)
493 {
494 let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
497 if !projection.contains(&column_id) {
498 continue;
499 }
500 builder.push_field_array(column_id, array.clone())?;
501 }
502
503 builder.build()
504}
505
506struct Inner {
510 metadata: RegionMetadataRef,
511 pk_to_pk_id: HashMap<Vec<u8>, PkId>,
513 shard_builder: ShardBuilder,
515 shards: Vec<Shard>,
517 num_rows: usize,
518 frozen: bool,
519}
520
521impl Inner {
522 fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
523 let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
524 let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
525 (
526 vec![Shard::new(
527 0,
528 None,
529 data_parts,
530 config.dedup,
531 config.data_freeze_threshold,
532 )],
533 1,
534 )
535 } else {
536 (Vec::new(), 0)
537 };
538 let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
539 Self {
540 metadata,
541 pk_to_pk_id: HashMap::new(),
542 shard_builder,
543 shards,
544 num_rows: 0,
545 frozen: false,
546 }
547 }
548
549 fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
550 assert!(!self.frozen);
551 self.pk_to_pk_id.get(primary_key).copied()
552 }
553
554 fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
555 if pk_id.shard_id == self.shard_builder.current_shard_id() {
556 self.shard_builder.write_with_pk_id(pk_id, key_value);
557 return Ok(());
558 }
559
560 let shard = self
562 .shards
563 .iter_mut()
564 .find(|shard| shard.shard_id == pk_id.shard_id)
565 .unwrap();
566 shard.write_with_pk_id(pk_id, key_value)?;
567 self.num_rows += 1;
568
569 Ok(())
570 }
571
572 fn freeze_active_shard(&mut self) -> Result<()> {
573 if let Some(shard) = self
574 .shard_builder
575 .finish(self.metadata.clone(), &mut self.pk_to_pk_id)?
576 {
577 self.shards.push(shard);
578 }
579 Ok(())
580 }
581}