mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
29use crate::manifest::manager::RemoveFileOptions;
30use crate::region::ManifestStats;
31use crate::sst::FormatType;
32use crate::sst::file::FileMeta;
33use crate::wal::EntryId;
34
35/// Actions that can be applied to region manifest.
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38    /// Change region's metadata for request like ALTER
39    Change(RegionChange),
40    /// Edit region's state for changing options or file list.
41    Edit(RegionEdit),
42    /// Remove the region.
43    Remove(RegionRemove),
44    /// Truncate the region.
45    Truncate(RegionTruncate),
46}
47
48#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
49pub struct RegionChange {
50    /// The metadata after changed.
51    pub metadata: RegionMetadataRef,
52    /// Format of the SST.
53    #[serde(default)]
54    pub sst_format: FormatType,
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
58pub struct RegionEdit {
59    pub files_to_add: Vec<FileMeta>,
60    pub files_to_remove: Vec<FileMeta>,
61    /// event unix timestamp in milliseconds, help to determine file deletion time.
62    #[serde(default)]
63    pub timestamp_ms: Option<i64>,
64    #[serde(with = "humantime_serde")]
65    pub compaction_time_window: Option<Duration>,
66    pub flushed_entry_id: Option<EntryId>,
67    pub flushed_sequence: Option<SequenceNumber>,
68    pub committed_sequence: Option<SequenceNumber>,
69}
70
71#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
72pub struct RegionRemove {
73    pub region_id: RegionId,
74}
75
76/// Last data truncated in the region.
77///
78#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
79pub struct RegionTruncate {
80    pub region_id: RegionId,
81    #[serde(flatten)]
82    pub kind: TruncateKind,
83    /// event unix timestamp in milliseconds, help to determine file deletion time.
84    #[serde(default)]
85    pub timestamp_ms: Option<i64>,
86}
87
88/// The kind of truncate operation.
89#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
90#[serde(untagged)]
91pub enum TruncateKind {
92    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
93    All {
94        /// Last WAL entry id of truncated data.
95        truncated_entry_id: EntryId,
96        // Last sequence number of truncated data.
97        truncated_sequence: SequenceNumber,
98    },
99    /// Only remove certain files in the region.
100    Partial { files_to_remove: Vec<FileMeta> },
101}
102
103/// The region manifest data.
104#[derive(Serialize, Deserialize, Clone, Debug)]
105#[cfg_attr(test, derive(Eq))]
106pub struct RegionManifest {
107    /// Metadata of the region.
108    pub metadata: RegionMetadataRef,
109    /// SST files.
110    pub files: HashMap<FileId, FileMeta>,
111    /// Removed files, which are not in the current manifest but may still be kept for a while.
112    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
113    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
114    ///
115    /// Using same checkpoint files and action files, the recovered manifest may differ in this
116    /// `removed_files` field, because the checkpointer may evict some removed files using
117    /// current machine time. This is acceptable because the removed files are not used in normal
118    /// read/write path.
119    ///
120    #[serde(default)]
121    pub removed_files: RemovedFilesRecord,
122    /// Last WAL entry id of flushed data.
123    pub flushed_entry_id: EntryId,
124    /// Last sequence of flushed data.
125    pub flushed_sequence: SequenceNumber,
126    pub committed_sequence: Option<SequenceNumber>,
127    /// Current manifest version.
128    pub manifest_version: ManifestVersion,
129    /// Last WAL entry id of truncated data.
130    pub truncated_entry_id: Option<EntryId>,
131    /// Inferred compaction time window.
132    #[serde(with = "humantime_serde")]
133    pub compaction_time_window: Option<Duration>,
134    /// Format of the SST file.
135    #[serde(default)]
136    pub sst_format: FormatType,
137}
138
139#[cfg(test)]
140impl PartialEq for RegionManifest {
141    fn eq(&self, other: &Self) -> bool {
142        self.metadata == other.metadata
143            && self.files == other.files
144            && self.flushed_entry_id == other.flushed_entry_id
145            && self.flushed_sequence == other.flushed_sequence
146            && self.manifest_version == other.manifest_version
147            && self.truncated_entry_id == other.truncated_entry_id
148            && self.compaction_time_window == other.compaction_time_window
149            && self.committed_sequence == other.committed_sequence
150    }
151}
152
153#[derive(Debug, Default)]
154pub struct RegionManifestBuilder {
155    metadata: Option<RegionMetadataRef>,
156    files: HashMap<FileId, FileMeta>,
157    pub removed_files: RemovedFilesRecord,
158    flushed_entry_id: EntryId,
159    flushed_sequence: SequenceNumber,
160    manifest_version: ManifestVersion,
161    truncated_entry_id: Option<EntryId>,
162    compaction_time_window: Option<Duration>,
163    committed_sequence: Option<SequenceNumber>,
164    sst_format: FormatType,
165}
166
167impl RegionManifestBuilder {
168    /// Start with a checkpoint.
169    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
170        if let Some(s) = checkpoint {
171            Self {
172                metadata: Some(s.metadata),
173                files: s.files,
174                removed_files: s.removed_files,
175                flushed_entry_id: s.flushed_entry_id,
176                manifest_version: s.manifest_version,
177                flushed_sequence: s.flushed_sequence,
178                truncated_entry_id: s.truncated_entry_id,
179                compaction_time_window: s.compaction_time_window,
180                committed_sequence: s.committed_sequence,
181                sst_format: s.sst_format,
182            }
183        } else {
184            Default::default()
185        }
186    }
187
188    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
189        self.metadata = Some(change.metadata);
190        self.manifest_version = manifest_version;
191        self.sst_format = change.sst_format;
192    }
193
194    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
195        self.manifest_version = manifest_version;
196        for file in edit.files_to_add {
197            self.files.insert(file.file_id, file);
198        }
199        self.removed_files.add_removed_files(
200            edit.files_to_remove
201                .iter()
202                .map(|meta| meta.file_id)
203                .collect(),
204            edit.timestamp_ms
205                .unwrap_or_else(|| Utc::now().timestamp_millis()),
206        );
207        for file in edit.files_to_remove {
208            self.files.remove(&file.file_id);
209        }
210        if let Some(flushed_entry_id) = edit.flushed_entry_id {
211            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
212        }
213        if let Some(flushed_sequence) = edit.flushed_sequence {
214            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
215        }
216
217        if let Some(committed_sequence) = edit.committed_sequence {
218            self.committed_sequence = Some(
219                self.committed_sequence
220                    .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
221            );
222        }
223        if let Some(window) = edit.compaction_time_window {
224            self.compaction_time_window = Some(window);
225        }
226    }
227
228    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
229        self.manifest_version = manifest_version;
230        match truncate.kind {
231            TruncateKind::All {
232                truncated_entry_id,
233                truncated_sequence,
234            } => {
235                self.flushed_entry_id = truncated_entry_id;
236                self.flushed_sequence = truncated_sequence;
237                self.truncated_entry_id = Some(truncated_entry_id);
238                self.removed_files.add_removed_files(
239                    self.files.values().map(|meta| meta.file_id).collect(),
240                    truncate
241                        .timestamp_ms
242                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
243                );
244                self.files.clear();
245            }
246            TruncateKind::Partial { files_to_remove } => {
247                self.removed_files.add_removed_files(
248                    files_to_remove.iter().map(|meta| meta.file_id).collect(),
249                    truncate
250                        .timestamp_ms
251                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
252                );
253                for file in files_to_remove {
254                    self.files.remove(&file.file_id);
255                }
256            }
257        }
258    }
259
260    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
261        &self.files
262    }
263
264    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
265    pub fn contains_metadata(&self) -> bool {
266        self.metadata.is_some()
267    }
268
269    pub fn try_build(self) -> Result<RegionManifest> {
270        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
271        Ok(RegionManifest {
272            metadata,
273            files: self.files,
274            removed_files: self.removed_files,
275            flushed_entry_id: self.flushed_entry_id,
276            flushed_sequence: self.flushed_sequence,
277            committed_sequence: self.committed_sequence,
278            manifest_version: self.manifest_version,
279            truncated_entry_id: self.truncated_entry_id,
280            compaction_time_window: self.compaction_time_window,
281            sst_format: self.sst_format,
282        })
283    }
284}
285
286/// A record of removed files in the region manifest.
287/// This is used to keep track of files that have been removed from the manifest but may still
288/// be kept for a while
289#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
290pub struct RemovedFilesRecord {
291    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
292    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
293    pub removed_files: Vec<RemovedFiles>,
294}
295
296impl RemovedFilesRecord {
297    /// Clear the actually deleted files from the list of removed files
298    pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
299        let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
300        for files in self.removed_files.iter_mut() {
301            files.file_ids.retain(|fid| !deleted_file_set.contains(fid));
302        }
303
304        self.removed_files.retain(|fs| !fs.file_ids.is_empty());
305    }
306
307    pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
308        let cnt = self
309            .removed_files
310            .iter()
311            .map(|r| r.file_ids.len() as u64)
312            .sum();
313        stats
314            .file_removed_cnt
315            .store(cnt, std::sync::atomic::Ordering::Relaxed);
316    }
317}
318
319#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
320pub struct RemovedFiles {
321    /// The timestamp is the time when
322    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
323    pub removed_at: i64,
324    /// The set of file ids that are removed.
325    pub file_ids: HashSet<FileId>,
326}
327
328impl RemovedFilesRecord {
329    /// Add a record of removed files with the current timestamp.
330    pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
331        if file_ids.is_empty() {
332            return;
333        }
334        self.removed_files.push(RemovedFiles {
335            removed_at: at,
336            file_ids,
337        });
338    }
339
340    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
341        if !opt.enable_gc {
342            // If GC is not enabled, always keep removed files empty.
343            self.removed_files.clear();
344            return Ok(());
345        }
346
347        // if GC is enabled, rely on gc worker to delete files, and evict removed files based on options.
348
349        Ok(())
350    }
351}
352
353// The checkpoint of region manifest, generated by checkpointer.
354#[derive(Serialize, Deserialize, Debug, Clone)]
355#[cfg_attr(test, derive(PartialEq, Eq))]
356pub struct RegionCheckpoint {
357    /// The last manifest version that this checkpoint compacts(inclusive).
358    pub last_version: ManifestVersion,
359    // The number of manifest actions that this checkpoint compacts.
360    pub compacted_actions: usize,
361    // The checkpoint data
362    pub checkpoint: Option<RegionManifest>,
363}
364
365impl RegionCheckpoint {
366    pub fn last_version(&self) -> ManifestVersion {
367        self.last_version
368    }
369
370    pub fn encode(&self) -> Result<Vec<u8>> {
371        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
372
373        Ok(json.into_bytes())
374    }
375
376    pub fn decode(bytes: &[u8]) -> Result<Self> {
377        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
378
379        serde_json::from_str(data).context(SerdeJsonSnafu)
380    }
381}
382
383#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
384pub struct RegionMetaActionList {
385    pub actions: Vec<RegionMetaAction>,
386}
387
388impl RegionMetaActionList {
389    pub fn with_action(action: RegionMetaAction) -> Self {
390        Self {
391            actions: vec![action],
392        }
393    }
394
395    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
396        Self { actions }
397    }
398
399    pub fn into_region_edit(self) -> RegionEdit {
400        let mut edit = RegionEdit {
401            files_to_add: Vec::new(),
402            files_to_remove: Vec::new(),
403            timestamp_ms: None,
404            compaction_time_window: None,
405            flushed_entry_id: None,
406            flushed_sequence: None,
407            committed_sequence: None,
408        };
409
410        for action in self.actions {
411            if let RegionMetaAction::Edit(region_edit) = action {
412                // Merge file adds/removes
413                edit.files_to_add.extend(region_edit.files_to_add);
414                edit.files_to_remove.extend(region_edit.files_to_remove);
415                // Max of flushed entry id / sequence
416                if let Some(eid) = region_edit.flushed_entry_id {
417                    edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
418                }
419                if let Some(seq) = region_edit.flushed_sequence {
420                    edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
421                }
422                if let Some(seq) = region_edit.committed_sequence {
423                    edit.committed_sequence =
424                        Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
425                }
426                // Prefer the latest non-none time window
427                if region_edit.compaction_time_window.is_some() {
428                    edit.compaction_time_window = region_edit.compaction_time_window;
429                }
430            }
431        }
432
433        edit
434    }
435}
436
437impl RegionMetaActionList {
438    /// Encode self into json in the form of string lines.
439    pub fn encode(&self) -> Result<Vec<u8>> {
440        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
441
442        Ok(json.into_bytes())
443    }
444
445    pub fn decode(bytes: &[u8]) -> Result<Self> {
446        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
447
448        serde_json::from_str(data).context(SerdeJsonSnafu)
449    }
450}
451
452#[cfg(test)]
453mod tests {
454
455    use common_time::Timestamp;
456
457    use super::*;
458
459    // These tests are used to ensure backward compatibility of manifest files.
460    // DO NOT modify the serialized string when they fail, check if your
461    // modification to manifest-related structs is compatible with older manifests.
462    #[test]
463    fn test_region_action_compatibility() {
464        let region_edit = r#"{
465            "flushed_entry_id":null,
466            "compaction_time_window":null,
467            "files_to_add":[
468            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
469            ],
470            "files_to_remove":[
471            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
472            ]
473        }"#;
474        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
475
476        let region_edit = r#"{
477            "flushed_entry_id":10,
478            "flushed_sequence":10,
479            "compaction_time_window":null,
480            "files_to_add":[
481            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
482            ],
483            "files_to_remove":[
484            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
485            ]
486        }"#;
487        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
488
489        // Note: For backward compatibility, the test accepts a RegionChange without sst_format
490        let region_change = r#" {
491            "metadata":{
492                "column_metadatas":[
493                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
494                ],
495                "primary_key":[1],
496                "region_id":5299989648942,
497                "schema_version":0
498            }
499            }"#;
500        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
501
502        let region_remove = r#"{"region_id":42}"#;
503        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
504    }
505
506    #[test]
507    fn test_region_manifest_builder() {
508        // TODO(ruihang): port this test case
509    }
510
511    #[test]
512    fn test_encode_decode_region_checkpoint() {
513        // TODO(ruihang): port this test case
514    }
515
516    #[test]
517    fn test_region_manifest_compatibility() {
518        // Test deserializing RegionManifest from old schema where FileId is a UUID string
519        let region_manifest_json = r#"{
520            "metadata": {
521                "column_metadatas": [
522                    {
523                        "column_schema": {
524                            "name": "a",
525                            "data_type": {"Int64": {}},
526                            "is_nullable": false,
527                            "is_time_index": false,
528                            "default_constraint": null,
529                            "metadata": {}
530                        },
531                        "semantic_type": "Tag",
532                        "column_id": 1
533                    },
534                    {
535                        "column_schema": {
536                            "name": "b",
537                            "data_type": {"Float64": {}},
538                            "is_nullable": false,
539                            "is_time_index": false,
540                            "default_constraint": null,
541                            "metadata": {}
542                        },
543                        "semantic_type": "Field",
544                        "column_id": 2
545                    },
546                    {
547                        "column_schema": {
548                            "name": "c",
549                            "data_type": {"Timestamp": {"Millisecond": null}},
550                            "is_nullable": false,
551                            "is_time_index": false,
552                            "default_constraint": null,
553                            "metadata": {}
554                        },
555                        "semantic_type": "Timestamp",
556                        "column_id": 3
557                    }
558                ],
559                "primary_key": [1],
560                "region_id": 4402341478400,
561                "schema_version": 0
562            },
563            "files": {
564                "4b220a70-2b03-4641-9687-b65d94641208": {
565                    "region_id": 4402341478400,
566                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
567                    "time_range": [
568                        {"value": 1451609210000, "unit": "Millisecond"},
569                        {"value": 1451609520000, "unit": "Millisecond"}
570                    ],
571                    "level": 1,
572                    "file_size": 100
573                },
574                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
575                    "region_id": 4402341478400,
576                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
577                    "time_range": [
578                        {"value": 1451609210000, "unit": "Millisecond"},
579                        {"value": 1451609520000, "unit": "Millisecond"}
580                    ],
581                    "level": 0,
582                    "file_size": 100
583                }
584            },
585            "flushed_entry_id": 10,
586            "flushed_sequence": 20,
587            "manifest_version": 1,
588            "truncated_entry_id": null,
589            "compaction_time_window": null
590        }"#;
591
592        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
593
594        // Verify that the files were correctly deserialized
595        assert_eq!(manifest.files.len(), 2);
596        assert_eq!(manifest.flushed_entry_id, 10);
597        assert_eq!(manifest.flushed_sequence, 20);
598        assert_eq!(manifest.manifest_version, 1);
599
600        // Verify that FileIds were correctly parsed from UUID strings
601        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
602        file_ids.sort_unstable();
603        assert_eq!(
604            file_ids,
605            vec![
606                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
607                "4b220a70-2b03-4641-9687-b65d94641208",
608            ]
609        );
610
611        // Roundtrip test with current FileId format
612        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
613        let deserialized_manifest: RegionManifest =
614            serde_json::from_str(&serialized_manifest).unwrap();
615        assert_eq!(manifest, deserialized_manifest);
616        assert_ne!(serialized_manifest, region_manifest_json);
617    }
618
619    #[test]
620    fn test_region_truncate_compat() {
621        // Test deserializing RegionTruncate from old schema
622        let region_truncate_json = r#"{
623            "region_id": 4402341478400,
624            "truncated_entry_id": 10,
625            "truncated_sequence": 20
626        }"#;
627
628        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
629        assert_eq!(truncate_v1.region_id, 4402341478400);
630        assert_eq!(
631            truncate_v1.kind,
632            TruncateKind::All {
633                truncated_entry_id: 10,
634                truncated_sequence: 20,
635            }
636        );
637
638        // Test deserializing RegionTruncate from new schema
639        let region_truncate_v2_json = r#"{
640    "region_id": 4402341478400,
641    "files_to_remove": [
642        {
643            "region_id": 4402341478400,
644            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
645            "time_range": [
646                {
647                    "value": 1451609210000,
648                    "unit": "Millisecond"
649                },
650                {
651                    "value": 1451609520000,
652                    "unit": "Millisecond"
653                }
654            ],
655            "level": 1,
656            "file_size": 100
657        }
658    ]
659}"#;
660
661        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
662        assert_eq!(truncate_v2.region_id, 4402341478400);
663        assert_eq!(
664            truncate_v2.kind,
665            TruncateKind::Partial {
666                files_to_remove: vec![FileMeta {
667                    region_id: RegionId::from_u64(4402341478400),
668                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
669                    time_range: (
670                        Timestamp::new_millisecond(1451609210000),
671                        Timestamp::new_millisecond(1451609520000)
672                    ),
673                    level: 1,
674                    file_size: 100,
675                    ..Default::default()
676                }]
677            }
678        );
679    }
680
681    #[test]
682    fn test_region_manifest_removed_files() {
683        let region_metadata = r#"{
684                "column_metadatas": [
685                    {
686                        "column_schema": {
687                            "name": "a",
688                            "data_type": {"Int64": {}},
689                            "is_nullable": false,
690                            "is_time_index": false,
691                            "default_constraint": null,
692                            "metadata": {}
693                        },
694                        "semantic_type": "Tag",
695                        "column_id": 1
696                    },
697                    {
698                        "column_schema": {
699                            "name": "b",
700                            "data_type": {"Float64": {}},
701                            "is_nullable": false,
702                            "is_time_index": false,
703                            "default_constraint": null,
704                            "metadata": {}
705                        },
706                        "semantic_type": "Field",
707                        "column_id": 2
708                    },
709                    {
710                        "column_schema": {
711                            "name": "c",
712                            "data_type": {"Timestamp": {"Millisecond": null}},
713                            "is_nullable": false,
714                            "is_time_index": false,
715                            "default_constraint": null,
716                            "metadata": {}
717                        },
718                        "semantic_type": "Timestamp",
719                        "column_id": 3
720                    }
721                ],
722                "primary_key": [1],
723                "region_id": 4402341478400,
724                "schema_version": 0
725            }"#;
726
727        let metadata: RegionMetadataRef =
728            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
729        let manifest = RegionManifest {
730            metadata: metadata.clone(),
731            files: HashMap::new(),
732            flushed_entry_id: 0,
733            flushed_sequence: 0,
734            committed_sequence: None,
735            manifest_version: 0,
736            truncated_entry_id: None,
737            compaction_time_window: None,
738            removed_files: RemovedFilesRecord {
739                removed_files: vec![RemovedFiles {
740                    removed_at: 0,
741                    file_ids: HashSet::from([FileId::parse_str(
742                        "4b220a70-2b03-4641-9687-b65d94641208",
743                    )
744                    .unwrap()]),
745                }],
746            },
747            sst_format: FormatType::PrimaryKey,
748        };
749
750        let json = serde_json::to_string(&manifest).unwrap();
751        let new: RegionManifest = serde_json::from_str(&json).unwrap();
752
753        assert_eq!(manifest, new);
754    }
755
756    /// Test if old version can still be deserialized then serialized to the new version.
757    #[test]
758    fn test_old_region_manifest_compat() {
759        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
760        pub struct RegionManifestV1 {
761            /// Metadata of the region.
762            pub metadata: RegionMetadataRef,
763            /// SST files.
764            pub files: HashMap<FileId, FileMeta>,
765            /// Last WAL entry id of flushed data.
766            pub flushed_entry_id: EntryId,
767            /// Last sequence of flushed data.
768            pub flushed_sequence: SequenceNumber,
769            /// Current manifest version.
770            pub manifest_version: ManifestVersion,
771            /// Last WAL entry id of truncated data.
772            pub truncated_entry_id: Option<EntryId>,
773            /// Inferred compaction time window.
774            #[serde(with = "humantime_serde")]
775            pub compaction_time_window: Option<Duration>,
776        }
777
778        let region_metadata = r#"{
779                "column_metadatas": [
780                    {
781                        "column_schema": {
782                            "name": "a",
783                            "data_type": {"Int64": {}},
784                            "is_nullable": false,
785                            "is_time_index": false,
786                            "default_constraint": null,
787                            "metadata": {}
788                        },
789                        "semantic_type": "Tag",
790                        "column_id": 1
791                    },
792                    {
793                        "column_schema": {
794                            "name": "b",
795                            "data_type": {"Float64": {}},
796                            "is_nullable": false,
797                            "is_time_index": false,
798                            "default_constraint": null,
799                            "metadata": {}
800                        },
801                        "semantic_type": "Field",
802                        "column_id": 2
803                    },
804                    {
805                        "column_schema": {
806                            "name": "c",
807                            "data_type": {"Timestamp": {"Millisecond": null}},
808                            "is_nullable": false,
809                            "is_time_index": false,
810                            "default_constraint": null,
811                            "metadata": {}
812                        },
813                        "semantic_type": "Timestamp",
814                        "column_id": 3
815                    }
816                ],
817                "primary_key": [1],
818                "region_id": 4402341478400,
819                "schema_version": 0
820            }"#;
821
822        let metadata: RegionMetadataRef =
823            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
824
825        // first test v1 empty to new
826        let v1 = RegionManifestV1 {
827            metadata: metadata.clone(),
828            files: HashMap::new(),
829            flushed_entry_id: 0,
830            flushed_sequence: 0,
831            manifest_version: 0,
832            truncated_entry_id: None,
833            compaction_time_window: None,
834        };
835        let json = serde_json::to_string(&v1).unwrap();
836        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
837        assert_eq!(
838            new_from_old,
839            RegionManifest {
840                metadata: metadata.clone(),
841                files: HashMap::new(),
842                removed_files: Default::default(),
843                flushed_entry_id: 0,
844                flushed_sequence: 0,
845                committed_sequence: None,
846                manifest_version: 0,
847                truncated_entry_id: None,
848                compaction_time_window: None,
849                sst_format: FormatType::PrimaryKey,
850            }
851        );
852
853        let new_manifest = RegionManifest {
854            metadata: metadata.clone(),
855            files: HashMap::new(),
856            removed_files: Default::default(),
857            flushed_entry_id: 0,
858            flushed_sequence: 0,
859            committed_sequence: None,
860            manifest_version: 0,
861            truncated_entry_id: None,
862            compaction_time_window: None,
863            sst_format: FormatType::PrimaryKey,
864        };
865        let json = serde_json::to_string(&new_manifest).unwrap();
866        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
867        assert_eq!(
868            old_from_new,
869            RegionManifestV1 {
870                metadata: metadata.clone(),
871                files: HashMap::new(),
872                flushed_entry_id: 0,
873                flushed_sequence: 0,
874                manifest_version: 0,
875                truncated_entry_id: None,
876                compaction_time_window: None,
877            }
878        );
879
880        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
881        pub struct RegionEditV1 {
882            pub files_to_add: Vec<FileMeta>,
883            pub files_to_remove: Vec<FileMeta>,
884            #[serde(with = "humantime_serde")]
885            pub compaction_time_window: Option<Duration>,
886            pub flushed_entry_id: Option<EntryId>,
887            pub flushed_sequence: Option<SequenceNumber>,
888        }
889
890        let json = serde_json::to_string(&RegionEditV1 {
891            files_to_add: vec![],
892            files_to_remove: vec![],
893            compaction_time_window: None,
894            flushed_entry_id: None,
895            flushed_sequence: None,
896        })
897        .unwrap();
898        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
899        assert_eq!(
900            RegionEdit {
901                files_to_add: vec![],
902                files_to_remove: vec![],
903                timestamp_ms: None,
904                compaction_time_window: None,
905                flushed_entry_id: None,
906                flushed_sequence: None,
907                committed_sequence: None,
908            },
909            new_from_old
910        );
911
912        // test new version with timestamp_ms set can deserialize to old version
913        let new = RegionEdit {
914            files_to_add: vec![],
915            files_to_remove: vec![],
916            timestamp_ms: Some(42),
917            compaction_time_window: None,
918            flushed_entry_id: None,
919            flushed_sequence: None,
920            committed_sequence: None,
921        };
922
923        let new_json = serde_json::to_string(&new).unwrap();
924
925        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
926        assert_eq!(
927            RegionEditV1 {
928                files_to_add: vec![],
929                files_to_remove: vec![],
930                compaction_time_window: None,
931                flushed_entry_id: None,
932                flushed_sequence: None,
933            },
934            old_from_new
935        );
936    }
937
938    #[test]
939    fn test_region_change_backward_compatibility() {
940        // Test that we can deserialize a RegionChange without sst_format
941        let region_change_json = r#"{
942            "metadata": {
943                "column_metadatas": [
944                    {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
945                    {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
946                    {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
947                ],
948                "primary_key": [
949                    1
950                ],
951                "region_id": 42,
952                "schema_version": 0
953            }
954        }"#;
955
956        let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
957        assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
958
959        // Test serialization and deserialization with sst_format
960        let region_change = RegionChange {
961            metadata: region_change.metadata.clone(),
962            sst_format: FormatType::Flat,
963        };
964
965        let serialized = serde_json::to_string(&region_change).unwrap();
966        let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
967        assert_eq!(deserialized.sst_format, FormatType::Flat);
968    }
969}