mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{
29    DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
30};
31use crate::manifest::manager::RemoveFileOptions;
32use crate::sst::FormatType;
33use crate::sst::file::FileMeta;
34use crate::wal::EntryId;
35
36/// Actions that can be applied to region manifest.
37#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
38pub enum RegionMetaAction {
39    /// Change region's metadata for request like ALTER
40    Change(RegionChange),
41    /// Edit region's state for changing options or file list.
42    Edit(RegionEdit),
43    /// Remove the region.
44    Remove(RegionRemove),
45    /// Truncate the region.
46    Truncate(RegionTruncate),
47}
48
49#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
50pub struct RegionChange {
51    /// The metadata after changed.
52    pub metadata: RegionMetadataRef,
53    /// Format of the SST.
54    #[serde(default)]
55    pub sst_format: FormatType,
56}
57
58#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
59pub struct RegionEdit {
60    pub files_to_add: Vec<FileMeta>,
61    pub files_to_remove: Vec<FileMeta>,
62    /// event unix timestamp in milliseconds, help to determine file deletion time.
63    #[serde(default)]
64    pub timestamp_ms: Option<i64>,
65    #[serde(with = "humantime_serde")]
66    pub compaction_time_window: Option<Duration>,
67    pub flushed_entry_id: Option<EntryId>,
68    pub flushed_sequence: Option<SequenceNumber>,
69    pub committed_sequence: Option<SequenceNumber>,
70}
71
72#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
73pub struct RegionRemove {
74    pub region_id: RegionId,
75}
76
77/// Last data truncated in the region.
78///
79#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
80pub struct RegionTruncate {
81    pub region_id: RegionId,
82    #[serde(flatten)]
83    pub kind: TruncateKind,
84    /// event unix timestamp in milliseconds, help to determine file deletion time.
85    #[serde(default)]
86    pub timestamp_ms: Option<i64>,
87}
88
89/// The kind of truncate operation.
90#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
91#[serde(untagged)]
92pub enum TruncateKind {
93    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
94    All {
95        /// Last WAL entry id of truncated data.
96        truncated_entry_id: EntryId,
97        // Last sequence number of truncated data.
98        truncated_sequence: SequenceNumber,
99    },
100    /// Only remove certain files in the region.
101    Partial { files_to_remove: Vec<FileMeta> },
102}
103
104/// The region manifest data.
105#[derive(Serialize, Deserialize, Clone, Debug)]
106#[cfg_attr(test, derive(Eq))]
107pub struct RegionManifest {
108    /// Metadata of the region.
109    pub metadata: RegionMetadataRef,
110    /// SST files.
111    pub files: HashMap<FileId, FileMeta>,
112    /// Removed files, which are not in the current manifest but may still be kept for a while.
113    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
114    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
115    ///
116    /// Using same checkpoint files and action files, the recovered manifest may differ in this
117    /// `removed_files` field, because the checkpointer may evict some removed files using
118    /// current machine time. This is acceptable because the removed files are not used in normal
119    /// read/write path.
120    ///
121    #[serde(default)]
122    pub removed_files: RemovedFilesRecord,
123    /// Last WAL entry id of flushed data.
124    pub flushed_entry_id: EntryId,
125    /// Last sequence of flushed data.
126    pub flushed_sequence: SequenceNumber,
127    pub committed_sequence: Option<SequenceNumber>,
128    /// Current manifest version.
129    pub manifest_version: ManifestVersion,
130    /// Last WAL entry id of truncated data.
131    pub truncated_entry_id: Option<EntryId>,
132    /// Inferred compaction time window.
133    #[serde(with = "humantime_serde")]
134    pub compaction_time_window: Option<Duration>,
135    /// Format of the SST file.
136    #[serde(default)]
137    pub sst_format: FormatType,
138}
139
140#[cfg(test)]
141impl PartialEq for RegionManifest {
142    fn eq(&self, other: &Self) -> bool {
143        self.metadata == other.metadata
144            && self.files == other.files
145            && self.flushed_entry_id == other.flushed_entry_id
146            && self.flushed_sequence == other.flushed_sequence
147            && self.manifest_version == other.manifest_version
148            && self.truncated_entry_id == other.truncated_entry_id
149            && self.compaction_time_window == other.compaction_time_window
150            && self.committed_sequence == other.committed_sequence
151    }
152}
153
154#[derive(Debug, Default)]
155pub struct RegionManifestBuilder {
156    metadata: Option<RegionMetadataRef>,
157    files: HashMap<FileId, FileMeta>,
158    pub removed_files: RemovedFilesRecord,
159    flushed_entry_id: EntryId,
160    flushed_sequence: SequenceNumber,
161    manifest_version: ManifestVersion,
162    truncated_entry_id: Option<EntryId>,
163    compaction_time_window: Option<Duration>,
164    committed_sequence: Option<SequenceNumber>,
165    sst_format: FormatType,
166}
167
168impl RegionManifestBuilder {
169    /// Start with a checkpoint.
170    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
171        if let Some(s) = checkpoint {
172            Self {
173                metadata: Some(s.metadata),
174                files: s.files,
175                removed_files: s.removed_files,
176                flushed_entry_id: s.flushed_entry_id,
177                manifest_version: s.manifest_version,
178                flushed_sequence: s.flushed_sequence,
179                truncated_entry_id: s.truncated_entry_id,
180                compaction_time_window: s.compaction_time_window,
181                committed_sequence: s.committed_sequence,
182                sst_format: s.sst_format,
183            }
184        } else {
185            Default::default()
186        }
187    }
188
189    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
190        self.metadata = Some(change.metadata);
191        self.manifest_version = manifest_version;
192        self.sst_format = change.sst_format;
193    }
194
195    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
196        self.manifest_version = manifest_version;
197        for file in edit.files_to_add {
198            self.files.insert(file.file_id, file);
199        }
200        self.removed_files.add_removed_files(
201            edit.files_to_remove
202                .iter()
203                .map(|meta| meta.file_id)
204                .collect(),
205            edit.timestamp_ms
206                .unwrap_or_else(|| Utc::now().timestamp_millis()),
207        );
208        for file in edit.files_to_remove {
209            self.files.remove(&file.file_id);
210        }
211        if let Some(flushed_entry_id) = edit.flushed_entry_id {
212            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
213        }
214        if let Some(flushed_sequence) = edit.flushed_sequence {
215            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
216        }
217
218        if let Some(committed_sequence) = edit.committed_sequence {
219            self.committed_sequence = Some(
220                self.committed_sequence
221                    .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
222            );
223        }
224        if let Some(window) = edit.compaction_time_window {
225            self.compaction_time_window = Some(window);
226        }
227    }
228
229    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
230        self.manifest_version = manifest_version;
231        match truncate.kind {
232            TruncateKind::All {
233                truncated_entry_id,
234                truncated_sequence,
235            } => {
236                self.flushed_entry_id = truncated_entry_id;
237                self.flushed_sequence = truncated_sequence;
238                self.truncated_entry_id = Some(truncated_entry_id);
239                self.files.clear();
240                self.removed_files.add_removed_files(
241                    self.files.values().map(|meta| meta.file_id).collect(),
242                    truncate
243                        .timestamp_ms
244                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
245                );
246            }
247            TruncateKind::Partial { files_to_remove } => {
248                self.removed_files.add_removed_files(
249                    files_to_remove.iter().map(|meta| meta.file_id).collect(),
250                    truncate
251                        .timestamp_ms
252                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
253                );
254                for file in files_to_remove {
255                    self.files.remove(&file.file_id);
256                }
257            }
258        }
259    }
260
261    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
262        &self.files
263    }
264
265    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
266    pub fn contains_metadata(&self) -> bool {
267        self.metadata.is_some()
268    }
269
270    pub fn try_build(self) -> Result<RegionManifest> {
271        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
272        Ok(RegionManifest {
273            metadata,
274            files: self.files,
275            removed_files: self.removed_files,
276            flushed_entry_id: self.flushed_entry_id,
277            flushed_sequence: self.flushed_sequence,
278            committed_sequence: self.committed_sequence,
279            manifest_version: self.manifest_version,
280            truncated_entry_id: self.truncated_entry_id,
281            compaction_time_window: self.compaction_time_window,
282            sst_format: self.sst_format,
283        })
284    }
285}
286
287/// A record of removed files in the region manifest.
288/// This is used to keep track of files that have been removed from the manifest but may still
289/// be kept for a while
290#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
291pub struct RemovedFilesRecord {
292    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
293    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
294    pub removed_files: Vec<RemovedFiles>,
295}
296
297#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
298pub struct RemovedFiles {
299    /// The timestamp is the time when
300    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
301    pub removed_at: i64,
302    /// The set of file ids that are removed.
303    pub file_ids: HashSet<FileId>,
304}
305
306impl RemovedFilesRecord {
307    /// Add a record of removed files with the current timestamp.
308    pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
309        self.removed_files.push(RemovedFiles {
310            removed_at: at,
311            file_ids,
312        });
313    }
314
315    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
316        let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
317        if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
318            return Ok(());
319        }
320
321        let mut cur_file_cnt = total_removed_files;
322
323        let can_evict_until = chrono::Utc::now()
324            - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
325                input: opt.keep_ttl,
326            })?;
327
328        self.removed_files.sort_unstable_by_key(|f| f.removed_at);
329        let updated = std::mem::take(&mut self.removed_files)
330            .into_iter()
331            .filter_map(|f| {
332                if f.removed_at < can_evict_until.timestamp_millis()
333                    && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
334                {
335                    // can evict all files
336                    // TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent.
337                    cur_file_cnt -= f.file_ids.len();
338                    None
339                } else {
340                    Some(f)
341                }
342            })
343            .collect();
344        self.removed_files = updated;
345
346        Ok(())
347    }
348}
349
350// The checkpoint of region manifest, generated by checkpointer.
351#[derive(Serialize, Deserialize, Debug, Clone)]
352#[cfg_attr(test, derive(PartialEq, Eq))]
353pub struct RegionCheckpoint {
354    /// The last manifest version that this checkpoint compacts(inclusive).
355    pub last_version: ManifestVersion,
356    // The number of manifest actions that this checkpoint compacts.
357    pub compacted_actions: usize,
358    // The checkpoint data
359    pub checkpoint: Option<RegionManifest>,
360}
361
362impl RegionCheckpoint {
363    pub fn last_version(&self) -> ManifestVersion {
364        self.last_version
365    }
366
367    pub fn encode(&self) -> Result<Vec<u8>> {
368        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
369
370        Ok(json.into_bytes())
371    }
372
373    pub fn decode(bytes: &[u8]) -> Result<Self> {
374        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
375
376        serde_json::from_str(data).context(SerdeJsonSnafu)
377    }
378}
379
380#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
381pub struct RegionMetaActionList {
382    pub actions: Vec<RegionMetaAction>,
383}
384
385impl RegionMetaActionList {
386    pub fn with_action(action: RegionMetaAction) -> Self {
387        Self {
388            actions: vec![action],
389        }
390    }
391
392    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
393        Self { actions }
394    }
395
396    pub fn into_region_edit(self) -> RegionEdit {
397        let mut edit = RegionEdit {
398            files_to_add: Vec::new(),
399            files_to_remove: Vec::new(),
400            timestamp_ms: None,
401            compaction_time_window: None,
402            flushed_entry_id: None,
403            flushed_sequence: None,
404            committed_sequence: None,
405        };
406
407        for action in self.actions {
408            if let RegionMetaAction::Edit(region_edit) = action {
409                // Merge file adds/removes
410                edit.files_to_add.extend(region_edit.files_to_add);
411                edit.files_to_remove.extend(region_edit.files_to_remove);
412                // Max of flushed entry id / sequence
413                if let Some(eid) = region_edit.flushed_entry_id {
414                    edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
415                }
416                if let Some(seq) = region_edit.flushed_sequence {
417                    edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
418                }
419                if let Some(seq) = region_edit.committed_sequence {
420                    edit.committed_sequence =
421                        Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
422                }
423                // Prefer the latest non-none time window
424                if region_edit.compaction_time_window.is_some() {
425                    edit.compaction_time_window = region_edit.compaction_time_window;
426                }
427            }
428        }
429
430        edit
431    }
432}
433
434impl RegionMetaActionList {
435    /// Encode self into json in the form of string lines.
436    pub fn encode(&self) -> Result<Vec<u8>> {
437        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
438
439        Ok(json.into_bytes())
440    }
441
442    pub fn decode(bytes: &[u8]) -> Result<Self> {
443        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
444
445        serde_json::from_str(data).context(SerdeJsonSnafu)
446    }
447}
448
449#[cfg(test)]
450mod tests {
451
452    use common_time::Timestamp;
453
454    use super::*;
455
456    // These tests are used to ensure backward compatibility of manifest files.
457    // DO NOT modify the serialized string when they fail, check if your
458    // modification to manifest-related structs is compatible with older manifests.
459    #[test]
460    fn test_region_action_compatibility() {
461        let region_edit = r#"{
462            "flushed_entry_id":null,
463            "compaction_time_window":null,
464            "files_to_add":[
465            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
466            ],
467            "files_to_remove":[
468            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
469            ]
470        }"#;
471        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
472
473        let region_edit = r#"{
474            "flushed_entry_id":10,
475            "flushed_sequence":10,
476            "compaction_time_window":null,
477            "files_to_add":[
478            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
479            ],
480            "files_to_remove":[
481            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
482            ]
483        }"#;
484        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
485
486        // Note: For backward compatibility, the test accepts a RegionChange without sst_format
487        let region_change = r#" {
488            "metadata":{
489                "column_metadatas":[
490                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
491                ],
492                "primary_key":[1],
493                "region_id":5299989648942,
494                "schema_version":0
495            }
496            }"#;
497        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
498
499        let region_remove = r#"{"region_id":42}"#;
500        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
501    }
502
503    #[test]
504    fn test_region_manifest_builder() {
505        // TODO(ruihang): port this test case
506    }
507
508    #[test]
509    fn test_encode_decode_region_checkpoint() {
510        // TODO(ruihang): port this test case
511    }
512
513    #[test]
514    fn test_region_manifest_compatibility() {
515        // Test deserializing RegionManifest from old schema where FileId is a UUID string
516        let region_manifest_json = r#"{
517            "metadata": {
518                "column_metadatas": [
519                    {
520                        "column_schema": {
521                            "name": "a",
522                            "data_type": {"Int64": {}},
523                            "is_nullable": false,
524                            "is_time_index": false,
525                            "default_constraint": null,
526                            "metadata": {}
527                        },
528                        "semantic_type": "Tag",
529                        "column_id": 1
530                    },
531                    {
532                        "column_schema": {
533                            "name": "b",
534                            "data_type": {"Float64": {}},
535                            "is_nullable": false,
536                            "is_time_index": false,
537                            "default_constraint": null,
538                            "metadata": {}
539                        },
540                        "semantic_type": "Field",
541                        "column_id": 2
542                    },
543                    {
544                        "column_schema": {
545                            "name": "c",
546                            "data_type": {"Timestamp": {"Millisecond": null}},
547                            "is_nullable": false,
548                            "is_time_index": false,
549                            "default_constraint": null,
550                            "metadata": {}
551                        },
552                        "semantic_type": "Timestamp",
553                        "column_id": 3
554                    }
555                ],
556                "primary_key": [1],
557                "region_id": 4402341478400,
558                "schema_version": 0
559            },
560            "files": {
561                "4b220a70-2b03-4641-9687-b65d94641208": {
562                    "region_id": 4402341478400,
563                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
564                    "time_range": [
565                        {"value": 1451609210000, "unit": "Millisecond"},
566                        {"value": 1451609520000, "unit": "Millisecond"}
567                    ],
568                    "level": 1,
569                    "file_size": 100
570                },
571                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
572                    "region_id": 4402341478400,
573                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
574                    "time_range": [
575                        {"value": 1451609210000, "unit": "Millisecond"},
576                        {"value": 1451609520000, "unit": "Millisecond"}
577                    ],
578                    "level": 0,
579                    "file_size": 100
580                }
581            },
582            "flushed_entry_id": 10,
583            "flushed_sequence": 20,
584            "manifest_version": 1,
585            "truncated_entry_id": null,
586            "compaction_time_window": null
587        }"#;
588
589        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
590
591        // Verify that the files were correctly deserialized
592        assert_eq!(manifest.files.len(), 2);
593        assert_eq!(manifest.flushed_entry_id, 10);
594        assert_eq!(manifest.flushed_sequence, 20);
595        assert_eq!(manifest.manifest_version, 1);
596
597        // Verify that FileIds were correctly parsed from UUID strings
598        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
599        file_ids.sort_unstable();
600        assert_eq!(
601            file_ids,
602            vec![
603                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
604                "4b220a70-2b03-4641-9687-b65d94641208",
605            ]
606        );
607
608        // Roundtrip test with current FileId format
609        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
610        let deserialized_manifest: RegionManifest =
611            serde_json::from_str(&serialized_manifest).unwrap();
612        assert_eq!(manifest, deserialized_manifest);
613        assert_ne!(serialized_manifest, region_manifest_json);
614    }
615
616    #[test]
617    fn test_region_truncate_compat() {
618        // Test deserializing RegionTruncate from old schema
619        let region_truncate_json = r#"{
620            "region_id": 4402341478400,
621            "truncated_entry_id": 10,
622            "truncated_sequence": 20
623        }"#;
624
625        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
626        assert_eq!(truncate_v1.region_id, 4402341478400);
627        assert_eq!(
628            truncate_v1.kind,
629            TruncateKind::All {
630                truncated_entry_id: 10,
631                truncated_sequence: 20,
632            }
633        );
634
635        // Test deserializing RegionTruncate from new schema
636        let region_truncate_v2_json = r#"{
637    "region_id": 4402341478400,
638    "files_to_remove": [
639        {
640            "region_id": 4402341478400,
641            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
642            "time_range": [
643                {
644                    "value": 1451609210000,
645                    "unit": "Millisecond"
646                },
647                {
648                    "value": 1451609520000,
649                    "unit": "Millisecond"
650                }
651            ],
652            "level": 1,
653            "file_size": 100
654        }
655    ]
656}"#;
657
658        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
659        assert_eq!(truncate_v2.region_id, 4402341478400);
660        assert_eq!(
661            truncate_v2.kind,
662            TruncateKind::Partial {
663                files_to_remove: vec![FileMeta {
664                    region_id: RegionId::from_u64(4402341478400),
665                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
666                    time_range: (
667                        Timestamp::new_millisecond(1451609210000),
668                        Timestamp::new_millisecond(1451609520000)
669                    ),
670                    level: 1,
671                    file_size: 100,
672                    ..Default::default()
673                }]
674            }
675        );
676    }
677
678    #[test]
679    fn test_region_manifest_removed_files() {
680        let region_metadata = r#"{
681                "column_metadatas": [
682                    {
683                        "column_schema": {
684                            "name": "a",
685                            "data_type": {"Int64": {}},
686                            "is_nullable": false,
687                            "is_time_index": false,
688                            "default_constraint": null,
689                            "metadata": {}
690                        },
691                        "semantic_type": "Tag",
692                        "column_id": 1
693                    },
694                    {
695                        "column_schema": {
696                            "name": "b",
697                            "data_type": {"Float64": {}},
698                            "is_nullable": false,
699                            "is_time_index": false,
700                            "default_constraint": null,
701                            "metadata": {}
702                        },
703                        "semantic_type": "Field",
704                        "column_id": 2
705                    },
706                    {
707                        "column_schema": {
708                            "name": "c",
709                            "data_type": {"Timestamp": {"Millisecond": null}},
710                            "is_nullable": false,
711                            "is_time_index": false,
712                            "default_constraint": null,
713                            "metadata": {}
714                        },
715                        "semantic_type": "Timestamp",
716                        "column_id": 3
717                    }
718                ],
719                "primary_key": [1],
720                "region_id": 4402341478400,
721                "schema_version": 0
722            }"#;
723
724        let metadata: RegionMetadataRef =
725            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
726        let manifest = RegionManifest {
727            metadata: metadata.clone(),
728            files: HashMap::new(),
729            flushed_entry_id: 0,
730            flushed_sequence: 0,
731            committed_sequence: None,
732            manifest_version: 0,
733            truncated_entry_id: None,
734            compaction_time_window: None,
735            removed_files: RemovedFilesRecord {
736                removed_files: vec![RemovedFiles {
737                    removed_at: 0,
738                    file_ids: HashSet::from([FileId::parse_str(
739                        "4b220a70-2b03-4641-9687-b65d94641208",
740                    )
741                    .unwrap()]),
742                }],
743            },
744            sst_format: FormatType::PrimaryKey,
745        };
746
747        let json = serde_json::to_string(&manifest).unwrap();
748        let new: RegionManifest = serde_json::from_str(&json).unwrap();
749
750        assert_eq!(manifest, new);
751    }
752
753    /// Test if old version can still be deserialized then serialized to the new version.
754    #[test]
755    fn test_old_region_manifest_compat() {
756        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
757        pub struct RegionManifestV1 {
758            /// Metadata of the region.
759            pub metadata: RegionMetadataRef,
760            /// SST files.
761            pub files: HashMap<FileId, FileMeta>,
762            /// Last WAL entry id of flushed data.
763            pub flushed_entry_id: EntryId,
764            /// Last sequence of flushed data.
765            pub flushed_sequence: SequenceNumber,
766            /// Current manifest version.
767            pub manifest_version: ManifestVersion,
768            /// Last WAL entry id of truncated data.
769            pub truncated_entry_id: Option<EntryId>,
770            /// Inferred compaction time window.
771            #[serde(with = "humantime_serde")]
772            pub compaction_time_window: Option<Duration>,
773        }
774
775        let region_metadata = r#"{
776                "column_metadatas": [
777                    {
778                        "column_schema": {
779                            "name": "a",
780                            "data_type": {"Int64": {}},
781                            "is_nullable": false,
782                            "is_time_index": false,
783                            "default_constraint": null,
784                            "metadata": {}
785                        },
786                        "semantic_type": "Tag",
787                        "column_id": 1
788                    },
789                    {
790                        "column_schema": {
791                            "name": "b",
792                            "data_type": {"Float64": {}},
793                            "is_nullable": false,
794                            "is_time_index": false,
795                            "default_constraint": null,
796                            "metadata": {}
797                        },
798                        "semantic_type": "Field",
799                        "column_id": 2
800                    },
801                    {
802                        "column_schema": {
803                            "name": "c",
804                            "data_type": {"Timestamp": {"Millisecond": null}},
805                            "is_nullable": false,
806                            "is_time_index": false,
807                            "default_constraint": null,
808                            "metadata": {}
809                        },
810                        "semantic_type": "Timestamp",
811                        "column_id": 3
812                    }
813                ],
814                "primary_key": [1],
815                "region_id": 4402341478400,
816                "schema_version": 0
817            }"#;
818
819        let metadata: RegionMetadataRef =
820            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
821
822        // first test v1 empty to new
823        let v1 = RegionManifestV1 {
824            metadata: metadata.clone(),
825            files: HashMap::new(),
826            flushed_entry_id: 0,
827            flushed_sequence: 0,
828            manifest_version: 0,
829            truncated_entry_id: None,
830            compaction_time_window: None,
831        };
832        let json = serde_json::to_string(&v1).unwrap();
833        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
834        assert_eq!(
835            new_from_old,
836            RegionManifest {
837                metadata: metadata.clone(),
838                files: HashMap::new(),
839                removed_files: Default::default(),
840                flushed_entry_id: 0,
841                flushed_sequence: 0,
842                committed_sequence: None,
843                manifest_version: 0,
844                truncated_entry_id: None,
845                compaction_time_window: None,
846                sst_format: FormatType::PrimaryKey,
847            }
848        );
849
850        let new_manifest = RegionManifest {
851            metadata: metadata.clone(),
852            files: HashMap::new(),
853            removed_files: Default::default(),
854            flushed_entry_id: 0,
855            flushed_sequence: 0,
856            committed_sequence: None,
857            manifest_version: 0,
858            truncated_entry_id: None,
859            compaction_time_window: None,
860            sst_format: FormatType::PrimaryKey,
861        };
862        let json = serde_json::to_string(&new_manifest).unwrap();
863        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
864        assert_eq!(
865            old_from_new,
866            RegionManifestV1 {
867                metadata: metadata.clone(),
868                files: HashMap::new(),
869                flushed_entry_id: 0,
870                flushed_sequence: 0,
871                manifest_version: 0,
872                truncated_entry_id: None,
873                compaction_time_window: None,
874            }
875        );
876
877        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
878        pub struct RegionEditV1 {
879            pub files_to_add: Vec<FileMeta>,
880            pub files_to_remove: Vec<FileMeta>,
881            #[serde(with = "humantime_serde")]
882            pub compaction_time_window: Option<Duration>,
883            pub flushed_entry_id: Option<EntryId>,
884            pub flushed_sequence: Option<SequenceNumber>,
885        }
886
887        let json = serde_json::to_string(&RegionEditV1 {
888            files_to_add: vec![],
889            files_to_remove: vec![],
890            compaction_time_window: None,
891            flushed_entry_id: None,
892            flushed_sequence: None,
893        })
894        .unwrap();
895        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
896        assert_eq!(
897            RegionEdit {
898                files_to_add: vec![],
899                files_to_remove: vec![],
900                timestamp_ms: None,
901                compaction_time_window: None,
902                flushed_entry_id: None,
903                flushed_sequence: None,
904                committed_sequence: None,
905            },
906            new_from_old
907        );
908
909        // test new version with timestamp_ms set can deserialize to old version
910        let new = RegionEdit {
911            files_to_add: vec![],
912            files_to_remove: vec![],
913            timestamp_ms: Some(42),
914            compaction_time_window: None,
915            flushed_entry_id: None,
916            flushed_sequence: None,
917            committed_sequence: None,
918        };
919
920        let new_json = serde_json::to_string(&new).unwrap();
921
922        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
923        assert_eq!(
924            RegionEditV1 {
925                files_to_add: vec![],
926                files_to_remove: vec![],
927                compaction_time_window: None,
928                flushed_entry_id: None,
929                flushed_sequence: None,
930            },
931            old_from_new
932        );
933    }
934
935    #[test]
936    fn test_region_change_backward_compatibility() {
937        // Test that we can deserialize a RegionChange without sst_format
938        let region_change_json = r#"{
939            "metadata": {
940                "column_metadatas": [
941                    {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
942                    {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
943                    {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
944                ],
945                "primary_key": [
946                    1
947                ],
948                "region_id": 42,
949                "schema_version": 0
950            }
951        }"#;
952
953        let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
954        assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
955
956        // Test serialization and deserialization with sst_format
957        let region_change = RegionChange {
958            metadata: region_change.metadata.clone(),
959            sst_format: FormatType::Flat,
960        };
961
962        let serialized = serde_json::to_string(&region_change).unwrap();
963        let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
964        assert_eq!(deserialized.sst_format, FormatType::Flat);
965    }
966}