mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
29use crate::manifest::manager::RemoveFileOptions;
30use crate::region::ManifestStats;
31use crate::sst::FormatType;
32use crate::sst::file::FileMeta;
33use crate::wal::EntryId;
34
35/// Actions that can be applied to region manifest.
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38    /// Change region's metadata for request like ALTER
39    Change(RegionChange),
40    /// Edit region's state for changing options or file list.
41    Edit(RegionEdit),
42    /// Remove the region.
43    Remove(RegionRemove),
44    /// Truncate the region.
45    Truncate(RegionTruncate),
46}
47
48impl RegionMetaAction {
49    /// Returns true if the action is a change action.
50    pub fn is_change(&self) -> bool {
51        matches!(self, RegionMetaAction::Change(_))
52    }
53
54    /// Returns true if the action is an edit action.
55    pub fn is_edit(&self) -> bool {
56        matches!(self, RegionMetaAction::Edit(_))
57    }
58}
59
60#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
61pub struct RegionChange {
62    /// The metadata after changed.
63    pub metadata: RegionMetadataRef,
64    /// Format of the SST.
65    #[serde(default)]
66    pub sst_format: FormatType,
67}
68
69#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
70pub struct RegionEdit {
71    pub files_to_add: Vec<FileMeta>,
72    pub files_to_remove: Vec<FileMeta>,
73    /// event unix timestamp in milliseconds, help to determine file deletion time.
74    #[serde(default)]
75    pub timestamp_ms: Option<i64>,
76    #[serde(with = "humantime_serde")]
77    pub compaction_time_window: Option<Duration>,
78    pub flushed_entry_id: Option<EntryId>,
79    pub flushed_sequence: Option<SequenceNumber>,
80    pub committed_sequence: Option<SequenceNumber>,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
84pub struct RegionRemove {
85    pub region_id: RegionId,
86}
87
88/// Last data truncated in the region.
89///
90#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
91pub struct RegionTruncate {
92    pub region_id: RegionId,
93    #[serde(flatten)]
94    pub kind: TruncateKind,
95    /// event unix timestamp in milliseconds, help to determine file deletion time.
96    #[serde(default)]
97    pub timestamp_ms: Option<i64>,
98}
99
100/// The kind of truncate operation.
101#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
102#[serde(untagged)]
103pub enum TruncateKind {
104    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
105    All {
106        /// Last WAL entry id of truncated data.
107        truncated_entry_id: EntryId,
108        // Last sequence number of truncated data.
109        truncated_sequence: SequenceNumber,
110    },
111    /// Only remove certain files in the region.
112    Partial { files_to_remove: Vec<FileMeta> },
113}
114
115/// The region manifest data.
116#[derive(Serialize, Deserialize, Clone, Debug)]
117#[cfg_attr(test, derive(Eq))]
118pub struct RegionManifest {
119    /// Metadata of the region.
120    pub metadata: RegionMetadataRef,
121    /// SST files.
122    pub files: HashMap<FileId, FileMeta>,
123    /// Removed files, which are not in the current manifest but may still be kept for a while.
124    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
125    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
126    ///
127    /// Using same checkpoint files and action files, the recovered manifest may differ in this
128    /// `removed_files` field, because the checkpointer may evict some removed files using
129    /// current machine time. This is acceptable because the removed files are not used in normal
130    /// read/write path.
131    ///
132    #[serde(default)]
133    pub removed_files: RemovedFilesRecord,
134    /// Last WAL entry id of flushed data.
135    pub flushed_entry_id: EntryId,
136    /// Last sequence of flushed data.
137    pub flushed_sequence: SequenceNumber,
138    pub committed_sequence: Option<SequenceNumber>,
139    /// Current manifest version.
140    pub manifest_version: ManifestVersion,
141    /// Last WAL entry id of truncated data.
142    pub truncated_entry_id: Option<EntryId>,
143    /// Inferred compaction time window.
144    #[serde(with = "humantime_serde")]
145    pub compaction_time_window: Option<Duration>,
146    /// Format of the SST file.
147    #[serde(default)]
148    pub sst_format: FormatType,
149}
150
151#[cfg(test)]
152impl PartialEq for RegionManifest {
153    fn eq(&self, other: &Self) -> bool {
154        self.metadata == other.metadata
155            && self.files == other.files
156            && self.flushed_entry_id == other.flushed_entry_id
157            && self.flushed_sequence == other.flushed_sequence
158            && self.manifest_version == other.manifest_version
159            && self.truncated_entry_id == other.truncated_entry_id
160            && self.compaction_time_window == other.compaction_time_window
161            && self.committed_sequence == other.committed_sequence
162    }
163}
164
165#[derive(Debug, Default)]
166pub struct RegionManifestBuilder {
167    metadata: Option<RegionMetadataRef>,
168    files: HashMap<FileId, FileMeta>,
169    pub removed_files: RemovedFilesRecord,
170    flushed_entry_id: EntryId,
171    flushed_sequence: SequenceNumber,
172    manifest_version: ManifestVersion,
173    truncated_entry_id: Option<EntryId>,
174    compaction_time_window: Option<Duration>,
175    committed_sequence: Option<SequenceNumber>,
176    sst_format: FormatType,
177}
178
179impl RegionManifestBuilder {
180    /// Start with a checkpoint.
181    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
182        if let Some(s) = checkpoint {
183            Self {
184                metadata: Some(s.metadata),
185                files: s.files,
186                removed_files: s.removed_files,
187                flushed_entry_id: s.flushed_entry_id,
188                manifest_version: s.manifest_version,
189                flushed_sequence: s.flushed_sequence,
190                truncated_entry_id: s.truncated_entry_id,
191                compaction_time_window: s.compaction_time_window,
192                committed_sequence: s.committed_sequence,
193                sst_format: s.sst_format,
194            }
195        } else {
196            Default::default()
197        }
198    }
199
200    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
201        self.metadata = Some(change.metadata);
202        self.manifest_version = manifest_version;
203        self.sst_format = change.sst_format;
204    }
205
206    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
207        self.manifest_version = manifest_version;
208
209        let mut removed_files = vec![];
210        for file in edit.files_to_add {
211            if let Some(old_file) = self.files.insert(file.file_id, file.clone())
212                && let Some(old_index) = old_file.index_version()
213                && !old_file.is_index_up_to_date(&file)
214            {
215                // The old file has an index that is now outdated.
216                removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
217            }
218        }
219        removed_files.extend(
220            edit.files_to_remove
221                .iter()
222                .map(|f| RemovedFile::File(f.file_id, f.index_version())),
223        );
224        let at = edit
225            .timestamp_ms
226            .unwrap_or_else(|| Utc::now().timestamp_millis());
227        self.removed_files.add_removed_files(removed_files, at);
228
229        for file in edit.files_to_remove {
230            self.files.remove(&file.file_id);
231        }
232        if let Some(flushed_entry_id) = edit.flushed_entry_id {
233            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
234        }
235        if let Some(flushed_sequence) = edit.flushed_sequence {
236            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
237        }
238
239        if let Some(committed_sequence) = edit.committed_sequence {
240            self.committed_sequence = Some(
241                self.committed_sequence
242                    .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
243            );
244        }
245        if let Some(window) = edit.compaction_time_window {
246            self.compaction_time_window = Some(window);
247        }
248    }
249
250    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
251        self.manifest_version = manifest_version;
252        match truncate.kind {
253            TruncateKind::All {
254                truncated_entry_id,
255                truncated_sequence,
256            } => {
257                self.flushed_entry_id = truncated_entry_id;
258                self.flushed_sequence = truncated_sequence;
259                self.truncated_entry_id = Some(truncated_entry_id);
260                self.removed_files.add_removed_files(
261                    self.files
262                        .values()
263                        .map(|f| RemovedFile::File(f.file_id, f.index_version()))
264                        .collect(),
265                    truncate
266                        .timestamp_ms
267                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
268                );
269                self.files.clear();
270            }
271            TruncateKind::Partial { files_to_remove } => {
272                self.removed_files.add_removed_files(
273                    files_to_remove
274                        .iter()
275                        .map(|f| RemovedFile::File(f.file_id, f.index_version()))
276                        .collect(),
277                    truncate
278                        .timestamp_ms
279                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
280                );
281                for file in files_to_remove {
282                    self.files.remove(&file.file_id);
283                }
284            }
285        }
286    }
287
288    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
289        &self.files
290    }
291
292    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
293    pub fn contains_metadata(&self) -> bool {
294        self.metadata.is_some()
295    }
296
297    pub fn try_build(self) -> Result<RegionManifest> {
298        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
299        Ok(RegionManifest {
300            metadata,
301            files: self.files,
302            removed_files: self.removed_files,
303            flushed_entry_id: self.flushed_entry_id,
304            flushed_sequence: self.flushed_sequence,
305            committed_sequence: self.committed_sequence,
306            manifest_version: self.manifest_version,
307            truncated_entry_id: self.truncated_entry_id,
308            compaction_time_window: self.compaction_time_window,
309            sst_format: self.sst_format,
310        })
311    }
312}
313
314/// A record of removed files in the region manifest.
315/// This is used to keep track of files that have been removed from the manifest but may still
316/// be kept for a while
317#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
318pub struct RemovedFilesRecord {
319    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
320    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
321    pub removed_files: Vec<RemovedFiles>,
322}
323
324impl RemovedFilesRecord {
325    /// Clear the actually deleted files from the list of removed files
326    pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
327        let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
328        for files in self.removed_files.iter_mut() {
329            files
330                .files
331                .retain(|removed| !deleted_file_set.contains(removed));
332        }
333
334        self.removed_files.retain(|fs| !fs.files.is_empty());
335    }
336
337    pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
338        let cnt = self
339            .removed_files
340            .iter()
341            .map(|r| r.files.len() as u64)
342            .sum();
343        stats
344            .file_removed_cnt
345            .store(cnt, std::sync::atomic::Ordering::Relaxed);
346    }
347}
348
349#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
350pub struct RemovedFiles {
351    /// The timestamp is the time when
352    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
353    pub removed_at: i64,
354    /// The set of file ids that are removed.
355    #[serde(default)]
356    pub files: HashSet<RemovedFile>,
357}
358
359/// A removed file, which can be a data file(optional paired with a index file) or an outdated index file.
360#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
361pub enum RemovedFile {
362    File(FileId, Option<IndexVersion>),
363    Index(FileId, IndexVersion),
364}
365
366/// Support deserialize from old format(just FileId as string) for backward compatibility
367/// into current format(RemovedFile enum).
368/// This is needed just in case there are old manifests with removed files recorded.
369impl<'de> Deserialize<'de> for RemovedFile {
370    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
371    where
372        D: serde::Deserializer<'de>,
373    {
374        #[derive(Deserialize)]
375        #[serde(untagged)]
376        enum CompatRemovedFile {
377            Enum(RemovedFileEnum),
378            FileId(FileId),
379        }
380
381        #[derive(Deserialize)]
382        enum RemovedFileEnum {
383            File(FileId, Option<IndexVersion>),
384            Index(FileId, IndexVersion),
385        }
386
387        let compat = CompatRemovedFile::deserialize(deserializer)?;
388        match compat {
389            CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
390            CompatRemovedFile::Enum(e) => match e {
391                RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
392                RemovedFileEnum::Index(file_id, version) => {
393                    Ok(RemovedFile::Index(file_id, version))
394                }
395            },
396        }
397    }
398}
399
400impl RemovedFile {
401    pub fn file_id(&self) -> FileId {
402        match self {
403            RemovedFile::File(file_id, _) => *file_id,
404            RemovedFile::Index(file_id, _) => *file_id,
405        }
406    }
407
408    pub fn index_version(&self) -> Option<IndexVersion> {
409        match self {
410            RemovedFile::File(_, index_version) => *index_version,
411            RemovedFile::Index(_, index_version) => Some(*index_version),
412        }
413    }
414}
415
416impl RemovedFilesRecord {
417    /// Add a record of removed files with the current timestamp.
418    pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
419        if removed.is_empty() {
420            return;
421        }
422        let files = removed.into_iter().collect();
423        self.removed_files.push(RemovedFiles {
424            removed_at: at,
425            files,
426        });
427    }
428
429    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
430        if !opt.enable_gc {
431            // If GC is not enabled, always keep removed files empty.
432            self.removed_files.clear();
433            return Ok(());
434        }
435
436        // if GC is enabled, rely on gc worker to delete files, and evict removed files based on options.
437
438        Ok(())
439    }
440}
441
442// The checkpoint of region manifest, generated by checkpointer.
443#[derive(Serialize, Deserialize, Debug, Clone)]
444#[cfg_attr(test, derive(PartialEq, Eq))]
445pub struct RegionCheckpoint {
446    /// The last manifest version that this checkpoint compacts(inclusive).
447    pub last_version: ManifestVersion,
448    // The number of manifest actions that this checkpoint compacts.
449    pub compacted_actions: usize,
450    // The checkpoint data
451    pub checkpoint: Option<RegionManifest>,
452}
453
454impl RegionCheckpoint {
455    pub fn last_version(&self) -> ManifestVersion {
456        self.last_version
457    }
458
459    pub fn encode(&self) -> Result<Vec<u8>> {
460        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
461
462        Ok(json.into_bytes())
463    }
464
465    pub fn decode(bytes: &[u8]) -> Result<Self> {
466        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
467
468        serde_json::from_str(data).context(SerdeJsonSnafu)
469    }
470}
471
472#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
473pub struct RegionMetaActionList {
474    pub actions: Vec<RegionMetaAction>,
475}
476
477impl RegionMetaActionList {
478    pub fn with_action(action: RegionMetaAction) -> Self {
479        Self {
480            actions: vec![action],
481        }
482    }
483
484    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
485        Self { actions }
486    }
487
488    /// Split the actions into a region change and an edit.
489    pub fn split_region_change_and_edit(self) -> (Option<RegionChange>, RegionEdit) {
490        let mut edit = RegionEdit {
491            files_to_add: Vec::new(),
492            files_to_remove: Vec::new(),
493            timestamp_ms: None,
494            compaction_time_window: None,
495            flushed_entry_id: None,
496            flushed_sequence: None,
497            committed_sequence: None,
498        };
499        let mut region_change = None;
500        for action in self.actions {
501            match action {
502                RegionMetaAction::Change(change) => {
503                    region_change = Some(change);
504                }
505                RegionMetaAction::Edit(region_edit) => {
506                    // Merge file adds/removes
507                    edit.files_to_add.extend(region_edit.files_to_add);
508                    edit.files_to_remove.extend(region_edit.files_to_remove);
509                    // Max of flushed entry id / sequence
510                    if let Some(eid) = region_edit.flushed_entry_id {
511                        edit.flushed_entry_id =
512                            Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
513                    }
514                    if let Some(seq) = region_edit.flushed_sequence {
515                        edit.flushed_sequence =
516                            Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
517                    }
518                    if let Some(seq) = region_edit.committed_sequence {
519                        edit.committed_sequence =
520                            Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
521                    }
522                    // Prefer the latest non-none time window
523                    if region_edit.compaction_time_window.is_some() {
524                        edit.compaction_time_window = region_edit.compaction_time_window;
525                    }
526                }
527                _ => {}
528            }
529        }
530
531        (region_change, edit)
532    }
533}
534
535impl RegionMetaActionList {
536    /// Encode self into json in the form of string lines.
537    pub fn encode(&self) -> Result<Vec<u8>> {
538        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
539
540        Ok(json.into_bytes())
541    }
542
543    pub fn decode(bytes: &[u8]) -> Result<Self> {
544        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
545
546        serde_json::from_str(data).context(SerdeJsonSnafu)
547    }
548}
549
550#[cfg(test)]
551mod tests {
552
553    use common_time::Timestamp;
554
555    use super::*;
556
557    // These tests are used to ensure backward compatibility of manifest files.
558    // DO NOT modify the serialized string when they fail, check if your
559    // modification to manifest-related structs is compatible with older manifests.
560    #[test]
561    fn test_region_action_compatibility() {
562        let region_edit = r#"{
563            "flushed_entry_id":null,
564            "compaction_time_window":null,
565            "files_to_add":[
566            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
567            ],
568            "files_to_remove":[
569            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
570            ]
571        }"#;
572        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
573
574        let region_edit = r#"{
575            "flushed_entry_id":10,
576            "flushed_sequence":10,
577            "compaction_time_window":null,
578            "files_to_add":[
579            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
580            ],
581            "files_to_remove":[
582            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
583            ]
584        }"#;
585        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
586
587        // Note: For backward compatibility, the test accepts a RegionChange without sst_format
588        let region_change = r#" {
589            "metadata":{
590                "column_metadatas":[
591                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
592                ],
593                "primary_key":[1],
594                "region_id":5299989648942,
595                "schema_version":0
596            }
597            }"#;
598        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
599
600        let region_remove = r#"{"region_id":42}"#;
601        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
602    }
603
604    #[test]
605    fn test_region_manifest_builder() {
606        // TODO(ruihang): port this test case
607    }
608
609    #[test]
610    fn test_encode_decode_region_checkpoint() {
611        // TODO(ruihang): port this test case
612    }
613
614    #[test]
615    fn test_region_manifest_compatibility() {
616        // Test deserializing RegionManifest from old schema where FileId is a UUID string
617        let region_manifest_json = r#"{
618            "metadata": {
619                "column_metadatas": [
620                    {
621                        "column_schema": {
622                            "name": "a",
623                            "data_type": {"Int64": {}},
624                            "is_nullable": false,
625                            "is_time_index": false,
626                            "default_constraint": null,
627                            "metadata": {}
628                        },
629                        "semantic_type": "Tag",
630                        "column_id": 1
631                    },
632                    {
633                        "column_schema": {
634                            "name": "b",
635                            "data_type": {"Float64": {}},
636                            "is_nullable": false,
637                            "is_time_index": false,
638                            "default_constraint": null,
639                            "metadata": {}
640                        },
641                        "semantic_type": "Field",
642                        "column_id": 2
643                    },
644                    {
645                        "column_schema": {
646                            "name": "c",
647                            "data_type": {"Timestamp": {"Millisecond": null}},
648                            "is_nullable": false,
649                            "is_time_index": false,
650                            "default_constraint": null,
651                            "metadata": {}
652                        },
653                        "semantic_type": "Timestamp",
654                        "column_id": 3
655                    }
656                ],
657                "primary_key": [1],
658                "region_id": 4402341478400,
659                "schema_version": 0
660            },
661            "files": {
662                "4b220a70-2b03-4641-9687-b65d94641208": {
663                    "region_id": 4402341478400,
664                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
665                    "time_range": [
666                        {"value": 1451609210000, "unit": "Millisecond"},
667                        {"value": 1451609520000, "unit": "Millisecond"}
668                    ],
669                    "level": 1,
670                    "file_size": 100
671                },
672                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
673                    "region_id": 4402341478400,
674                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
675                    "time_range": [
676                        {"value": 1451609210000, "unit": "Millisecond"},
677                        {"value": 1451609520000, "unit": "Millisecond"}
678                    ],
679                    "level": 0,
680                    "file_size": 100
681                }
682            },
683            "flushed_entry_id": 10,
684            "flushed_sequence": 20,
685            "manifest_version": 1,
686            "truncated_entry_id": null,
687            "compaction_time_window": null
688        }"#;
689
690        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
691
692        // Verify that the files were correctly deserialized
693        assert_eq!(manifest.files.len(), 2);
694        assert_eq!(manifest.flushed_entry_id, 10);
695        assert_eq!(manifest.flushed_sequence, 20);
696        assert_eq!(manifest.manifest_version, 1);
697
698        // Verify that FileIds were correctly parsed from UUID strings
699        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
700        file_ids.sort_unstable();
701        assert_eq!(
702            file_ids,
703            vec![
704                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
705                "4b220a70-2b03-4641-9687-b65d94641208",
706            ]
707        );
708
709        // Roundtrip test with current FileId format
710        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
711        let deserialized_manifest: RegionManifest =
712            serde_json::from_str(&serialized_manifest).unwrap();
713        assert_eq!(manifest, deserialized_manifest);
714        assert_ne!(serialized_manifest, region_manifest_json);
715    }
716
717    #[test]
718    fn test_region_truncate_compat() {
719        // Test deserializing RegionTruncate from old schema
720        let region_truncate_json = r#"{
721            "region_id": 4402341478400,
722            "truncated_entry_id": 10,
723            "truncated_sequence": 20
724        }"#;
725
726        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
727        assert_eq!(truncate_v1.region_id, 4402341478400);
728        assert_eq!(
729            truncate_v1.kind,
730            TruncateKind::All {
731                truncated_entry_id: 10,
732                truncated_sequence: 20,
733            }
734        );
735
736        // Test deserializing RegionTruncate from new schema
737        let region_truncate_v2_json = r#"{
738    "region_id": 4402341478400,
739    "files_to_remove": [
740        {
741            "region_id": 4402341478400,
742            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
743            "time_range": [
744                {
745                    "value": 1451609210000,
746                    "unit": "Millisecond"
747                },
748                {
749                    "value": 1451609520000,
750                    "unit": "Millisecond"
751                }
752            ],
753            "level": 1,
754            "file_size": 100
755        }
756    ]
757}"#;
758
759        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
760        assert_eq!(truncate_v2.region_id, 4402341478400);
761        assert_eq!(
762            truncate_v2.kind,
763            TruncateKind::Partial {
764                files_to_remove: vec![FileMeta {
765                    region_id: RegionId::from_u64(4402341478400),
766                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
767                    time_range: (
768                        Timestamp::new_millisecond(1451609210000),
769                        Timestamp::new_millisecond(1451609520000)
770                    ),
771                    level: 1,
772                    file_size: 100,
773                    ..Default::default()
774                }]
775            }
776        );
777    }
778
779    #[test]
780    fn test_region_manifest_removed_files() {
781        let region_metadata = r#"{
782                "column_metadatas": [
783                    {
784                        "column_schema": {
785                            "name": "a",
786                            "data_type": {"Int64": {}},
787                            "is_nullable": false,
788                            "is_time_index": false,
789                            "default_constraint": null,
790                            "metadata": {}
791                        },
792                        "semantic_type": "Tag",
793                        "column_id": 1
794                    },
795                    {
796                        "column_schema": {
797                            "name": "b",
798                            "data_type": {"Float64": {}},
799                            "is_nullable": false,
800                            "is_time_index": false,
801                            "default_constraint": null,
802                            "metadata": {}
803                        },
804                        "semantic_type": "Field",
805                        "column_id": 2
806                    },
807                    {
808                        "column_schema": {
809                            "name": "c",
810                            "data_type": {"Timestamp": {"Millisecond": null}},
811                            "is_nullable": false,
812                            "is_time_index": false,
813                            "default_constraint": null,
814                            "metadata": {}
815                        },
816                        "semantic_type": "Timestamp",
817                        "column_id": 3
818                    }
819                ],
820                "primary_key": [1],
821                "region_id": 4402341478400,
822                "schema_version": 0
823            }"#;
824
825        let metadata: RegionMetadataRef =
826            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
827        let manifest = RegionManifest {
828            metadata: metadata.clone(),
829            files: HashMap::new(),
830            flushed_entry_id: 0,
831            flushed_sequence: 0,
832            committed_sequence: None,
833            manifest_version: 0,
834            truncated_entry_id: None,
835            compaction_time_window: None,
836            removed_files: RemovedFilesRecord {
837                removed_files: vec![RemovedFiles {
838                    removed_at: 0,
839                    files: HashSet::from([RemovedFile::File(
840                        FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
841                        None,
842                    )]),
843                }],
844            },
845            sst_format: FormatType::PrimaryKey,
846        };
847
848        let json = serde_json::to_string(&manifest).unwrap();
849        let new: RegionManifest = serde_json::from_str(&json).unwrap();
850
851        assert_eq!(manifest, new);
852    }
853
854    /// Test if old version can still be deserialized then serialized to the new version.
855    #[test]
856    fn test_old_region_manifest_compat() {
857        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
858        pub struct RegionManifestV1 {
859            /// Metadata of the region.
860            pub metadata: RegionMetadataRef,
861            /// SST files.
862            pub files: HashMap<FileId, FileMeta>,
863            /// Last WAL entry id of flushed data.
864            pub flushed_entry_id: EntryId,
865            /// Last sequence of flushed data.
866            pub flushed_sequence: SequenceNumber,
867            /// Current manifest version.
868            pub manifest_version: ManifestVersion,
869            /// Last WAL entry id of truncated data.
870            pub truncated_entry_id: Option<EntryId>,
871            /// Inferred compaction time window.
872            #[serde(with = "humantime_serde")]
873            pub compaction_time_window: Option<Duration>,
874        }
875
876        let region_metadata = r#"{
877                "column_metadatas": [
878                    {
879                        "column_schema": {
880                            "name": "a",
881                            "data_type": {"Int64": {}},
882                            "is_nullable": false,
883                            "is_time_index": false,
884                            "default_constraint": null,
885                            "metadata": {}
886                        },
887                        "semantic_type": "Tag",
888                        "column_id": 1
889                    },
890                    {
891                        "column_schema": {
892                            "name": "b",
893                            "data_type": {"Float64": {}},
894                            "is_nullable": false,
895                            "is_time_index": false,
896                            "default_constraint": null,
897                            "metadata": {}
898                        },
899                        "semantic_type": "Field",
900                        "column_id": 2
901                    },
902                    {
903                        "column_schema": {
904                            "name": "c",
905                            "data_type": {"Timestamp": {"Millisecond": null}},
906                            "is_nullable": false,
907                            "is_time_index": false,
908                            "default_constraint": null,
909                            "metadata": {}
910                        },
911                        "semantic_type": "Timestamp",
912                        "column_id": 3
913                    }
914                ],
915                "primary_key": [1],
916                "region_id": 4402341478400,
917                "schema_version": 0
918            }"#;
919
920        let metadata: RegionMetadataRef =
921            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
922
923        // first test v1 empty to new
924        let v1 = RegionManifestV1 {
925            metadata: metadata.clone(),
926            files: HashMap::new(),
927            flushed_entry_id: 0,
928            flushed_sequence: 0,
929            manifest_version: 0,
930            truncated_entry_id: None,
931            compaction_time_window: None,
932        };
933        let json = serde_json::to_string(&v1).unwrap();
934        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
935        assert_eq!(
936            new_from_old,
937            RegionManifest {
938                metadata: metadata.clone(),
939                files: HashMap::new(),
940                removed_files: Default::default(),
941                flushed_entry_id: 0,
942                flushed_sequence: 0,
943                committed_sequence: None,
944                manifest_version: 0,
945                truncated_entry_id: None,
946                compaction_time_window: None,
947                sst_format: FormatType::PrimaryKey,
948            }
949        );
950
951        let new_manifest = RegionManifest {
952            metadata: metadata.clone(),
953            files: HashMap::new(),
954            removed_files: Default::default(),
955            flushed_entry_id: 0,
956            flushed_sequence: 0,
957            committed_sequence: None,
958            manifest_version: 0,
959            truncated_entry_id: None,
960            compaction_time_window: None,
961            sst_format: FormatType::PrimaryKey,
962        };
963        let json = serde_json::to_string(&new_manifest).unwrap();
964        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
965        assert_eq!(
966            old_from_new,
967            RegionManifestV1 {
968                metadata: metadata.clone(),
969                files: HashMap::new(),
970                flushed_entry_id: 0,
971                flushed_sequence: 0,
972                manifest_version: 0,
973                truncated_entry_id: None,
974                compaction_time_window: None,
975            }
976        );
977
978        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
979        pub struct RegionEditV1 {
980            pub files_to_add: Vec<FileMeta>,
981            pub files_to_remove: Vec<FileMeta>,
982            #[serde(with = "humantime_serde")]
983            pub compaction_time_window: Option<Duration>,
984            pub flushed_entry_id: Option<EntryId>,
985            pub flushed_sequence: Option<SequenceNumber>,
986        }
987
988        let json = serde_json::to_string(&RegionEditV1 {
989            files_to_add: vec![],
990            files_to_remove: vec![],
991            compaction_time_window: None,
992            flushed_entry_id: None,
993            flushed_sequence: None,
994        })
995        .unwrap();
996        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
997        assert_eq!(
998            RegionEdit {
999                files_to_add: vec![],
1000                files_to_remove: vec![],
1001                timestamp_ms: None,
1002                compaction_time_window: None,
1003                flushed_entry_id: None,
1004                flushed_sequence: None,
1005                committed_sequence: None,
1006            },
1007            new_from_old
1008        );
1009
1010        // test new version with timestamp_ms set can deserialize to old version
1011        let new = RegionEdit {
1012            files_to_add: vec![],
1013            files_to_remove: vec![],
1014            timestamp_ms: Some(42),
1015            compaction_time_window: None,
1016            flushed_entry_id: None,
1017            flushed_sequence: None,
1018            committed_sequence: None,
1019        };
1020
1021        let new_json = serde_json::to_string(&new).unwrap();
1022
1023        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
1024        assert_eq!(
1025            RegionEditV1 {
1026                files_to_add: vec![],
1027                files_to_remove: vec![],
1028                compaction_time_window: None,
1029                flushed_entry_id: None,
1030                flushed_sequence: None,
1031            },
1032            old_from_new
1033        );
1034    }
1035
1036    #[test]
1037    fn test_region_change_backward_compatibility() {
1038        // Test that we can deserialize a RegionChange without sst_format
1039        let region_change_json = r#"{
1040            "metadata": {
1041                "column_metadatas": [
1042                    {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
1043                    {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
1044                    {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
1045                ],
1046                "primary_key": [
1047                    1
1048                ],
1049                "region_id": 42,
1050                "schema_version": 0
1051            }
1052        }"#;
1053
1054        let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
1055        assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
1056
1057        // Test serialization and deserialization with sst_format
1058        let region_change = RegionChange {
1059            metadata: region_change.metadata.clone(),
1060            sst_format: FormatType::Flat,
1061        };
1062
1063        let serialized = serde_json::to_string(&region_change).unwrap();
1064        let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
1065        assert_eq!(deserialized.sst_format, FormatType::Flat);
1066    }
1067
1068    #[test]
1069    fn test_removed_file_compatibility() {
1070        let file_id = FileId::random();
1071        // Case 1: Deserialize from FileId string (Legacy format)
1072        let json_str = format!("\"{}\"", file_id);
1073        let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
1074        assert_eq!(removed_file, RemovedFile::File(file_id, None));
1075
1076        // Case 2: Deserialize from new format (File)
1077        let removed_file_v2 = RemovedFile::File(file_id, Some(10));
1078        let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
1079        let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
1080        assert_eq!(removed_file_v2, deserialized_v2);
1081
1082        // Case 3: Deserialize from new format (Index)
1083        let removed_index = RemovedFile::Index(file_id, 20);
1084        let json_index = serde_json::to_string(&removed_index).unwrap();
1085        let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
1086        assert_eq!(removed_index, deserialized_index);
1087
1088        // Case 4: Round-trip serialization/deserialization of new enum format with None as index version
1089        let removed_file = RemovedFile::File(file_id, None);
1090        let json = serde_json::to_string(&removed_file).unwrap();
1091        let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
1092        assert_eq!(removed_file, deserialized);
1093
1094        // Case 5: Deserialize mixed set in RemovedFilesRecord
1095        // This simulates a Set<RemovedFile> which might contain old strings or new objects if manually constructed or from old versions.
1096        // Actually, if it was HashSet<FileId>, the JSON is ["id1", "id2"].
1097        // If it is HashSet<RemovedFile>, the JSON is [{"File":...}, "id2"] if mixed (which shouldn't happen usually but good to test).
1098
1099        let json_set = format!("[\"{}\"]", file_id);
1100        let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
1101        assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
1102    }
1103
1104    /// It is intentionally acceptable to ignore the legacy `file_ids` field when
1105    /// deserializing [`RemovedFiles`].
1106    ///
1107    /// In older manifests, `file_ids` recorded the set of SSTable files that were
1108    /// candidates for garbage collection at a given `removed_at` timestamp. The
1109    /// newer format stores this information in the `files` field instead. When we
1110    /// deserialize an old manifest entry into the new struct, we *drop* the
1111    /// `file_ids` field instead of trying to recover or merge it.
1112    ///
1113    /// Dropping `file_ids` does **not** risk deleting live data: a file is only
1114    /// physically removed when it is both (a) no longer referenced by any region
1115    /// metadata and (b) selected by the GC worker as safe to delete. Losing the
1116    /// historical list of candidate `file_ids` merely means some obsolete files
1117    /// may stay on disk longer than strictly necessary.
1118    ///
1119    /// The GC worker periodically scans storage (e.g. by walking the data
1120    /// directories and/or consulting the latest manifest) to discover files that
1121    /// are no longer referenced anywhere. Any files that were only referenced via
1122    /// the dropped `file_ids` field will be rediscovered during these scans and
1123    /// eventually deleted. Thus the system converges to a correct, fully-collected
1124    /// state without relying on `file_ids`, and the only potential impact of
1125    /// ignoring it is temporary disk space overhead, not data loss.
1126    #[test]
1127    fn test_removed_files_backward_compatibility() {
1128        // Define the old version struct with file_ids field
1129        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1130        struct OldRemovedFiles {
1131            pub removed_at: i64,
1132            pub file_ids: HashSet<FileId>,
1133        }
1134
1135        // Create an old version instance
1136        let mut file_ids = HashSet::new();
1137        file_ids.insert(FileId::random());
1138        file_ids.insert(FileId::random());
1139
1140        let old_removed_files = OldRemovedFiles {
1141            removed_at: 1234567890,
1142            file_ids,
1143        };
1144
1145        // Serialize the old version
1146        let old_json = serde_json::to_string(&old_removed_files).unwrap();
1147
1148        // Try to deserialize into new version - file_ids should be ignored
1149        let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
1150
1151        // This should succeed and create a default RemovedFiles (empty files set)
1152        assert!(result.is_ok(), "{:?}", result);
1153        let removed_files = result.unwrap();
1154        assert_eq!(removed_files.removed_at, 1234567890);
1155        assert!(removed_files.files.is_empty());
1156
1157        // Test that new format still works
1158        let file_id = FileId::random();
1159        let new_json = format!(
1160            r#"{{
1161            "removed_at": 1234567890,
1162            "files": ["{}"]
1163        }}"#,
1164            file_id
1165        );
1166
1167        let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
1168        assert!(result.is_ok());
1169        let removed_files = result.unwrap();
1170        assert_eq!(removed_files.removed_at, 1234567890);
1171        assert_eq!(removed_files.files.len(), 1);
1172        assert!(
1173            removed_files
1174                .files
1175                .contains(&RemovedFile::File(file_id, None))
1176        );
1177    }
1178}