mito2/manifest/
action.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
16
17use std::collections::HashMap;
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt, ResultExt};
22use store_api::manifest::ManifestVersion;
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::{RegionId, SequenceNumber};
25use strum::Display;
26
27use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
28use crate::sst::file::{FileId, FileMeta};
29use crate::wal::EntryId;
30
31/// Actions that can be applied to region manifest.
32#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
33pub enum RegionMetaAction {
34    /// Change region's metadata for request like ALTER
35    Change(RegionChange),
36    /// Edit region's state for changing options or file list.
37    Edit(RegionEdit),
38    /// Remove the region.
39    Remove(RegionRemove),
40    /// Truncate the region.
41    Truncate(RegionTruncate),
42}
43
44#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
45pub struct RegionChange {
46    /// The metadata after changed.
47    pub metadata: RegionMetadataRef,
48}
49
50#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
51pub struct RegionEdit {
52    pub files_to_add: Vec<FileMeta>,
53    pub files_to_remove: Vec<FileMeta>,
54    #[serde(with = "humantime_serde")]
55    pub compaction_time_window: Option<Duration>,
56    pub flushed_entry_id: Option<EntryId>,
57    pub flushed_sequence: Option<SequenceNumber>,
58}
59
60#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
61pub struct RegionRemove {
62    pub region_id: RegionId,
63}
64
65/// Last data truncated in the region.
66#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
67pub struct RegionTruncate {
68    pub region_id: RegionId,
69    /// Last WAL entry id of truncated data.
70    pub truncated_entry_id: EntryId,
71    // Last sequence number of truncated data.
72    pub truncated_sequence: SequenceNumber,
73}
74
75/// The region manifest data.
76#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
77pub struct RegionManifest {
78    /// Metadata of the region.
79    pub metadata: RegionMetadataRef,
80    /// SST files.
81    pub files: HashMap<FileId, FileMeta>,
82    /// Last WAL entry id of flushed data.
83    pub flushed_entry_id: EntryId,
84    /// Last sequence of flushed data.
85    pub flushed_sequence: SequenceNumber,
86    /// Current manifest version.
87    pub manifest_version: ManifestVersion,
88    /// Last WAL entry id of truncated data.
89    pub truncated_entry_id: Option<EntryId>,
90    /// Inferred compaction time window.
91    #[serde(with = "humantime_serde")]
92    pub compaction_time_window: Option<Duration>,
93}
94
95#[derive(Debug, Default)]
96pub struct RegionManifestBuilder {
97    metadata: Option<RegionMetadataRef>,
98    files: HashMap<FileId, FileMeta>,
99    flushed_entry_id: EntryId,
100    flushed_sequence: SequenceNumber,
101    manifest_version: ManifestVersion,
102    truncated_entry_id: Option<EntryId>,
103    compaction_time_window: Option<Duration>,
104}
105
106impl RegionManifestBuilder {
107    /// Start with a checkpoint.
108    pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
109        if let Some(s) = checkpoint {
110            Self {
111                metadata: Some(s.metadata),
112                files: s.files,
113                flushed_entry_id: s.flushed_entry_id,
114                manifest_version: s.manifest_version,
115                flushed_sequence: s.flushed_sequence,
116                truncated_entry_id: s.truncated_entry_id,
117                compaction_time_window: s.compaction_time_window,
118            }
119        } else {
120            Default::default()
121        }
122    }
123
124    pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
125        self.metadata = Some(change.metadata);
126        self.manifest_version = manifest_version;
127    }
128
129    pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
130        self.manifest_version = manifest_version;
131        for file in edit.files_to_add {
132            self.files.insert(file.file_id, file);
133        }
134        for file in edit.files_to_remove {
135            self.files.remove(&file.file_id);
136        }
137        if let Some(flushed_entry_id) = edit.flushed_entry_id {
138            self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
139        }
140        if let Some(flushed_sequence) = edit.flushed_sequence {
141            self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
142        }
143        if let Some(window) = edit.compaction_time_window {
144            self.compaction_time_window = Some(window);
145        }
146    }
147
148    pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
149        self.manifest_version = manifest_version;
150        self.flushed_entry_id = truncate.truncated_entry_id;
151        self.flushed_sequence = truncate.truncated_sequence;
152        self.truncated_entry_id = Some(truncate.truncated_entry_id);
153        self.files.clear();
154    }
155
156    /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
157    pub fn contains_metadata(&self) -> bool {
158        self.metadata.is_some()
159    }
160
161    pub fn try_build(self) -> Result<RegionManifest> {
162        let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
163        Ok(RegionManifest {
164            metadata,
165            files: self.files,
166            flushed_entry_id: self.flushed_entry_id,
167            flushed_sequence: self.flushed_sequence,
168            manifest_version: self.manifest_version,
169            truncated_entry_id: self.truncated_entry_id,
170            compaction_time_window: self.compaction_time_window,
171        })
172    }
173}
174
175// The checkpoint of region manifest, generated by checkpointer.
176#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
177pub struct RegionCheckpoint {
178    /// The last manifest version that this checkpoint compacts(inclusive).
179    pub last_version: ManifestVersion,
180    // The number of manifest actions that this checkpoint compacts.
181    pub compacted_actions: usize,
182    // The checkpoint data
183    pub checkpoint: Option<RegionManifest>,
184}
185
186impl RegionCheckpoint {
187    pub fn last_version(&self) -> ManifestVersion {
188        self.last_version
189    }
190
191    pub fn encode(&self) -> Result<Vec<u8>> {
192        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
193
194        Ok(json.into_bytes())
195    }
196
197    pub fn decode(bytes: &[u8]) -> Result<Self> {
198        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
199
200        serde_json::from_str(data).context(SerdeJsonSnafu)
201    }
202}
203
204#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
205pub struct RegionMetaActionList {
206    pub actions: Vec<RegionMetaAction>,
207}
208
209impl RegionMetaActionList {
210    pub fn with_action(action: RegionMetaAction) -> Self {
211        Self {
212            actions: vec![action],
213        }
214    }
215
216    pub fn new(actions: Vec<RegionMetaAction>) -> Self {
217        Self { actions }
218    }
219}
220
221impl RegionMetaActionList {
222    /// Encode self into json in the form of string lines.
223    pub fn encode(&self) -> Result<Vec<u8>> {
224        let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
225
226        Ok(json.into_bytes())
227    }
228
229    pub fn decode(bytes: &[u8]) -> Result<Self> {
230        let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
231
232        serde_json::from_str(data).context(SerdeJsonSnafu)
233    }
234}
235
236#[cfg(test)]
237mod tests {
238
239    use super::*;
240
241    #[test]
242    fn test_encode_decode_action_list() {
243        // TODO(ruihang): port this test case
244    }
245
246    // These tests are used to ensure backward compatibility of manifest files.
247    // DO NOT modify the serialized string when they fail, check if your
248    // modification to manifest-related structs is compatible with older manifests.
249    #[test]
250    fn test_region_manifest_compatibility() {
251        let region_edit = r#"{
252            "flushed_entry_id":null,
253            "compaction_time_window":null,
254            "files_to_add":[
255            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
256            ],
257            "files_to_remove":[
258            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
259            ]
260        }"#;
261        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
262
263        let region_edit = r#"{
264            "flushed_entry_id":10,
265            "flushed_sequence":10,
266            "compaction_time_window":null,
267            "files_to_add":[
268            {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
269            ],
270            "files_to_remove":[
271            {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
272            ]
273        }"#;
274        let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
275
276        let region_change = r#" {
277            "metadata":{
278                "column_metadatas":[
279                {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
280                ],
281                "primary_key":[1],
282                "region_id":5299989648942,
283                "schema_version":0
284            }
285            }"#;
286        let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
287
288        let region_remove = r#"{"region_id":42}"#;
289        let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
290    }
291
292    #[test]
293    fn test_region_manifest_builder() {
294        // TODO(ruihang): port this test case
295    }
296
297    #[test]
298    fn test_encode_decode_region_checkpoint() {
299        // TODO(ruihang): port this test case
300    }
301}