1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use partition::expr::PartitionExpr;
19use snafu::{OptionExt, ResultExt, ensure};
20use store_api::storage::RegionId;
21
22use crate::error;
23pub use crate::error::{Error, Result};
24use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
25use crate::sst::file::FileMeta;
26
27pub struct RemapManifest {
29 old_manifests: HashMap<RegionId, RegionManifest>,
31 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
33 region_mapping: HashMap<RegionId, Vec<RegionId>>,
35 new_manifests: HashMap<RegionId, RegionManifest>,
37}
38
39impl RemapManifest {
40 pub fn new(
41 old_manifests: HashMap<RegionId, RegionManifest>,
42 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
43 region_mapping: HashMap<RegionId, Vec<RegionId>>,
44 ) -> Self {
45 Self {
46 old_manifests,
47 new_partition_exprs,
48 region_mapping,
49 new_manifests: HashMap::new(),
50 }
51 }
52
53 pub fn remap_manifests(&mut self) -> Result<RemapResult> {
58 self.initialize_new_manifests()?;
60
61 self.do_remap()?;
63
64 self.finalize_manifests()?;
66
67 let stats = self.compute_stats();
69 self.validate_result(&stats)?;
70
71 let new_manifests = std::mem::take(&mut self.new_manifests);
72
73 Ok(RemapResult {
74 new_manifests,
75 stats,
76 })
77 }
78
79 fn initialize_new_manifests(&mut self) -> Result<()> {
81 let mut new_manifests = HashMap::new();
82
83 let template_manifest = self
85 .old_manifests
86 .values()
87 .next()
88 .context(error::NoOldManifestsSnafu)?;
89 let template_metadata = (*template_manifest.metadata).clone();
90 let sst_format = template_manifest.sst_format;
91
92 for region_id in self.new_partition_exprs.keys() {
94 let mut new_metadata = template_metadata.clone();
96
97 new_metadata.region_id = *region_id;
98 let new_partition_expr = self
99 .new_partition_exprs
100 .get(region_id)
101 .context(error::MissingPartitionExprSnafu {
102 region_id: *region_id,
103 })?
104 .as_json_str()
105 .context(error::SerializePartitionExprSnafu)?;
106 new_metadata.partition_expr = Some(new_partition_expr);
107
108 let manifest = RegionManifest {
109 metadata: Arc::new(new_metadata),
110 files: HashMap::new(),
111 removed_files: RemovedFilesRecord::default(),
112 flushed_entry_id: 0,
113 flushed_sequence: 0,
114 committed_sequence: None,
115 manifest_version: 0,
116 truncated_entry_id: None,
117 compaction_time_window: None,
118 sst_format,
119 };
120
121 new_manifests.insert(*region_id, manifest);
122 }
123
124 self.new_manifests = new_manifests;
125
126 Ok(())
127 }
128
129 fn do_remap(&mut self) -> Result<()> {
131 for (&from_region_id, target_region_ids) in &self.region_mapping {
133 let from_manifest = self.old_manifests.get(&from_region_id).context(
135 error::MissingOldManifestSnafu {
136 region_id: from_region_id,
137 },
138 )?;
139
140 for &to_region_id in target_region_ids {
142 let target_manifest = self.new_manifests.get_mut(&to_region_id).context(
143 error::MissingNewManifestSnafu {
144 region_id: to_region_id,
145 },
146 )?;
147
148 Self::copy_files_to_region(from_manifest, target_manifest)?;
149 }
150 }
151
152 Ok(())
153 }
154
155 fn copy_files_to_region(
157 source_manifest: &RegionManifest,
158 target_manifest: &mut RegionManifest,
159 ) -> Result<()> {
160 for (file_id, file_meta) in &source_manifest.files {
161 let file_meta_clone = file_meta.clone();
162
163 use std::collections::hash_map::Entry;
166 match target_manifest.files.entry(*file_id) {
167 Entry::Vacant(e) => {
168 e.insert(file_meta_clone);
169 }
170 Entry::Occupied(e) => {
171 #[cfg(debug_assertions)]
173 Self::verify_file_consistency(e.get(), &file_meta_clone)?;
174 }
175 }
176 }
177
178 Ok(())
179 }
180
181 #[cfg(debug_assertions)]
183 fn verify_file_consistency(existing: &FileMeta, new: &FileMeta) -> Result<()> {
184 ensure!(
188 existing.region_id == new.region_id,
189 error::InconsistentFileSnafu {
190 file_id: existing.file_id,
191 reason: "region_id mismatch",
192 }
193 );
194
195 ensure!(
196 existing.file_id == new.file_id,
197 error::InconsistentFileSnafu {
198 file_id: existing.file_id,
199 reason: "file_id mismatch",
200 }
201 );
202
203 ensure!(
204 existing.time_range == new.time_range,
205 error::InconsistentFileSnafu {
206 file_id: existing.file_id,
207 reason: "time_range mismatch",
208 }
209 );
210
211 ensure!(
212 existing.level == new.level,
213 error::InconsistentFileSnafu {
214 file_id: existing.file_id,
215 reason: "level mismatch",
216 }
217 );
218
219 ensure!(
220 existing.file_size == new.file_size,
221 error::InconsistentFileSnafu {
222 file_id: existing.file_id,
223 reason: "file_size mismatch",
224 }
225 );
226
227 ensure!(
228 existing.partition_expr == new.partition_expr,
229 error::InconsistentFileSnafu {
230 file_id: existing.file_id,
231 reason: "partition_expr mismatch",
232 }
233 );
234
235 Ok(())
236 }
237
238 fn finalize_manifests(&mut self) -> Result<()> {
240 for (region_id, manifest) in self.new_manifests.iter_mut() {
241 if let Some(previous_manifest) = self.old_manifests.get(region_id) {
242 manifest.flushed_entry_id = previous_manifest.flushed_entry_id;
243 manifest.flushed_sequence = previous_manifest.flushed_sequence;
244 manifest.manifest_version = previous_manifest.manifest_version;
245 manifest.truncated_entry_id = previous_manifest.truncated_entry_id;
246 manifest.compaction_time_window = previous_manifest.compaction_time_window;
247 manifest.committed_sequence = previous_manifest.committed_sequence;
248 } else {
249 manifest.flushed_entry_id = 0;
251 manifest.flushed_sequence = 0;
252 manifest.manifest_version = 0;
253 manifest.truncated_entry_id = None;
254 manifest.compaction_time_window = None;
255 manifest.committed_sequence = None;
256 }
257
258 manifest.removed_files = RemovedFilesRecord::default();
260 }
261
262 Ok(())
263 }
264
265 fn compute_stats(&self) -> RemapStats {
267 let mut files_per_region = HashMap::with_capacity(self.new_manifests.len());
268 let mut total_file_refs = 0;
269 let mut empty_regions = Vec::new();
270 let mut all_files = HashSet::new();
271
272 for (®ion_id, manifest) in &self.new_manifests {
273 let file_count = manifest.files.len();
274 files_per_region.insert(region_id, file_count);
275 total_file_refs += file_count;
276
277 if file_count == 0 {
278 empty_regions.push(region_id);
279 }
280
281 for file_id in manifest.files.keys() {
282 all_files.insert(*file_id);
283 }
284 }
285
286 RemapStats {
287 files_per_region,
288 total_file_refs,
289 empty_regions,
290 unique_files: all_files.len(),
291 }
292 }
293
294 fn validate_result(&self, stats: &RemapStats) -> Result<()> {
296 for region_id in self.new_partition_exprs.keys() {
298 ensure!(
299 self.new_manifests.contains_key(region_id),
300 error::MissingNewManifestSnafu {
301 region_id: *region_id
302 }
303 );
304 }
305
306 let mut old_unique_files = HashSet::new();
309 for manifest in self.old_manifests.values() {
310 for file_id in manifest.files.keys() {
311 old_unique_files.insert(*file_id);
312 }
313 }
314
315 ensure!(
316 stats.unique_files >= old_unique_files.len(),
317 error::FilesLostSnafu {
318 old_count: old_unique_files.len(),
319 new_count: stats.unique_files,
320 }
321 );
322
323 if !stats.empty_regions.is_empty() {
325 common_telemetry::warn!(
326 "Repartition resulted in {} empty regions: {:?}, new partition exprs: {:?}",
327 stats.empty_regions.len(),
328 stats.empty_regions,
329 self.new_partition_exprs.keys().collect::<Vec<_>>()
330 );
331 }
332
333 Ok(())
334 }
335}
336
337#[derive(Debug)]
339pub struct RemapResult {
340 pub new_manifests: HashMap<RegionId, RegionManifest>,
342 pub stats: RemapStats,
344}
345
346#[derive(Debug)]
348pub struct RemapStats {
349 pub files_per_region: HashMap<RegionId, usize>,
351 pub total_file_refs: usize,
353 pub empty_regions: Vec<RegionId>,
355 pub unique_files: usize,
357}
358
359#[cfg(test)]
360mod tests {
361 use std::collections::HashMap;
362 use std::num::NonZeroU64;
363 use std::sync::Arc;
364 use std::time::Duration;
365
366 use api::v1::SemanticType;
367 use datatypes::prelude::ConcreteDataType;
368 use datatypes::schema::ColumnSchema;
369 use datatypes::value::Value;
370 use partition::expr::{PartitionExpr, col};
371 use smallvec::SmallVec;
372 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
373 use store_api::storage::{FileId, RegionId, SequenceNumber};
374
375 use super::*;
376 use crate::manifest::action::RegionManifest;
377 use crate::sst::FormatType;
378 use crate::sst::file::{FileMeta, FileTimeRange};
379 use crate::wal::EntryId;
380
381 fn create_region_metadata(region_id: RegionId) -> RegionMetadataRef {
383 let mut builder = RegionMetadataBuilder::new(region_id);
384 builder
385 .push_column_metadata(ColumnMetadata {
386 column_schema: ColumnSchema::new(
387 "ts",
388 ConcreteDataType::timestamp_millisecond_datatype(),
389 false,
390 ),
391 semantic_type: SemanticType::Timestamp,
392 column_id: 1,
393 })
394 .push_column_metadata(ColumnMetadata {
395 column_schema: ColumnSchema::new("pk", ConcreteDataType::int64_datatype(), false),
396 semantic_type: SemanticType::Tag,
397 column_id: 2,
398 })
399 .push_column_metadata(ColumnMetadata {
400 column_schema: ColumnSchema::new(
401 "val",
402 ConcreteDataType::float64_datatype(),
403 false,
404 ),
405 semantic_type: SemanticType::Field,
406 column_id: 3,
407 })
408 .primary_key(vec![2]);
409 Arc::new(builder.build().unwrap())
410 }
411
412 fn create_file_meta(
414 region_id: RegionId,
415 file_id: FileId,
416 partition_expr: Option<PartitionExpr>,
417 ) -> FileMeta {
418 FileMeta {
419 region_id,
420 file_id,
421 time_range: FileTimeRange::default(),
422 level: 0,
423 file_size: 1024,
424 available_indexes: SmallVec::new(),
425 index_file_size: 0,
426 num_rows: 100,
427 num_row_groups: 1,
428 sequence: NonZeroU64::new(1),
429 partition_expr,
430 }
431 }
432
433 fn create_manifest(
435 region_id: RegionId,
436 num_files: usize,
437 partition_expr: Option<PartitionExpr>,
438 flushed_entry_id: EntryId,
439 flushed_sequence: SequenceNumber,
440 ) -> RegionManifest {
441 let mut files = HashMap::new();
442 for _ in 0..num_files {
443 let file_id = FileId::random();
444 let file_meta = create_file_meta(region_id, file_id, partition_expr.clone());
445 files.insert(file_id, file_meta);
446 }
447
448 RegionManifest {
449 metadata: create_region_metadata(region_id),
450 files,
451 removed_files: RemovedFilesRecord::default(),
452 flushed_entry_id,
453 flushed_sequence,
454 manifest_version: 1,
455 truncated_entry_id: None,
456 compaction_time_window: None,
457 committed_sequence: None,
458 sst_format: FormatType::PrimaryKey,
459 }
460 }
461
462 fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
464 col(col_name)
465 .gt_eq(Value::Int64(start))
466 .and(col(col_name).lt(Value::Int64(end)))
467 }
468
469 #[test]
470 fn test_simple_split() {
471 let old_region_id = RegionId::new(1, 1);
473 let new_region_id_1 = RegionId::new(1, 2);
474 let new_region_id_2 = RegionId::new(1, 3);
475
476 let old_expr = range_expr("x", 0, 100);
477 let new_expr_1 = range_expr("x", 0, 50);
478 let new_expr_2 = range_expr("x", 50, 100);
479
480 let old_manifest = create_manifest(old_region_id, 10, Some(old_expr.clone()), 100, 200);
481
482 let mut old_manifests = HashMap::new();
483 old_manifests.insert(old_region_id, old_manifest);
484
485 let mut new_partition_exprs = HashMap::new();
486 new_partition_exprs.insert(new_region_id_1, new_expr_1);
487 new_partition_exprs.insert(new_region_id_2, new_expr_2);
488
489 let mut region_mapping = HashMap::new();
491 region_mapping.insert(old_region_id, vec![new_region_id_1, new_region_id_2]);
492
493 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
494
495 let result = remapper.remap_manifests().unwrap();
496
497 assert_eq!(result.new_manifests.len(), 2);
499 assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 10);
500 assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 10);
501 assert_eq!(result.stats.total_file_refs, 20);
502 assert_eq!(result.stats.unique_files, 10);
503 assert!(result.stats.empty_regions.is_empty());
504
505 for file_meta in result.new_manifests[&new_region_id_1].files.values() {
507 assert_eq!(file_meta.region_id, old_region_id);
508 }
509 for file_meta in result.new_manifests[&new_region_id_2].files.values() {
510 assert_eq!(file_meta.region_id, old_region_id);
511 }
512 }
513
514 #[test]
515 fn test_simple_merge() {
516 let old_region_id_1 = RegionId::new(1, 1);
518 let old_region_id_2 = RegionId::new(1, 2);
519 let new_region_id = RegionId::new(1, 3);
520
521 let old_expr_1 = range_expr("x", 0, 50);
522 let old_expr_2 = range_expr("x", 50, 100);
523 let new_expr = range_expr("x", 0, 100);
524
525 let manifest_1 = create_manifest(old_region_id_1, 5, Some(old_expr_1.clone()), 100, 200);
526 let manifest_2 = create_manifest(old_region_id_2, 5, Some(old_expr_2.clone()), 150, 250);
527
528 let mut old_manifests = HashMap::new();
529 old_manifests.insert(old_region_id_1, manifest_1);
530 old_manifests.insert(old_region_id_2, manifest_2);
531
532 let mut new_partition_exprs = HashMap::new();
533 new_partition_exprs.insert(new_region_id, new_expr);
534
535 let mut region_mapping = HashMap::new();
537 region_mapping.insert(old_region_id_1, vec![new_region_id]);
538 region_mapping.insert(old_region_id_2, vec![new_region_id]);
539
540 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
541
542 let result = remapper.remap_manifests().unwrap();
543
544 assert_eq!(result.new_manifests.len(), 1);
546 assert_eq!(result.new_manifests[&new_region_id].files.len(), 10);
547 assert_eq!(result.stats.total_file_refs, 10);
548 assert_eq!(result.stats.unique_files, 10);
549 assert!(result.stats.empty_regions.is_empty());
550
551 let new_manifest = &result.new_manifests[&new_region_id];
553 assert_eq!(new_manifest.flushed_entry_id, 0);
554 assert_eq!(new_manifest.flushed_sequence, 0);
555 assert_eq!(new_manifest.manifest_version, 0);
556 assert_eq!(new_manifest.truncated_entry_id, None);
557 assert_eq!(new_manifest.compaction_time_window, None);
558 }
559
560 #[test]
561 fn test_metadata_preserved_for_existing_region() {
562 let old_region_id_1 = RegionId::new(1, 1);
564 let old_region_id_2 = RegionId::new(1, 2);
565 let old_region_id_3 = RegionId::new(1, 3);
566 let new_region_id = RegionId::new(1, 4);
567
568 let new_expr = range_expr("x", 0, 100);
569
570 let mut manifest_1 = create_manifest(old_region_id_1, 2, None, 10, 20);
571 manifest_1.truncated_entry_id = Some(5);
572 manifest_1.compaction_time_window = Some(Duration::from_secs(3600));
573
574 let mut manifest_2 = create_manifest(old_region_id_2, 2, None, 25, 15); manifest_2.truncated_entry_id = Some(20);
576 manifest_2.compaction_time_window = Some(Duration::from_secs(7200)); let manifest_3 = create_manifest(old_region_id_3, 2, None, 15, 30); let mut previous_manifest = create_manifest(new_region_id, 0, None, 200, 300);
580 previous_manifest.truncated_entry_id = Some(40);
581 previous_manifest.compaction_time_window = Some(Duration::from_secs(1800));
582 previous_manifest.manifest_version = 7;
583 let expected_flushed_entry_id = previous_manifest.flushed_entry_id;
584 let expected_flushed_sequence = previous_manifest.flushed_sequence;
585 let expected_truncated_entry_id = previous_manifest.truncated_entry_id;
586 let expected_compaction_window = previous_manifest.compaction_time_window;
587 let expected_manifest_version = previous_manifest.manifest_version;
588
589 let mut old_manifests = HashMap::new();
590 old_manifests.insert(old_region_id_1, manifest_1);
591 old_manifests.insert(old_region_id_2, manifest_2);
592 old_manifests.insert(old_region_id_3, manifest_3);
593 old_manifests.insert(new_region_id, previous_manifest);
594
595 let mut new_partition_exprs = HashMap::new();
596 new_partition_exprs.insert(new_region_id, new_expr);
597
598 let mut region_mapping = HashMap::new();
600 region_mapping.insert(old_region_id_1, vec![new_region_id]);
601 region_mapping.insert(old_region_id_2, vec![new_region_id]);
602 region_mapping.insert(old_region_id_3, vec![new_region_id]);
603
604 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
605
606 let result = remapper.remap_manifests().unwrap();
607
608 let new_manifest = &result.new_manifests[&new_region_id];
609 assert_eq!(new_manifest.flushed_entry_id, expected_flushed_entry_id);
611 assert_eq!(new_manifest.flushed_sequence, expected_flushed_sequence);
612 assert_eq!(new_manifest.truncated_entry_id, expected_truncated_entry_id);
613 assert_eq!(
614 new_manifest.compaction_time_window,
615 expected_compaction_window
616 );
617 assert_eq!(new_manifest.manifest_version, expected_manifest_version);
618 }
619
620 #[test]
621 fn test_file_consistency_check() {
622 let old_region_id_1 = RegionId::new(1, 1);
624 let old_region_id_2 = RegionId::new(1, 2);
625 let new_region_id = RegionId::new(1, 3);
626
627 let new_expr = range_expr("x", 0, 100);
628
629 let shared_file_id = FileId::random();
631 let file_meta = create_file_meta(old_region_id_1, shared_file_id, None);
632
633 let mut manifest_1 = create_manifest(old_region_id_1, 0, None, 100, 200);
634 manifest_1.files.insert(shared_file_id, file_meta.clone());
635
636 let mut manifest_2 = create_manifest(old_region_id_2, 0, None, 100, 200);
637 manifest_2.files.insert(shared_file_id, file_meta);
638
639 let mut old_manifests = HashMap::new();
640 old_manifests.insert(old_region_id_1, manifest_1);
641 old_manifests.insert(old_region_id_2, manifest_2);
642
643 let mut new_partition_exprs = HashMap::new();
644 new_partition_exprs.insert(new_region_id, new_expr);
645
646 let mut region_mapping = HashMap::new();
648 region_mapping.insert(old_region_id_1, vec![new_region_id]);
649 region_mapping.insert(old_region_id_2, vec![new_region_id]);
650
651 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
652
653 let result = remapper.remap_manifests().unwrap();
654
655 assert_eq!(result.new_manifests[&new_region_id].files.len(), 1);
657 assert_eq!(result.stats.total_file_refs, 1);
658 assert_eq!(result.stats.unique_files, 1);
659 }
660
661 #[test]
662 fn test_empty_regions() {
663 let old_region_id = RegionId::new(1, 1);
665 let new_region_id_1 = RegionId::new(1, 2);
666 let new_region_id_2 = RegionId::new(1, 3);
667
668 let old_expr = range_expr("x", 0, 50);
669 let new_expr_1 = range_expr("x", 0, 50);
670 let new_expr_2 = range_expr("x", 100, 200); let old_manifest = create_manifest(old_region_id, 5, Some(old_expr.clone()), 100, 200);
673
674 let mut old_manifests = HashMap::new();
675 old_manifests.insert(old_region_id, old_manifest);
676
677 let mut new_partition_exprs = HashMap::new();
678 new_partition_exprs.insert(new_region_id_1, new_expr_1);
679 new_partition_exprs.insert(new_region_id_2, new_expr_2);
680
681 let mut region_mapping = HashMap::new();
683 region_mapping.insert(old_region_id, vec![new_region_id_1]);
684
685 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
686
687 let result = remapper.remap_manifests().unwrap();
688
689 assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 5);
691 assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 0);
692 assert_eq!(result.stats.empty_regions, vec![new_region_id_2]);
693 }
694
695 #[test]
696 fn test_n_to_m_complex_repartition() {
697 let old_region_1 = RegionId::new(1, 1);
699 let old_region_2 = RegionId::new(1, 2);
700 let new_region_1 = RegionId::new(1, 3);
701 let new_region_2 = RegionId::new(1, 4);
702 let new_region_3 = RegionId::new(1, 5);
703
704 let old_expr_1 = range_expr("u", 0, 100);
707 let old_expr_2 = range_expr("u", 100, 200);
708 let new_expr_1 = range_expr("u", 0, 50);
709 let new_expr_2 = range_expr("u", 50, 150);
710 let new_expr_3 = range_expr("u", 150, 250);
711
712 let manifest_1 = create_manifest(old_region_1, 3, Some(old_expr_1.clone()), 100, 200);
713 let manifest_2 = create_manifest(old_region_2, 4, Some(old_expr_2.clone()), 150, 250);
714
715 let mut old_manifests = HashMap::new();
716 old_manifests.insert(old_region_1, manifest_1);
717 old_manifests.insert(old_region_2, manifest_2);
718
719 let mut new_partition_exprs = HashMap::new();
720 new_partition_exprs.insert(new_region_1, new_expr_1);
721 new_partition_exprs.insert(new_region_2, new_expr_2);
722 new_partition_exprs.insert(new_region_3, new_expr_3);
723
724 let mut region_mapping = HashMap::new();
728 region_mapping.insert(old_region_1, vec![new_region_1, new_region_2]);
729 region_mapping.insert(old_region_2, vec![new_region_2, new_region_3]);
730
731 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
732
733 let result = remapper.remap_manifests().unwrap();
734
735 assert_eq!(result.new_manifests.len(), 3);
736 assert_eq!(result.new_manifests[&new_region_1].files.len(), 3);
737 assert_eq!(result.new_manifests[&new_region_2].files.len(), 7); assert_eq!(result.new_manifests[&new_region_3].files.len(), 4);
739 assert_eq!(result.stats.total_file_refs, 14); assert_eq!(result.stats.unique_files, 7); }
742}