1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use partition::expr::PartitionExpr;
19use snafu::{OptionExt, ResultExt, ensure};
20use store_api::storage::RegionId;
21
22use crate::error;
23pub use crate::error::{Error, Result};
24use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
25
26pub struct RemapManifest {
28 old_manifests: HashMap<RegionId, RegionManifest>,
30 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
32 region_mapping: HashMap<RegionId, Vec<RegionId>>,
34 new_manifests: HashMap<RegionId, RegionManifest>,
36}
37
38impl RemapManifest {
39 pub fn new(
40 old_manifests: HashMap<RegionId, RegionManifest>,
41 new_partition_exprs: HashMap<RegionId, PartitionExpr>,
42 region_mapping: HashMap<RegionId, Vec<RegionId>>,
43 ) -> Self {
44 Self {
45 old_manifests,
46 new_partition_exprs,
47 region_mapping,
48 new_manifests: HashMap::new(),
49 }
50 }
51
52 pub fn remap_manifests(&mut self) -> Result<RemapResult> {
57 self.initialize_new_manifests()?;
59
60 self.do_remap()?;
62
63 self.finalize_manifests()?;
65
66 let stats = self.compute_stats();
68 self.validate_result(&stats)?;
69
70 let new_manifests = std::mem::take(&mut self.new_manifests);
71
72 Ok(RemapResult {
73 new_manifests,
74 stats,
75 })
76 }
77
78 fn initialize_new_manifests(&mut self) -> Result<()> {
80 let mut new_manifests = HashMap::new();
81
82 let template_manifest = self
84 .old_manifests
85 .values()
86 .next()
87 .context(error::NoOldManifestsSnafu)?;
88 let template_metadata = (*template_manifest.metadata).clone();
89 let sst_format = template_manifest.sst_format;
90
91 for region_id in self.new_partition_exprs.keys() {
93 let mut new_metadata = template_metadata.clone();
95
96 new_metadata.region_id = *region_id;
97 let new_partition_expr = self
98 .new_partition_exprs
99 .get(region_id)
100 .context(error::MissingPartitionExprSnafu {
101 region_id: *region_id,
102 })?
103 .as_json_str()
104 .context(error::SerializePartitionExprSnafu)?;
105 new_metadata.set_partition_expr(Some(new_partition_expr));
106
107 let manifest = RegionManifest {
108 metadata: Arc::new(new_metadata),
109 files: HashMap::new(),
110 removed_files: RemovedFilesRecord::default(),
111 flushed_entry_id: 0,
112 flushed_sequence: 0,
113 committed_sequence: None,
114 manifest_version: 0,
115 truncated_entry_id: None,
116 compaction_time_window: None,
117 sst_format,
118 append_mode: None,
119 };
120
121 new_manifests.insert(*region_id, manifest);
122 }
123
124 self.new_manifests = new_manifests;
125
126 Ok(())
127 }
128
129 fn do_remap(&mut self) -> Result<()> {
131 for (&from_region_id, target_region_ids) in &self.region_mapping {
133 let from_manifest = self.old_manifests.get(&from_region_id).context(
135 error::MissingOldManifestSnafu {
136 region_id: from_region_id,
137 },
138 )?;
139
140 for &to_region_id in target_region_ids {
142 let target_manifest = self.new_manifests.get_mut(&to_region_id).context(
143 error::MissingNewManifestSnafu {
144 region_id: to_region_id,
145 },
146 )?;
147
148 Self::copy_files_to_region(from_manifest, target_manifest)?;
149 }
150 }
151
152 Ok(())
153 }
154
155 fn copy_files_to_region(
157 source_manifest: &RegionManifest,
158 target_manifest: &mut RegionManifest,
159 ) -> Result<()> {
160 for (file_id, file_meta) in &source_manifest.files {
161 let file_meta_clone = file_meta.clone();
162
163 use std::collections::hash_map::Entry;
166 match target_manifest.files.entry(*file_id) {
167 Entry::Vacant(e) => {
168 e.insert(file_meta_clone);
169 }
170 #[cfg(debug_assertions)]
171 Entry::Occupied(e) => {
172 Self::verify_file_consistency(e.get(), &file_meta_clone)?;
174 }
175 #[cfg(not(debug_assertions))]
176 Entry::Occupied(_) => {}
177 }
178 }
179
180 Ok(())
181 }
182
183 #[cfg(debug_assertions)]
185 fn verify_file_consistency(
186 existing: &crate::sst::file::FileMeta,
187 new: &crate::sst::file::FileMeta,
188 ) -> Result<()> {
189 ensure!(
193 existing.region_id == new.region_id,
194 error::InconsistentFileSnafu {
195 file_id: existing.file_id,
196 reason: "region_id mismatch",
197 }
198 );
199
200 ensure!(
201 existing.file_id == new.file_id,
202 error::InconsistentFileSnafu {
203 file_id: existing.file_id,
204 reason: "file_id mismatch",
205 }
206 );
207
208 ensure!(
209 existing.time_range == new.time_range,
210 error::InconsistentFileSnafu {
211 file_id: existing.file_id,
212 reason: "time_range mismatch",
213 }
214 );
215
216 ensure!(
217 existing.level == new.level,
218 error::InconsistentFileSnafu {
219 file_id: existing.file_id,
220 reason: "level mismatch",
221 }
222 );
223
224 ensure!(
225 existing.file_size == new.file_size,
226 error::InconsistentFileSnafu {
227 file_id: existing.file_id,
228 reason: "file_size mismatch",
229 }
230 );
231
232 ensure!(
233 existing.partition_expr == new.partition_expr,
234 error::InconsistentFileSnafu {
235 file_id: existing.file_id,
236 reason: "partition_expr mismatch",
237 }
238 );
239
240 Ok(())
241 }
242
243 fn finalize_manifests(&mut self) -> Result<()> {
245 for (region_id, manifest) in self.new_manifests.iter_mut() {
246 if let Some(previous_manifest) = self.old_manifests.get(region_id) {
247 manifest.flushed_entry_id = previous_manifest.flushed_entry_id;
248 manifest.flushed_sequence = previous_manifest.flushed_sequence;
249 manifest.manifest_version = previous_manifest.manifest_version;
250 manifest.truncated_entry_id = previous_manifest.truncated_entry_id;
251 manifest.compaction_time_window = previous_manifest.compaction_time_window;
252 manifest.committed_sequence = previous_manifest.committed_sequence;
253 } else {
254 manifest.flushed_entry_id = 0;
256 manifest.flushed_sequence = 0;
257 manifest.manifest_version = 0;
258 manifest.truncated_entry_id = None;
259 manifest.compaction_time_window = None;
260 manifest.committed_sequence = None;
261 }
262
263 manifest.removed_files = RemovedFilesRecord::default();
265 }
266
267 Ok(())
268 }
269
270 fn compute_stats(&self) -> RemapStats {
272 let mut files_per_region = HashMap::with_capacity(self.new_manifests.len());
273 let mut total_file_refs = 0;
274 let mut empty_regions = Vec::new();
275 let mut all_files = HashSet::new();
276
277 for (®ion_id, manifest) in &self.new_manifests {
278 let file_count = manifest.files.len();
279 files_per_region.insert(region_id, file_count);
280 total_file_refs += file_count;
281
282 if file_count == 0 {
283 empty_regions.push(region_id);
284 }
285
286 for file_id in manifest.files.keys() {
287 all_files.insert(*file_id);
288 }
289 }
290
291 RemapStats {
292 files_per_region,
293 total_file_refs,
294 empty_regions,
295 unique_files: all_files.len(),
296 }
297 }
298
299 fn validate_result(&self, stats: &RemapStats) -> Result<()> {
301 for region_id in self.new_partition_exprs.keys() {
303 ensure!(
304 self.new_manifests.contains_key(region_id),
305 error::MissingNewManifestSnafu {
306 region_id: *region_id
307 }
308 );
309 }
310
311 let mut old_unique_files = HashSet::new();
314 for manifest in self.old_manifests.values() {
315 for file_id in manifest.files.keys() {
316 old_unique_files.insert(*file_id);
317 }
318 }
319
320 ensure!(
321 stats.unique_files >= old_unique_files.len(),
322 error::FilesLostSnafu {
323 old_count: old_unique_files.len(),
324 new_count: stats.unique_files,
325 }
326 );
327
328 if !stats.empty_regions.is_empty() {
330 common_telemetry::warn!(
331 "Repartition resulted in {} empty regions: {:?}, new partition exprs: {:?}",
332 stats.empty_regions.len(),
333 stats.empty_regions,
334 self.new_partition_exprs.keys().collect::<Vec<_>>()
335 );
336 }
337
338 Ok(())
339 }
340}
341
342#[derive(Debug)]
344pub struct RemapResult {
345 pub new_manifests: HashMap<RegionId, RegionManifest>,
347 pub stats: RemapStats,
349}
350
351#[derive(Debug)]
353pub struct RemapStats {
354 pub files_per_region: HashMap<RegionId, usize>,
356 pub total_file_refs: usize,
358 pub empty_regions: Vec<RegionId>,
360 pub unique_files: usize,
362}
363
364#[cfg(test)]
365mod tests {
366 use std::collections::HashMap;
367 use std::num::NonZeroU64;
368 use std::sync::Arc;
369 use std::time::Duration;
370
371 use api::v1::SemanticType;
372 use datatypes::prelude::ConcreteDataType;
373 use datatypes::schema::ColumnSchema;
374 use datatypes::value::Value;
375 use partition::expr::{PartitionExpr, col};
376 use smallvec::SmallVec;
377 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
378 use store_api::storage::{FileId, RegionId, SequenceNumber};
379
380 use super::*;
381 use crate::manifest::action::RegionManifest;
382 use crate::sst::FormatType;
383 use crate::sst::file::{FileMeta, FileTimeRange};
384 use crate::wal::EntryId;
385
386 fn create_region_metadata(region_id: RegionId) -> RegionMetadataRef {
388 let mut builder = RegionMetadataBuilder::new(region_id);
389 builder
390 .push_column_metadata(ColumnMetadata {
391 column_schema: ColumnSchema::new(
392 "ts",
393 ConcreteDataType::timestamp_millisecond_datatype(),
394 false,
395 ),
396 semantic_type: SemanticType::Timestamp,
397 column_id: 1,
398 })
399 .push_column_metadata(ColumnMetadata {
400 column_schema: ColumnSchema::new("pk", ConcreteDataType::int64_datatype(), false),
401 semantic_type: SemanticType::Tag,
402 column_id: 2,
403 })
404 .push_column_metadata(ColumnMetadata {
405 column_schema: ColumnSchema::new(
406 "val",
407 ConcreteDataType::float64_datatype(),
408 false,
409 ),
410 semantic_type: SemanticType::Field,
411 column_id: 3,
412 })
413 .primary_key(vec![2]);
414 Arc::new(builder.build().unwrap())
415 }
416
417 fn create_file_meta(
419 region_id: RegionId,
420 file_id: FileId,
421 partition_expr: Option<PartitionExpr>,
422 ) -> FileMeta {
423 FileMeta {
424 region_id,
425 file_id,
426 time_range: FileTimeRange::default(),
427 level: 0,
428 file_size: 1024,
429 max_row_group_uncompressed_size: 1024,
430 available_indexes: SmallVec::new(),
431 indexes: Default::default(),
432 index_file_size: 0,
433 index_version: 0,
434 num_rows: 100,
435 num_row_groups: 1,
436 sequence: NonZeroU64::new(1),
437 partition_expr,
438 num_series: 1,
439 ..Default::default()
440 }
441 }
442
443 fn create_manifest(
445 region_id: RegionId,
446 num_files: usize,
447 partition_expr: Option<PartitionExpr>,
448 flushed_entry_id: EntryId,
449 flushed_sequence: SequenceNumber,
450 ) -> RegionManifest {
451 let mut files = HashMap::new();
452 for _ in 0..num_files {
453 let file_id = FileId::random();
454 let file_meta = create_file_meta(region_id, file_id, partition_expr.clone());
455 files.insert(file_id, file_meta);
456 }
457
458 RegionManifest {
459 metadata: create_region_metadata(region_id),
460 files,
461 removed_files: RemovedFilesRecord::default(),
462 flushed_entry_id,
463 flushed_sequence,
464 manifest_version: 1,
465 truncated_entry_id: None,
466 compaction_time_window: None,
467 committed_sequence: None,
468 sst_format: FormatType::PrimaryKey,
469 append_mode: None,
470 }
471 }
472
473 fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
475 col(col_name)
476 .gt_eq(Value::Int64(start))
477 .and(col(col_name).lt(Value::Int64(end)))
478 }
479
480 #[test]
481 fn test_simple_split() {
482 let old_region_id = RegionId::new(1, 1);
484 let new_region_id_1 = RegionId::new(1, 2);
485 let new_region_id_2 = RegionId::new(1, 3);
486
487 let old_expr = range_expr("x", 0, 100);
488 let new_expr_1 = range_expr("x", 0, 50);
489 let new_expr_2 = range_expr("x", 50, 100);
490
491 let old_manifest = create_manifest(old_region_id, 10, Some(old_expr.clone()), 100, 200);
492
493 let mut old_manifests = HashMap::new();
494 old_manifests.insert(old_region_id, old_manifest);
495
496 let mut new_partition_exprs = HashMap::new();
497 new_partition_exprs.insert(new_region_id_1, new_expr_1);
498 new_partition_exprs.insert(new_region_id_2, new_expr_2);
499
500 let mut region_mapping = HashMap::new();
502 region_mapping.insert(old_region_id, vec![new_region_id_1, new_region_id_2]);
503
504 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
505
506 let result = remapper.remap_manifests().unwrap();
507
508 assert_eq!(result.new_manifests.len(), 2);
510 assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 10);
511 assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 10);
512 assert_eq!(result.stats.total_file_refs, 20);
513 assert_eq!(result.stats.unique_files, 10);
514 assert!(result.stats.empty_regions.is_empty());
515
516 for file_meta in result.new_manifests[&new_region_id_1].files.values() {
518 assert_eq!(file_meta.region_id, old_region_id);
519 }
520 for file_meta in result.new_manifests[&new_region_id_2].files.values() {
521 assert_eq!(file_meta.region_id, old_region_id);
522 }
523 }
524
525 #[test]
526 fn test_simple_merge() {
527 let old_region_id_1 = RegionId::new(1, 1);
529 let old_region_id_2 = RegionId::new(1, 2);
530 let new_region_id = RegionId::new(1, 3);
531
532 let old_expr_1 = range_expr("x", 0, 50);
533 let old_expr_2 = range_expr("x", 50, 100);
534 let new_expr = range_expr("x", 0, 100);
535
536 let manifest_1 = create_manifest(old_region_id_1, 5, Some(old_expr_1.clone()), 100, 200);
537 let manifest_2 = create_manifest(old_region_id_2, 5, Some(old_expr_2.clone()), 150, 250);
538
539 let mut old_manifests = HashMap::new();
540 old_manifests.insert(old_region_id_1, manifest_1);
541 old_manifests.insert(old_region_id_2, manifest_2);
542
543 let mut new_partition_exprs = HashMap::new();
544 new_partition_exprs.insert(new_region_id, new_expr);
545
546 let mut region_mapping = HashMap::new();
548 region_mapping.insert(old_region_id_1, vec![new_region_id]);
549 region_mapping.insert(old_region_id_2, vec![new_region_id]);
550
551 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
552
553 let result = remapper.remap_manifests().unwrap();
554
555 assert_eq!(result.new_manifests.len(), 1);
557 assert_eq!(result.new_manifests[&new_region_id].files.len(), 10);
558 assert_eq!(result.stats.total_file_refs, 10);
559 assert_eq!(result.stats.unique_files, 10);
560 assert!(result.stats.empty_regions.is_empty());
561
562 let new_manifest = &result.new_manifests[&new_region_id];
564 assert_eq!(new_manifest.flushed_entry_id, 0);
565 assert_eq!(new_manifest.flushed_sequence, 0);
566 assert_eq!(new_manifest.manifest_version, 0);
567 assert_eq!(new_manifest.truncated_entry_id, None);
568 assert_eq!(new_manifest.compaction_time_window, None);
569 }
570
571 #[test]
572 fn test_metadata_preserved_for_existing_region() {
573 let old_region_id_1 = RegionId::new(1, 1);
575 let old_region_id_2 = RegionId::new(1, 2);
576 let old_region_id_3 = RegionId::new(1, 3);
577 let new_region_id = RegionId::new(1, 4);
578
579 let new_expr = range_expr("x", 0, 100);
580
581 let mut manifest_1 = create_manifest(old_region_id_1, 2, None, 10, 20);
582 manifest_1.truncated_entry_id = Some(5);
583 manifest_1.compaction_time_window = Some(Duration::from_secs(3600));
584
585 let mut manifest_2 = create_manifest(old_region_id_2, 2, None, 25, 15); manifest_2.truncated_entry_id = Some(20);
587 manifest_2.compaction_time_window = Some(Duration::from_secs(7200)); let manifest_3 = create_manifest(old_region_id_3, 2, None, 15, 30); let mut previous_manifest = create_manifest(new_region_id, 0, None, 200, 300);
591 previous_manifest.truncated_entry_id = Some(40);
592 previous_manifest.compaction_time_window = Some(Duration::from_secs(1800));
593 previous_manifest.manifest_version = 7;
594 let expected_flushed_entry_id = previous_manifest.flushed_entry_id;
595 let expected_flushed_sequence = previous_manifest.flushed_sequence;
596 let expected_truncated_entry_id = previous_manifest.truncated_entry_id;
597 let expected_compaction_window = previous_manifest.compaction_time_window;
598 let expected_manifest_version = previous_manifest.manifest_version;
599
600 let mut old_manifests = HashMap::new();
601 old_manifests.insert(old_region_id_1, manifest_1);
602 old_manifests.insert(old_region_id_2, manifest_2);
603 old_manifests.insert(old_region_id_3, manifest_3);
604 old_manifests.insert(new_region_id, previous_manifest);
605
606 let mut new_partition_exprs = HashMap::new();
607 new_partition_exprs.insert(new_region_id, new_expr);
608
609 let mut region_mapping = HashMap::new();
611 region_mapping.insert(old_region_id_1, vec![new_region_id]);
612 region_mapping.insert(old_region_id_2, vec![new_region_id]);
613 region_mapping.insert(old_region_id_3, vec![new_region_id]);
614
615 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
616
617 let result = remapper.remap_manifests().unwrap();
618
619 let new_manifest = &result.new_manifests[&new_region_id];
620 assert_eq!(new_manifest.flushed_entry_id, expected_flushed_entry_id);
622 assert_eq!(new_manifest.flushed_sequence, expected_flushed_sequence);
623 assert_eq!(new_manifest.truncated_entry_id, expected_truncated_entry_id);
624 assert_eq!(
625 new_manifest.compaction_time_window,
626 expected_compaction_window
627 );
628 assert_eq!(new_manifest.manifest_version, expected_manifest_version);
629 }
630
631 #[test]
632 fn test_file_consistency_check() {
633 let old_region_id_1 = RegionId::new(1, 1);
635 let old_region_id_2 = RegionId::new(1, 2);
636 let new_region_id = RegionId::new(1, 3);
637
638 let new_expr = range_expr("x", 0, 100);
639
640 let shared_file_id = FileId::random();
642 let file_meta = create_file_meta(old_region_id_1, shared_file_id, None);
643
644 let mut manifest_1 = create_manifest(old_region_id_1, 0, None, 100, 200);
645 manifest_1.files.insert(shared_file_id, file_meta.clone());
646
647 let mut manifest_2 = create_manifest(old_region_id_2, 0, None, 100, 200);
648 manifest_2.files.insert(shared_file_id, file_meta);
649
650 let mut old_manifests = HashMap::new();
651 old_manifests.insert(old_region_id_1, manifest_1);
652 old_manifests.insert(old_region_id_2, manifest_2);
653
654 let mut new_partition_exprs = HashMap::new();
655 new_partition_exprs.insert(new_region_id, new_expr);
656
657 let mut region_mapping = HashMap::new();
659 region_mapping.insert(old_region_id_1, vec![new_region_id]);
660 region_mapping.insert(old_region_id_2, vec![new_region_id]);
661
662 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
663
664 let result = remapper.remap_manifests().unwrap();
665
666 assert_eq!(result.new_manifests[&new_region_id].files.len(), 1);
668 assert_eq!(result.stats.total_file_refs, 1);
669 assert_eq!(result.stats.unique_files, 1);
670 }
671
672 #[test]
673 fn test_empty_regions() {
674 let old_region_id = RegionId::new(1, 1);
676 let new_region_id_1 = RegionId::new(1, 2);
677 let new_region_id_2 = RegionId::new(1, 3);
678
679 let old_expr = range_expr("x", 0, 50);
680 let new_expr_1 = range_expr("x", 0, 50);
681 let new_expr_2 = range_expr("x", 100, 200); let old_manifest = create_manifest(old_region_id, 5, Some(old_expr.clone()), 100, 200);
684
685 let mut old_manifests = HashMap::new();
686 old_manifests.insert(old_region_id, old_manifest);
687
688 let mut new_partition_exprs = HashMap::new();
689 new_partition_exprs.insert(new_region_id_1, new_expr_1);
690 new_partition_exprs.insert(new_region_id_2, new_expr_2);
691
692 let mut region_mapping = HashMap::new();
694 region_mapping.insert(old_region_id, vec![new_region_id_1]);
695
696 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
697
698 let result = remapper.remap_manifests().unwrap();
699
700 assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 5);
702 assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 0);
703 assert_eq!(result.stats.empty_regions, vec![new_region_id_2]);
704 }
705
706 #[test]
707 fn test_n_to_m_complex_repartition() {
708 let old_region_1 = RegionId::new(1, 1);
710 let old_region_2 = RegionId::new(1, 2);
711 let new_region_1 = RegionId::new(1, 3);
712 let new_region_2 = RegionId::new(1, 4);
713 let new_region_3 = RegionId::new(1, 5);
714
715 let old_expr_1 = range_expr("u", 0, 100);
718 let old_expr_2 = range_expr("u", 100, 200);
719 let new_expr_1 = range_expr("u", 0, 50);
720 let new_expr_2 = range_expr("u", 50, 150);
721 let new_expr_3 = range_expr("u", 150, 250);
722
723 let manifest_1 = create_manifest(old_region_1, 3, Some(old_expr_1.clone()), 100, 200);
724 let manifest_2 = create_manifest(old_region_2, 4, Some(old_expr_2.clone()), 150, 250);
725
726 let mut old_manifests = HashMap::new();
727 old_manifests.insert(old_region_1, manifest_1);
728 old_manifests.insert(old_region_2, manifest_2);
729
730 let mut new_partition_exprs = HashMap::new();
731 new_partition_exprs.insert(new_region_1, new_expr_1);
732 new_partition_exprs.insert(new_region_2, new_expr_2);
733 new_partition_exprs.insert(new_region_3, new_expr_3);
734
735 let mut region_mapping = HashMap::new();
739 region_mapping.insert(old_region_1, vec![new_region_1, new_region_2]);
740 region_mapping.insert(old_region_2, vec![new_region_2, new_region_3]);
741
742 let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
743
744 let result = remapper.remap_manifests().unwrap();
745
746 assert_eq!(result.new_manifests.len(), 3);
747 assert_eq!(result.new_manifests[&new_region_1].files.len(), 3);
748 assert_eq!(result.new_manifests[&new_region_2].files.len(), 7); assert_eq!(result.new_manifests[&new_region_3].files.len(), 4);
750 assert_eq!(result.stats.total_file_refs, 14); assert_eq!(result.stats.unique_files, 7); }
753}