1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use partition::expr::PartitionExpr;
19use snafu::{OptionExt, ResultExt, ensure};
20use store_api::storage::RegionId;
21
22use crate::error;
23pub use crate::error::{Error, Result};
24use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
25
26pub struct RemapManifest {
28 old_manifests: HashMap<RegionId, RegionManifest>,
30 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
32 region_mapping: HashMap<RegionId, Vec<RegionId>>,
34 new_manifests: HashMap<RegionId, RegionManifest>,
36}
37
38impl RemapManifest {
39 pub fn new(
40 old_manifests: HashMap<RegionId, RegionManifest>,
41 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
42 region_mapping: HashMap<RegionId, Vec<RegionId>>,
43 ) -> Self {
44 Self {
45 old_manifests,
46 new_partition_exprs,
47 region_mapping,
48 new_manifests: HashMap::new(),
49 }
50 }
51
52 pub fn remap_manifests(&mut self) -> Result<RemapResult> {
57 self.initialize_new_manifests()?;
59
60 self.do_remap()?;
62
63 self.finalize_manifests()?;
65
66 let stats = self.compute_stats();
68 self.validate_result(&stats)?;
69
70 let new_manifests = std::mem::take(&mut self.new_manifests);
71
72 Ok(RemapResult {
73 new_manifests,
74 stats,
75 })
76 }
77
78 fn initialize_new_manifests(&mut self) -> Result<()> {
80 let mut new_manifests = HashMap::new();
81
82 let template_manifest = self
84 .old_manifests
85 .values()
86 .next()
87 .context(error::NoOldManifestsSnafu)?;
88 let template_metadata = (*template_manifest.metadata).clone();
89 let sst_format = template_manifest.sst_format;
90
91 for region_id in self.new_partition_exprs.keys() {
93 let mut new_metadata = template_metadata.clone();
95
96 new_metadata.region_id = *region_id;
97 let new_partition_expr = self
98 .new_partition_exprs
99 .get(region_id)
100 .context(error::MissingPartitionExprSnafu {
101 region_id: *region_id,
102 })?
103 .as_json_str()
104 .context(error::SerializePartitionExprSnafu)?;
105 new_metadata.partition_expr = Some(new_partition_expr);
106
107 let manifest = RegionManifest {
108 metadata: Arc::new(new_metadata),
109 files: HashMap::new(),
110 removed_files: RemovedFilesRecord::default(),
111 flushed_entry_id: 0,
112 flushed_sequence: 0,
113 committed_sequence: None,
114 manifest_version: 0,
115 truncated_entry_id: None,
116 compaction_time_window: None,
117 sst_format,
118 };
119
120 new_manifests.insert(*region_id, manifest);
121 }
122
123 self.new_manifests = new_manifests;
124
125 Ok(())
126 }
127
128 fn do_remap(&mut self) -> Result<()> {
130 for (&from_region_id, target_region_ids) in &self.region_mapping {
132 let from_manifest = self.old_manifests.get(&from_region_id).context(
134 error::MissingOldManifestSnafu {
135 region_id: from_region_id,
136 },
137 )?;
138
139 for &to_region_id in target_region_ids {
141 let target_manifest = self.new_manifests.get_mut(&to_region_id).context(
142 error::MissingNewManifestSnafu {
143 region_id: to_region_id,
144 },
145 )?;
146
147 Self::copy_files_to_region(from_manifest, target_manifest)?;
148 }
149 }
150
151 Ok(())
152 }
153
154 fn copy_files_to_region(
156 source_manifest: &RegionManifest,
157 target_manifest: &mut RegionManifest,
158 ) -> Result<()> {
159 for (file_id, file_meta) in &source_manifest.files {
160 let file_meta_clone = file_meta.clone();
161
162 use std::collections::hash_map::Entry;
165 match target_manifest.files.entry(*file_id) {
166 Entry::Vacant(e) => {
167 e.insert(file_meta_clone);
168 }
169 #[cfg(debug_assertions)]
170 Entry::Occupied(e) => {
171 Self::verify_file_consistency(e.get(), &file_meta_clone)?;
173 }
174 #[cfg(not(debug_assertions))]
175 Entry::Occupied(_) => {}
176 }
177 }
178
179 Ok(())
180 }
181
182 #[cfg(debug_assertions)]
184 fn verify_file_consistency(
185 existing: &crate::sst::file::FileMeta,
186 new: &crate::sst::file::FileMeta,
187 ) -> Result<()> {
188 ensure!(
192 existing.region_id == new.region_id,
193 error::InconsistentFileSnafu {
194 file_id: existing.file_id,
195 reason: "region_id mismatch",
196 }
197 );
198
199 ensure!(
200 existing.file_id == new.file_id,
201 error::InconsistentFileSnafu {
202 file_id: existing.file_id,
203 reason: "file_id mismatch",
204 }
205 );
206
207 ensure!(
208 existing.time_range == new.time_range,
209 error::InconsistentFileSnafu {
210 file_id: existing.file_id,
211 reason: "time_range mismatch",
212 }
213 );
214
215 ensure!(
216 existing.level == new.level,
217 error::InconsistentFileSnafu {
218 file_id: existing.file_id,
219 reason: "level mismatch",
220 }
221 );
222
223 ensure!(
224 existing.file_size == new.file_size,
225 error::InconsistentFileSnafu {
226 file_id: existing.file_id,
227 reason: "file_size mismatch",
228 }
229 );
230
231 ensure!(
232 existing.partition_expr == new.partition_expr,
233 error::InconsistentFileSnafu {
234 file_id: existing.file_id,
235 reason: "partition_expr mismatch",
236 }
237 );
238
239 Ok(())
240 }
241
242 fn finalize_manifests(&mut self) -> Result<()> {
244 for (region_id, manifest) in self.new_manifests.iter_mut() {
245 if let Some(previous_manifest) = self.old_manifests.get(region_id) {
246 manifest.flushed_entry_id = previous_manifest.flushed_entry_id;
247 manifest.flushed_sequence = previous_manifest.flushed_sequence;
248 manifest.manifest_version = previous_manifest.manifest_version;
249 manifest.truncated_entry_id = previous_manifest.truncated_entry_id;
250 manifest.compaction_time_window = previous_manifest.compaction_time_window;
251 manifest.committed_sequence = previous_manifest.committed_sequence;
252 } else {
253 manifest.flushed_entry_id = 0;
255 manifest.flushed_sequence = 0;
256 manifest.manifest_version = 0;
257 manifest.truncated_entry_id = None;
258 manifest.compaction_time_window = None;
259 manifest.committed_sequence = None;
260 }
261
262 manifest.removed_files = RemovedFilesRecord::default();
264 }
265
266 Ok(())
267 }
268
269 fn compute_stats(&self) -> RemapStats {
271 let mut files_per_region = HashMap::with_capacity(self.new_manifests.len());
272 let mut total_file_refs = 0;
273 let mut empty_regions = Vec::new();
274 let mut all_files = HashSet::new();
275
276 for (®ion_id, manifest) in &self.new_manifests {
277 let file_count = manifest.files.len();
278 files_per_region.insert(region_id, file_count);
279 total_file_refs += file_count;
280
281 if file_count == 0 {
282 empty_regions.push(region_id);
283 }
284
285 for file_id in manifest.files.keys() {
286 all_files.insert(*file_id);
287 }
288 }
289
290 RemapStats {
291 files_per_region,
292 total_file_refs,
293 empty_regions,
294 unique_files: all_files.len(),
295 }
296 }
297
298 fn validate_result(&self, stats: &RemapStats) -> Result<()> {
300 for region_id in self.new_partition_exprs.keys() {
302 ensure!(
303 self.new_manifests.contains_key(region_id),
304 error::MissingNewManifestSnafu {
305 region_id: *region_id
306 }
307 );
308 }
309
310 let mut old_unique_files = HashSet::new();
313 for manifest in self.old_manifests.values() {
314 for file_id in manifest.files.keys() {
315 old_unique_files.insert(*file_id);
316 }
317 }
318
319 ensure!(
320 stats.unique_files >= old_unique_files.len(),
321 error::FilesLostSnafu {
322 old_count: old_unique_files.len(),
323 new_count: stats.unique_files,
324 }
325 );
326
327 if !stats.empty_regions.is_empty() {
329 common_telemetry::warn!(
330 "Repartition resulted in {} empty regions: {:?}, new partition exprs: {:?}",
331 stats.empty_regions.len(),
332 stats.empty_regions,
333 self.new_partition_exprs.keys().collect::<Vec<_>>()
334 );
335 }
336
337 Ok(())
338 }
339}
340
341#[derive(Debug)]
343pub struct RemapResult {
344 pub new_manifests: HashMap<RegionId, RegionManifest>,
346 pub stats: RemapStats,
348}
349
350#[derive(Debug)]
352pub struct RemapStats {
353 pub files_per_region: HashMap<RegionId, usize>,
355 pub total_file_refs: usize,
357 pub empty_regions: Vec<RegionId>,
359 pub unique_files: usize,
361}
362
363#[cfg(test)]
364mod tests {
365 use std::collections::HashMap;
366 use std::num::NonZeroU64;
367 use std::sync::Arc;
368 use std::time::Duration;
369
370 use api::v1::SemanticType;
371 use datatypes::prelude::ConcreteDataType;
372 use datatypes::schema::ColumnSchema;
373 use datatypes::value::Value;
374 use partition::expr::{PartitionExpr, col};
375 use smallvec::SmallVec;
376 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
377 use store_api::storage::{FileId, RegionId, SequenceNumber};
378
379 use super::*;
380 use crate::manifest::action::RegionManifest;
381 use crate::sst::FormatType;
382 use crate::sst::file::{FileMeta, FileTimeRange};
383 use crate::wal::EntryId;
384
385 fn create_region_metadata(region_id: RegionId) -> RegionMetadataRef {
387 let mut builder = RegionMetadataBuilder::new(region_id);
388 builder
389 .push_column_metadata(ColumnMetadata {
390 column_schema: ColumnSchema::new(
391 "ts",
392 ConcreteDataType::timestamp_millisecond_datatype(),
393 false,
394 ),
395 semantic_type: SemanticType::Timestamp,
396 column_id: 1,
397 })
398 .push_column_metadata(ColumnMetadata {
399 column_schema: ColumnSchema::new("pk", ConcreteDataType::int64_datatype(), false),
400 semantic_type: SemanticType::Tag,
401 column_id: 2,
402 })
403 .push_column_metadata(ColumnMetadata {
404 column_schema: ColumnSchema::new(
405 "val",
406 ConcreteDataType::float64_datatype(),
407 false,
408 ),
409 semantic_type: SemanticType::Field,
410 column_id: 3,
411 })
412 .primary_key(vec![2]);
413 Arc::new(builder.build().unwrap())
414 }
415
416 fn create_file_meta(
418 region_id: RegionId,
419 file_id: FileId,
420 partition_expr: Option<PartitionExpr>,
421 ) -> FileMeta {
422 FileMeta {
423 region_id,
424 file_id,
425 time_range: FileTimeRange::default(),
426 level: 0,
427 file_size: 1024,
428 max_row_group_uncompressed_size: 1024,
429 available_indexes: SmallVec::new(),
430 indexes: Default::default(),
431 index_file_size: 0,
432 index_version: 0,
433 num_rows: 100,
434 num_row_groups: 1,
435 sequence: NonZeroU64::new(1),
436 partition_expr,
437 num_series: 1,
438 }
439 }
440
441 fn create_manifest(
443 region_id: RegionId,
444 num_files: usize,
445 partition_expr: Option<PartitionExpr>,
446 flushed_entry_id: EntryId,
447 flushed_sequence: SequenceNumber,
448 ) -> RegionManifest {
449 let mut files = HashMap::new();
450 for _ in 0..num_files {
451 let file_id = FileId::random();
452 let file_meta = create_file_meta(region_id, file_id, partition_expr.clone());
453 files.insert(file_id, file_meta);
454 }
455
456 RegionManifest {
457 metadata: create_region_metadata(region_id),
458 files,
459 removed_files: RemovedFilesRecord::default(),
460 flushed_entry_id,
461 flushed_sequence,
462 manifest_version: 1,
463 truncated_entry_id: None,
464 compaction_time_window: None,
465 committed_sequence: None,
466 sst_format: FormatType::PrimaryKey,
467 }
468 }
469
470 fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
472 col(col_name)
473 .gt_eq(Value::Int64(start))
474 .and(col(col_name).lt(Value::Int64(end)))
475 }
476
477 #[test]
478 fn test_simple_split() {
479 let old_region_id = RegionId::new(1, 1);
481 let new_region_id_1 = RegionId::new(1, 2);
482 let new_region_id_2 = RegionId::new(1, 3);
483
484 let old_expr = range_expr("x", 0, 100);
485 let new_expr_1 = range_expr("x", 0, 50);
486 let new_expr_2 = range_expr("x", 50, 100);
487
488 let old_manifest = create_manifest(old_region_id, 10, Some(old_expr.clone()), 100, 200);
489
490 let mut old_manifests = HashMap::new();
491 old_manifests.insert(old_region_id, old_manifest);
492
493 let mut new_partition_exprs = HashMap::new();
494 new_partition_exprs.insert(new_region_id_1, new_expr_1);
495 new_partition_exprs.insert(new_region_id_2, new_expr_2);
496
497 let mut region_mapping = HashMap::new();
499 region_mapping.insert(old_region_id, vec![new_region_id_1, new_region_id_2]);
500
501 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
502
503 let result = remapper.remap_manifests().unwrap();
504
505 assert_eq!(result.new_manifests.len(), 2);
507 assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 10);
508 assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 10);
509 assert_eq!(result.stats.total_file_refs, 20);
510 assert_eq!(result.stats.unique_files, 10);
511 assert!(result.stats.empty_regions.is_empty());
512
513 for file_meta in result.new_manifests[&new_region_id_1].files.values() {
515 assert_eq!(file_meta.region_id, old_region_id);
516 }
517 for file_meta in result.new_manifests[&new_region_id_2].files.values() {
518 assert_eq!(file_meta.region_id, old_region_id);
519 }
520 }
521
522 #[test]
523 fn test_simple_merge() {
524 let old_region_id_1 = RegionId::new(1, 1);
526 let old_region_id_2 = RegionId::new(1, 2);
527 let new_region_id = RegionId::new(1, 3);
528
529 let old_expr_1 = range_expr("x", 0, 50);
530 let old_expr_2 = range_expr("x", 50, 100);
531 let new_expr = range_expr("x", 0, 100);
532
533 let manifest_1 = create_manifest(old_region_id_1, 5, Some(old_expr_1.clone()), 100, 200);
534 let manifest_2 = create_manifest(old_region_id_2, 5, Some(old_expr_2.clone()), 150, 250);
535
536 let mut old_manifests = HashMap::new();
537 old_manifests.insert(old_region_id_1, manifest_1);
538 old_manifests.insert(old_region_id_2, manifest_2);
539
540 let mut new_partition_exprs = HashMap::new();
541 new_partition_exprs.insert(new_region_id, new_expr);
542
543 let mut region_mapping = HashMap::new();
545 region_mapping.insert(old_region_id_1, vec![new_region_id]);
546 region_mapping.insert(old_region_id_2, vec![new_region_id]);
547
548 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
549
550 let result = remapper.remap_manifests().unwrap();
551
552 assert_eq!(result.new_manifests.len(), 1);
554 assert_eq!(result.new_manifests[&new_region_id].files.len(), 10);
555 assert_eq!(result.stats.total_file_refs, 10);
556 assert_eq!(result.stats.unique_files, 10);
557 assert!(result.stats.empty_regions.is_empty());
558
559 let new_manifest = &result.new_manifests[&new_region_id];
561 assert_eq!(new_manifest.flushed_entry_id, 0);
562 assert_eq!(new_manifest.flushed_sequence, 0);
563 assert_eq!(new_manifest.manifest_version, 0);
564 assert_eq!(new_manifest.truncated_entry_id, None);
565 assert_eq!(new_manifest.compaction_time_window, None);
566 }
567
568 #[test]
569 fn test_metadata_preserved_for_existing_region() {
570 let old_region_id_1 = RegionId::new(1, 1);
572 let old_region_id_2 = RegionId::new(1, 2);
573 let old_region_id_3 = RegionId::new(1, 3);
574 let new_region_id = RegionId::new(1, 4);
575
576 let new_expr = range_expr("x", 0, 100);
577
578 let mut manifest_1 = create_manifest(old_region_id_1, 2, None, 10, 20);
579 manifest_1.truncated_entry_id = Some(5);
580 manifest_1.compaction_time_window = Some(Duration::from_secs(3600));
581
582 let mut manifest_2 = create_manifest(old_region_id_2, 2, None, 25, 15); manifest_2.truncated_entry_id = Some(20);
584 manifest_2.compaction_time_window = Some(Duration::from_secs(7200)); let manifest_3 = create_manifest(old_region_id_3, 2, None, 15, 30); let mut previous_manifest = create_manifest(new_region_id, 0, None, 200, 300);
588 previous_manifest.truncated_entry_id = Some(40);
589 previous_manifest.compaction_time_window = Some(Duration::from_secs(1800));
590 previous_manifest.manifest_version = 7;
591 let expected_flushed_entry_id = previous_manifest.flushed_entry_id;
592 let expected_flushed_sequence = previous_manifest.flushed_sequence;
593 let expected_truncated_entry_id = previous_manifest.truncated_entry_id;
594 let expected_compaction_window = previous_manifest.compaction_time_window;
595 let expected_manifest_version = previous_manifest.manifest_version;
596
597 let mut old_manifests = HashMap::new();
598 old_manifests.insert(old_region_id_1, manifest_1);
599 old_manifests.insert(old_region_id_2, manifest_2);
600 old_manifests.insert(old_region_id_3, manifest_3);
601 old_manifests.insert(new_region_id, previous_manifest);
602
603 let mut new_partition_exprs = HashMap::new();
604 new_partition_exprs.insert(new_region_id, new_expr);
605
606 let mut region_mapping = HashMap::new();
608 region_mapping.insert(old_region_id_1, vec![new_region_id]);
609 region_mapping.insert(old_region_id_2, vec![new_region_id]);
610 region_mapping.insert(old_region_id_3, vec![new_region_id]);
611
612 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
613
614 let result = remapper.remap_manifests().unwrap();
615
616 let new_manifest = &result.new_manifests[&new_region_id];
617 assert_eq!(new_manifest.flushed_entry_id, expected_flushed_entry_id);
619 assert_eq!(new_manifest.flushed_sequence, expected_flushed_sequence);
620 assert_eq!(new_manifest.truncated_entry_id, expected_truncated_entry_id);
621 assert_eq!(
622 new_manifest.compaction_time_window,
623 expected_compaction_window
624 );
625 assert_eq!(new_manifest.manifest_version, expected_manifest_version);
626 }
627
628 #[test]
629 fn test_file_consistency_check() {
630 let old_region_id_1 = RegionId::new(1, 1);
632 let old_region_id_2 = RegionId::new(1, 2);
633 let new_region_id = RegionId::new(1, 3);
634
635 let new_expr = range_expr("x", 0, 100);
636
637 let shared_file_id = FileId::random();
639 let file_meta = create_file_meta(old_region_id_1, shared_file_id, None);
640
641 let mut manifest_1 = create_manifest(old_region_id_1, 0, None, 100, 200);
642 manifest_1.files.insert(shared_file_id, file_meta.clone());
643
644 let mut manifest_2 = create_manifest(old_region_id_2, 0, None, 100, 200);
645 manifest_2.files.insert(shared_file_id, file_meta);
646
647 let mut old_manifests = HashMap::new();
648 old_manifests.insert(old_region_id_1, manifest_1);
649 old_manifests.insert(old_region_id_2, manifest_2);
650
651 let mut new_partition_exprs = HashMap::new();
652 new_partition_exprs.insert(new_region_id, new_expr);
653
654 let mut region_mapping = HashMap::new();
656 region_mapping.insert(old_region_id_1, vec![new_region_id]);
657 region_mapping.insert(old_region_id_2, vec![new_region_id]);
658
659 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
660
661 let result = remapper.remap_manifests().unwrap();
662
663 assert_eq!(result.new_manifests[&new_region_id].files.len(), 1);
665 assert_eq!(result.stats.total_file_refs, 1);
666 assert_eq!(result.stats.unique_files, 1);
667 }
668
669 #[test]
670 fn test_empty_regions() {
671 let old_region_id = RegionId::new(1, 1);
673 let new_region_id_1 = RegionId::new(1, 2);
674 let new_region_id_2 = RegionId::new(1, 3);
675
676 let old_expr = range_expr("x", 0, 50);
677 let new_expr_1 = range_expr("x", 0, 50);
678 let new_expr_2 = range_expr("x", 100, 200); let old_manifest = create_manifest(old_region_id, 5, Some(old_expr.clone()), 100, 200);
681
682 let mut old_manifests = HashMap::new();
683 old_manifests.insert(old_region_id, old_manifest);
684
685 let mut new_partition_exprs = HashMap::new();
686 new_partition_exprs.insert(new_region_id_1, new_expr_1);
687 new_partition_exprs.insert(new_region_id_2, new_expr_2);
688
689 let mut region_mapping = HashMap::new();
691 region_mapping.insert(old_region_id, vec![new_region_id_1]);
692
693 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
694
695 let result = remapper.remap_manifests().unwrap();
696
697 assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 5);
699 assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 0);
700 assert_eq!(result.stats.empty_regions, vec![new_region_id_2]);
701 }
702
703 #[test]
704 fn test_n_to_m_complex_repartition() {
705 let old_region_1 = RegionId::new(1, 1);
707 let old_region_2 = RegionId::new(1, 2);
708 let new_region_1 = RegionId::new(1, 3);
709 let new_region_2 = RegionId::new(1, 4);
710 let new_region_3 = RegionId::new(1, 5);
711
712 let old_expr_1 = range_expr("u", 0, 100);
715 let old_expr_2 = range_expr("u", 100, 200);
716 let new_expr_1 = range_expr("u", 0, 50);
717 let new_expr_2 = range_expr("u", 50, 150);
718 let new_expr_3 = range_expr("u", 150, 250);
719
720 let manifest_1 = create_manifest(old_region_1, 3, Some(old_expr_1.clone()), 100, 200);
721 let manifest_2 = create_manifest(old_region_2, 4, Some(old_expr_2.clone()), 150, 250);
722
723 let mut old_manifests = HashMap::new();
724 old_manifests.insert(old_region_1, manifest_1);
725 old_manifests.insert(old_region_2, manifest_2);
726
727 let mut new_partition_exprs = HashMap::new();
728 new_partition_exprs.insert(new_region_1, new_expr_1);
729 new_partition_exprs.insert(new_region_2, new_expr_2);
730 new_partition_exprs.insert(new_region_3, new_expr_3);
731
732 let mut region_mapping = HashMap::new();
736 region_mapping.insert(old_region_1, vec![new_region_1, new_region_2]);
737 region_mapping.insert(old_region_2, vec![new_region_2, new_region_3]);
738
739 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
740
741 let result = remapper.remap_manifests().unwrap();
742
743 assert_eq!(result.new_manifests.len(), 3);
744 assert_eq!(result.new_manifests[&new_region_1].files.len(), 3);
745 assert_eq!(result.new_manifests[&new_region_2].files.len(), 7); assert_eq!(result.new_manifests[&new_region_3].files.len(), 4);
747 assert_eq!(result.stats.total_file_refs, 14); assert_eq!(result.stats.unique_files, 7); }
750}