mito2/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use partition::expr::PartitionExpr;
19use snafu::{OptionExt, ResultExt, ensure};
20use store_api::storage::RegionId;
21
22use crate::error;
23pub use crate::error::{Error, Result};
24use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
25use crate::sst::file::FileMeta;
26
27/// Remaps file references from old region manifests to new region manifests.
28pub struct RemapManifest {
29    /// Old region manifests indexed by region ID
30    old_manifests: HashMap<RegionId, RegionManifest>,
31    /// New partition expressions indexed by region ID
32    new_partition_exprs: HashMap<RegionId, PartitionExpr>,
33    /// For each old region, which new regions should receive its files
34    region_mapping: HashMap<RegionId, Vec<RegionId>>,
35    /// Newly generated manifests for target regions
36    new_manifests: HashMap<RegionId, RegionManifest>,
37}
38
39impl RemapManifest {
40    pub fn new(
41        old_manifests: HashMap<RegionId, RegionManifest>,
42        new_partition_exprs: HashMap<RegionId, PartitionExpr>,
43        region_mapping: HashMap<RegionId, Vec<RegionId>>,
44    ) -> Self {
45        Self {
46            old_manifests,
47            new_partition_exprs,
48            region_mapping,
49            new_manifests: HashMap::new(),
50        }
51    }
52
53    /// Remaps manifests from old regions to new regions.
54    ///
55    /// Main entry point. It copies files from old regions to new regions based on
56    /// partition expression overlaps.
57    pub fn remap_manifests(&mut self) -> Result<RemapResult> {
58        // initialize new manifests
59        self.initialize_new_manifests()?;
60
61        // remap files
62        self.do_remap()?;
63
64        // merge and set metadata for all new manifests
65        self.finalize_manifests()?;
66
67        // validate and compute statistics
68        let stats = self.compute_stats();
69        self.validate_result(&stats)?;
70
71        let new_manifests = std::mem::take(&mut self.new_manifests);
72
73        Ok(RemapResult {
74            new_manifests,
75            stats,
76        })
77    }
78
79    /// Initializes empty manifests for all new regions.
80    fn initialize_new_manifests(&mut self) -> Result<()> {
81        let mut new_manifests = HashMap::new();
82
83        // Get any old manifest as template (they all share the same table schema)
84        let template_manifest = self
85            .old_manifests
86            .values()
87            .next()
88            .context(error::NoOldManifestsSnafu)?;
89        let template_metadata = (*template_manifest.metadata).clone();
90        let sst_format = template_manifest.sst_format;
91
92        // Create empty manifest for each new region
93        for region_id in self.new_partition_exprs.keys() {
94            // Derive region metadata from any old manifest as template
95            let mut new_metadata = template_metadata.clone();
96
97            new_metadata.region_id = *region_id;
98            let new_partition_expr = self
99                .new_partition_exprs
100                .get(region_id)
101                .context(error::MissingPartitionExprSnafu {
102                    region_id: *region_id,
103                })?
104                .as_json_str()
105                .context(error::SerializePartitionExprSnafu)?;
106            new_metadata.partition_expr = Some(new_partition_expr);
107
108            let manifest = RegionManifest {
109                metadata: Arc::new(new_metadata),
110                files: HashMap::new(),
111                removed_files: RemovedFilesRecord::default(),
112                flushed_entry_id: 0,
113                flushed_sequence: 0,
114                committed_sequence: None,
115                manifest_version: 0,
116                truncated_entry_id: None,
117                compaction_time_window: None,
118                sst_format,
119            };
120
121            new_manifests.insert(*region_id, manifest);
122        }
123
124        self.new_manifests = new_manifests;
125
126        Ok(())
127    }
128
129    /// Remaps files from old regions to new regions according to the region mapping.
130    fn do_remap(&mut self) -> Result<()> {
131        // For each old region and its target new regions
132        for (&from_region_id, target_region_ids) in &self.region_mapping {
133            // Get old manifest
134            let from_manifest = self.old_manifests.get(&from_region_id).context(
135                error::MissingOldManifestSnafu {
136                    region_id: from_region_id,
137                },
138            )?;
139
140            // Copy files to all target new regions
141            for &to_region_id in target_region_ids {
142                let target_manifest = self.new_manifests.get_mut(&to_region_id).context(
143                    error::MissingNewManifestSnafu {
144                        region_id: to_region_id,
145                    },
146                )?;
147
148                Self::copy_files_to_region(from_manifest, target_manifest)?;
149            }
150        }
151
152        Ok(())
153    }
154
155    /// Copies files from a source region to a target region.
156    fn copy_files_to_region(
157        source_manifest: &RegionManifest,
158        target_manifest: &mut RegionManifest,
159    ) -> Result<()> {
160        for (file_id, file_meta) in &source_manifest.files {
161            let file_meta_clone = file_meta.clone();
162
163            // Insert or merge into target manifest
164            // Same file might be added from multiple overlapping old regions
165            use std::collections::hash_map::Entry;
166            match target_manifest.files.entry(*file_id) {
167                Entry::Vacant(e) => {
168                    e.insert(file_meta_clone);
169                }
170                Entry::Occupied(e) => {
171                    // File already exists - verify it's the same physical file
172                    #[cfg(debug_assertions)]
173                    Self::verify_file_consistency(e.get(), &file_meta_clone)?;
174                }
175            }
176        }
177
178        Ok(())
179    }
180
181    /// Verifies that two file metadata entries are consistent.
182    #[cfg(debug_assertions)]
183    fn verify_file_consistency(existing: &FileMeta, new: &FileMeta) -> Result<()> {
184        // When the same file appears from multiple overlapping old regions,
185        // verify they are actually the same physical file with identical metadata
186
187        ensure!(
188            existing.region_id == new.region_id,
189            error::InconsistentFileSnafu {
190                file_id: existing.file_id,
191                reason: "region_id mismatch",
192            }
193        );
194
195        ensure!(
196            existing.file_id == new.file_id,
197            error::InconsistentFileSnafu {
198                file_id: existing.file_id,
199                reason: "file_id mismatch",
200            }
201        );
202
203        ensure!(
204            existing.time_range == new.time_range,
205            error::InconsistentFileSnafu {
206                file_id: existing.file_id,
207                reason: "time_range mismatch",
208            }
209        );
210
211        ensure!(
212            existing.level == new.level,
213            error::InconsistentFileSnafu {
214                file_id: existing.file_id,
215                reason: "level mismatch",
216            }
217        );
218
219        ensure!(
220            existing.file_size == new.file_size,
221            error::InconsistentFileSnafu {
222                file_id: existing.file_id,
223                reason: "file_size mismatch",
224            }
225        );
226
227        ensure!(
228            existing.partition_expr == new.partition_expr,
229            error::InconsistentFileSnafu {
230                file_id: existing.file_id,
231                reason: "partition_expr mismatch",
232            }
233        );
234
235        Ok(())
236    }
237
238    /// Finalizes manifests by merging metadata from source regions.
239    fn finalize_manifests(&mut self) -> Result<()> {
240        for (region_id, manifest) in self.new_manifests.iter_mut() {
241            if let Some(previous_manifest) = self.old_manifests.get(region_id) {
242                manifest.flushed_entry_id = previous_manifest.flushed_entry_id;
243                manifest.flushed_sequence = previous_manifest.flushed_sequence;
244                manifest.manifest_version = previous_manifest.manifest_version;
245                manifest.truncated_entry_id = previous_manifest.truncated_entry_id;
246                manifest.compaction_time_window = previous_manifest.compaction_time_window;
247                manifest.committed_sequence = previous_manifest.committed_sequence;
248            } else {
249                // new region
250                manifest.flushed_entry_id = 0;
251                manifest.flushed_sequence = 0;
252                manifest.manifest_version = 0;
253                manifest.truncated_entry_id = None;
254                manifest.compaction_time_window = None;
255                manifest.committed_sequence = None;
256            }
257
258            // removed_files are tracked by old manifests, don't copy
259            manifest.removed_files = RemovedFilesRecord::default();
260        }
261
262        Ok(())
263    }
264
265    /// Computes statistics about the remapping.
266    fn compute_stats(&self) -> RemapStats {
267        let mut files_per_region = HashMap::with_capacity(self.new_manifests.len());
268        let mut total_file_refs = 0;
269        let mut empty_regions = Vec::new();
270        let mut all_files = HashSet::new();
271
272        for (&region_id, manifest) in &self.new_manifests {
273            let file_count = manifest.files.len();
274            files_per_region.insert(region_id, file_count);
275            total_file_refs += file_count;
276
277            if file_count == 0 {
278                empty_regions.push(region_id);
279            }
280
281            for file_id in manifest.files.keys() {
282                all_files.insert(*file_id);
283            }
284        }
285
286        RemapStats {
287            files_per_region,
288            total_file_refs,
289            empty_regions,
290            unique_files: all_files.len(),
291        }
292    }
293
294    /// Validates the remapping result.
295    fn validate_result(&self, stats: &RemapStats) -> Result<()> {
296        // all new regions have manifests
297        for region_id in self.new_partition_exprs.keys() {
298            ensure!(
299                self.new_manifests.contains_key(region_id),
300                error::MissingNewManifestSnafu {
301                    region_id: *region_id
302                }
303            );
304        }
305
306        // no unique files were lost
307        // Count unique files in old manifests (files may be duplicated across regions)
308        let mut old_unique_files = HashSet::new();
309        for manifest in self.old_manifests.values() {
310            for file_id in manifest.files.keys() {
311                old_unique_files.insert(*file_id);
312            }
313        }
314
315        ensure!(
316            stats.unique_files >= old_unique_files.len(),
317            error::FilesLostSnafu {
318                old_count: old_unique_files.len(),
319                new_count: stats.unique_files,
320            }
321        );
322
323        // 3. Log warning about empty regions (not an error)
324        if !stats.empty_regions.is_empty() {
325            common_telemetry::warn!(
326                "Repartition resulted in {} empty regions: {:?}, new partition exprs: {:?}",
327                stats.empty_regions.len(),
328                stats.empty_regions,
329                self.new_partition_exprs.keys().collect::<Vec<_>>()
330            );
331        }
332
333        Ok(())
334    }
335}
336
337/// Result of manifest remapping, including new manifests and statistics.
338#[derive(Debug)]
339pub struct RemapResult {
340    /// New manifests for all new regions
341    pub new_manifests: HashMap<RegionId, RegionManifest>,
342    /// Statistics about the remapping
343    pub stats: RemapStats,
344}
345
346/// Statistical information about the manifest remapping.
347#[derive(Debug)]
348pub struct RemapStats {
349    /// Number of files per region in new manifests
350    pub files_per_region: HashMap<RegionId, usize>,
351    /// Total number of file references created across all new regions
352    pub total_file_refs: usize,
353    /// Regions that received no files (potentially empty after repartition)
354    pub empty_regions: Vec<RegionId>,
355    /// Total number of unique files processed
356    pub unique_files: usize,
357}
358
359#[cfg(test)]
360mod tests {
361    use std::collections::HashMap;
362    use std::num::NonZeroU64;
363    use std::sync::Arc;
364    use std::time::Duration;
365
366    use api::v1::SemanticType;
367    use datatypes::prelude::ConcreteDataType;
368    use datatypes::schema::ColumnSchema;
369    use datatypes::value::Value;
370    use partition::expr::{PartitionExpr, col};
371    use smallvec::SmallVec;
372    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
373    use store_api::storage::{FileId, RegionId, SequenceNumber};
374
375    use super::*;
376    use crate::manifest::action::RegionManifest;
377    use crate::sst::FormatType;
378    use crate::sst::file::{FileMeta, FileTimeRange};
379    use crate::wal::EntryId;
380
381    /// Helper to create a basic region metadata for testing.
382    fn create_region_metadata(region_id: RegionId) -> RegionMetadataRef {
383        let mut builder = RegionMetadataBuilder::new(region_id);
384        builder
385            .push_column_metadata(ColumnMetadata {
386                column_schema: ColumnSchema::new(
387                    "ts",
388                    ConcreteDataType::timestamp_millisecond_datatype(),
389                    false,
390                ),
391                semantic_type: SemanticType::Timestamp,
392                column_id: 1,
393            })
394            .push_column_metadata(ColumnMetadata {
395                column_schema: ColumnSchema::new("pk", ConcreteDataType::int64_datatype(), false),
396                semantic_type: SemanticType::Tag,
397                column_id: 2,
398            })
399            .push_column_metadata(ColumnMetadata {
400                column_schema: ColumnSchema::new(
401                    "val",
402                    ConcreteDataType::float64_datatype(),
403                    false,
404                ),
405                semantic_type: SemanticType::Field,
406                column_id: 3,
407            })
408            .primary_key(vec![2]);
409        Arc::new(builder.build().unwrap())
410    }
411
412    /// Helper to create a FileMeta for testing.
413    fn create_file_meta(
414        region_id: RegionId,
415        file_id: FileId,
416        partition_expr: Option<PartitionExpr>,
417    ) -> FileMeta {
418        FileMeta {
419            region_id,
420            file_id,
421            time_range: FileTimeRange::default(),
422            level: 0,
423            file_size: 1024,
424            available_indexes: SmallVec::new(),
425            index_file_size: 0,
426            num_rows: 100,
427            num_row_groups: 1,
428            sequence: NonZeroU64::new(1),
429            partition_expr,
430        }
431    }
432
433    /// Helper to create a manifest with specified number of files.
434    fn create_manifest(
435        region_id: RegionId,
436        num_files: usize,
437        partition_expr: Option<PartitionExpr>,
438        flushed_entry_id: EntryId,
439        flushed_sequence: SequenceNumber,
440    ) -> RegionManifest {
441        let mut files = HashMap::new();
442        for _ in 0..num_files {
443            let file_id = FileId::random();
444            let file_meta = create_file_meta(region_id, file_id, partition_expr.clone());
445            files.insert(file_id, file_meta);
446        }
447
448        RegionManifest {
449            metadata: create_region_metadata(region_id),
450            files,
451            removed_files: RemovedFilesRecord::default(),
452            flushed_entry_id,
453            flushed_sequence,
454            manifest_version: 1,
455            truncated_entry_id: None,
456            compaction_time_window: None,
457            committed_sequence: None,
458            sst_format: FormatType::PrimaryKey,
459        }
460    }
461
462    /// Helper to create partition expression: col >= start AND col < end
463    fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
464        col(col_name)
465            .gt_eq(Value::Int64(start))
466            .and(col(col_name).lt(Value::Int64(end)))
467    }
468
469    #[test]
470    fn test_simple_split() {
471        // One region [0, 100) splits into two regions: [0, 50) and [50, 100)
472        let old_region_id = RegionId::new(1, 1);
473        let new_region_id_1 = RegionId::new(1, 2);
474        let new_region_id_2 = RegionId::new(1, 3);
475
476        let old_expr = range_expr("x", 0, 100);
477        let new_expr_1 = range_expr("x", 0, 50);
478        let new_expr_2 = range_expr("x", 50, 100);
479
480        let old_manifest = create_manifest(old_region_id, 10, Some(old_expr.clone()), 100, 200);
481
482        let mut old_manifests = HashMap::new();
483        old_manifests.insert(old_region_id, old_manifest);
484
485        let mut new_partition_exprs = HashMap::new();
486        new_partition_exprs.insert(new_region_id_1, new_expr_1);
487        new_partition_exprs.insert(new_region_id_2, new_expr_2);
488
489        // Direct mapping: old region -> both new regions
490        let mut region_mapping = HashMap::new();
491        region_mapping.insert(old_region_id, vec![new_region_id_1, new_region_id_2]);
492
493        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
494
495        let result = remapper.remap_manifests().unwrap();
496
497        // Both new regions should have all 10 files
498        assert_eq!(result.new_manifests.len(), 2);
499        assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 10);
500        assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 10);
501        assert_eq!(result.stats.total_file_refs, 20);
502        assert_eq!(result.stats.unique_files, 10);
503        assert!(result.stats.empty_regions.is_empty());
504
505        // Verify FileMeta is immutable - region_id stays as old_region_id
506        for file_meta in result.new_manifests[&new_region_id_1].files.values() {
507            assert_eq!(file_meta.region_id, old_region_id);
508        }
509        for file_meta in result.new_manifests[&new_region_id_2].files.values() {
510            assert_eq!(file_meta.region_id, old_region_id);
511        }
512    }
513
514    #[test]
515    fn test_simple_merge() {
516        // Two regions [0, 50) and [50, 100) merge into one region [0, 100)
517        let old_region_id_1 = RegionId::new(1, 1);
518        let old_region_id_2 = RegionId::new(1, 2);
519        let new_region_id = RegionId::new(1, 3);
520
521        let old_expr_1 = range_expr("x", 0, 50);
522        let old_expr_2 = range_expr("x", 50, 100);
523        let new_expr = range_expr("x", 0, 100);
524
525        let manifest_1 = create_manifest(old_region_id_1, 5, Some(old_expr_1.clone()), 100, 200);
526        let manifest_2 = create_manifest(old_region_id_2, 5, Some(old_expr_2.clone()), 150, 250);
527
528        let mut old_manifests = HashMap::new();
529        old_manifests.insert(old_region_id_1, manifest_1);
530        old_manifests.insert(old_region_id_2, manifest_2);
531
532        let mut new_partition_exprs = HashMap::new();
533        new_partition_exprs.insert(new_region_id, new_expr);
534
535        // Direct mapping: both old regions -> same new region
536        let mut region_mapping = HashMap::new();
537        region_mapping.insert(old_region_id_1, vec![new_region_id]);
538        region_mapping.insert(old_region_id_2, vec![new_region_id]);
539
540        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
541
542        let result = remapper.remap_manifests().unwrap();
543
544        // New region should have all 10 files
545        assert_eq!(result.new_manifests.len(), 1);
546        assert_eq!(result.new_manifests[&new_region_id].files.len(), 10);
547        assert_eq!(result.stats.total_file_refs, 10);
548        assert_eq!(result.stats.unique_files, 10);
549        assert!(result.stats.empty_regions.is_empty());
550
551        // Verify metadata falls back to defaults when no prior manifest exists for the region
552        let new_manifest = &result.new_manifests[&new_region_id];
553        assert_eq!(new_manifest.flushed_entry_id, 0);
554        assert_eq!(new_manifest.flushed_sequence, 0);
555        assert_eq!(new_manifest.manifest_version, 0);
556        assert_eq!(new_manifest.truncated_entry_id, None);
557        assert_eq!(new_manifest.compaction_time_window, None);
558    }
559
560    #[test]
561    fn test_metadata_preserved_for_existing_region() {
562        // Test that metadata is preserved when a previous manifest exists for the same region id
563        let old_region_id_1 = RegionId::new(1, 1);
564        let old_region_id_2 = RegionId::new(1, 2);
565        let old_region_id_3 = RegionId::new(1, 3);
566        let new_region_id = RegionId::new(1, 4);
567
568        let new_expr = range_expr("x", 0, 100);
569
570        let mut manifest_1 = create_manifest(old_region_id_1, 2, None, 10, 20);
571        manifest_1.truncated_entry_id = Some(5);
572        manifest_1.compaction_time_window = Some(Duration::from_secs(3600));
573
574        let mut manifest_2 = create_manifest(old_region_id_2, 2, None, 25, 15); // Higher entry_id, lower sequence
575        manifest_2.truncated_entry_id = Some(20);
576        manifest_2.compaction_time_window = Some(Duration::from_secs(7200)); // Larger window
577
578        let manifest_3 = create_manifest(old_region_id_3, 2, None, 15, 30); // Lower entry_id, higher sequence
579        let mut previous_manifest = create_manifest(new_region_id, 0, None, 200, 300);
580        previous_manifest.truncated_entry_id = Some(40);
581        previous_manifest.compaction_time_window = Some(Duration::from_secs(1800));
582        previous_manifest.manifest_version = 7;
583        let expected_flushed_entry_id = previous_manifest.flushed_entry_id;
584        let expected_flushed_sequence = previous_manifest.flushed_sequence;
585        let expected_truncated_entry_id = previous_manifest.truncated_entry_id;
586        let expected_compaction_window = previous_manifest.compaction_time_window;
587        let expected_manifest_version = previous_manifest.manifest_version;
588
589        let mut old_manifests = HashMap::new();
590        old_manifests.insert(old_region_id_1, manifest_1);
591        old_manifests.insert(old_region_id_2, manifest_2);
592        old_manifests.insert(old_region_id_3, manifest_3);
593        old_manifests.insert(new_region_id, previous_manifest);
594
595        let mut new_partition_exprs = HashMap::new();
596        new_partition_exprs.insert(new_region_id, new_expr);
597
598        // Direct mapping: all old regions -> same new region
599        let mut region_mapping = HashMap::new();
600        region_mapping.insert(old_region_id_1, vec![new_region_id]);
601        region_mapping.insert(old_region_id_2, vec![new_region_id]);
602        region_mapping.insert(old_region_id_3, vec![new_region_id]);
603
604        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
605
606        let result = remapper.remap_manifests().unwrap();
607
608        let new_manifest = &result.new_manifests[&new_region_id];
609        // Should reuse metadata from previous manifest of the same region id
610        assert_eq!(new_manifest.flushed_entry_id, expected_flushed_entry_id);
611        assert_eq!(new_manifest.flushed_sequence, expected_flushed_sequence);
612        assert_eq!(new_manifest.truncated_entry_id, expected_truncated_entry_id);
613        assert_eq!(
614            new_manifest.compaction_time_window,
615            expected_compaction_window
616        );
617        assert_eq!(new_manifest.manifest_version, expected_manifest_version);
618    }
619
620    #[test]
621    fn test_file_consistency_check() {
622        // Test that duplicate files are verified for consistency
623        let old_region_id_1 = RegionId::new(1, 1);
624        let old_region_id_2 = RegionId::new(1, 2);
625        let new_region_id = RegionId::new(1, 3);
626
627        let new_expr = range_expr("x", 0, 100);
628
629        // Create manifests with same file (overlapping regions)
630        let shared_file_id = FileId::random();
631        let file_meta = create_file_meta(old_region_id_1, shared_file_id, None);
632
633        let mut manifest_1 = create_manifest(old_region_id_1, 0, None, 100, 200);
634        manifest_1.files.insert(shared_file_id, file_meta.clone());
635
636        let mut manifest_2 = create_manifest(old_region_id_2, 0, None, 100, 200);
637        manifest_2.files.insert(shared_file_id, file_meta);
638
639        let mut old_manifests = HashMap::new();
640        old_manifests.insert(old_region_id_1, manifest_1);
641        old_manifests.insert(old_region_id_2, manifest_2);
642
643        let mut new_partition_exprs = HashMap::new();
644        new_partition_exprs.insert(new_region_id, new_expr);
645
646        // Direct mapping: both old regions -> same new region
647        let mut region_mapping = HashMap::new();
648        region_mapping.insert(old_region_id_1, vec![new_region_id]);
649        region_mapping.insert(old_region_id_2, vec![new_region_id]);
650
651        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
652
653        let result = remapper.remap_manifests().unwrap();
654
655        // Should succeed - same file is consistent
656        assert_eq!(result.new_manifests[&new_region_id].files.len(), 1);
657        assert_eq!(result.stats.total_file_refs, 1);
658        assert_eq!(result.stats.unique_files, 1);
659    }
660
661    #[test]
662    fn test_empty_regions() {
663        // Test that empty regions are detected and logged (but not an error)
664        let old_region_id = RegionId::new(1, 1);
665        let new_region_id_1 = RegionId::new(1, 2);
666        let new_region_id_2 = RegionId::new(1, 3);
667
668        let old_expr = range_expr("x", 0, 50);
669        let new_expr_1 = range_expr("x", 0, 50);
670        let new_expr_2 = range_expr("x", 100, 200); // No overlap with old region
671
672        let old_manifest = create_manifest(old_region_id, 5, Some(old_expr.clone()), 100, 200);
673
674        let mut old_manifests = HashMap::new();
675        old_manifests.insert(old_region_id, old_manifest);
676
677        let mut new_partition_exprs = HashMap::new();
678        new_partition_exprs.insert(new_region_id_1, new_expr_1);
679        new_partition_exprs.insert(new_region_id_2, new_expr_2);
680
681        // Direct mapping: old region -> new region 1 only (region 2 is isolated)
682        let mut region_mapping = HashMap::new();
683        region_mapping.insert(old_region_id, vec![new_region_id_1]);
684
685        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
686
687        let result = remapper.remap_manifests().unwrap();
688
689        // Region 2 should be empty
690        assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 5);
691        assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 0);
692        assert_eq!(result.stats.empty_regions, vec![new_region_id_2]);
693    }
694
695    #[test]
696    fn test_n_to_m_complex_repartition() {
697        // Test complex N-to-M transition: 2 old regions -> 3 new regions
698        let old_region_1 = RegionId::new(1, 1);
699        let old_region_2 = RegionId::new(1, 2);
700        let new_region_1 = RegionId::new(1, 3);
701        let new_region_2 = RegionId::new(1, 4);
702        let new_region_3 = RegionId::new(1, 5);
703
704        // Old: [0, 100), [100, 200)
705        // New: [0, 50), [50, 150), [150, 250)
706        let old_expr_1 = range_expr("u", 0, 100);
707        let old_expr_2 = range_expr("u", 100, 200);
708        let new_expr_1 = range_expr("u", 0, 50);
709        let new_expr_2 = range_expr("u", 50, 150);
710        let new_expr_3 = range_expr("u", 150, 250);
711
712        let manifest_1 = create_manifest(old_region_1, 3, Some(old_expr_1.clone()), 100, 200);
713        let manifest_2 = create_manifest(old_region_2, 4, Some(old_expr_2.clone()), 150, 250);
714
715        let mut old_manifests = HashMap::new();
716        old_manifests.insert(old_region_1, manifest_1);
717        old_manifests.insert(old_region_2, manifest_2);
718
719        let mut new_partition_exprs = HashMap::new();
720        new_partition_exprs.insert(new_region_1, new_expr_1);
721        new_partition_exprs.insert(new_region_2, new_expr_2);
722        new_partition_exprs.insert(new_region_3, new_expr_3);
723
724        // Direct mapping:
725        // old region 1 -> new regions 1, 2
726        // old region 2 -> new regions 2, 3
727        let mut region_mapping = HashMap::new();
728        region_mapping.insert(old_region_1, vec![new_region_1, new_region_2]);
729        region_mapping.insert(old_region_2, vec![new_region_2, new_region_3]);
730
731        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
732
733        let result = remapper.remap_manifests().unwrap();
734
735        assert_eq!(result.new_manifests.len(), 3);
736        assert_eq!(result.new_manifests[&new_region_1].files.len(), 3);
737        assert_eq!(result.new_manifests[&new_region_2].files.len(), 7); // 3 + 4
738        assert_eq!(result.new_manifests[&new_region_3].files.len(), 4);
739        assert_eq!(result.stats.total_file_refs, 14); // 3 + 7 + 4
740        assert_eq!(result.stats.unique_files, 7); // 3 + 4 unique
741    }
742}