mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use common_telemetry::warn;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt};
24use store_api::ManifestVersion;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
27use strum::Display;
28
29use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
30use crate::manifest::manager::RemoveFileOptions;
31use crate::region::ManifestStats;
32use crate::sst::FormatType;
33use crate::sst::file::FileMeta;
34use crate::wal::EntryId;
35
36/// Actions that can be applied to region manifest.
37#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
38pub enum RegionMetaAction {
39    /// Change region's metadata for request like ALTER
40    Change(RegionChange),
41    /// Change only region partition expression metadata.
42    PartitionExprChange(RegionPartitionExprChange),
43    /// Edit region's state for changing options or file list.
44    Edit(RegionEdit),
45    /// Remove the region.
46    Remove(RegionRemove),
47    /// Truncate the region.
48    Truncate(RegionTruncate),
49}
50
51#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
52pub struct RegionPartitionExprChange {
53    /// Partition expression serialized as JSON.
54    pub partition_expr: Option<String>,
55}
56
57impl RegionMetaAction {
58    /// Returns true if the action is a change action.
59    pub fn is_change(&self) -> bool {
60        matches!(self, RegionMetaAction::Change(_))
61    }
62
63    /// Returns true if the action is an edit action.
64    pub fn is_edit(&self) -> bool {
65        matches!(self, RegionMetaAction::Edit(_))
66    }
67
68    /// Returns true if the action is a partition expr change action.
69    pub fn is_partition_expr_change(&self) -> bool {
70        matches!(self, RegionMetaAction::PartitionExprChange(_))
71    }
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
75pub struct RegionChange {
76    /// The metadata after changed.
77    pub metadata: RegionMetadataRef,
78    /// Format of the SST.
79    #[serde(default)]
80    pub sst_format: FormatType,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
84pub struct RegionEdit {
85    pub files_to_add: Vec<FileMeta>,
86    pub files_to_remove: Vec<FileMeta>,
87    /// event unix timestamp in milliseconds, help to determine file deletion time.
88    #[serde(default)]
89    pub timestamp_ms: Option<i64>,
90    #[serde(with = "humantime_serde")]
91    pub compaction_time_window: Option<Duration>,
92    pub flushed_entry_id: Option<EntryId>,
93    pub flushed_sequence: Option<SequenceNumber>,
94    pub committed_sequence: Option<SequenceNumber>,
95}
96
97#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
98pub struct RegionRemove {
99    pub region_id: RegionId,
100}
101
102/// Last data truncated in the region.
103///
104#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
105pub struct RegionTruncate {
106    pub region_id: RegionId,
107    #[serde(flatten)]
108    pub kind: TruncateKind,
109    /// event unix timestamp in milliseconds, help to determine file deletion time.
110    #[serde(default)]
111    pub timestamp_ms: Option<i64>,
112}
113
114/// The kind of truncate operation.
115#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
116#[serde(untagged)]
117pub enum TruncateKind {
118    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
119    All {
120        /// Last WAL entry id of truncated data.
121        truncated_entry_id: EntryId,
122        // Last sequence number of truncated data.
123        truncated_sequence: SequenceNumber,
124    },
125    /// Only remove certain files in the region.
126    Partial { files_to_remove: Vec<FileMeta> },
127}
128
129/// The region manifest data.
130#[derive(Serialize, Deserialize, Clone, Debug)]
131#[cfg_attr(test, derive(Eq))]
132pub struct RegionManifest {
133    /// Metadata of the region.
134    pub metadata: RegionMetadataRef,
135    /// SST files.
136    pub files: HashMap<FileId, FileMeta>,
137    /// Removed files, which are not in the current manifest but may still be kept for a while.
138    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
139    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
140    ///
141    /// Using same checkpoint files and action files, the recovered manifest may differ in this
142    /// `removed_files` field, because the checkpointer may evict some removed files using
143    /// current machine time. This is acceptable because the removed files are not used in normal
144    /// read/write path.
145    ///
146    #[serde(default)]
147    pub removed_files: RemovedFilesRecord,
148    /// Last WAL entry id of flushed data.
149    pub flushed_entry_id: EntryId,
150    /// Last sequence of flushed data.
151    pub flushed_sequence: SequenceNumber,
152    pub committed_sequence: Option<SequenceNumber>,
153    /// Current manifest version.
154    pub manifest_version: ManifestVersion,
155    /// Last WAL entry id of truncated data.
156    pub truncated_entry_id: Option<EntryId>,
157    /// Inferred compaction time window.
158    #[serde(with = "humantime_serde")]
159    pub compaction_time_window: Option<Duration>,
160    /// Format of the SST file.
161    #[serde(default)]
162    pub sst_format: FormatType,
163}
164
165#[cfg(test)]
166impl PartialEq for RegionManifest {
167    fn eq(&self, other: &Self) -> bool {
168        self.metadata == other.metadata
169            && self.files == other.files
170            && self.flushed_entry_id == other.flushed_entry_id
171            && self.flushed_sequence == other.flushed_sequence
172            && self.manifest_version == other.manifest_version
173            && self.truncated_entry_id == other.truncated_entry_id
174            && self.compaction_time_window == other.compaction_time_window
175            && self.committed_sequence == other.committed_sequence
176    }
177}
178
179#[derive(Debug, Default)]
180pub struct RegionManifestBuilder {
181    metadata: Option<RegionMetadataRef>,
182    files: HashMap<FileId, FileMeta>,
183    pub removed_files: RemovedFilesRecord,
184    flushed_entry_id: EntryId,
185    flushed_sequence: SequenceNumber,
186    manifest_version: ManifestVersion,
187    truncated_entry_id: Option<EntryId>,
188    compaction_time_window: Option<Duration>,
189    committed_sequence: Option<SequenceNumber>,
190    sst_format: FormatType,
191}
192
193impl RegionManifestBuilder {
194    /// Start with a checkpoint.
195    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
196        if let Some(s) = checkpoint {
197            Self {
198                metadata: Some(s.metadata),
199                files: s.files,
200                removed_files: s.removed_files,
201                flushed_entry_id: s.flushed_entry_id,
202                manifest_version: s.manifest_version,
203                flushed_sequence: s.flushed_sequence,
204                truncated_entry_id: s.truncated_entry_id,
205                compaction_time_window: s.compaction_time_window,
206                committed_sequence: s.committed_sequence,
207                sst_format: s.sst_format,
208            }
209        } else {
210            Default::default()
211        }
212    }
213
214    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
215        self.metadata = Some(change.metadata);
216        self.manifest_version = manifest_version;
217        self.sst_format = change.sst_format;
218    }
219
220    /// Applies a partition-expression-only metadata change.
221    ///
222    /// This path updates only `partition_expr` (and its derived
223    /// `partition_rule_version`) on current metadata and does not touch
224    /// `sst_format`.
225    pub fn apply_partition_expr_change(
226        &mut self,
227        manifest_version: ManifestVersion,
228        change: RegionPartitionExprChange,
229    ) {
230        if let Some(metadata) = &self.metadata {
231            let mut metadata = metadata.as_ref().clone();
232            metadata.set_partition_expr(change.partition_expr);
233            self.metadata = Some(metadata.into());
234            self.manifest_version = manifest_version;
235        } else {
236            warn!(
237                "metadata is not set in region manifest builder, ignore partition expr change: {:?}",
238                change
239            );
240        }
241    }
242
243    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
244        self.manifest_version = manifest_version;
245
246        let mut removed_files = vec![];
247        for file in edit.files_to_add {
248            if let Some(old_file) = self.files.insert(file.file_id, file.clone())
249                && let Some(old_index) = old_file.index_version()
250                && !old_file.is_index_up_to_date(&file)
251            {
252                // The old file has an index that is now outdated.
253                removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
254            }
255        }
256        removed_files.extend(
257            edit.files_to_remove
258                .iter()
259                .map(|f| RemovedFile::File(f.file_id, f.index_version())),
260        );
261        let at = edit
262            .timestamp_ms
263            .unwrap_or_else(|| Utc::now().timestamp_millis());
264        self.removed_files.add_removed_files(removed_files, at);
265
266        for file in edit.files_to_remove {
267            self.files.remove(&file.file_id);
268        }
269        if let Some(flushed_entry_id) = edit.flushed_entry_id {
270            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
271        }
272        if let Some(flushed_sequence) = edit.flushed_sequence {
273            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
274        }
275
276        if let Some(committed_sequence) = edit.committed_sequence {
277            self.committed_sequence = Some(
278                self.committed_sequence
279                    .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
280            );
281        }
282        if let Some(window) = edit.compaction_time_window {
283            self.compaction_time_window = Some(window);
284        }
285    }
286
287    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
288        self.manifest_version = manifest_version;
289        match truncate.kind {
290            TruncateKind::All {
291                truncated_entry_id,
292                truncated_sequence,
293            } => {
294                self.flushed_entry_id = truncated_entry_id;
295                self.flushed_sequence = truncated_sequence;
296                self.truncated_entry_id = Some(truncated_entry_id);
297                self.removed_files.add_removed_files(
298                    self.files
299                        .values()
300                        .map(|f| RemovedFile::File(f.file_id, f.index_version()))
301                        .collect(),
302                    truncate
303                        .timestamp_ms
304                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
305                );
306                self.files.clear();
307            }
308            TruncateKind::Partial { files_to_remove } => {
309                self.removed_files.add_removed_files(
310                    files_to_remove
311                        .iter()
312                        .map(|f| RemovedFile::File(f.file_id, f.index_version()))
313                        .collect(),
314                    truncate
315                        .timestamp_ms
316                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
317                );
318                for file in files_to_remove {
319                    self.files.remove(&file.file_id);
320                }
321            }
322        }
323    }
324
325    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
326        &self.files
327    }
328
329    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
330    pub fn contains_metadata(&self) -> bool {
331        self.metadata.is_some()
332    }
333
334    pub fn try_build(self) -> Result<RegionManifest> {
335        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
336        Ok(RegionManifest {
337            metadata,
338            files: self.files,
339            removed_files: self.removed_files,
340            flushed_entry_id: self.flushed_entry_id,
341            flushed_sequence: self.flushed_sequence,
342            committed_sequence: self.committed_sequence,
343            manifest_version: self.manifest_version,
344            truncated_entry_id: self.truncated_entry_id,
345            compaction_time_window: self.compaction_time_window,
346            sst_format: self.sst_format,
347        })
348    }
349}
350
351/// A record of removed files in the region manifest.
352/// This is used to keep track of files that have been removed from the manifest but may still
353/// be kept for a while
354#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
355pub struct RemovedFilesRecord {
356    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
357    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
358    pub removed_files: Vec<RemovedFiles>,
359}
360
361impl RemovedFilesRecord {
362    /// Clear the actually deleted files from the list of removed files
363    pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
364        let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
365        for files in self.removed_files.iter_mut() {
366            files
367                .files
368                .retain(|removed| !deleted_file_set.contains(removed));
369        }
370
371        self.removed_files.retain(|fs| !fs.files.is_empty());
372    }
373
374    pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
375        let cnt = self
376            .removed_files
377            .iter()
378            .map(|r| r.files.len() as u64)
379            .sum();
380        stats
381            .file_removed_cnt
382            .store(cnt, std::sync::atomic::Ordering::Relaxed);
383    }
384}
385
386#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
387pub struct RemovedFiles {
388    /// The timestamp is the time when
389    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
390    pub removed_at: i64,
391    /// The set of file ids that are removed.
392    #[serde(default)]
393    pub files: HashSet<RemovedFile>,
394}
395
396/// A removed file, which can be a data file(optional paired with a index file) or an outdated index file.
397#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
398pub enum RemovedFile {
399    File(FileId, Option<IndexVersion>),
400    Index(FileId, IndexVersion),
401}
402
403/// Support deserialize from old format(just FileId as string) for backward compatibility
404/// into current format(RemovedFile enum).
405/// This is needed just in case there are old manifests with removed files recorded.
406impl<'de> Deserialize<'de> for RemovedFile {
407    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
408    where
409        D: serde::Deserializer<'de>,
410    {
411        #[derive(Deserialize)]
412        #[serde(untagged)]
413        enum CompatRemovedFile {
414            Enum(RemovedFileEnum),
415            FileId(FileId),
416        }
417
418        #[derive(Deserialize)]
419        enum RemovedFileEnum {
420            File(FileId, Option<IndexVersion>),
421            Index(FileId, IndexVersion),
422        }
423
424        let compat = CompatRemovedFile::deserialize(deserializer)?;
425        match compat {
426            CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
427            CompatRemovedFile::Enum(e) => match e {
428                RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
429                RemovedFileEnum::Index(file_id, version) => {
430                    Ok(RemovedFile::Index(file_id, version))
431                }
432            },
433        }
434    }
435}
436
437impl RemovedFile {
438    pub fn file_id(&self) -> FileId {
439        match self {
440            RemovedFile::File(file_id, _) => *file_id,
441            RemovedFile::Index(file_id, _) => *file_id,
442        }
443    }
444
445    pub fn index_version(&self) -> Option<IndexVersion> {
446        match self {
447            RemovedFile::File(_, index_version) => *index_version,
448            RemovedFile::Index(_, index_version) => Some(*index_version),
449        }
450    }
451}
452
453impl RemovedFilesRecord {
454    /// Add a record of removed files with the current timestamp.
455    pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
456        if removed.is_empty() {
457            return;
458        }
459        let files = removed.into_iter().collect();
460        self.removed_files.push(RemovedFiles {
461            removed_at: at,
462            files,
463        });
464    }
465
466    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
467        if !opt.enable_gc {
468            // If GC is not enabled, always keep removed files empty.
469            self.removed_files.clear();
470            return Ok(());
471        }
472
473        // if GC is enabled, rely on gc worker to delete files, and evict removed files based on options.
474
475        Ok(())
476    }
477}
478
479// The checkpoint of region manifest, generated by checkpointer.
480#[derive(Serialize, Deserialize, Debug, Clone)]
481#[cfg_attr(test, derive(PartialEq, Eq))]
482pub struct RegionCheckpoint {
483    /// The last manifest version that this checkpoint compacts(inclusive).
484    pub last_version: ManifestVersion,
485    // The number of manifest actions that this checkpoint compacts.
486    pub compacted_actions: usize,
487    // The checkpoint data
488    pub checkpoint: Option<RegionManifest>,
489}
490
491impl RegionCheckpoint {
492    pub fn last_version(&self) -> ManifestVersion {
493        self.last_version
494    }
495
496    pub fn encode(&self) -> Result<Vec<u8>> {
497        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
498
499        Ok(json.into_bytes())
500    }
501
502    pub fn decode(bytes: &[u8]) -> Result<Self> {
503        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
504
505        serde_json::from_str(data).context(SerdeJsonSnafu)
506    }
507}
508
509#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
510pub struct RegionMetaActionList {
511    pub actions: Vec<RegionMetaAction>,
512}
513
514impl RegionMetaActionList {
515    pub fn with_action(action: RegionMetaAction) -> Self {
516        Self {
517            actions: vec![action],
518        }
519    }
520
521    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
522        Self { actions }
523    }
524
525    /// Split the actions into a partition expr change, a region change and an edit.
526    pub fn split_region_change_and_edit(
527        self,
528    ) -> (
529        Option<RegionPartitionExprChange>,
530        Option<RegionChange>,
531        RegionEdit,
532    ) {
533        let mut edit = RegionEdit {
534            files_to_add: Vec::new(),
535            files_to_remove: Vec::new(),
536            timestamp_ms: None,
537            compaction_time_window: None,
538            flushed_entry_id: None,
539            flushed_sequence: None,
540            committed_sequence: None,
541        };
542        let mut partition_expr_change = None;
543        let mut region_change = None;
544        for action in self.actions {
545            match action {
546                RegionMetaAction::PartitionExprChange(change) => {
547                    partition_expr_change = Some(change);
548                }
549                RegionMetaAction::Change(change) => {
550                    region_change = Some(change);
551                }
552                RegionMetaAction::Edit(region_edit) => {
553                    // Merge file adds/removes
554                    edit.files_to_add.extend(region_edit.files_to_add);
555                    edit.files_to_remove.extend(region_edit.files_to_remove);
556                    // Max of flushed entry id / sequence
557                    if let Some(eid) = region_edit.flushed_entry_id {
558                        edit.flushed_entry_id =
559                            Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
560                    }
561                    if let Some(seq) = region_edit.flushed_sequence {
562                        edit.flushed_sequence =
563                            Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
564                    }
565                    if let Some(seq) = region_edit.committed_sequence {
566                        edit.committed_sequence =
567                            Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
568                    }
569                    // Prefer the latest non-none time window
570                    if region_edit.compaction_time_window.is_some() {
571                        edit.compaction_time_window = region_edit.compaction_time_window;
572                    }
573                }
574                _ => {}
575            }
576        }
577
578        (partition_expr_change, region_change, edit)
579    }
580}
581
582impl RegionMetaActionList {
583    /// Encode self into json in the form of string lines.
584    pub fn encode(&self) -> Result<Vec<u8>> {
585        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
586
587        Ok(json.into_bytes())
588    }
589
590    pub fn decode(bytes: &[u8]) -> Result<Self> {
591        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
592
593        serde_json::from_str(data).context(SerdeJsonSnafu)
594    }
595}
596
597#[cfg(test)]
598mod tests {
599
600    use common_time::Timestamp;
601
602    use super::*;
603
604    // These tests are used to ensure backward compatibility of manifest files.
605    // DO NOT modify the serialized string when they fail, check if your
606    // modification to manifest-related structs is compatible with older manifests.
607    #[test]
608    fn test_region_action_compatibility() {
609        let region_edit = r#"{
610            "flushed_entry_id":null,
611            "compaction_time_window":null,
612            "files_to_add":[
613            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
614            ],
615            "files_to_remove":[
616            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
617            ]
618        }"#;
619        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
620
621        let region_edit = r#"{
622            "flushed_entry_id":10,
623            "flushed_sequence":10,
624            "compaction_time_window":null,
625            "files_to_add":[
626            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
627            ],
628            "files_to_remove":[
629            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
630            ]
631        }"#;
632        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
633
634        // Note: For backward compatibility, the test accepts a RegionChange without sst_format
635        let region_change = r#" {
636            "metadata":{
637                "column_metadatas":[
638                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
639                ],
640                "primary_key":[1],
641                "region_id":5299989648942,
642                "schema_version":0
643            }
644            }"#;
645        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
646
647        let region_remove = r#"{"region_id":42}"#;
648        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
649
650        let region_partition_expr_change = r#"{
651            "partition_expr": "{\"expr\":\"x < 100\"}"
652        }"#;
653        let _ = serde_json::from_str::<RegionPartitionExprChange>(region_partition_expr_change)
654            .unwrap();
655    }
656
657    #[test]
658    fn test_region_manifest_builder() {
659        // TODO(ruihang): port this test case
660    }
661
662    #[test]
663    fn test_encode_decode_region_checkpoint() {
664        // TODO(ruihang): port this test case
665    }
666
667    #[test]
668    fn test_region_manifest_compatibility() {
669        // Test deserializing RegionManifest from old schema where FileId is a UUID string
670        let region_manifest_json = r#"{
671            "metadata": {
672                "column_metadatas": [
673                    {
674                        "column_schema": {
675                            "name": "a",
676                            "data_type": {"Int64": {}},
677                            "is_nullable": false,
678                            "is_time_index": false,
679                            "default_constraint": null,
680                            "metadata": {}
681                        },
682                        "semantic_type": "Tag",
683                        "column_id": 1
684                    },
685                    {
686                        "column_schema": {
687                            "name": "b",
688                            "data_type": {"Float64": {}},
689                            "is_nullable": false,
690                            "is_time_index": false,
691                            "default_constraint": null,
692                            "metadata": {}
693                        },
694                        "semantic_type": "Field",
695                        "column_id": 2
696                    },
697                    {
698                        "column_schema": {
699                            "name": "c",
700                            "data_type": {"Timestamp": {"Millisecond": null}},
701                            "is_nullable": false,
702                            "is_time_index": false,
703                            "default_constraint": null,
704                            "metadata": {}
705                        },
706                        "semantic_type": "Timestamp",
707                        "column_id": 3
708                    }
709                ],
710                "primary_key": [1],
711                "region_id": 4402341478400,
712                "schema_version": 0
713            },
714            "files": {
715                "4b220a70-2b03-4641-9687-b65d94641208": {
716                    "region_id": 4402341478400,
717                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
718                    "time_range": [
719                        {"value": 1451609210000, "unit": "Millisecond"},
720                        {"value": 1451609520000, "unit": "Millisecond"}
721                    ],
722                    "level": 1,
723                    "file_size": 100
724                },
725                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
726                    "region_id": 4402341478400,
727                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
728                    "time_range": [
729                        {"value": 1451609210000, "unit": "Millisecond"},
730                        {"value": 1451609520000, "unit": "Millisecond"}
731                    ],
732                    "level": 0,
733                    "file_size": 100
734                }
735            },
736            "flushed_entry_id": 10,
737            "flushed_sequence": 20,
738            "manifest_version": 1,
739            "truncated_entry_id": null,
740            "compaction_time_window": null
741        }"#;
742
743        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
744
745        // Verify that the files were correctly deserialized
746        assert_eq!(manifest.files.len(), 2);
747        assert_eq!(manifest.flushed_entry_id, 10);
748        assert_eq!(manifest.flushed_sequence, 20);
749        assert_eq!(manifest.manifest_version, 1);
750
751        // Verify that FileIds were correctly parsed from UUID strings
752        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
753        file_ids.sort_unstable();
754        assert_eq!(
755            file_ids,
756            vec![
757                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
758                "4b220a70-2b03-4641-9687-b65d94641208",
759            ]
760        );
761
762        // Roundtrip test with current FileId format
763        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
764        let deserialized_manifest: RegionManifest =
765            serde_json::from_str(&serialized_manifest).unwrap();
766        assert_eq!(manifest, deserialized_manifest);
767        assert_ne!(serialized_manifest, region_manifest_json);
768    }
769
770    #[test]
771    fn test_region_truncate_compat() {
772        // Test deserializing RegionTruncate from old schema
773        let region_truncate_json = r#"{
774            "region_id": 4402341478400,
775            "truncated_entry_id": 10,
776            "truncated_sequence": 20
777        }"#;
778
779        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
780        assert_eq!(truncate_v1.region_id, 4402341478400);
781        assert_eq!(
782            truncate_v1.kind,
783            TruncateKind::All {
784                truncated_entry_id: 10,
785                truncated_sequence: 20,
786            }
787        );
788
789        // Test deserializing RegionTruncate from new schema
790        let region_truncate_v2_json = r#"{
791    "region_id": 4402341478400,
792    "files_to_remove": [
793        {
794            "region_id": 4402341478400,
795            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
796            "time_range": [
797                {
798                    "value": 1451609210000,
799                    "unit": "Millisecond"
800                },
801                {
802                    "value": 1451609520000,
803                    "unit": "Millisecond"
804                }
805            ],
806            "level": 1,
807            "file_size": 100
808        }
809    ]
810}"#;
811
812        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
813        assert_eq!(truncate_v2.region_id, 4402341478400);
814        assert_eq!(
815            truncate_v2.kind,
816            TruncateKind::Partial {
817                files_to_remove: vec![FileMeta {
818                    region_id: RegionId::from_u64(4402341478400),
819                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
820                    time_range: (
821                        Timestamp::new_millisecond(1451609210000),
822                        Timestamp::new_millisecond(1451609520000)
823                    ),
824                    level: 1,
825                    file_size: 100,
826                    ..Default::default()
827                }]
828            }
829        );
830    }
831
832    #[test]
833    fn test_region_manifest_removed_files() {
834        let region_metadata = r#"{
835                "column_metadatas": [
836                    {
837                        "column_schema": {
838                            "name": "a",
839                            "data_type": {"Int64": {}},
840                            "is_nullable": false,
841                            "is_time_index": false,
842                            "default_constraint": null,
843                            "metadata": {}
844                        },
845                        "semantic_type": "Tag",
846                        "column_id": 1
847                    },
848                    {
849                        "column_schema": {
850                            "name": "b",
851                            "data_type": {"Float64": {}},
852                            "is_nullable": false,
853                            "is_time_index": false,
854                            "default_constraint": null,
855                            "metadata": {}
856                        },
857                        "semantic_type": "Field",
858                        "column_id": 2
859                    },
860                    {
861                        "column_schema": {
862                            "name": "c",
863                            "data_type": {"Timestamp": {"Millisecond": null}},
864                            "is_nullable": false,
865                            "is_time_index": false,
866                            "default_constraint": null,
867                            "metadata": {}
868                        },
869                        "semantic_type": "Timestamp",
870                        "column_id": 3
871                    }
872                ],
873                "primary_key": [1],
874                "region_id": 4402341478400,
875                "schema_version": 0
876            }"#;
877
878        let metadata: RegionMetadataRef =
879            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
880        let manifest = RegionManifest {
881            metadata: metadata.clone(),
882            files: HashMap::new(),
883            flushed_entry_id: 0,
884            flushed_sequence: 0,
885            committed_sequence: None,
886            manifest_version: 0,
887            truncated_entry_id: None,
888            compaction_time_window: None,
889            removed_files: RemovedFilesRecord {
890                removed_files: vec![RemovedFiles {
891                    removed_at: 0,
892                    files: HashSet::from([RemovedFile::File(
893                        FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
894                        None,
895                    )]),
896                }],
897            },
898            sst_format: FormatType::PrimaryKey,
899        };
900
901        let json = serde_json::to_string(&manifest).unwrap();
902        let new: RegionManifest = serde_json::from_str(&json).unwrap();
903
904        assert_eq!(manifest, new);
905    }
906
907    /// Test if old version can still be deserialized then serialized to the new version.
908    #[test]
909    fn test_old_region_manifest_compat() {
910        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
911        pub struct RegionManifestV1 {
912            /// Metadata of the region.
913            pub metadata: RegionMetadataRef,
914            /// SST files.
915            pub files: HashMap<FileId, FileMeta>,
916            /// Last WAL entry id of flushed data.
917            pub flushed_entry_id: EntryId,
918            /// Last sequence of flushed data.
919            pub flushed_sequence: SequenceNumber,
920            /// Current manifest version.
921            pub manifest_version: ManifestVersion,
922            /// Last WAL entry id of truncated data.
923            pub truncated_entry_id: Option<EntryId>,
924            /// Inferred compaction time window.
925            #[serde(with = "humantime_serde")]
926            pub compaction_time_window: Option<Duration>,
927        }
928
929        let region_metadata = r#"{
930                "column_metadatas": [
931                    {
932                        "column_schema": {
933                            "name": "a",
934                            "data_type": {"Int64": {}},
935                            "is_nullable": false,
936                            "is_time_index": false,
937                            "default_constraint": null,
938                            "metadata": {}
939                        },
940                        "semantic_type": "Tag",
941                        "column_id": 1
942                    },
943                    {
944                        "column_schema": {
945                            "name": "b",
946                            "data_type": {"Float64": {}},
947                            "is_nullable": false,
948                            "is_time_index": false,
949                            "default_constraint": null,
950                            "metadata": {}
951                        },
952                        "semantic_type": "Field",
953                        "column_id": 2
954                    },
955                    {
956                        "column_schema": {
957                            "name": "c",
958                            "data_type": {"Timestamp": {"Millisecond": null}},
959                            "is_nullable": false,
960                            "is_time_index": false,
961                            "default_constraint": null,
962                            "metadata": {}
963                        },
964                        "semantic_type": "Timestamp",
965                        "column_id": 3
966                    }
967                ],
968                "primary_key": [1],
969                "region_id": 4402341478400,
970                "schema_version": 0
971            }"#;
972
973        let metadata: RegionMetadataRef =
974            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
975
976        // first test v1 empty to new
977        let v1 = RegionManifestV1 {
978            metadata: metadata.clone(),
979            files: HashMap::new(),
980            flushed_entry_id: 0,
981            flushed_sequence: 0,
982            manifest_version: 0,
983            truncated_entry_id: None,
984            compaction_time_window: None,
985        };
986        let json = serde_json::to_string(&v1).unwrap();
987        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
988        assert_eq!(
989            new_from_old,
990            RegionManifest {
991                metadata: metadata.clone(),
992                files: HashMap::new(),
993                removed_files: Default::default(),
994                flushed_entry_id: 0,
995                flushed_sequence: 0,
996                committed_sequence: None,
997                manifest_version: 0,
998                truncated_entry_id: None,
999                compaction_time_window: None,
1000                sst_format: FormatType::PrimaryKey,
1001            }
1002        );
1003
1004        let new_manifest = RegionManifest {
1005            metadata: metadata.clone(),
1006            files: HashMap::new(),
1007            removed_files: Default::default(),
1008            flushed_entry_id: 0,
1009            flushed_sequence: 0,
1010            committed_sequence: None,
1011            manifest_version: 0,
1012            truncated_entry_id: None,
1013            compaction_time_window: None,
1014            sst_format: FormatType::PrimaryKey,
1015        };
1016        let json = serde_json::to_string(&new_manifest).unwrap();
1017        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
1018        assert_eq!(
1019            old_from_new,
1020            RegionManifestV1 {
1021                metadata: metadata.clone(),
1022                files: HashMap::new(),
1023                flushed_entry_id: 0,
1024                flushed_sequence: 0,
1025                manifest_version: 0,
1026                truncated_entry_id: None,
1027                compaction_time_window: None,
1028            }
1029        );
1030
1031        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1032        pub struct RegionEditV1 {
1033            pub files_to_add: Vec<FileMeta>,
1034            pub files_to_remove: Vec<FileMeta>,
1035            #[serde(with = "humantime_serde")]
1036            pub compaction_time_window: Option<Duration>,
1037            pub flushed_entry_id: Option<EntryId>,
1038            pub flushed_sequence: Option<SequenceNumber>,
1039        }
1040
1041        let json = serde_json::to_string(&RegionEditV1 {
1042            files_to_add: vec![],
1043            files_to_remove: vec![],
1044            compaction_time_window: None,
1045            flushed_entry_id: None,
1046            flushed_sequence: None,
1047        })
1048        .unwrap();
1049        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
1050        assert_eq!(
1051            RegionEdit {
1052                files_to_add: vec![],
1053                files_to_remove: vec![],
1054                timestamp_ms: None,
1055                compaction_time_window: None,
1056                flushed_entry_id: None,
1057                flushed_sequence: None,
1058                committed_sequence: None,
1059            },
1060            new_from_old
1061        );
1062
1063        // test new version with timestamp_ms set can deserialize to old version
1064        let new = RegionEdit {
1065            files_to_add: vec![],
1066            files_to_remove: vec![],
1067            timestamp_ms: Some(42),
1068            compaction_time_window: None,
1069            flushed_entry_id: None,
1070            flushed_sequence: None,
1071            committed_sequence: None,
1072        };
1073
1074        let new_json = serde_json::to_string(&new).unwrap();
1075
1076        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
1077        assert_eq!(
1078            RegionEditV1 {
1079                files_to_add: vec![],
1080                files_to_remove: vec![],
1081                compaction_time_window: None,
1082                flushed_entry_id: None,
1083                flushed_sequence: None,
1084            },
1085            old_from_new
1086        );
1087    }
1088
1089    #[test]
1090    fn test_region_change_backward_compatibility() {
1091        // Test that we can deserialize a RegionChange without sst_format
1092        let region_change_json = r#"{
1093            "metadata": {
1094                "column_metadatas": [
1095                    {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
1096                    {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
1097                    {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
1098                ],
1099                "primary_key": [
1100                    1
1101                ],
1102                "region_id": 42,
1103                "schema_version": 0
1104            }
1105        }"#;
1106
1107        let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
1108        assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
1109
1110        // Test serialization and deserialization with sst_format
1111        let region_change = RegionChange {
1112            metadata: region_change.metadata.clone(),
1113            sst_format: FormatType::Flat,
1114        };
1115
1116        let serialized = serde_json::to_string(&region_change).unwrap();
1117        let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
1118        assert_eq!(deserialized.sst_format, FormatType::Flat);
1119    }
1120
1121    #[test]
1122    fn test_removed_file_compatibility() {
1123        let file_id = FileId::random();
1124        // Case 1: Deserialize from FileId string (Legacy format)
1125        let json_str = format!("\"{}\"", file_id);
1126        let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
1127        assert_eq!(removed_file, RemovedFile::File(file_id, None));
1128
1129        // Case 2: Deserialize from new format (File)
1130        let removed_file_v2 = RemovedFile::File(file_id, Some(10));
1131        let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
1132        let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
1133        assert_eq!(removed_file_v2, deserialized_v2);
1134
1135        // Case 3: Deserialize from new format (Index)
1136        let removed_index = RemovedFile::Index(file_id, 20);
1137        let json_index = serde_json::to_string(&removed_index).unwrap();
1138        let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
1139        assert_eq!(removed_index, deserialized_index);
1140
1141        // Case 4: Round-trip serialization/deserialization of new enum format with None as index version
1142        let removed_file = RemovedFile::File(file_id, None);
1143        let json = serde_json::to_string(&removed_file).unwrap();
1144        let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
1145        assert_eq!(removed_file, deserialized);
1146
1147        // Case 5: Deserialize mixed set in RemovedFilesRecord
1148        // This simulates a Set<RemovedFile> which might contain old strings or new objects if manually constructed or from old versions.
1149        // Actually, if it was HashSet<FileId>, the JSON is ["id1", "id2"].
1150        // If it is HashSet<RemovedFile>, the JSON is [{"File":...}, "id2"] if mixed (which shouldn't happen usually but good to test).
1151
1152        let json_set = format!("[\"{}\"]", file_id);
1153        let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
1154        assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
1155    }
1156
1157    /// It is intentionally acceptable to ignore the legacy `file_ids` field when
1158    /// deserializing [`RemovedFiles`].
1159    ///
1160    /// In older manifests, `file_ids` recorded the set of SSTable files that were
1161    /// candidates for garbage collection at a given `removed_at` timestamp. The
1162    /// newer format stores this information in the `files` field instead. When we
1163    /// deserialize an old manifest entry into the new struct, we *drop* the
1164    /// `file_ids` field instead of trying to recover or merge it.
1165    ///
1166    /// Dropping `file_ids` does **not** risk deleting live data: a file is only
1167    /// physically removed when it is both (a) no longer referenced by any region
1168    /// metadata and (b) selected by the GC worker as safe to delete. Losing the
1169    /// historical list of candidate `file_ids` merely means some obsolete files
1170    /// may stay on disk longer than strictly necessary.
1171    ///
1172    /// The GC worker periodically scans storage (e.g. by walking the data
1173    /// directories and/or consulting the latest manifest) to discover files that
1174    /// are no longer referenced anywhere. Any files that were only referenced via
1175    /// the dropped `file_ids` field will be rediscovered during these scans and
1176    /// eventually deleted. Thus the system converges to a correct, fully-collected
1177    /// state without relying on `file_ids`, and the only potential impact of
1178    /// ignoring it is temporary disk space overhead, not data loss.
1179    #[test]
1180    fn test_removed_files_backward_compatibility() {
1181        // Define the old version struct with file_ids field
1182        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1183        struct OldRemovedFiles {
1184            pub removed_at: i64,
1185            pub file_ids: HashSet<FileId>,
1186        }
1187
1188        // Create an old version instance
1189        let mut file_ids = HashSet::new();
1190        file_ids.insert(FileId::random());
1191        file_ids.insert(FileId::random());
1192
1193        let old_removed_files = OldRemovedFiles {
1194            removed_at: 1234567890,
1195            file_ids,
1196        };
1197
1198        // Serialize the old version
1199        let old_json = serde_json::to_string(&old_removed_files).unwrap();
1200
1201        // Try to deserialize into new version - file_ids should be ignored
1202        let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
1203
1204        // This should succeed and create a default RemovedFiles (empty files set)
1205        assert!(result.is_ok(), "{:?}", result);
1206        let removed_files = result.unwrap();
1207        assert_eq!(removed_files.removed_at, 1234567890);
1208        assert!(removed_files.files.is_empty());
1209
1210        // Test that new format still works
1211        let file_id = FileId::random();
1212        let new_json = format!(
1213            r#"{{
1214            "removed_at": 1234567890,
1215            "files": ["{}"]
1216        }}"#,
1217            file_id
1218        );
1219
1220        let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
1221        assert!(result.is_ok());
1222        let removed_files = result.unwrap();
1223        assert_eq!(removed_files.removed_at, 1234567890);
1224        assert_eq!(removed_files.files.len(), 1);
1225        assert!(
1226            removed_files
1227                .files
1228                .contains(&RemovedFile::File(file_id, None))
1229        );
1230    }
1231}