mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use common_telemetry::warn;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt};
24use store_api::ManifestVersion;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
27use strum::Display;
28
29use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
30use crate::manifest::manager::RemoveFileOptions;
31use crate::region::ManifestStats;
32use crate::sst::FormatType;
33use crate::sst::file::FileMeta;
34use crate::wal::EntryId;
35
36/// Actions that can be applied to region manifest.
37#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
38pub enum RegionMetaAction {
39    /// Change region's metadata for request like ALTER
40    Change(RegionChange),
41    /// Change only region partition expression metadata.
42    PartitionExprChange(RegionPartitionExprChange),
43    /// Edit region's state for changing options or file list.
44    Edit(RegionEdit),
45    /// Remove the region.
46    Remove(RegionRemove),
47    /// Truncate the region.
48    Truncate(RegionTruncate),
49}
50
51#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
52pub struct RegionPartitionExprChange {
53    /// Partition expression serialized as JSON.
54    pub partition_expr: Option<String>,
55}
56
57impl RegionMetaAction {
58    /// Returns true if the action is a change action.
59    pub fn is_change(&self) -> bool {
60        matches!(self, RegionMetaAction::Change(_))
61    }
62
63    /// Returns true if the action is an edit action.
64    pub fn is_edit(&self) -> bool {
65        matches!(self, RegionMetaAction::Edit(_))
66    }
67
68    /// Returns true if the action is a partition expr change action.
69    pub fn is_partition_expr_change(&self) -> bool {
70        matches!(self, RegionMetaAction::PartitionExprChange(_))
71    }
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
75pub struct RegionChange {
76    /// The metadata after changed.
77    pub metadata: RegionMetadataRef,
78    /// Format of the SST.
79    #[serde(default)]
80    pub sst_format: FormatType,
81    /// Whether the region is in append mode.
82    #[serde(default)]
83    pub append_mode: Option<bool>,
84}
85
86#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
87pub struct RegionEdit {
88    pub files_to_add: Vec<FileMeta>,
89    pub files_to_remove: Vec<FileMeta>,
90    /// event unix timestamp in milliseconds, help to determine file deletion time.
91    #[serde(default)]
92    pub timestamp_ms: Option<i64>,
93    #[serde(with = "humantime_serde")]
94    pub compaction_time_window: Option<Duration>,
95    pub flushed_entry_id: Option<EntryId>,
96    pub flushed_sequence: Option<SequenceNumber>,
97    pub committed_sequence: Option<SequenceNumber>,
98}
99
100#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
101pub struct RegionRemove {
102    pub region_id: RegionId,
103}
104
105/// Last data truncated in the region.
106///
107#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
108pub struct RegionTruncate {
109    pub region_id: RegionId,
110    #[serde(flatten)]
111    pub kind: TruncateKind,
112    /// event unix timestamp in milliseconds, help to determine file deletion time.
113    #[serde(default)]
114    pub timestamp_ms: Option<i64>,
115}
116
117/// The kind of truncate operation.
118#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
119#[serde(untagged)]
120pub enum TruncateKind {
121    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
122    All {
123        /// Last WAL entry id of truncated data.
124        truncated_entry_id: EntryId,
125        // Last sequence number of truncated data.
126        truncated_sequence: SequenceNumber,
127    },
128    /// Only remove certain files in the region.
129    Partial { files_to_remove: Vec<FileMeta> },
130}
131
132/// The region manifest data.
133#[derive(Serialize, Deserialize, Clone, Debug)]
134#[cfg_attr(test, derive(Eq))]
135pub struct RegionManifest {
136    /// Metadata of the region.
137    pub metadata: RegionMetadataRef,
138    /// SST files.
139    pub files: HashMap<FileId, FileMeta>,
140    /// Removed files, which are not in the current manifest but may still be kept for a while.
141    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
142    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
143    ///
144    /// Using same checkpoint files and action files, the recovered manifest may differ in this
145    /// `removed_files` field, because the checkpointer may evict some removed files using
146    /// current machine time. This is acceptable because the removed files are not used in normal
147    /// read/write path.
148    ///
149    #[serde(default)]
150    pub removed_files: RemovedFilesRecord,
151    /// Last WAL entry id of flushed data.
152    pub flushed_entry_id: EntryId,
153    /// Last sequence of flushed data.
154    pub flushed_sequence: SequenceNumber,
155    pub committed_sequence: Option<SequenceNumber>,
156    /// Current manifest version.
157    pub manifest_version: ManifestVersion,
158    /// Last WAL entry id of truncated data.
159    pub truncated_entry_id: Option<EntryId>,
160    /// Inferred compaction time window.
161    #[serde(with = "humantime_serde")]
162    pub compaction_time_window: Option<Duration>,
163    /// Format of the SST file.
164    #[serde(default)]
165    pub sst_format: FormatType,
166    /// Whether the region is in append mode.
167    #[serde(default)]
168    pub append_mode: Option<bool>,
169}
170
171#[cfg(test)]
172impl PartialEq for RegionManifest {
173    fn eq(&self, other: &Self) -> bool {
174        self.metadata == other.metadata
175            && self.files == other.files
176            && self.flushed_entry_id == other.flushed_entry_id
177            && self.flushed_sequence == other.flushed_sequence
178            && self.manifest_version == other.manifest_version
179            && self.truncated_entry_id == other.truncated_entry_id
180            && self.compaction_time_window == other.compaction_time_window
181            && self.committed_sequence == other.committed_sequence
182    }
183}
184
185#[derive(Debug, Default)]
186pub struct RegionManifestBuilder {
187    metadata: Option<RegionMetadataRef>,
188    files: HashMap<FileId, FileMeta>,
189    pub removed_files: RemovedFilesRecord,
190    flushed_entry_id: EntryId,
191    flushed_sequence: SequenceNumber,
192    manifest_version: ManifestVersion,
193    truncated_entry_id: Option<EntryId>,
194    compaction_time_window: Option<Duration>,
195    committed_sequence: Option<SequenceNumber>,
196    sst_format: FormatType,
197    append_mode: Option<bool>,
198}
199
200impl RegionManifestBuilder {
201    /// Start with a checkpoint.
202    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
203        if let Some(s) = checkpoint {
204            Self {
205                metadata: Some(s.metadata),
206                files: s.files,
207                removed_files: s.removed_files,
208                flushed_entry_id: s.flushed_entry_id,
209                manifest_version: s.manifest_version,
210                flushed_sequence: s.flushed_sequence,
211                truncated_entry_id: s.truncated_entry_id,
212                compaction_time_window: s.compaction_time_window,
213                committed_sequence: s.committed_sequence,
214                sst_format: s.sst_format,
215                append_mode: s.append_mode,
216            }
217        } else {
218            Default::default()
219        }
220    }
221
222    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
223        self.metadata = Some(change.metadata);
224        self.manifest_version = manifest_version;
225        self.sst_format = change.sst_format;
226        // Only update append_mode if the change specifies a value.
227        self.append_mode = change.append_mode.or(self.append_mode);
228    }
229
230    /// Applies a partition-expression-only metadata change.
231    ///
232    /// This path updates only `partition_expr` (and its derived
233    /// `partition_rule_version`) on current metadata and does not touch
234    /// `sst_format`.
235    pub fn apply_partition_expr_change(
236        &mut self,
237        manifest_version: ManifestVersion,
238        change: RegionPartitionExprChange,
239    ) {
240        if let Some(metadata) = &self.metadata {
241            let mut metadata = metadata.as_ref().clone();
242            metadata.set_partition_expr(change.partition_expr);
243            self.metadata = Some(metadata.into());
244            self.manifest_version = manifest_version;
245        } else {
246            warn!(
247                "metadata is not set in region manifest builder, ignore partition expr change: {:?}",
248                change
249            );
250        }
251    }
252
253    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
254        self.manifest_version = manifest_version;
255
256        let mut removed_files = vec![];
257        for file in edit.files_to_add {
258            if let Some(old_file) = self.files.insert(file.file_id, file.clone())
259                && let Some(old_index) = old_file.index_version()
260                && !old_file.is_index_up_to_date(&file)
261            {
262                // The old file has an index that is now outdated.
263                removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
264            }
265        }
266        removed_files.extend(
267            edit.files_to_remove
268                .iter()
269                .map(|f| RemovedFile::File(f.file_id, f.index_version())),
270        );
271        let at = edit
272            .timestamp_ms
273            .unwrap_or_else(|| Utc::now().timestamp_millis());
274        self.removed_files.add_removed_files(removed_files, at);
275
276        for file in edit.files_to_remove {
277            self.files.remove(&file.file_id);
278        }
279        if let Some(flushed_entry_id) = edit.flushed_entry_id {
280            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
281        }
282        if let Some(flushed_sequence) = edit.flushed_sequence {
283            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
284        }
285
286        if let Some(committed_sequence) = edit.committed_sequence {
287            self.committed_sequence = Some(
288                self.committed_sequence
289                    .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
290            );
291        }
292        if let Some(window) = edit.compaction_time_window {
293            self.compaction_time_window = Some(window);
294        }
295    }
296
297    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
298        self.manifest_version = manifest_version;
299        match truncate.kind {
300            TruncateKind::All {
301                truncated_entry_id,
302                truncated_sequence,
303            } => {
304                self.flushed_entry_id = truncated_entry_id;
305                self.flushed_sequence = truncated_sequence;
306                self.truncated_entry_id = Some(truncated_entry_id);
307                self.removed_files.add_removed_files(
308                    self.files
309                        .values()
310                        .map(|f| RemovedFile::File(f.file_id, f.index_version()))
311                        .collect(),
312                    truncate
313                        .timestamp_ms
314                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
315                );
316                self.files.clear();
317            }
318            TruncateKind::Partial { files_to_remove } => {
319                self.removed_files.add_removed_files(
320                    files_to_remove
321                        .iter()
322                        .map(|f| RemovedFile::File(f.file_id, f.index_version()))
323                        .collect(),
324                    truncate
325                        .timestamp_ms
326                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
327                );
328                for file in files_to_remove {
329                    self.files.remove(&file.file_id);
330                }
331            }
332        }
333    }
334
335    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
336        &self.files
337    }
338
339    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
340    pub fn contains_metadata(&self) -> bool {
341        self.metadata.is_some()
342    }
343
344    pub fn try_build(self) -> Result<RegionManifest> {
345        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
346        Ok(RegionManifest {
347            metadata,
348            files: self.files,
349            removed_files: self.removed_files,
350            flushed_entry_id: self.flushed_entry_id,
351            flushed_sequence: self.flushed_sequence,
352            committed_sequence: self.committed_sequence,
353            manifest_version: self.manifest_version,
354            truncated_entry_id: self.truncated_entry_id,
355            compaction_time_window: self.compaction_time_window,
356            sst_format: self.sst_format,
357            append_mode: self.append_mode,
358        })
359    }
360}
361
362/// A record of removed files in the region manifest.
363/// This is used to keep track of files that have been removed from the manifest but may still
364/// be kept for a while
365#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
366pub struct RemovedFilesRecord {
367    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
368    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
369    pub removed_files: Vec<RemovedFiles>,
370}
371
372impl RemovedFilesRecord {
373    /// Clear the actually deleted files from the list of removed files
374    pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
375        let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
376        for files in self.removed_files.iter_mut() {
377            files
378                .files
379                .retain(|removed| !deleted_file_set.contains(removed));
380        }
381
382        self.removed_files.retain(|fs| !fs.files.is_empty());
383    }
384
385    pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
386        let cnt = self
387            .removed_files
388            .iter()
389            .map(|r| r.files.len() as u64)
390            .sum();
391        stats
392            .file_removed_cnt
393            .store(cnt, std::sync::atomic::Ordering::Relaxed);
394    }
395}
396
397#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
398pub struct RemovedFiles {
399    /// The timestamp is the time when
400    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
401    pub removed_at: i64,
402    /// The set of file ids that are removed.
403    #[serde(default)]
404    pub files: HashSet<RemovedFile>,
405}
406
407/// A removed file, which can be a data file(optional paired with a index file) or an outdated index file.
408#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
409pub enum RemovedFile {
410    File(FileId, Option<IndexVersion>),
411    Index(FileId, IndexVersion),
412}
413
414/// Support deserialize from old format(just FileId as string) for backward compatibility
415/// into current format(RemovedFile enum).
416/// This is needed just in case there are old manifests with removed files recorded.
417impl<'de> Deserialize<'de> for RemovedFile {
418    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
419    where
420        D: serde::Deserializer<'de>,
421    {
422        #[derive(Deserialize)]
423        #[serde(untagged)]
424        enum CompatRemovedFile {
425            Enum(RemovedFileEnum),
426            FileId(FileId),
427        }
428
429        #[derive(Deserialize)]
430        enum RemovedFileEnum {
431            File(FileId, Option<IndexVersion>),
432            Index(FileId, IndexVersion),
433        }
434
435        let compat = CompatRemovedFile::deserialize(deserializer)?;
436        match compat {
437            CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
438            CompatRemovedFile::Enum(e) => match e {
439                RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
440                RemovedFileEnum::Index(file_id, version) => {
441                    Ok(RemovedFile::Index(file_id, version))
442                }
443            },
444        }
445    }
446}
447
448impl RemovedFile {
449    pub fn file_id(&self) -> FileId {
450        match self {
451            RemovedFile::File(file_id, _) => *file_id,
452            RemovedFile::Index(file_id, _) => *file_id,
453        }
454    }
455
456    pub fn index_version(&self) -> Option<IndexVersion> {
457        match self {
458            RemovedFile::File(_, index_version) => *index_version,
459            RemovedFile::Index(_, index_version) => Some(*index_version),
460        }
461    }
462}
463
464impl RemovedFilesRecord {
465    /// Add a record of removed files with the current timestamp.
466    pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
467        if removed.is_empty() {
468            return;
469        }
470        let files = removed.into_iter().collect();
471        self.removed_files.push(RemovedFiles {
472            removed_at: at,
473            files,
474        });
475    }
476
477    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
478        if !opt.enable_gc {
479            // If GC is not enabled, always keep removed files empty.
480            self.removed_files.clear();
481            return Ok(());
482        }
483
484        // if GC is enabled, rely on gc worker to delete files, and evict removed files based on options.
485
486        Ok(())
487    }
488}
489
490// The checkpoint of region manifest, generated by checkpointer.
491#[derive(Serialize, Deserialize, Debug, Clone)]
492#[cfg_attr(test, derive(PartialEq, Eq))]
493pub struct RegionCheckpoint {
494    /// The last manifest version that this checkpoint compacts(inclusive).
495    pub last_version: ManifestVersion,
496    // The number of manifest actions that this checkpoint compacts.
497    pub compacted_actions: usize,
498    // The checkpoint data
499    pub checkpoint: Option<RegionManifest>,
500}
501
502impl RegionCheckpoint {
503    pub fn last_version(&self) -> ManifestVersion {
504        self.last_version
505    }
506
507    pub fn encode(&self) -> Result<Vec<u8>> {
508        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
509
510        Ok(json.into_bytes())
511    }
512
513    pub fn decode(bytes: &[u8]) -> Result<Self> {
514        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
515
516        serde_json::from_str(data).context(SerdeJsonSnafu)
517    }
518}
519
520#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
521pub struct RegionMetaActionList {
522    pub actions: Vec<RegionMetaAction>,
523}
524
525impl RegionMetaActionList {
526    pub fn with_action(action: RegionMetaAction) -> Self {
527        Self {
528            actions: vec![action],
529        }
530    }
531
532    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
533        Self { actions }
534    }
535
536    /// Split the actions into a partition expr change, a region change and an edit.
537    pub fn split_region_change_and_edit(
538        self,
539    ) -> (
540        Option<RegionPartitionExprChange>,
541        Option<RegionChange>,
542        RegionEdit,
543    ) {
544        let mut edit = RegionEdit {
545            files_to_add: Vec::new(),
546            files_to_remove: Vec::new(),
547            timestamp_ms: None,
548            compaction_time_window: None,
549            flushed_entry_id: None,
550            flushed_sequence: None,
551            committed_sequence: None,
552        };
553        let mut partition_expr_change = None;
554        let mut region_change = None;
555        for action in self.actions {
556            match action {
557                RegionMetaAction::PartitionExprChange(change) => {
558                    partition_expr_change = Some(change);
559                }
560                RegionMetaAction::Change(change) => {
561                    region_change = Some(change);
562                }
563                RegionMetaAction::Edit(region_edit) => {
564                    // Merge file adds/removes
565                    edit.files_to_add.extend(region_edit.files_to_add);
566                    edit.files_to_remove.extend(region_edit.files_to_remove);
567                    // Max of flushed entry id / sequence
568                    if let Some(eid) = region_edit.flushed_entry_id {
569                        edit.flushed_entry_id =
570                            Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
571                    }
572                    if let Some(seq) = region_edit.flushed_sequence {
573                        edit.flushed_sequence =
574                            Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
575                    }
576                    if let Some(seq) = region_edit.committed_sequence {
577                        edit.committed_sequence =
578                            Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
579                    }
580                    // Prefer the latest non-none time window
581                    if region_edit.compaction_time_window.is_some() {
582                        edit.compaction_time_window = region_edit.compaction_time_window;
583                    }
584                }
585                _ => {}
586            }
587        }
588
589        (partition_expr_change, region_change, edit)
590    }
591}
592
593impl RegionMetaActionList {
594    /// Encode self into json in the form of string lines.
595    pub fn encode(&self) -> Result<Vec<u8>> {
596        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
597
598        Ok(json.into_bytes())
599    }
600
601    pub fn decode(bytes: &[u8]) -> Result<Self> {
602        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
603
604        serde_json::from_str(data).context(SerdeJsonSnafu)
605    }
606}
607
608#[cfg(test)]
609mod tests {
610
611    use common_time::Timestamp;
612
613    use super::*;
614
615    // These tests are used to ensure backward compatibility of manifest files.
616    // DO NOT modify the serialized string when they fail, check if your
617    // modification to manifest-related structs is compatible with older manifests.
618    #[test]
619    fn test_region_action_compatibility() {
620        let region_edit = r#"{
621            "flushed_entry_id":null,
622            "compaction_time_window":null,
623            "files_to_add":[
624            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
625            ],
626            "files_to_remove":[
627            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
628            ]
629        }"#;
630        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
631
632        let region_edit = r#"{
633            "flushed_entry_id":10,
634            "flushed_sequence":10,
635            "compaction_time_window":null,
636            "files_to_add":[
637            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
638            ],
639            "files_to_remove":[
640            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
641            ]
642        }"#;
643        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
644
645        // Note: For backward compatibility, the test accepts a RegionChange without sst_format
646        let region_change = r#" {
647            "metadata":{
648                "column_metadatas":[
649                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
650                ],
651                "primary_key":[1],
652                "region_id":5299989648942,
653                "schema_version":0
654            }
655            }"#;
656        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
657
658        let region_remove = r#"{"region_id":42}"#;
659        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
660
661        let region_partition_expr_change = r#"{
662            "partition_expr": "{\"expr\":\"x < 100\"}"
663        }"#;
664        let _ = serde_json::from_str::<RegionPartitionExprChange>(region_partition_expr_change)
665            .unwrap();
666    }
667
668    #[test]
669    fn test_region_manifest_builder() {
670        // TODO(ruihang): port this test case
671    }
672
673    #[test]
674    fn test_encode_decode_region_checkpoint() {
675        // TODO(ruihang): port this test case
676    }
677
678    #[test]
679    fn test_region_manifest_compatibility() {
680        // Test deserializing RegionManifest from old schema where FileId is a UUID string
681        let region_manifest_json = r#"{
682            "metadata": {
683                "column_metadatas": [
684                    {
685                        "column_schema": {
686                            "name": "a",
687                            "data_type": {"Int64": {}},
688                            "is_nullable": false,
689                            "is_time_index": false,
690                            "default_constraint": null,
691                            "metadata": {}
692                        },
693                        "semantic_type": "Tag",
694                        "column_id": 1
695                    },
696                    {
697                        "column_schema": {
698                            "name": "b",
699                            "data_type": {"Float64": {}},
700                            "is_nullable": false,
701                            "is_time_index": false,
702                            "default_constraint": null,
703                            "metadata": {}
704                        },
705                        "semantic_type": "Field",
706                        "column_id": 2
707                    },
708                    {
709                        "column_schema": {
710                            "name": "c",
711                            "data_type": {"Timestamp": {"Millisecond": null}},
712                            "is_nullable": false,
713                            "is_time_index": false,
714                            "default_constraint": null,
715                            "metadata": {}
716                        },
717                        "semantic_type": "Timestamp",
718                        "column_id": 3
719                    }
720                ],
721                "primary_key": [1],
722                "region_id": 4402341478400,
723                "schema_version": 0
724            },
725            "files": {
726                "4b220a70-2b03-4641-9687-b65d94641208": {
727                    "region_id": 4402341478400,
728                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
729                    "time_range": [
730                        {"value": 1451609210000, "unit": "Millisecond"},
731                        {"value": 1451609520000, "unit": "Millisecond"}
732                    ],
733                    "level": 1,
734                    "file_size": 100
735                },
736                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
737                    "region_id": 4402341478400,
738                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
739                    "time_range": [
740                        {"value": 1451609210000, "unit": "Millisecond"},
741                        {"value": 1451609520000, "unit": "Millisecond"}
742                    ],
743                    "level": 0,
744                    "file_size": 100
745                }
746            },
747            "flushed_entry_id": 10,
748            "flushed_sequence": 20,
749            "manifest_version": 1,
750            "truncated_entry_id": null,
751            "compaction_time_window": null
752        }"#;
753
754        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
755
756        // Verify that the files were correctly deserialized
757        assert_eq!(manifest.files.len(), 2);
758        assert_eq!(manifest.flushed_entry_id, 10);
759        assert_eq!(manifest.flushed_sequence, 20);
760        assert_eq!(manifest.manifest_version, 1);
761
762        // Verify that FileIds were correctly parsed from UUID strings
763        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
764        file_ids.sort_unstable();
765        assert_eq!(
766            file_ids,
767            vec![
768                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
769                "4b220a70-2b03-4641-9687-b65d94641208",
770            ]
771        );
772
773        // Roundtrip test with current FileId format
774        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
775        let deserialized_manifest: RegionManifest =
776            serde_json::from_str(&serialized_manifest).unwrap();
777        assert_eq!(manifest, deserialized_manifest);
778        assert_ne!(serialized_manifest, region_manifest_json);
779    }
780
781    #[test]
782    fn test_region_truncate_compat() {
783        // Test deserializing RegionTruncate from old schema
784        let region_truncate_json = r#"{
785            "region_id": 4402341478400,
786            "truncated_entry_id": 10,
787            "truncated_sequence": 20
788        }"#;
789
790        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
791        assert_eq!(truncate_v1.region_id, 4402341478400);
792        assert_eq!(
793            truncate_v1.kind,
794            TruncateKind::All {
795                truncated_entry_id: 10,
796                truncated_sequence: 20,
797            }
798        );
799
800        // Test deserializing RegionTruncate from new schema
801        let region_truncate_v2_json = r#"{
802    "region_id": 4402341478400,
803    "files_to_remove": [
804        {
805            "region_id": 4402341478400,
806            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
807            "time_range": [
808                {
809                    "value": 1451609210000,
810                    "unit": "Millisecond"
811                },
812                {
813                    "value": 1451609520000,
814                    "unit": "Millisecond"
815                }
816            ],
817            "level": 1,
818            "file_size": 100
819        }
820    ]
821}"#;
822
823        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
824        assert_eq!(truncate_v2.region_id, 4402341478400);
825        assert_eq!(
826            truncate_v2.kind,
827            TruncateKind::Partial {
828                files_to_remove: vec![FileMeta {
829                    region_id: RegionId::from_u64(4402341478400),
830                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
831                    time_range: (
832                        Timestamp::new_millisecond(1451609210000),
833                        Timestamp::new_millisecond(1451609520000)
834                    ),
835                    level: 1,
836                    file_size: 100,
837                    ..Default::default()
838                }]
839            }
840        );
841    }
842
843    #[test]
844    fn test_region_manifest_removed_files() {
845        let region_metadata = r#"{
846                "column_metadatas": [
847                    {
848                        "column_schema": {
849                            "name": "a",
850                            "data_type": {"Int64": {}},
851                            "is_nullable": false,
852                            "is_time_index": false,
853                            "default_constraint": null,
854                            "metadata": {}
855                        },
856                        "semantic_type": "Tag",
857                        "column_id": 1
858                    },
859                    {
860                        "column_schema": {
861                            "name": "b",
862                            "data_type": {"Float64": {}},
863                            "is_nullable": false,
864                            "is_time_index": false,
865                            "default_constraint": null,
866                            "metadata": {}
867                        },
868                        "semantic_type": "Field",
869                        "column_id": 2
870                    },
871                    {
872                        "column_schema": {
873                            "name": "c",
874                            "data_type": {"Timestamp": {"Millisecond": null}},
875                            "is_nullable": false,
876                            "is_time_index": false,
877                            "default_constraint": null,
878                            "metadata": {}
879                        },
880                        "semantic_type": "Timestamp",
881                        "column_id": 3
882                    }
883                ],
884                "primary_key": [1],
885                "region_id": 4402341478400,
886                "schema_version": 0
887            }"#;
888
889        let metadata: RegionMetadataRef =
890            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
891        let manifest = RegionManifest {
892            metadata: metadata.clone(),
893            files: HashMap::new(),
894            flushed_entry_id: 0,
895            flushed_sequence: 0,
896            committed_sequence: None,
897            manifest_version: 0,
898            truncated_entry_id: None,
899            compaction_time_window: None,
900            removed_files: RemovedFilesRecord {
901                removed_files: vec![RemovedFiles {
902                    removed_at: 0,
903                    files: HashSet::from([RemovedFile::File(
904                        FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
905                        None,
906                    )]),
907                }],
908            },
909            sst_format: FormatType::PrimaryKey,
910            append_mode: None,
911        };
912
913        let json = serde_json::to_string(&manifest).unwrap();
914        let new: RegionManifest = serde_json::from_str(&json).unwrap();
915
916        assert_eq!(manifest, new);
917    }
918
919    /// Test if old version can still be deserialized then serialized to the new version.
920    #[test]
921    fn test_old_region_manifest_compat() {
922        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
923        pub struct RegionManifestV1 {
924            /// Metadata of the region.
925            pub metadata: RegionMetadataRef,
926            /// SST files.
927            pub files: HashMap<FileId, FileMeta>,
928            /// Last WAL entry id of flushed data.
929            pub flushed_entry_id: EntryId,
930            /// Last sequence of flushed data.
931            pub flushed_sequence: SequenceNumber,
932            /// Current manifest version.
933            pub manifest_version: ManifestVersion,
934            /// Last WAL entry id of truncated data.
935            pub truncated_entry_id: Option<EntryId>,
936            /// Inferred compaction time window.
937            #[serde(with = "humantime_serde")]
938            pub compaction_time_window: Option<Duration>,
939        }
940
941        let region_metadata = r#"{
942                "column_metadatas": [
943                    {
944                        "column_schema": {
945                            "name": "a",
946                            "data_type": {"Int64": {}},
947                            "is_nullable": false,
948                            "is_time_index": false,
949                            "default_constraint": null,
950                            "metadata": {}
951                        },
952                        "semantic_type": "Tag",
953                        "column_id": 1
954                    },
955                    {
956                        "column_schema": {
957                            "name": "b",
958                            "data_type": {"Float64": {}},
959                            "is_nullable": false,
960                            "is_time_index": false,
961                            "default_constraint": null,
962                            "metadata": {}
963                        },
964                        "semantic_type": "Field",
965                        "column_id": 2
966                    },
967                    {
968                        "column_schema": {
969                            "name": "c",
970                            "data_type": {"Timestamp": {"Millisecond": null}},
971                            "is_nullable": false,
972                            "is_time_index": false,
973                            "default_constraint": null,
974                            "metadata": {}
975                        },
976                        "semantic_type": "Timestamp",
977                        "column_id": 3
978                    }
979                ],
980                "primary_key": [1],
981                "region_id": 4402341478400,
982                "schema_version": 0
983            }"#;
984
985        let metadata: RegionMetadataRef =
986            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
987
988        // first test v1 empty to new
989        let v1 = RegionManifestV1 {
990            metadata: metadata.clone(),
991            files: HashMap::new(),
992            flushed_entry_id: 0,
993            flushed_sequence: 0,
994            manifest_version: 0,
995            truncated_entry_id: None,
996            compaction_time_window: None,
997        };
998        let json = serde_json::to_string(&v1).unwrap();
999        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
1000        assert_eq!(
1001            new_from_old,
1002            RegionManifest {
1003                metadata: metadata.clone(),
1004                files: HashMap::new(),
1005                removed_files: Default::default(),
1006                flushed_entry_id: 0,
1007                flushed_sequence: 0,
1008                committed_sequence: None,
1009                manifest_version: 0,
1010                truncated_entry_id: None,
1011                compaction_time_window: None,
1012                sst_format: FormatType::PrimaryKey,
1013                append_mode: None,
1014            }
1015        );
1016
1017        let new_manifest = RegionManifest {
1018            metadata: metadata.clone(),
1019            files: HashMap::new(),
1020            removed_files: Default::default(),
1021            flushed_entry_id: 0,
1022            flushed_sequence: 0,
1023            committed_sequence: None,
1024            manifest_version: 0,
1025            truncated_entry_id: None,
1026            compaction_time_window: None,
1027            sst_format: FormatType::PrimaryKey,
1028            append_mode: None,
1029        };
1030        let json = serde_json::to_string(&new_manifest).unwrap();
1031        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
1032        assert_eq!(
1033            old_from_new,
1034            RegionManifestV1 {
1035                metadata: metadata.clone(),
1036                files: HashMap::new(),
1037                flushed_entry_id: 0,
1038                flushed_sequence: 0,
1039                manifest_version: 0,
1040                truncated_entry_id: None,
1041                compaction_time_window: None,
1042            }
1043        );
1044
1045        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1046        pub struct RegionEditV1 {
1047            pub files_to_add: Vec<FileMeta>,
1048            pub files_to_remove: Vec<FileMeta>,
1049            #[serde(with = "humantime_serde")]
1050            pub compaction_time_window: Option<Duration>,
1051            pub flushed_entry_id: Option<EntryId>,
1052            pub flushed_sequence: Option<SequenceNumber>,
1053        }
1054
1055        let json = serde_json::to_string(&RegionEditV1 {
1056            files_to_add: vec![],
1057            files_to_remove: vec![],
1058            compaction_time_window: None,
1059            flushed_entry_id: None,
1060            flushed_sequence: None,
1061        })
1062        .unwrap();
1063        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
1064        assert_eq!(
1065            RegionEdit {
1066                files_to_add: vec![],
1067                files_to_remove: vec![],
1068                timestamp_ms: None,
1069                compaction_time_window: None,
1070                flushed_entry_id: None,
1071                flushed_sequence: None,
1072                committed_sequence: None,
1073            },
1074            new_from_old
1075        );
1076
1077        // test new version with timestamp_ms set can deserialize to old version
1078        let new = RegionEdit {
1079            files_to_add: vec![],
1080            files_to_remove: vec![],
1081            timestamp_ms: Some(42),
1082            compaction_time_window: None,
1083            flushed_entry_id: None,
1084            flushed_sequence: None,
1085            committed_sequence: None,
1086        };
1087
1088        let new_json = serde_json::to_string(&new).unwrap();
1089
1090        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
1091        assert_eq!(
1092            RegionEditV1 {
1093                files_to_add: vec![],
1094                files_to_remove: vec![],
1095                compaction_time_window: None,
1096                flushed_entry_id: None,
1097                flushed_sequence: None,
1098            },
1099            old_from_new
1100        );
1101    }
1102
1103    #[test]
1104    fn test_region_change_backward_compatibility() {
1105        // Test that we can deserialize a RegionChange without sst_format
1106        let region_change_json = r#"{
1107            "metadata": {
1108                "column_metadatas": [
1109                    {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
1110                    {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
1111                    {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
1112                ],
1113                "primary_key": [
1114                    1
1115                ],
1116                "region_id": 42,
1117                "schema_version": 0
1118            }
1119        }"#;
1120
1121        let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
1122        assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
1123
1124        // Test serialization and deserialization with sst_format
1125        let region_change = RegionChange {
1126            metadata: region_change.metadata.clone(),
1127            sst_format: FormatType::Flat,
1128            append_mode: None,
1129        };
1130
1131        let serialized = serde_json::to_string(&region_change).unwrap();
1132        let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
1133        assert_eq!(deserialized.sst_format, FormatType::Flat);
1134    }
1135
1136    #[test]
1137    fn test_removed_file_compatibility() {
1138        let file_id = FileId::random();
1139        // Case 1: Deserialize from FileId string (Legacy format)
1140        let json_str = format!("\"{}\"", file_id);
1141        let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
1142        assert_eq!(removed_file, RemovedFile::File(file_id, None));
1143
1144        // Case 2: Deserialize from new format (File)
1145        let removed_file_v2 = RemovedFile::File(file_id, Some(10));
1146        let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
1147        let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
1148        assert_eq!(removed_file_v2, deserialized_v2);
1149
1150        // Case 3: Deserialize from new format (Index)
1151        let removed_index = RemovedFile::Index(file_id, 20);
1152        let json_index = serde_json::to_string(&removed_index).unwrap();
1153        let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
1154        assert_eq!(removed_index, deserialized_index);
1155
1156        // Case 4: Round-trip serialization/deserialization of new enum format with None as index version
1157        let removed_file = RemovedFile::File(file_id, None);
1158        let json = serde_json::to_string(&removed_file).unwrap();
1159        let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
1160        assert_eq!(removed_file, deserialized);
1161
1162        // Case 5: Deserialize mixed set in RemovedFilesRecord
1163        // This simulates a Set<RemovedFile> which might contain old strings or new objects if manually constructed or from old versions.
1164        // Actually, if it was HashSet<FileId>, the JSON is ["id1", "id2"].
1165        // If it is HashSet<RemovedFile>, the JSON is [{"File":...}, "id2"] if mixed (which shouldn't happen usually but good to test).
1166
1167        let json_set = format!("[\"{}\"]", file_id);
1168        let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
1169        assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
1170    }
1171
1172    /// It is intentionally acceptable to ignore the legacy `file_ids` field when
1173    /// deserializing [`RemovedFiles`].
1174    ///
1175    /// In older manifests, `file_ids` recorded the set of SSTable files that were
1176    /// candidates for garbage collection at a given `removed_at` timestamp. The
1177    /// newer format stores this information in the `files` field instead. When we
1178    /// deserialize an old manifest entry into the new struct, we *drop* the
1179    /// `file_ids` field instead of trying to recover or merge it.
1180    ///
1181    /// Dropping `file_ids` does **not** risk deleting live data: a file is only
1182    /// physically removed when it is both (a) no longer referenced by any region
1183    /// metadata and (b) selected by the GC worker as safe to delete. Losing the
1184    /// historical list of candidate `file_ids` merely means some obsolete files
1185    /// may stay on disk longer than strictly necessary.
1186    ///
1187    /// The GC worker periodically scans storage (e.g. by walking the data
1188    /// directories and/or consulting the latest manifest) to discover files that
1189    /// are no longer referenced anywhere. Any files that were only referenced via
1190    /// the dropped `file_ids` field will be rediscovered during these scans and
1191    /// eventually deleted. Thus the system converges to a correct, fully-collected
1192    /// state without relying on `file_ids`, and the only potential impact of
1193    /// ignoring it is temporary disk space overhead, not data loss.
1194    #[test]
1195    fn test_removed_files_backward_compatibility() {
1196        // Define the old version struct with file_ids field
1197        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1198        struct OldRemovedFiles {
1199            pub removed_at: i64,
1200            pub file_ids: HashSet<FileId>,
1201        }
1202
1203        // Create an old version instance
1204        let mut file_ids = HashSet::new();
1205        file_ids.insert(FileId::random());
1206        file_ids.insert(FileId::random());
1207
1208        let old_removed_files = OldRemovedFiles {
1209            removed_at: 1234567890,
1210            file_ids,
1211        };
1212
1213        // Serialize the old version
1214        let old_json = serde_json::to_string(&old_removed_files).unwrap();
1215
1216        // Try to deserialize into new version - file_ids should be ignored
1217        let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
1218
1219        // This should succeed and create a default RemovedFiles (empty files set)
1220        assert!(result.is_ok(), "{:?}", result);
1221        let removed_files = result.unwrap();
1222        assert_eq!(removed_files.removed_at, 1234567890);
1223        assert!(removed_files.files.is_empty());
1224
1225        // Test that new format still works
1226        let file_id = FileId::random();
1227        let new_json = format!(
1228            r#"{{
1229            "removed_at": 1234567890,
1230            "files": ["{}"]
1231        }}"#,
1232            file_id
1233        );
1234
1235        let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
1236        assert!(result.is_ok());
1237        let removed_files = result.unwrap();
1238        assert_eq!(removed_files.removed_at, 1234567890);
1239        assert_eq!(removed_files.files.len(), 1);
1240        assert!(
1241            removed_files
1242                .files
1243                .contains(&RemovedFile::File(file_id, None))
1244        );
1245    }
1246}