1use std::collections::HashMap;
18use std::time::Duration;
19
20use serde::{Deserialize, Serialize};
21use snafu::{OptionExt, ResultExt};
22use store_api::manifest::ManifestVersion;
23use store_api::metadata::RegionMetadataRef;
24use store_api::storage::{RegionId, SequenceNumber};
25use strum::Display;
26
27use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
28use crate::sst::file::{FileId, FileMeta};
29use crate::wal::EntryId;
30
31#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
33pub enum RegionMetaAction {
34 Change(RegionChange),
36 Edit(RegionEdit),
38 Remove(RegionRemove),
40 Truncate(RegionTruncate),
42}
43
44#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
45pub struct RegionChange {
46 pub metadata: RegionMetadataRef,
48}
49
50#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
51pub struct RegionEdit {
52 pub files_to_add: Vec<FileMeta>,
53 pub files_to_remove: Vec<FileMeta>,
54 #[serde(with = "humantime_serde")]
55 pub compaction_time_window: Option<Duration>,
56 pub flushed_entry_id: Option<EntryId>,
57 pub flushed_sequence: Option<SequenceNumber>,
58}
59
60#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
61pub struct RegionRemove {
62 pub region_id: RegionId,
63}
64
65#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
67pub struct RegionTruncate {
68 pub region_id: RegionId,
69 pub truncated_entry_id: EntryId,
71 pub truncated_sequence: SequenceNumber,
73}
74
75#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
77pub struct RegionManifest {
78 pub metadata: RegionMetadataRef,
80 pub files: HashMap<FileId, FileMeta>,
82 pub flushed_entry_id: EntryId,
84 pub flushed_sequence: SequenceNumber,
86 pub manifest_version: ManifestVersion,
88 pub truncated_entry_id: Option<EntryId>,
90 #[serde(with = "humantime_serde")]
92 pub compaction_time_window: Option<Duration>,
93}
94
95#[derive(Debug, Default)]
96pub struct RegionManifestBuilder {
97 metadata: Option<RegionMetadataRef>,
98 files: HashMap<FileId, FileMeta>,
99 flushed_entry_id: EntryId,
100 flushed_sequence: SequenceNumber,
101 manifest_version: ManifestVersion,
102 truncated_entry_id: Option<EntryId>,
103 compaction_time_window: Option<Duration>,
104}
105
106impl RegionManifestBuilder {
107 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
109 if let Some(s) = checkpoint {
110 Self {
111 metadata: Some(s.metadata),
112 files: s.files,
113 flushed_entry_id: s.flushed_entry_id,
114 manifest_version: s.manifest_version,
115 flushed_sequence: s.flushed_sequence,
116 truncated_entry_id: s.truncated_entry_id,
117 compaction_time_window: s.compaction_time_window,
118 }
119 } else {
120 Default::default()
121 }
122 }
123
124 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
125 self.metadata = Some(change.metadata);
126 self.manifest_version = manifest_version;
127 }
128
129 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
130 self.manifest_version = manifest_version;
131 for file in edit.files_to_add {
132 self.files.insert(file.file_id, file);
133 }
134 for file in edit.files_to_remove {
135 self.files.remove(&file.file_id);
136 }
137 if let Some(flushed_entry_id) = edit.flushed_entry_id {
138 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
139 }
140 if let Some(flushed_sequence) = edit.flushed_sequence {
141 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
142 }
143 if let Some(window) = edit.compaction_time_window {
144 self.compaction_time_window = Some(window);
145 }
146 }
147
148 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
149 self.manifest_version = manifest_version;
150 self.flushed_entry_id = truncate.truncated_entry_id;
151 self.flushed_sequence = truncate.truncated_sequence;
152 self.truncated_entry_id = Some(truncate.truncated_entry_id);
153 self.files.clear();
154 }
155
156 pub fn contains_metadata(&self) -> bool {
158 self.metadata.is_some()
159 }
160
161 pub fn try_build(self) -> Result<RegionManifest> {
162 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
163 Ok(RegionManifest {
164 metadata,
165 files: self.files,
166 flushed_entry_id: self.flushed_entry_id,
167 flushed_sequence: self.flushed_sequence,
168 manifest_version: self.manifest_version,
169 truncated_entry_id: self.truncated_entry_id,
170 compaction_time_window: self.compaction_time_window,
171 })
172 }
173}
174
175#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
177pub struct RegionCheckpoint {
178 pub last_version: ManifestVersion,
180 pub compacted_actions: usize,
182 pub checkpoint: Option<RegionManifest>,
184}
185
186impl RegionCheckpoint {
187 pub fn last_version(&self) -> ManifestVersion {
188 self.last_version
189 }
190
191 pub fn encode(&self) -> Result<Vec<u8>> {
192 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
193
194 Ok(json.into_bytes())
195 }
196
197 pub fn decode(bytes: &[u8]) -> Result<Self> {
198 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
199
200 serde_json::from_str(data).context(SerdeJsonSnafu)
201 }
202}
203
204#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
205pub struct RegionMetaActionList {
206 pub actions: Vec<RegionMetaAction>,
207}
208
209impl RegionMetaActionList {
210 pub fn with_action(action: RegionMetaAction) -> Self {
211 Self {
212 actions: vec![action],
213 }
214 }
215
216 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
217 Self { actions }
218 }
219}
220
221impl RegionMetaActionList {
222 pub fn encode(&self) -> Result<Vec<u8>> {
224 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
225
226 Ok(json.into_bytes())
227 }
228
229 pub fn decode(bytes: &[u8]) -> Result<Self> {
230 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
231
232 serde_json::from_str(data).context(SerdeJsonSnafu)
233 }
234}
235
236#[cfg(test)]
237mod tests {
238
239 use super::*;
240
241 #[test]
242 fn test_encode_decode_action_list() {
243 }
245
246 #[test]
250 fn test_region_manifest_compatibility() {
251 let region_edit = r#"{
252 "flushed_entry_id":null,
253 "compaction_time_window":null,
254 "files_to_add":[
255 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
256 ],
257 "files_to_remove":[
258 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
259 ]
260 }"#;
261 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
262
263 let region_edit = r#"{
264 "flushed_entry_id":10,
265 "flushed_sequence":10,
266 "compaction_time_window":null,
267 "files_to_add":[
268 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
269 ],
270 "files_to_remove":[
271 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
272 ]
273 }"#;
274 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
275
276 let region_change = r#" {
277 "metadata":{
278 "column_metadatas":[
279 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
280 ],
281 "primary_key":[1],
282 "region_id":5299989648942,
283 "schema_version":0
284 }
285 }"#;
286 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
287
288 let region_remove = r#"{"region_id":42}"#;
289 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
290 }
291
292 #[test]
293 fn test_region_manifest_builder() {
294 }
296
297 #[test]
298 fn test_encode_decode_region_checkpoint() {
299 }
301}