mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{
29    DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
30};
31use crate::manifest::manager::RemoveFileOptions;
32use crate::sst::file::FileMeta;
33use crate::wal::EntryId;
34
35/// Actions that can be applied to region manifest.
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38    /// Change region's metadata for request like ALTER
39    Change(RegionChange),
40    /// Edit region's state for changing options or file list.
41    Edit(RegionEdit),
42    /// Remove the region.
43    Remove(RegionRemove),
44    /// Truncate the region.
45    Truncate(RegionTruncate),
46}
47
48#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
49pub struct RegionChange {
50    /// The metadata after changed.
51    pub metadata: RegionMetadataRef,
52}
53
54#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
55pub struct RegionEdit {
56    pub files_to_add: Vec<FileMeta>,
57    pub files_to_remove: Vec<FileMeta>,
58    /// event unix timestamp in milliseconds, help to determine file deletion time.
59    #[serde(default)]
60    pub timestamp_ms: Option<i64>,
61    #[serde(with = "humantime_serde")]
62    pub compaction_time_window: Option<Duration>,
63    pub flushed_entry_id: Option<EntryId>,
64    pub flushed_sequence: Option<SequenceNumber>,
65    pub committed_sequence: Option<SequenceNumber>,
66}
67
68#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
69pub struct RegionRemove {
70    pub region_id: RegionId,
71}
72
73/// Last data truncated in the region.
74///
75#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
76pub struct RegionTruncate {
77    pub region_id: RegionId,
78    #[serde(flatten)]
79    pub kind: TruncateKind,
80    /// event unix timestamp in milliseconds, help to determine file deletion time.
81    #[serde(default)]
82    pub timestamp_ms: Option<i64>,
83}
84
85/// The kind of truncate operation.
86#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
87#[serde(untagged)]
88pub enum TruncateKind {
89    /// Truncate all data in the region, marked by all data before the given entry id&sequence.
90    All {
91        /// Last WAL entry id of truncated data.
92        truncated_entry_id: EntryId,
93        // Last sequence number of truncated data.
94        truncated_sequence: SequenceNumber,
95    },
96    /// Only remove certain files in the region.
97    Partial { files_to_remove: Vec<FileMeta> },
98}
99
100/// The region manifest data.
101#[derive(Serialize, Deserialize, Clone, Debug)]
102#[cfg_attr(test, derive(Eq))]
103pub struct RegionManifest {
104    /// Metadata of the region.
105    pub metadata: RegionMetadataRef,
106    /// SST files.
107    pub files: HashMap<FileId, FileMeta>,
108    /// Removed files, which are not in the current manifest but may still be kept for a while.
109    /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
110    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
111    ///
112    /// Using same checkpoint files and action files, the recovered manifest may differ in this
113    /// `removed_files` field, because the checkpointer may evict some removed files using
114    /// current machine time. This is acceptable because the removed files are not used in normal
115    /// read/write path.
116    ///
117    #[serde(default)]
118    pub removed_files: RemovedFilesRecord,
119    /// Last WAL entry id of flushed data.
120    pub flushed_entry_id: EntryId,
121    /// Last sequence of flushed data.
122    pub flushed_sequence: SequenceNumber,
123    pub committed_sequence: Option<SequenceNumber>,
124    /// Current manifest version.
125    pub manifest_version: ManifestVersion,
126    /// Last WAL entry id of truncated data.
127    pub truncated_entry_id: Option<EntryId>,
128    /// Inferred compaction time window.
129    #[serde(with = "humantime_serde")]
130    pub compaction_time_window: Option<Duration>,
131}
132
133#[cfg(test)]
134impl PartialEq for RegionManifest {
135    fn eq(&self, other: &Self) -> bool {
136        self.metadata == other.metadata
137            && self.files == other.files
138            && self.flushed_entry_id == other.flushed_entry_id
139            && self.flushed_sequence == other.flushed_sequence
140            && self.manifest_version == other.manifest_version
141            && self.truncated_entry_id == other.truncated_entry_id
142            && self.compaction_time_window == other.compaction_time_window
143            && self.committed_sequence == other.committed_sequence
144    }
145}
146
147#[derive(Debug, Default)]
148pub struct RegionManifestBuilder {
149    metadata: Option<RegionMetadataRef>,
150    files: HashMap<FileId, FileMeta>,
151    pub removed_files: RemovedFilesRecord,
152    flushed_entry_id: EntryId,
153    flushed_sequence: SequenceNumber,
154    manifest_version: ManifestVersion,
155    truncated_entry_id: Option<EntryId>,
156    compaction_time_window: Option<Duration>,
157    committed_sequence: Option<SequenceNumber>,
158}
159
160impl RegionManifestBuilder {
161    /// Start with a checkpoint.
162    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
163        if let Some(s) = checkpoint {
164            Self {
165                metadata: Some(s.metadata),
166                files: s.files,
167                removed_files: s.removed_files,
168                flushed_entry_id: s.flushed_entry_id,
169                manifest_version: s.manifest_version,
170                flushed_sequence: s.flushed_sequence,
171                truncated_entry_id: s.truncated_entry_id,
172                compaction_time_window: s.compaction_time_window,
173                committed_sequence: s.committed_sequence,
174            }
175        } else {
176            Default::default()
177        }
178    }
179
180    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
181        self.metadata = Some(change.metadata);
182        self.manifest_version = manifest_version;
183    }
184
185    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
186        self.manifest_version = manifest_version;
187        for file in edit.files_to_add {
188            self.files.insert(file.file_id, file);
189        }
190        self.removed_files.add_removed_files(
191            edit.files_to_remove
192                .iter()
193                .map(|meta| meta.file_id)
194                .collect(),
195            edit.timestamp_ms
196                .unwrap_or_else(|| Utc::now().timestamp_millis()),
197        );
198        for file in edit.files_to_remove {
199            self.files.remove(&file.file_id);
200        }
201        if let Some(flushed_entry_id) = edit.flushed_entry_id {
202            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
203        }
204        if let Some(flushed_sequence) = edit.flushed_sequence {
205            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
206        }
207
208        if let Some(committed_sequence) = edit.committed_sequence {
209            self.committed_sequence = Some(
210                self.committed_sequence
211                    .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
212            );
213        }
214        if let Some(window) = edit.compaction_time_window {
215            self.compaction_time_window = Some(window);
216        }
217    }
218
219    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
220        self.manifest_version = manifest_version;
221        match truncate.kind {
222            TruncateKind::All {
223                truncated_entry_id,
224                truncated_sequence,
225            } => {
226                self.flushed_entry_id = truncated_entry_id;
227                self.flushed_sequence = truncated_sequence;
228                self.truncated_entry_id = Some(truncated_entry_id);
229                self.files.clear();
230                self.removed_files.add_removed_files(
231                    self.files.values().map(|meta| meta.file_id).collect(),
232                    truncate
233                        .timestamp_ms
234                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
235                );
236            }
237            TruncateKind::Partial { files_to_remove } => {
238                self.removed_files.add_removed_files(
239                    files_to_remove.iter().map(|meta| meta.file_id).collect(),
240                    truncate
241                        .timestamp_ms
242                        .unwrap_or_else(|| Utc::now().timestamp_millis()),
243                );
244                for file in files_to_remove {
245                    self.files.remove(&file.file_id);
246                }
247            }
248        }
249    }
250
251    pub fn files(&self) -> &HashMap<FileId, FileMeta> {
252        &self.files
253    }
254
255    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
256    pub fn contains_metadata(&self) -> bool {
257        self.metadata.is_some()
258    }
259
260    pub fn try_build(self) -> Result<RegionManifest> {
261        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
262        Ok(RegionManifest {
263            metadata,
264            files: self.files,
265            removed_files: self.removed_files,
266            flushed_entry_id: self.flushed_entry_id,
267            flushed_sequence: self.flushed_sequence,
268            committed_sequence: self.committed_sequence,
269            manifest_version: self.manifest_version,
270            truncated_entry_id: self.truncated_entry_id,
271            compaction_time_window: self.compaction_time_window,
272        })
273    }
274}
275
276/// A record of removed files in the region manifest.
277/// This is used to keep track of files that have been removed from the manifest but may still
278/// be kept for a while
279#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
280pub struct RemovedFilesRecord {
281    /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
282    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
283    pub removed_files: Vec<RemovedFiles>,
284}
285
286#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
287pub struct RemovedFiles {
288    /// The timestamp is the time when
289    /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
290    pub removed_at: i64,
291    /// The set of file ids that are removed.
292    pub file_ids: HashSet<FileId>,
293}
294
295impl RemovedFilesRecord {
296    /// Add a record of removed files with the current timestamp.
297    pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
298        self.removed_files.push(RemovedFiles {
299            removed_at: at,
300            file_ids,
301        });
302    }
303
304    pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
305        let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
306        if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
307            return Ok(());
308        }
309
310        let mut cur_file_cnt = total_removed_files;
311
312        let can_evict_until = chrono::Utc::now()
313            - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
314                input: opt.keep_ttl,
315            })?;
316
317        self.removed_files.sort_unstable_by_key(|f| f.removed_at);
318        let updated = std::mem::take(&mut self.removed_files)
319            .into_iter()
320            .filter_map(|f| {
321                if f.removed_at < can_evict_until.timestamp_millis()
322                    && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
323                {
324                    // can evict all files
325                    // TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent.
326                    cur_file_cnt -= f.file_ids.len();
327                    None
328                } else {
329                    Some(f)
330                }
331            })
332            .collect();
333        self.removed_files = updated;
334
335        Ok(())
336    }
337}
338
339// The checkpoint of region manifest, generated by checkpointer.
340#[derive(Serialize, Deserialize, Debug, Clone)]
341#[cfg_attr(test, derive(PartialEq, Eq))]
342pub struct RegionCheckpoint {
343    /// The last manifest version that this checkpoint compacts(inclusive).
344    pub last_version: ManifestVersion,
345    // The number of manifest actions that this checkpoint compacts.
346    pub compacted_actions: usize,
347    // The checkpoint data
348    pub checkpoint: Option<RegionManifest>,
349}
350
351impl RegionCheckpoint {
352    pub fn last_version(&self) -> ManifestVersion {
353        self.last_version
354    }
355
356    pub fn encode(&self) -> Result<Vec<u8>> {
357        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
358
359        Ok(json.into_bytes())
360    }
361
362    pub fn decode(bytes: &[u8]) -> Result<Self> {
363        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
364
365        serde_json::from_str(data).context(SerdeJsonSnafu)
366    }
367}
368
369#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
370pub struct RegionMetaActionList {
371    pub actions: Vec<RegionMetaAction>,
372}
373
374impl RegionMetaActionList {
375    pub fn with_action(action: RegionMetaAction) -> Self {
376        Self {
377            actions: vec![action],
378        }
379    }
380
381    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
382        Self { actions }
383    }
384
385    pub fn into_region_edit(self) -> RegionEdit {
386        let mut edit = RegionEdit {
387            files_to_add: Vec::new(),
388            files_to_remove: Vec::new(),
389            timestamp_ms: None,
390            compaction_time_window: None,
391            flushed_entry_id: None,
392            flushed_sequence: None,
393            committed_sequence: None,
394        };
395
396        for action in self.actions {
397            if let RegionMetaAction::Edit(region_edit) = action {
398                // Merge file adds/removes
399                edit.files_to_add.extend(region_edit.files_to_add);
400                edit.files_to_remove.extend(region_edit.files_to_remove);
401                // Max of flushed entry id / sequence
402                if let Some(eid) = region_edit.flushed_entry_id {
403                    edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
404                }
405                if let Some(seq) = region_edit.flushed_sequence {
406                    edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
407                }
408                if let Some(seq) = region_edit.committed_sequence {
409                    edit.committed_sequence =
410                        Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
411                }
412                // Prefer the latest non-none time window
413                if region_edit.compaction_time_window.is_some() {
414                    edit.compaction_time_window = region_edit.compaction_time_window;
415                }
416            }
417        }
418
419        edit
420    }
421}
422
423impl RegionMetaActionList {
424    /// Encode self into json in the form of string lines.
425    pub fn encode(&self) -> Result<Vec<u8>> {
426        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
427
428        Ok(json.into_bytes())
429    }
430
431    pub fn decode(bytes: &[u8]) -> Result<Self> {
432        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
433
434        serde_json::from_str(data).context(SerdeJsonSnafu)
435    }
436}
437
438#[cfg(test)]
439mod tests {
440
441    use common_time::Timestamp;
442
443    use super::*;
444
445    // These tests are used to ensure backward compatibility of manifest files.
446    // DO NOT modify the serialized string when they fail, check if your
447    // modification to manifest-related structs is compatible with older manifests.
448    #[test]
449    fn test_region_action_compatibility() {
450        let region_edit = r#"{
451            "flushed_entry_id":null,
452            "compaction_time_window":null,
453            "files_to_add":[
454            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
455            ],
456            "files_to_remove":[
457            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
458            ]
459        }"#;
460        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
461
462        let region_edit = r#"{
463            "flushed_entry_id":10,
464            "flushed_sequence":10,
465            "compaction_time_window":null,
466            "files_to_add":[
467            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
468            ],
469            "files_to_remove":[
470            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
471            ]
472        }"#;
473        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
474
475        let region_change = r#" {
476            "metadata":{
477                "column_metadatas":[
478                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
479                ],
480                "primary_key":[1],
481                "region_id":5299989648942,
482                "schema_version":0
483            }
484            }"#;
485        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
486
487        let region_remove = r#"{"region_id":42}"#;
488        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
489    }
490
491    #[test]
492    fn test_region_manifest_builder() {
493        // TODO(ruihang): port this test case
494    }
495
496    #[test]
497    fn test_encode_decode_region_checkpoint() {
498        // TODO(ruihang): port this test case
499    }
500
501    #[test]
502    fn test_region_manifest_compatibility() {
503        // Test deserializing RegionManifest from old schema where FileId is a UUID string
504        let region_manifest_json = r#"{
505            "metadata": {
506                "column_metadatas": [
507                    {
508                        "column_schema": {
509                            "name": "a",
510                            "data_type": {"Int64": {}},
511                            "is_nullable": false,
512                            "is_time_index": false,
513                            "default_constraint": null,
514                            "metadata": {}
515                        },
516                        "semantic_type": "Tag",
517                        "column_id": 1
518                    },
519                    {
520                        "column_schema": {
521                            "name": "b",
522                            "data_type": {"Float64": {}},
523                            "is_nullable": false,
524                            "is_time_index": false,
525                            "default_constraint": null,
526                            "metadata": {}
527                        },
528                        "semantic_type": "Field",
529                        "column_id": 2
530                    },
531                    {
532                        "column_schema": {
533                            "name": "c",
534                            "data_type": {"Timestamp": {"Millisecond": null}},
535                            "is_nullable": false,
536                            "is_time_index": false,
537                            "default_constraint": null,
538                            "metadata": {}
539                        },
540                        "semantic_type": "Timestamp",
541                        "column_id": 3
542                    }
543                ],
544                "primary_key": [1],
545                "region_id": 4402341478400,
546                "schema_version": 0
547            },
548            "files": {
549                "4b220a70-2b03-4641-9687-b65d94641208": {
550                    "region_id": 4402341478400,
551                    "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
552                    "time_range": [
553                        {"value": 1451609210000, "unit": "Millisecond"},
554                        {"value": 1451609520000, "unit": "Millisecond"}
555                    ],
556                    "level": 1,
557                    "file_size": 100
558                },
559                "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
560                    "region_id": 4402341478400,
561                    "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
562                    "time_range": [
563                        {"value": 1451609210000, "unit": "Millisecond"},
564                        {"value": 1451609520000, "unit": "Millisecond"}
565                    ],
566                    "level": 0,
567                    "file_size": 100
568                }
569            },
570            "flushed_entry_id": 10,
571            "flushed_sequence": 20,
572            "manifest_version": 1,
573            "truncated_entry_id": null,
574            "compaction_time_window": null
575        }"#;
576
577        let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
578
579        // Verify that the files were correctly deserialized
580        assert_eq!(manifest.files.len(), 2);
581        assert_eq!(manifest.flushed_entry_id, 10);
582        assert_eq!(manifest.flushed_sequence, 20);
583        assert_eq!(manifest.manifest_version, 1);
584
585        // Verify that FileIds were correctly parsed from UUID strings
586        let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
587        file_ids.sort_unstable();
588        assert_eq!(
589            file_ids,
590            vec![
591                "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
592                "4b220a70-2b03-4641-9687-b65d94641208",
593            ]
594        );
595
596        // Roundtrip test with current FileId format
597        let serialized_manifest = serde_json::to_string(&manifest).unwrap();
598        let deserialized_manifest: RegionManifest =
599            serde_json::from_str(&serialized_manifest).unwrap();
600        assert_eq!(manifest, deserialized_manifest);
601        assert_ne!(serialized_manifest, region_manifest_json);
602    }
603
604    #[test]
605    fn test_region_truncate_compat() {
606        // Test deserializing RegionTruncate from old schema
607        let region_truncate_json = r#"{
608            "region_id": 4402341478400,
609            "truncated_entry_id": 10,
610            "truncated_sequence": 20
611        }"#;
612
613        let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
614        assert_eq!(truncate_v1.region_id, 4402341478400);
615        assert_eq!(
616            truncate_v1.kind,
617            TruncateKind::All {
618                truncated_entry_id: 10,
619                truncated_sequence: 20,
620            }
621        );
622
623        // Test deserializing RegionTruncate from new schema
624        let region_truncate_v2_json = r#"{
625    "region_id": 4402341478400,
626    "files_to_remove": [
627        {
628            "region_id": 4402341478400,
629            "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
630            "time_range": [
631                {
632                    "value": 1451609210000,
633                    "unit": "Millisecond"
634                },
635                {
636                    "value": 1451609520000,
637                    "unit": "Millisecond"
638                }
639            ],
640            "level": 1,
641            "file_size": 100
642        }
643    ]
644}"#;
645
646        let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
647        assert_eq!(truncate_v2.region_id, 4402341478400);
648        assert_eq!(
649            truncate_v2.kind,
650            TruncateKind::Partial {
651                files_to_remove: vec![FileMeta {
652                    region_id: RegionId::from_u64(4402341478400),
653                    file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
654                    time_range: (
655                        Timestamp::new_millisecond(1451609210000),
656                        Timestamp::new_millisecond(1451609520000)
657                    ),
658                    level: 1,
659                    file_size: 100,
660                    ..Default::default()
661                }]
662            }
663        );
664    }
665
666    #[test]
667    fn test_region_manifest_removed_files() {
668        let region_metadata = r#"{
669                "column_metadatas": [
670                    {
671                        "column_schema": {
672                            "name": "a",
673                            "data_type": {"Int64": {}},
674                            "is_nullable": false,
675                            "is_time_index": false,
676                            "default_constraint": null,
677                            "metadata": {}
678                        },
679                        "semantic_type": "Tag",
680                        "column_id": 1
681                    },
682                    {
683                        "column_schema": {
684                            "name": "b",
685                            "data_type": {"Float64": {}},
686                            "is_nullable": false,
687                            "is_time_index": false,
688                            "default_constraint": null,
689                            "metadata": {}
690                        },
691                        "semantic_type": "Field",
692                        "column_id": 2
693                    },
694                    {
695                        "column_schema": {
696                            "name": "c",
697                            "data_type": {"Timestamp": {"Millisecond": null}},
698                            "is_nullable": false,
699                            "is_time_index": false,
700                            "default_constraint": null,
701                            "metadata": {}
702                        },
703                        "semantic_type": "Timestamp",
704                        "column_id": 3
705                    }
706                ],
707                "primary_key": [1],
708                "region_id": 4402341478400,
709                "schema_version": 0
710            }"#;
711
712        let metadata: RegionMetadataRef =
713            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
714        let manifest = RegionManifest {
715            metadata: metadata.clone(),
716            files: HashMap::new(),
717            flushed_entry_id: 0,
718            flushed_sequence: 0,
719            committed_sequence: None,
720            manifest_version: 0,
721            truncated_entry_id: None,
722            compaction_time_window: None,
723            removed_files: RemovedFilesRecord {
724                removed_files: vec![RemovedFiles {
725                    removed_at: 0,
726                    file_ids: HashSet::from([FileId::parse_str(
727                        "4b220a70-2b03-4641-9687-b65d94641208",
728                    )
729                    .unwrap()]),
730                }],
731            },
732        };
733
734        let json = serde_json::to_string(&manifest).unwrap();
735        let new: RegionManifest = serde_json::from_str(&json).unwrap();
736
737        assert_eq!(manifest, new);
738    }
739
740    /// Test if old version can still be deserialized then serialized to the new version.
741    #[test]
742    fn test_old_region_manifest_compat() {
743        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
744        pub struct RegionManifestV1 {
745            /// Metadata of the region.
746            pub metadata: RegionMetadataRef,
747            /// SST files.
748            pub files: HashMap<FileId, FileMeta>,
749            /// Last WAL entry id of flushed data.
750            pub flushed_entry_id: EntryId,
751            /// Last sequence of flushed data.
752            pub flushed_sequence: SequenceNumber,
753            /// Current manifest version.
754            pub manifest_version: ManifestVersion,
755            /// Last WAL entry id of truncated data.
756            pub truncated_entry_id: Option<EntryId>,
757            /// Inferred compaction time window.
758            #[serde(with = "humantime_serde")]
759            pub compaction_time_window: Option<Duration>,
760        }
761
762        let region_metadata = r#"{
763                "column_metadatas": [
764                    {
765                        "column_schema": {
766                            "name": "a",
767                            "data_type": {"Int64": {}},
768                            "is_nullable": false,
769                            "is_time_index": false,
770                            "default_constraint": null,
771                            "metadata": {}
772                        },
773                        "semantic_type": "Tag",
774                        "column_id": 1
775                    },
776                    {
777                        "column_schema": {
778                            "name": "b",
779                            "data_type": {"Float64": {}},
780                            "is_nullable": false,
781                            "is_time_index": false,
782                            "default_constraint": null,
783                            "metadata": {}
784                        },
785                        "semantic_type": "Field",
786                        "column_id": 2
787                    },
788                    {
789                        "column_schema": {
790                            "name": "c",
791                            "data_type": {"Timestamp": {"Millisecond": null}},
792                            "is_nullable": false,
793                            "is_time_index": false,
794                            "default_constraint": null,
795                            "metadata": {}
796                        },
797                        "semantic_type": "Timestamp",
798                        "column_id": 3
799                    }
800                ],
801                "primary_key": [1],
802                "region_id": 4402341478400,
803                "schema_version": 0
804            }"#;
805
806        let metadata: RegionMetadataRef =
807            serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
808
809        // first test v1 empty to new
810        let v1 = RegionManifestV1 {
811            metadata: metadata.clone(),
812            files: HashMap::new(),
813            flushed_entry_id: 0,
814            flushed_sequence: 0,
815            manifest_version: 0,
816            truncated_entry_id: None,
817            compaction_time_window: None,
818        };
819        let json = serde_json::to_string(&v1).unwrap();
820        let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
821        assert_eq!(
822            new_from_old,
823            RegionManifest {
824                metadata: metadata.clone(),
825                files: HashMap::new(),
826                removed_files: Default::default(),
827                flushed_entry_id: 0,
828                flushed_sequence: 0,
829                committed_sequence: None,
830                manifest_version: 0,
831                truncated_entry_id: None,
832                compaction_time_window: None,
833            }
834        );
835
836        let new_manifest = RegionManifest {
837            metadata: metadata.clone(),
838            files: HashMap::new(),
839            removed_files: Default::default(),
840            flushed_entry_id: 0,
841            flushed_sequence: 0,
842            committed_sequence: None,
843            manifest_version: 0,
844            truncated_entry_id: None,
845            compaction_time_window: None,
846        };
847        let json = serde_json::to_string(&new_manifest).unwrap();
848        let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
849        assert_eq!(
850            old_from_new,
851            RegionManifestV1 {
852                metadata: metadata.clone(),
853                files: HashMap::new(),
854                flushed_entry_id: 0,
855                flushed_sequence: 0,
856                manifest_version: 0,
857                truncated_entry_id: None,
858                compaction_time_window: None,
859            }
860        );
861
862        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
863        pub struct RegionEditV1 {
864            pub files_to_add: Vec<FileMeta>,
865            pub files_to_remove: Vec<FileMeta>,
866            #[serde(with = "humantime_serde")]
867            pub compaction_time_window: Option<Duration>,
868            pub flushed_entry_id: Option<EntryId>,
869            pub flushed_sequence: Option<SequenceNumber>,
870        }
871
872        /// Last data truncated in the region.
873        #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
874        pub struct RegionTruncateV1 {
875            pub region_id: RegionId,
876            pub kind: TruncateKind,
877        }
878
879        let json = serde_json::to_string(&RegionEditV1 {
880            files_to_add: vec![],
881            files_to_remove: vec![],
882            compaction_time_window: None,
883            flushed_entry_id: None,
884            flushed_sequence: None,
885        })
886        .unwrap();
887        let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
888        assert_eq!(
889            RegionEdit {
890                files_to_add: vec![],
891                files_to_remove: vec![],
892                timestamp_ms: None,
893                compaction_time_window: None,
894                flushed_entry_id: None,
895                flushed_sequence: None,
896                committed_sequence: None,
897            },
898            new_from_old
899        );
900
901        // test new version with timestamp_ms set can deserialize to old version
902        let new = RegionEdit {
903            files_to_add: vec![],
904            files_to_remove: vec![],
905            timestamp_ms: Some(42),
906            compaction_time_window: None,
907            flushed_entry_id: None,
908            flushed_sequence: None,
909            committed_sequence: None,
910        };
911
912        let new_json = serde_json::to_string(&new).unwrap();
913
914        let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
915        assert_eq!(
916            RegionEditV1 {
917                files_to_add: vec![],
918                files_to_remove: vec![],
919                compaction_time_window: None,
920                flushed_entry_id: None,
921                flushed_sequence: None,
922            },
923            old_from_new
924        );
925    }
926}