mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::{RegionId, SequenceNumber};
25use store_api::ManifestVersion;
26use strum::Display;
27
28use crate::error::{
29    DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
30};
31use crate::manifest::manager::RemoveFileOptions;
32use crate::sst::file::{FileId, FileMeta};
33use crate::wal::EntryId;
34
35/// Actions that can be applied to region manifest.
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38    /// Change region's metadata for request like ALTER
39    Change(RegionChange),
40    /// Edit region's state for changing options or file list.
41    Edit(RegionEdit),
42    /// Remove the region.
43    Remove(RegionRemove),
44    /// Truncate the region.
45    Truncate(RegionTruncate),
46}
47
48#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
49pub struct RegionChange {
50    /// The metadata after changed.
51    pub metadata: RegionMetadataRef,
52}
53
54#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
55pub struct RegionEdit {
56    pub files_to_add: Vec<FileMeta>,
57    pub files_to_remove: Vec<FileMeta>,
58    /// event unix timestamp in milliseconds, help to determine file deletion time.
59    #[serde(default)]
60    pub timestamp_ms: Option<i64>,
61    #[serde(with = "humantime_serde")]
62    pub compaction_time_window: Option<Duration>,
63    pub flushed_entry_id: Option<EntryId>,
64    pub flushed_sequence: Option<SequenceNumber>,
65}
66
67#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
68pub struct RegionRemove {
69    pub region_id: RegionId,
70}
71
72/// Last data truncated in the region.
73///
74#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
75pub struct RegionTruncate {
76    pub region_id: RegionId,
77    #[serde(flatten)]
78    pub kind: TruncateKind,
79    /// event unix timestamp in milliseconds, help to determine file deletion time.
80    #[serde(default)]
81    pub timestamp_ms: Option<i64>,
82}
83
84/// The kind of truncate operation.
85#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
86#[serde(untagged)]
87pub enum TruncateKind {
88    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
89    All {
90        /// Last WAL entry id of truncated data.
91        truncated_entry_id: EntryId,
92        // Last sequence number of truncated data.
93        truncated_sequence: SequenceNumber,
94    },
95    /// Only remove certain files in the region.
96    Partial { files_to_remove: Vec<FileMeta> },
97}
98
99/// The region manifest data.
100#[derive(Serialize, Deserialize, Clone, Debug)]
101#[cfg_attr(test, derive(Eq))]
102pub struct RegionManifest {
103    /// Metadata of the region.
104    pub metadata: RegionMetadataRef,
105    /// SST files.
106    pub files: HashMap<FileId, FileMeta>,
107    /// Removed files, which are not in the current manifest but may still be kept for a while.
108    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
109    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
110    ///
111    /// Using same checkpoint files and action files, the recovered manifest may differ in this
112    /// `removed_files` field, because the checkpointer may evict some removed files using
113    /// current machine time. This is acceptable because the removed files are not used in normal
114    /// read/write path.
115    ///
116    #[serde(default)]
117    pub removed_files: RemovedFilesRecord,
118    /// Last WAL entry id of flushed data.
119    pub flushed_entry_id: EntryId,
120    /// Last sequence of flushed data.
121    pub flushed_sequence: SequenceNumber,
122    /// Current manifest version.
123    pub manifest_version: ManifestVersion,
124    /// Last WAL entry id of truncated data.
125    pub truncated_entry_id: Option<EntryId>,
126    /// Inferred compaction time window.
127    #[serde(with = "humantime_serde")]
128    pub compaction_time_window: Option<Duration>,
129}
130
131#[cfg(test)]
132impl PartialEq for RegionManifest {
133    fn eq(&self, other: &Self) -> bool {
134        self.metadata == other.metadata
135            && self.files == other.files
136            && self.flushed_entry_id == other.flushed_entry_id
137            && self.flushed_sequence == other.flushed_sequence
138            && self.manifest_version == other.manifest_version
139            && self.truncated_entry_id == other.truncated_entry_id
140            && self.compaction_time_window == other.compaction_time_window
141    }
142}
143
144#[derive(Debug, Default)]
145pub struct RegionManifestBuilder {
146    metadata: Option<RegionMetadataRef>,
147    files: HashMap<FileId, FileMeta>,
148    pub removed_files: RemovedFilesRecord,
149    flushed_entry_id: EntryId,
150    flushed_sequence: SequenceNumber,
151    manifest_version: ManifestVersion,
152    truncated_entry_id: Option<EntryId>,
153    compaction_time_window: Option<Duration>,
154}
155
156impl RegionManifestBuilder {
157    /// Start with a checkpoint.
158    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
159        if let Some(s) = checkpoint {
160            Self {
161                metadata: Some(s.metadata),
162                files: s.files,
163                removed_files: s.removed_files,
164                flushed_entry_id: s.flushed_entry_id,
165                manifest_version: s.manifest_version,
166                flushed_sequence: s.flushed_sequence,
167                truncated_entry_id: s.truncated_entry_id,
168                compaction_time_window: s.compaction_time_window,
169            }
170        } else {
171            Default::default()
172        }
173    }
174
175    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
176        self.metadata = Some(change.metadata);
177        self.manifest_version = manifest_version;
178    }
179
180    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
181        self.manifest_version = manifest_version;
182        for file in edit.files_to_add {
183            self.files.insert(file.file_id, file);
184        }
185        self.removed_files.add_removed_files(
186            edit.files_to_remove
187                .iter()
188                .map(|meta| meta.file_id)
189                .collect(),
190            edit.timestamp_ms
191                .unwrap_or_else(|| Utc::now().timestamp_millis()),
192        );
193        for file in edit.files_to_remove {
194            self.files.remove(&file.file_id);
195        }
196        if let Some(flushed_entry_id) = edit.flushed_entry_id {
197            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
198        }
199        if let Some(flushed_sequence) = edit.flushed_sequence {
200            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
201        }
202        if let Some(window) = edit.compaction_time_window {
203            self.compaction_time_window = Some(window);
204        }
205    }
206
207    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
208        self.manifest_version = manifest_version;
209        match truncate.kind {
210            TruncateKind::All {
211                truncated_entry_id,
212                truncated_sequence,
213            } => {
214                self.flushed_entry_id = truncated_entry_id;
215                self.flushed_sequence = truncated_sequence;
216                self.truncated_entry_id = Some(truncated_entry_id);
217                self.files.clear();
218                self.removed_files.add_removed_files(
219                    self.files.values().map(|meta| meta.file_id).collect(),
220                    truncate
221                        .timestamp_ms
222                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
223                );
224            }
225            TruncateKind::Partial { files_to_remove } => {
226                self.removed_files.add_removed_files(
227                    files_to_remove.iter().map(|meta| meta.file_id).collect(),
228                    truncate
229                        .timestamp_ms
230                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
231                );
232                for file in files_to_remove {
233                    self.files.remove(&file.file_id);
234                }
235            }
236        }
237    }
238
239    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
240        &self.files
241    }
242
243    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
244    pub fn contains_metadata(&self) -> bool {
245        self.metadata.is_some()
246    }
247
248    pub fn try_build(self) -> Result<RegionManifest> {
249        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
250        Ok(RegionManifest {
251            metadata,
252            files: self.files,
253            removed_files: self.removed_files,
254            flushed_entry_id: self.flushed_entry_id,
255            flushed_sequence: self.flushed_sequence,
256            manifest_version: self.manifest_version,
257            truncated_entry_id: self.truncated_entry_id,
258            compaction_time_window: self.compaction_time_window,
259        })
260    }
261}
262
263/// A record of removed files in the region manifest.
264/// This is used to keep track of files that have been removed from the manifest but may still
265/// be kept for a while
266#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
267pub struct RemovedFilesRecord {
268    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
269    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
270    pub removed_files: Vec<RemovedFiles>,
271}
272
273#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
274pub struct RemovedFiles {
275    /// The timestamp is the time when
276    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
277    pub removed_at: i64,
278    /// The set of file ids that are removed.
279    pub file_ids: HashSet<FileId>,
280}
281
282impl RemovedFilesRecord {
283    /// Add a record of removed files with the current timestamp.
284    pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
285        self.removed_files.push(RemovedFiles {
286            removed_at: at,
287            file_ids,
288        });
289    }
290
291    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
292        let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
293        if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
294            return Ok(());
295        }
296
297        let mut cur_file_cnt = total_removed_files;
298
299        let can_evict_until = chrono::Utc::now()
300            - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
301                input: opt.keep_ttl,
302            })?;
303
304        self.removed_files.sort_unstable_by_key(|f| f.removed_at);
305        let updated = std::mem::take(&mut self.removed_files)
306            .into_iter()
307            .filter_map(|f| {
308                if f.removed_at < can_evict_until.timestamp_millis()
309                    && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
310                {
311                    // can evict all files
312                    // TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent.
313                    cur_file_cnt -= f.file_ids.len();
314                    None
315                } else {
316                    Some(f)
317                }
318            })
319            .collect();
320        self.removed_files = updated;
321
322        Ok(())
323    }
324}
325
326// The checkpoint of region manifest, generated by checkpointer.
327#[derive(Serialize, Deserialize, Debug, Clone)]
328#[cfg_attr(test, derive(PartialEq, Eq))]
329pub struct RegionCheckpoint {
330    /// The last manifest version that this checkpoint compacts(inclusive).
331    pub last_version: ManifestVersion,
332    // The number of manifest actions that this checkpoint compacts.
333    pub compacted_actions: usize,
334    // The checkpoint data
335    pub checkpoint: Option<RegionManifest>,
336}
337
338impl RegionCheckpoint {
339    pub fn last_version(&self) -> ManifestVersion {
340        self.last_version
341    }
342
343    pub fn encode(&self) -> Result<Vec<u8>> {
344        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
345
346        Ok(json.into_bytes())
347    }
348
349    pub fn decode(bytes: &[u8]) -> Result<Self> {
350        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
351
352        serde_json::from_str(data).context(SerdeJsonSnafu)
353    }
354}
355
356#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
357pub struct RegionMetaActionList {
358    pub actions: Vec<RegionMetaAction>,
359}
360
361impl RegionMetaActionList {
362    pub fn with_action(action: RegionMetaAction) -> Self {
363        Self {
364            actions: vec![action],
365        }
366    }
367
368    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
369        Self { actions }
370    }
371}
372
373impl RegionMetaActionList {
374    /// Encode self into json in the form of string lines.
375    pub fn encode(&self) -> Result<Vec<u8>> {
376        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
377
378        Ok(json.into_bytes())
379    }
380
381    pub fn decode(bytes: &[u8]) -> Result<Self> {
382        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
383
384        serde_json::from_str(data).context(SerdeJsonSnafu)
385    }
386}
387
388#[cfg(test)]
389mod tests {
390
391    use common_time::Timestamp;
392
393    use super::*;
394
395    // These tests are used to ensure backward compatibility of manifest files.
396    // DO NOT modify the serialized string when they fail, check if your
397    // modification to manifest-related structs is compatible with older manifests.
398    #[test]
399    fn test_region_action_compatibility() {
400        let region_edit = r#"{
401            "flushed_entry_id":null,
402            "compaction_time_window":null,
403            "files_to_add":[
404            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
405            ],
406            "files_to_remove":[
407            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
408            ]
409        }"#;
410        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
411
412        let region_edit = r#"{
413            "flushed_entry_id":10,
414            "flushed_sequence":10,
415            "compaction_time_window":null,
416            "files_to_add":[
417            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
418            ],
419            "files_to_remove":[
420            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
421            ]
422        }"#;
423        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
424
425        let region_change = r#" {
426            "metadata":{
427                "column_metadatas":[
428                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
429                ],
430                "primary_key":[1],
431                "region_id":5299989648942,
432                "schema_version":0
433            }
434            }"#;
435        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
436
437        let region_remove = r#"{"region_id":42}"#;
438        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
439    }
440
441    #[test]
442    fn test_region_manifest_builder() {
443        // TODO(ruihang): port this test case
444    }
445
446    #[test]
447    fn test_encode_decode_region_checkpoint() {
448        // TODO(ruihang): port this test case
449    }
450
451    #[test]
452    fn test_region_manifest_compatibility() {
453        // Test deserializing RegionManifest from old schema where FileId is a UUID string
454        let region_manifest_json = r#"{
455            "metadata": {
456                "column_metadatas": [
457                    {
458                        "column_schema": {
459                            "name": "a",
460                            "data_type": {"Int64": {}},
461                            "is_nullable": false,
462                            "is_time_index": false,
463                            "default_constraint": null,
464                            "metadata": {}
465                        },
466                        "semantic_type": "Tag",
467                        "column_id": 1
468                    },
469                    {
470                        "column_schema": {
471                            "name": "b",
472                            "data_type": {"Float64": {}},
473                            "is_nullable": false,
474                            "is_time_index": false,
475                            "default_constraint": null,
476                            "metadata": {}
477                        },
478                        "semantic_type": "Field",
479                        "column_id": 2
480                    },
481                    {
482                        "column_schema": {
483                            "name": "c",
484                            "data_type": {"Timestamp": {"Millisecond": null}},
485                            "is_nullable": false,
486                            "is_time_index": false,
487                            "default_constraint": null,
488                            "metadata": {}
489                        },
490                        "semantic_type": "Timestamp",
491                        "column_id": 3
492                    }
493                ],
494                "primary_key": [1],
495                "region_id": 4402341478400,
496                "schema_version": 0
497            },
498            "files": {
499                "4b220a70-2b03-4641-9687-b65d94641208": {
500                    "region_id": 4402341478400,
501                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
502                    "time_range": [
503                        {"value": 1451609210000, "unit": "Millisecond"},
504                        {"value": 1451609520000, "unit": "Millisecond"}
505                    ],
506                    "level": 1,
507                    "file_size": 100
508                },
509                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
510                    "region_id": 4402341478400,
511                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
512                    "time_range": [
513                        {"value": 1451609210000, "unit": "Millisecond"},
514                        {"value": 1451609520000, "unit": "Millisecond"}
515                    ],
516                    "level": 0,
517                    "file_size": 100
518                }
519            },
520            "flushed_entry_id": 10,
521            "flushed_sequence": 20,
522            "manifest_version": 1,
523            "truncated_entry_id": null,
524            "compaction_time_window": null
525        }"#;
526
527        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
528
529        // Verify that the files were correctly deserialized
530        assert_eq!(manifest.files.len(), 2);
531        assert_eq!(manifest.flushed_entry_id, 10);
532        assert_eq!(manifest.flushed_sequence, 20);
533        assert_eq!(manifest.manifest_version, 1);
534
535        // Verify that FileIds were correctly parsed from UUID strings
536        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
537        file_ids.sort_unstable();
538        assert_eq!(
539            file_ids,
540            vec![
541                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
542                "4b220a70-2b03-4641-9687-b65d94641208",
543            ]
544        );
545
546        // Roundtrip test with current FileId format
547        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
548        let deserialized_manifest: RegionManifest =
549            serde_json::from_str(&serialized_manifest).unwrap();
550        assert_eq!(manifest, deserialized_manifest);
551        assert_ne!(serialized_manifest, region_manifest_json);
552    }
553
554    #[test]
555    fn test_region_truncate_compat() {
556        // Test deserializing RegionTruncate from old schema
557        let region_truncate_json = r#"{
558            "region_id": 4402341478400,
559            "truncated_entry_id": 10,
560            "truncated_sequence": 20
561        }"#;
562
563        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
564        assert_eq!(truncate_v1.region_id, 4402341478400);
565        assert_eq!(
566            truncate_v1.kind,
567            TruncateKind::All {
568                truncated_entry_id: 10,
569                truncated_sequence: 20,
570            }
571        );
572
573        // Test deserializing RegionTruncate from new schema
574        let region_truncate_v2_json = r#"{
575    "region_id": 4402341478400,
576    "files_to_remove": [
577        {
578            "region_id": 4402341478400,
579            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
580            "time_range": [
581                {
582                    "value": 1451609210000,
583                    "unit": "Millisecond"
584                },
585                {
586                    "value": 1451609520000,
587                    "unit": "Millisecond"
588                }
589            ],
590            "level": 1,
591            "file_size": 100
592        }
593    ]
594}"#;
595
596        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
597        assert_eq!(truncate_v2.region_id, 4402341478400);
598        assert_eq!(
599            truncate_v2.kind,
600            TruncateKind::Partial {
601                files_to_remove: vec![FileMeta {
602                    region_id: RegionId::from_u64(4402341478400),
603                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
604                    time_range: (
605                        Timestamp::new_millisecond(1451609210000),
606                        Timestamp::new_millisecond(1451609520000)
607                    ),
608                    level: 1,
609                    file_size: 100,
610                    ..Default::default()
611                }]
612            }
613        );
614    }
615
616    #[test]
617    fn test_region_manifest_removed_files() {
618        let region_metadata = r#"{
619                "column_metadatas": [
620                    {
621                        "column_schema": {
622                            "name": "a",
623                            "data_type": {"Int64": {}},
624                            "is_nullable": false,
625                            "is_time_index": false,
626                            "default_constraint": null,
627                            "metadata": {}
628                        },
629                        "semantic_type": "Tag",
630                        "column_id": 1
631                    },
632                    {
633                        "column_schema": {
634                            "name": "b",
635                            "data_type": {"Float64": {}},
636                            "is_nullable": false,
637                            "is_time_index": false,
638                            "default_constraint": null,
639                            "metadata": {}
640                        },
641                        "semantic_type": "Field",
642                        "column_id": 2
643                    },
644                    {
645                        "column_schema": {
646                            "name": "c",
647                            "data_type": {"Timestamp": {"Millisecond": null}},
648                            "is_nullable": false,
649                            "is_time_index": false,
650                            "default_constraint": null,
651                            "metadata": {}
652                        },
653                        "semantic_type": "Timestamp",
654                        "column_id": 3
655                    }
656                ],
657                "primary_key": [1],
658                "region_id": 4402341478400,
659                "schema_version": 0
660            }"#;
661
662        let metadata: RegionMetadataRef =
663            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
664        let manifest = RegionManifest {
665            metadata: metadata.clone(),
666            files: HashMap::new(),
667            flushed_entry_id: 0,
668            flushed_sequence: 0,
669            manifest_version: 0,
670            truncated_entry_id: None,
671            compaction_time_window: None,
672            removed_files: RemovedFilesRecord {
673                removed_files: vec![RemovedFiles {
674                    removed_at: 0,
675                    file_ids: HashSet::from([FileId::parse_str(
676                        "4b220a70-2b03-4641-9687-b65d94641208",
677                    )
678                    .unwrap()]),
679                }],
680            },
681        };
682
683        let json = serde_json::to_string(&manifest).unwrap();
684        let new: RegionManifest = serde_json::from_str(&json).unwrap();
685
686        assert_eq!(manifest, new);
687    }
688
689    /// Test if old version can still be deserialized then serialized to the new version.
690    #[test]
691    fn test_old_region_manifest_compat() {
692        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
693        pub struct RegionManifestV1 {
694            /// Metadata of the region.
695            pub metadata: RegionMetadataRef,
696            /// SST files.
697            pub files: HashMap<FileId, FileMeta>,
698            /// Last WAL entry id of flushed data.
699            pub flushed_entry_id: EntryId,
700            /// Last sequence of flushed data.
701            pub flushed_sequence: SequenceNumber,
702            /// Current manifest version.
703            pub manifest_version: ManifestVersion,
704            /// Last WAL entry id of truncated data.
705            pub truncated_entry_id: Option<EntryId>,
706            /// Inferred compaction time window.
707            #[serde(with = "humantime_serde")]
708            pub compaction_time_window: Option<Duration>,
709        }
710
711        let region_metadata = r#"{
712                "column_metadatas": [
713                    {
714                        "column_schema": {
715                            "name": "a",
716                            "data_type": {"Int64": {}},
717                            "is_nullable": false,
718                            "is_time_index": false,
719                            "default_constraint": null,
720                            "metadata": {}
721                        },
722                        "semantic_type": "Tag",
723                        "column_id": 1
724                    },
725                    {
726                        "column_schema": {
727                            "name": "b",
728                            "data_type": {"Float64": {}},
729                            "is_nullable": false,
730                            "is_time_index": false,
731                            "default_constraint": null,
732                            "metadata": {}
733                        },
734                        "semantic_type": "Field",
735                        "column_id": 2
736                    },
737                    {
738                        "column_schema": {
739                            "name": "c",
740                            "data_type": {"Timestamp": {"Millisecond": null}},
741                            "is_nullable": false,
742                            "is_time_index": false,
743                            "default_constraint": null,
744                            "metadata": {}
745                        },
746                        "semantic_type": "Timestamp",
747                        "column_id": 3
748                    }
749                ],
750                "primary_key": [1],
751                "region_id": 4402341478400,
752                "schema_version": 0
753            }"#;
754
755        let metadata: RegionMetadataRef =
756            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
757
758        // first test v1 empty to new
759        let v1 = RegionManifestV1 {
760            metadata: metadata.clone(),
761            files: HashMap::new(),
762            flushed_entry_id: 0,
763            flushed_sequence: 0,
764            manifest_version: 0,
765            truncated_entry_id: None,
766            compaction_time_window: None,
767        };
768        let json = serde_json::to_string(&v1).unwrap();
769        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
770        assert_eq!(
771            new_from_old,
772            RegionManifest {
773                metadata: metadata.clone(),
774                files: HashMap::new(),
775                removed_files: Default::default(),
776                flushed_entry_id: 0,
777                flushed_sequence: 0,
778                manifest_version: 0,
779                truncated_entry_id: None,
780                compaction_time_window: None,
781            }
782        );
783
784        let new_manifest = RegionManifest {
785            metadata: metadata.clone(),
786            files: HashMap::new(),
787            removed_files: Default::default(),
788            flushed_entry_id: 0,
789            flushed_sequence: 0,
790            manifest_version: 0,
791            truncated_entry_id: None,
792            compaction_time_window: None,
793        };
794        let json = serde_json::to_string(&new_manifest).unwrap();
795        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
796        assert_eq!(
797            old_from_new,
798            RegionManifestV1 {
799                metadata: metadata.clone(),
800                files: HashMap::new(),
801                flushed_entry_id: 0,
802                flushed_sequence: 0,
803                manifest_version: 0,
804                truncated_entry_id: None,
805                compaction_time_window: None,
806            }
807        );
808
809        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
810        pub struct RegionEditV1 {
811            pub files_to_add: Vec<FileMeta>,
812            pub files_to_remove: Vec<FileMeta>,
813            #[serde(with = "humantime_serde")]
814            pub compaction_time_window: Option<Duration>,
815            pub flushed_entry_id: Option<EntryId>,
816            pub flushed_sequence: Option<SequenceNumber>,
817        }
818
819        /// Last data truncated in the region.
820        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
821        pub struct RegionTruncateV1 {
822            pub region_id: RegionId,
823            pub kind: TruncateKind,
824        }
825
826        let json = serde_json::to_string(&RegionEditV1 {
827            files_to_add: vec![],
828            files_to_remove: vec![],
829            compaction_time_window: None,
830            flushed_entry_id: None,
831            flushed_sequence: None,
832        })
833        .unwrap();
834        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
835        assert_eq!(
836            RegionEdit {
837                files_to_add: vec![],
838                files_to_remove: vec![],
839                timestamp_ms: None,
840                compaction_time_window: None,
841                flushed_entry_id: None,
842                flushed_sequence: None,
843            },
844            new_from_old
845        );
846
847        // test new version with timestamp_ms set can deserialize to old version
848        let new = RegionEdit {
849            files_to_add: vec![],
850            files_to_remove: vec![],
851            timestamp_ms: Some(42),
852            compaction_time_window: None,
853            flushed_entry_id: None,
854            flushed_sequence: None,
855        };
856
857        let new_json = serde_json::to_string(&new).unwrap();
858
859        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
860        assert_eq!(
861            RegionEditV1 {
862                files_to_add: vec![],
863                files_to_remove: vec![],
864                compaction_time_window: None,
865                flushed_entry_id: None,
866                flushed_sequence: None,
867            },
868            old_from_new
869        );
870    }
871}