mito2/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use partition::expr::PartitionExpr;
19use snafu::{OptionExt, ResultExt, ensure};
20use store_api::storage::RegionId;
21
22use crate::error;
23pub use crate::error::{Error, Result};
24use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
25
26/// Remaps file references from old region manifests to new region manifests.
27pub struct RemapManifest {
28    /// Old region manifests indexed by region ID
29    old_manifests: HashMap<RegionId, RegionManifest>,
30    /// New partition expressions indexed by region ID
31    new_partition_exprs: HashMap<RegionId, PartitionExpr>,
32    /// For each old region, which new regions should receive its files
33    region_mapping: HashMap<RegionId, Vec<RegionId>>,
34    /// Newly generated manifests for target regions
35    new_manifests: HashMap<RegionId, RegionManifest>,
36}
37
38impl RemapManifest {
39    pub fn new(
40        old_manifests: HashMap<RegionId, RegionManifest>,
41        new_partition_exprs: HashMap<RegionId, PartitionExpr>,
42        region_mapping: HashMap<RegionId, Vec<RegionId>>,
43    ) -> Self {
44        Self {
45            old_manifests,
46            new_partition_exprs,
47            region_mapping,
48            new_manifests: HashMap::new(),
49        }
50    }
51
52    /// Remaps manifests from old regions to new regions.
53    ///
54    /// Main entry point. It copies files from old regions to new regions based on
55    /// partition expression overlaps.
56    pub fn remap_manifests(&mut self) -> Result<RemapResult> {
57        // initialize new manifests
58        self.initialize_new_manifests()?;
59
60        // remap files
61        self.do_remap()?;
62
63        // merge and set metadata for all new manifests
64        self.finalize_manifests()?;
65
66        // validate and compute statistics
67        let stats = self.compute_stats();
68        self.validate_result(&stats)?;
69
70        let new_manifests = std::mem::take(&mut self.new_manifests);
71
72        Ok(RemapResult {
73            new_manifests,
74            stats,
75        })
76    }
77
78    /// Initializes empty manifests for all new regions.
79    fn initialize_new_manifests(&mut self) -> Result<()> {
80        let mut new_manifests = HashMap::new();
81
82        // Get any old manifest as template (they all share the same table schema)
83        let template_manifest = self
84            .old_manifests
85            .values()
86            .next()
87            .context(error::NoOldManifestsSnafu)?;
88        let template_metadata = (*template_manifest.metadata).clone();
89        let sst_format = template_manifest.sst_format;
90
91        // Create empty manifest for each new region
92        for region_id in self.new_partition_exprs.keys() {
93            // Derive region metadata from any old manifest as template
94            let mut new_metadata = template_metadata.clone();
95
96            new_metadata.region_id = *region_id;
97            let new_partition_expr = self
98                .new_partition_exprs
99                .get(region_id)
100                .context(error::MissingPartitionExprSnafu {
101                    region_id: *region_id,
102                })?
103                .as_json_str()
104                .context(error::SerializePartitionExprSnafu)?;
105            new_metadata.set_partition_expr(Some(new_partition_expr));
106
107            let manifest = RegionManifest {
108                metadata: Arc::new(new_metadata),
109                files: HashMap::new(),
110                removed_files: RemovedFilesRecord::default(),
111                flushed_entry_id: 0,
112                flushed_sequence: 0,
113                committed_sequence: None,
114                manifest_version: 0,
115                truncated_entry_id: None,
116                compaction_time_window: None,
117                sst_format,
118                append_mode: None,
119            };
120
121            new_manifests.insert(*region_id, manifest);
122        }
123
124        self.new_manifests = new_manifests;
125
126        Ok(())
127    }
128
129    /// Remaps files from old regions to new regions according to the region mapping.
130    fn do_remap(&mut self) -> Result<()> {
131        // For each old region and its target new regions
132        for (&from_region_id, target_region_ids) in &self.region_mapping {
133            // Get old manifest
134            let from_manifest = self.old_manifests.get(&from_region_id).context(
135                error::MissingOldManifestSnafu {
136                    region_id: from_region_id,
137                },
138            )?;
139
140            // Copy files to all target new regions
141            for &to_region_id in target_region_ids {
142                let target_manifest = self.new_manifests.get_mut(&to_region_id).context(
143                    error::MissingNewManifestSnafu {
144                        region_id: to_region_id,
145                    },
146                )?;
147
148                Self::copy_files_to_region(from_manifest, target_manifest)?;
149            }
150        }
151
152        Ok(())
153    }
154
155    /// Copies files from a source region to a target region.
156    fn copy_files_to_region(
157        source_manifest: &RegionManifest,
158        target_manifest: &mut RegionManifest,
159    ) -> Result<()> {
160        for (file_id, file_meta) in &source_manifest.files {
161            let file_meta_clone = file_meta.clone();
162
163            // Insert or merge into target manifest
164            // Same file might be added from multiple overlapping old regions
165            use std::collections::hash_map::Entry;
166            match target_manifest.files.entry(*file_id) {
167                Entry::Vacant(e) => {
168                    e.insert(file_meta_clone);
169                }
170                #[cfg(debug_assertions)]
171                Entry::Occupied(e) => {
172                    // File already exists - verify it's the same physical file
173                    Self::verify_file_consistency(e.get(), &file_meta_clone)?;
174                }
175                #[cfg(not(debug_assertions))]
176                Entry::Occupied(_) => {}
177            }
178        }
179
180        Ok(())
181    }
182
183    /// Verifies that two file metadata entries are consistent.
184    #[cfg(debug_assertions)]
185    fn verify_file_consistency(
186        existing: &crate::sst::file::FileMeta,
187        new: &crate::sst::file::FileMeta,
188    ) -> Result<()> {
189        // When the same file appears from multiple overlapping old regions,
190        // verify they are actually the same physical file with identical metadata
191
192        ensure!(
193            existing.region_id == new.region_id,
194            error::InconsistentFileSnafu {
195                file_id: existing.file_id,
196                reason: "region_id mismatch",
197            }
198        );
199
200        ensure!(
201            existing.file_id == new.file_id,
202            error::InconsistentFileSnafu {
203                file_id: existing.file_id,
204                reason: "file_id mismatch",
205            }
206        );
207
208        ensure!(
209            existing.time_range == new.time_range,
210            error::InconsistentFileSnafu {
211                file_id: existing.file_id,
212                reason: "time_range mismatch",
213            }
214        );
215
216        ensure!(
217            existing.level == new.level,
218            error::InconsistentFileSnafu {
219                file_id: existing.file_id,
220                reason: "level mismatch",
221            }
222        );
223
224        ensure!(
225            existing.file_size == new.file_size,
226            error::InconsistentFileSnafu {
227                file_id: existing.file_id,
228                reason: "file_size mismatch",
229            }
230        );
231
232        ensure!(
233            existing.partition_expr == new.partition_expr,
234            error::InconsistentFileSnafu {
235                file_id: existing.file_id,
236                reason: "partition_expr mismatch",
237            }
238        );
239
240        Ok(())
241    }
242
243    /// Finalizes manifests by merging metadata from source regions.
244    fn finalize_manifests(&mut self) -> Result<()> {
245        for (region_id, manifest) in self.new_manifests.iter_mut() {
246            if let Some(previous_manifest) = self.old_manifests.get(region_id) {
247                manifest.flushed_entry_id = previous_manifest.flushed_entry_id;
248                manifest.flushed_sequence = previous_manifest.flushed_sequence;
249                manifest.manifest_version = previous_manifest.manifest_version;
250                manifest.truncated_entry_id = previous_manifest.truncated_entry_id;
251                manifest.compaction_time_window = previous_manifest.compaction_time_window;
252                manifest.committed_sequence = previous_manifest.committed_sequence;
253            } else {
254                // new region
255                manifest.flushed_entry_id = 0;
256                manifest.flushed_sequence = 0;
257                manifest.manifest_version = 0;
258                manifest.truncated_entry_id = None;
259                manifest.compaction_time_window = None;
260                manifest.committed_sequence = None;
261            }
262
263            // removed_files are tracked by old manifests, don't copy
264            manifest.removed_files = RemovedFilesRecord::default();
265        }
266
267        Ok(())
268    }
269
270    /// Computes statistics about the remapping.
271    fn compute_stats(&self) -> RemapStats {
272        let mut files_per_region = HashMap::with_capacity(self.new_manifests.len());
273        let mut total_file_refs = 0;
274        let mut empty_regions = Vec::new();
275        let mut all_files = HashSet::new();
276
277        for (&region_id, manifest) in &self.new_manifests {
278            let file_count = manifest.files.len();
279            files_per_region.insert(region_id, file_count);
280            total_file_refs += file_count;
281
282            if file_count == 0 {
283                empty_regions.push(region_id);
284            }
285
286            for file_id in manifest.files.keys() {
287                all_files.insert(*file_id);
288            }
289        }
290
291        RemapStats {
292            files_per_region,
293            total_file_refs,
294            empty_regions,
295            unique_files: all_files.len(),
296        }
297    }
298
299    /// Validates the remapping result.
300    fn validate_result(&self, stats: &RemapStats) -> Result<()> {
301        // all new regions have manifests
302        for region_id in self.new_partition_exprs.keys() {
303            ensure!(
304                self.new_manifests.contains_key(region_id),
305                error::MissingNewManifestSnafu {
306                    region_id: *region_id
307                }
308            );
309        }
310
311        // no unique files were lost
312        // Count unique files in old manifests (files may be duplicated across regions)
313        let mut old_unique_files = HashSet::new();
314        for manifest in self.old_manifests.values() {
315            for file_id in manifest.files.keys() {
316                old_unique_files.insert(*file_id);
317            }
318        }
319
320        ensure!(
321            stats.unique_files >= old_unique_files.len(),
322            error::FilesLostSnafu {
323                old_count: old_unique_files.len(),
324                new_count: stats.unique_files,
325            }
326        );
327
328        // 3. Log warning about empty regions (not an error)
329        if !stats.empty_regions.is_empty() {
330            common_telemetry::warn!(
331                "Repartition resulted in {} empty regions: {:?}, new partition exprs: {:?}",
332                stats.empty_regions.len(),
333                stats.empty_regions,
334                self.new_partition_exprs.keys().collect::<Vec<_>>()
335            );
336        }
337
338        Ok(())
339    }
340}
341
342/// Result of manifest remapping, including new manifests and statistics.
343#[derive(Debug)]
344pub struct RemapResult {
345    /// New manifests for all new regions
346    pub new_manifests: HashMap<RegionId, RegionManifest>,
347    /// Statistics about the remapping
348    pub stats: RemapStats,
349}
350
351/// Statistical information about the manifest remapping.
352#[derive(Debug)]
353pub struct RemapStats {
354    /// Number of files per region in new manifests
355    pub files_per_region: HashMap<RegionId, usize>,
356    /// Total number of file references created across all new regions
357    pub total_file_refs: usize,
358    /// Regions that received no files (potentially empty after repartition)
359    pub empty_regions: Vec<RegionId>,
360    /// Total number of unique files processed
361    pub unique_files: usize,
362}
363
364#[cfg(test)]
365mod tests {
366    use std::collections::HashMap;
367    use std::num::NonZeroU64;
368    use std::sync::Arc;
369    use std::time::Duration;
370
371    use api::v1::SemanticType;
372    use datatypes::prelude::ConcreteDataType;
373    use datatypes::schema::ColumnSchema;
374    use datatypes::value::Value;
375    use partition::expr::{PartitionExpr, col};
376    use smallvec::SmallVec;
377    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
378    use store_api::storage::{FileId, RegionId, SequenceNumber};
379
380    use super::*;
381    use crate::manifest::action::RegionManifest;
382    use crate::sst::FormatType;
383    use crate::sst::file::{FileMeta, FileTimeRange};
384    use crate::wal::EntryId;
385
386    /// Helper to create a basic region metadata for testing.
387    fn create_region_metadata(region_id: RegionId) -> RegionMetadataRef {
388        let mut builder = RegionMetadataBuilder::new(region_id);
389        builder
390            .push_column_metadata(ColumnMetadata {
391                column_schema: ColumnSchema::new(
392                    "ts",
393                    ConcreteDataType::timestamp_millisecond_datatype(),
394                    false,
395                ),
396                semantic_type: SemanticType::Timestamp,
397                column_id: 1,
398            })
399            .push_column_metadata(ColumnMetadata {
400                column_schema: ColumnSchema::new("pk", ConcreteDataType::int64_datatype(), false),
401                semantic_type: SemanticType::Tag,
402                column_id: 2,
403            })
404            .push_column_metadata(ColumnMetadata {
405                column_schema: ColumnSchema::new(
406                    "val",
407                    ConcreteDataType::float64_datatype(),
408                    false,
409                ),
410                semantic_type: SemanticType::Field,
411                column_id: 3,
412            })
413            .primary_key(vec![2]);
414        Arc::new(builder.build().unwrap())
415    }
416
417    /// Helper to create a FileMeta for testing.
418    fn create_file_meta(
419        region_id: RegionId,
420        file_id: FileId,
421        partition_expr: Option<PartitionExpr>,
422    ) -> FileMeta {
423        FileMeta {
424            region_id,
425            file_id,
426            time_range: FileTimeRange::default(),
427            level: 0,
428            file_size: 1024,
429            max_row_group_uncompressed_size: 1024,
430            available_indexes: SmallVec::new(),
431            indexes: Default::default(),
432            index_file_size: 0,
433            index_version: 0,
434            num_rows: 100,
435            num_row_groups: 1,
436            sequence: NonZeroU64::new(1),
437            partition_expr,
438            num_series: 1,
439        }
440    }
441
442    /// Helper to create a manifest with specified number of files.
443    fn create_manifest(
444        region_id: RegionId,
445        num_files: usize,
446        partition_expr: Option<PartitionExpr>,
447        flushed_entry_id: EntryId,
448        flushed_sequence: SequenceNumber,
449    ) -> RegionManifest {
450        let mut files = HashMap::new();
451        for _ in 0..num_files {
452            let file_id = FileId::random();
453            let file_meta = create_file_meta(region_id, file_id, partition_expr.clone());
454            files.insert(file_id, file_meta);
455        }
456
457        RegionManifest {
458            metadata: create_region_metadata(region_id),
459            files,
460            removed_files: RemovedFilesRecord::default(),
461            flushed_entry_id,
462            flushed_sequence,
463            manifest_version: 1,
464            truncated_entry_id: None,
465            compaction_time_window: None,
466            committed_sequence: None,
467            sst_format: FormatType::PrimaryKey,
468            append_mode: None,
469        }
470    }
471
472    /// Helper to create partition expression: col >= start AND col < end
473    fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
474        col(col_name)
475            .gt_eq(Value::Int64(start))
476            .and(col(col_name).lt(Value::Int64(end)))
477    }
478
479    #[test]
480    fn test_simple_split() {
481        // One region [0, 100) splits into two regions: [0, 50) and [50, 100)
482        let old_region_id = RegionId::new(1, 1);
483        let new_region_id_1 = RegionId::new(1, 2);
484        let new_region_id_2 = RegionId::new(1, 3);
485
486        let old_expr = range_expr("x", 0, 100);
487        let new_expr_1 = range_expr("x", 0, 50);
488        let new_expr_2 = range_expr("x", 50, 100);
489
490        let old_manifest = create_manifest(old_region_id, 10, Some(old_expr.clone()), 100, 200);
491
492        let mut old_manifests = HashMap::new();
493        old_manifests.insert(old_region_id, old_manifest);
494
495        let mut new_partition_exprs = HashMap::new();
496        new_partition_exprs.insert(new_region_id_1, new_expr_1);
497        new_partition_exprs.insert(new_region_id_2, new_expr_2);
498
499        // Direct mapping: old region -> both new regions
500        let mut region_mapping = HashMap::new();
501        region_mapping.insert(old_region_id, vec![new_region_id_1, new_region_id_2]);
502
503        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
504
505        let result = remapper.remap_manifests().unwrap();
506
507        // Both new regions should have all 10 files
508        assert_eq!(result.new_manifests.len(), 2);
509        assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 10);
510        assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 10);
511        assert_eq!(result.stats.total_file_refs, 20);
512        assert_eq!(result.stats.unique_files, 10);
513        assert!(result.stats.empty_regions.is_empty());
514
515        // Verify FileMeta is immutable - region_id stays as old_region_id
516        for file_meta in result.new_manifests[&new_region_id_1].files.values() {
517            assert_eq!(file_meta.region_id, old_region_id);
518        }
519        for file_meta in result.new_manifests[&new_region_id_2].files.values() {
520            assert_eq!(file_meta.region_id, old_region_id);
521        }
522    }
523
524    #[test]
525    fn test_simple_merge() {
526        // Two regions [0, 50) and [50, 100) merge into one region [0, 100)
527        let old_region_id_1 = RegionId::new(1, 1);
528        let old_region_id_2 = RegionId::new(1, 2);
529        let new_region_id = RegionId::new(1, 3);
530
531        let old_expr_1 = range_expr("x", 0, 50);
532        let old_expr_2 = range_expr("x", 50, 100);
533        let new_expr = range_expr("x", 0, 100);
534
535        let manifest_1 = create_manifest(old_region_id_1, 5, Some(old_expr_1.clone()), 100, 200);
536        let manifest_2 = create_manifest(old_region_id_2, 5, Some(old_expr_2.clone()), 150, 250);
537
538        let mut old_manifests = HashMap::new();
539        old_manifests.insert(old_region_id_1, manifest_1);
540        old_manifests.insert(old_region_id_2, manifest_2);
541
542        let mut new_partition_exprs = HashMap::new();
543        new_partition_exprs.insert(new_region_id, new_expr);
544
545        // Direct mapping: both old regions -> same new region
546        let mut region_mapping = HashMap::new();
547        region_mapping.insert(old_region_id_1, vec![new_region_id]);
548        region_mapping.insert(old_region_id_2, vec![new_region_id]);
549
550        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
551
552        let result = remapper.remap_manifests().unwrap();
553
554        // New region should have all 10 files
555        assert_eq!(result.new_manifests.len(), 1);
556        assert_eq!(result.new_manifests[&new_region_id].files.len(), 10);
557        assert_eq!(result.stats.total_file_refs, 10);
558        assert_eq!(result.stats.unique_files, 10);
559        assert!(result.stats.empty_regions.is_empty());
560
561        // Verify metadata falls back to defaults when no prior manifest exists for the region
562        let new_manifest = &result.new_manifests[&new_region_id];
563        assert_eq!(new_manifest.flushed_entry_id, 0);
564        assert_eq!(new_manifest.flushed_sequence, 0);
565        assert_eq!(new_manifest.manifest_version, 0);
566        assert_eq!(new_manifest.truncated_entry_id, None);
567        assert_eq!(new_manifest.compaction_time_window, None);
568    }
569
570    #[test]
571    fn test_metadata_preserved_for_existing_region() {
572        // Test that metadata is preserved when a previous manifest exists for the same region id
573        let old_region_id_1 = RegionId::new(1, 1);
574        let old_region_id_2 = RegionId::new(1, 2);
575        let old_region_id_3 = RegionId::new(1, 3);
576        let new_region_id = RegionId::new(1, 4);
577
578        let new_expr = range_expr("x", 0, 100);
579
580        let mut manifest_1 = create_manifest(old_region_id_1, 2, None, 10, 20);
581        manifest_1.truncated_entry_id = Some(5);
582        manifest_1.compaction_time_window = Some(Duration::from_secs(3600));
583
584        let mut manifest_2 = create_manifest(old_region_id_2, 2, None, 25, 15); // Higher entry_id, lower sequence
585        manifest_2.truncated_entry_id = Some(20);
586        manifest_2.compaction_time_window = Some(Duration::from_secs(7200)); // Larger window
587
588        let manifest_3 = create_manifest(old_region_id_3, 2, None, 15, 30); // Lower entry_id, higher sequence
589        let mut previous_manifest = create_manifest(new_region_id, 0, None, 200, 300);
590        previous_manifest.truncated_entry_id = Some(40);
591        previous_manifest.compaction_time_window = Some(Duration::from_secs(1800));
592        previous_manifest.manifest_version = 7;
593        let expected_flushed_entry_id = previous_manifest.flushed_entry_id;
594        let expected_flushed_sequence = previous_manifest.flushed_sequence;
595        let expected_truncated_entry_id = previous_manifest.truncated_entry_id;
596        let expected_compaction_window = previous_manifest.compaction_time_window;
597        let expected_manifest_version = previous_manifest.manifest_version;
598
599        let mut old_manifests = HashMap::new();
600        old_manifests.insert(old_region_id_1, manifest_1);
601        old_manifests.insert(old_region_id_2, manifest_2);
602        old_manifests.insert(old_region_id_3, manifest_3);
603        old_manifests.insert(new_region_id, previous_manifest);
604
605        let mut new_partition_exprs = HashMap::new();
606        new_partition_exprs.insert(new_region_id, new_expr);
607
608        // Direct mapping: all old regions -> same new region
609        let mut region_mapping = HashMap::new();
610        region_mapping.insert(old_region_id_1, vec![new_region_id]);
611        region_mapping.insert(old_region_id_2, vec![new_region_id]);
612        region_mapping.insert(old_region_id_3, vec![new_region_id]);
613
614        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
615
616        let result = remapper.remap_manifests().unwrap();
617
618        let new_manifest = &result.new_manifests[&new_region_id];
619        // Should reuse metadata from previous manifest of the same region id
620        assert_eq!(new_manifest.flushed_entry_id, expected_flushed_entry_id);
621        assert_eq!(new_manifest.flushed_sequence, expected_flushed_sequence);
622        assert_eq!(new_manifest.truncated_entry_id, expected_truncated_entry_id);
623        assert_eq!(
624            new_manifest.compaction_time_window,
625            expected_compaction_window
626        );
627        assert_eq!(new_manifest.manifest_version, expected_manifest_version);
628    }
629
630    #[test]
631    fn test_file_consistency_check() {
632        // Test that duplicate files are verified for consistency
633        let old_region_id_1 = RegionId::new(1, 1);
634        let old_region_id_2 = RegionId::new(1, 2);
635        let new_region_id = RegionId::new(1, 3);
636
637        let new_expr = range_expr("x", 0, 100);
638
639        // Create manifests with same file (overlapping regions)
640        let shared_file_id = FileId::random();
641        let file_meta = create_file_meta(old_region_id_1, shared_file_id, None);
642
643        let mut manifest_1 = create_manifest(old_region_id_1, 0, None, 100, 200);
644        manifest_1.files.insert(shared_file_id, file_meta.clone());
645
646        let mut manifest_2 = create_manifest(old_region_id_2, 0, None, 100, 200);
647        manifest_2.files.insert(shared_file_id, file_meta);
648
649        let mut old_manifests = HashMap::new();
650        old_manifests.insert(old_region_id_1, manifest_1);
651        old_manifests.insert(old_region_id_2, manifest_2);
652
653        let mut new_partition_exprs = HashMap::new();
654        new_partition_exprs.insert(new_region_id, new_expr);
655
656        // Direct mapping: both old regions -> same new region
657        let mut region_mapping = HashMap::new();
658        region_mapping.insert(old_region_id_1, vec![new_region_id]);
659        region_mapping.insert(old_region_id_2, vec![new_region_id]);
660
661        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
662
663        let result = remapper.remap_manifests().unwrap();
664
665        // Should succeed - same file is consistent
666        assert_eq!(result.new_manifests[&new_region_id].files.len(), 1);
667        assert_eq!(result.stats.total_file_refs, 1);
668        assert_eq!(result.stats.unique_files, 1);
669    }
670
671    #[test]
672    fn test_empty_regions() {
673        // Test that empty regions are detected and logged (but not an error)
674        let old_region_id = RegionId::new(1, 1);
675        let new_region_id_1 = RegionId::new(1, 2);
676        let new_region_id_2 = RegionId::new(1, 3);
677
678        let old_expr = range_expr("x", 0, 50);
679        let new_expr_1 = range_expr("x", 0, 50);
680        let new_expr_2 = range_expr("x", 100, 200); // No overlap with old region
681
682        let old_manifest = create_manifest(old_region_id, 5, Some(old_expr.clone()), 100, 200);
683
684        let mut old_manifests = HashMap::new();
685        old_manifests.insert(old_region_id, old_manifest);
686
687        let mut new_partition_exprs = HashMap::new();
688        new_partition_exprs.insert(new_region_id_1, new_expr_1);
689        new_partition_exprs.insert(new_region_id_2, new_expr_2);
690
691        // Direct mapping: old region -> new region 1 only (region 2 is isolated)
692        let mut region_mapping = HashMap::new();
693        region_mapping.insert(old_region_id, vec![new_region_id_1]);
694
695        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
696
697        let result = remapper.remap_manifests().unwrap();
698
699        // Region 2 should be empty
700        assert_eq!(result.new_manifests[&new_region_id_1].files.len(), 5);
701        assert_eq!(result.new_manifests[&new_region_id_2].files.len(), 0);
702        assert_eq!(result.stats.empty_regions, vec![new_region_id_2]);
703    }
704
705    #[test]
706    fn test_n_to_m_complex_repartition() {
707        // Test complex N-to-M transition: 2 old regions -> 3 new regions
708        let old_region_1 = RegionId::new(1, 1);
709        let old_region_2 = RegionId::new(1, 2);
710        let new_region_1 = RegionId::new(1, 3);
711        let new_region_2 = RegionId::new(1, 4);
712        let new_region_3 = RegionId::new(1, 5);
713
714        // Old: [0, 100), [100, 200)
715        // New: [0, 50), [50, 150), [150, 250)
716        let old_expr_1 = range_expr("u", 0, 100);
717        let old_expr_2 = range_expr("u", 100, 200);
718        let new_expr_1 = range_expr("u", 0, 50);
719        let new_expr_2 = range_expr("u", 50, 150);
720        let new_expr_3 = range_expr("u", 150, 250);
721
722        let manifest_1 = create_manifest(old_region_1, 3, Some(old_expr_1.clone()), 100, 200);
723        let manifest_2 = create_manifest(old_region_2, 4, Some(old_expr_2.clone()), 150, 250);
724
725        let mut old_manifests = HashMap::new();
726        old_manifests.insert(old_region_1, manifest_1);
727        old_manifests.insert(old_region_2, manifest_2);
728
729        let mut new_partition_exprs = HashMap::new();
730        new_partition_exprs.insert(new_region_1, new_expr_1);
731        new_partition_exprs.insert(new_region_2, new_expr_2);
732        new_partition_exprs.insert(new_region_3, new_expr_3);
733
734        // Direct mapping:
735        // old region 1 -> new regions 1, 2
736        // old region 2 -> new regions 2, 3
737        let mut region_mapping = HashMap::new();
738        region_mapping.insert(old_region_1, vec![new_region_1, new_region_2]);
739        region_mapping.insert(old_region_2, vec![new_region_2, new_region_3]);
740
741        let mut remapper = RemapManifest::new(old_manifests, new_partition_exprs, region_mapping);
742
743        let result = remapper.remap_manifests().unwrap();
744
745        assert_eq!(result.new_manifests.len(), 3);
746        assert_eq!(result.new_manifests[&new_region_1].files.len(), 3);
747        assert_eq!(result.new_manifests[&new_region_2].files.len(), 7); // 3 + 4
748        assert_eq!(result.new_manifests[&new_region_3].files.len(), 4);
749        assert_eq!(result.stats.total_file_refs, 14); // 3 + 7 + 4
750        assert_eq!(result.stats.unique_files, 7); // 3 + 4 unique
751    }
752}