1use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{
29 DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
30};
31use crate::manifest::manager::RemoveFileOptions;
32use crate::sst::file::FileMeta;
33use crate::wal::EntryId;
34
35#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38 Change(RegionChange),
40 Edit(RegionEdit),
42 Remove(RegionRemove),
44 Truncate(RegionTruncate),
46}
47
48#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
49pub struct RegionChange {
50 pub metadata: RegionMetadataRef,
52}
53
54#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
55pub struct RegionEdit {
56 pub files_to_add: Vec<FileMeta>,
57 pub files_to_remove: Vec<FileMeta>,
58 #[serde(default)]
60 pub timestamp_ms: Option<i64>,
61 #[serde(with = "humantime_serde")]
62 pub compaction_time_window: Option<Duration>,
63 pub flushed_entry_id: Option<EntryId>,
64 pub flushed_sequence: Option<SequenceNumber>,
65 pub committed_sequence: Option<SequenceNumber>,
66}
67
68#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
69pub struct RegionRemove {
70 pub region_id: RegionId,
71}
72
73#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
76pub struct RegionTruncate {
77 pub region_id: RegionId,
78 #[serde(flatten)]
79 pub kind: TruncateKind,
80 #[serde(default)]
82 pub timestamp_ms: Option<i64>,
83}
84
85#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
87#[serde(untagged)]
88pub enum TruncateKind {
89 All {
91 truncated_entry_id: EntryId,
93 truncated_sequence: SequenceNumber,
95 },
96 Partial { files_to_remove: Vec<FileMeta> },
98}
99
100#[derive(Serialize, Deserialize, Clone, Debug)]
102#[cfg_attr(test, derive(Eq))]
103pub struct RegionManifest {
104 pub metadata: RegionMetadataRef,
106 pub files: HashMap<FileId, FileMeta>,
108 #[serde(default)]
118 pub removed_files: RemovedFilesRecord,
119 pub flushed_entry_id: EntryId,
121 pub flushed_sequence: SequenceNumber,
123 pub committed_sequence: Option<SequenceNumber>,
124 pub manifest_version: ManifestVersion,
126 pub truncated_entry_id: Option<EntryId>,
128 #[serde(with = "humantime_serde")]
130 pub compaction_time_window: Option<Duration>,
131}
132
133#[cfg(test)]
134impl PartialEq for RegionManifest {
135 fn eq(&self, other: &Self) -> bool {
136 self.metadata == other.metadata
137 && self.files == other.files
138 && self.flushed_entry_id == other.flushed_entry_id
139 && self.flushed_sequence == other.flushed_sequence
140 && self.manifest_version == other.manifest_version
141 && self.truncated_entry_id == other.truncated_entry_id
142 && self.compaction_time_window == other.compaction_time_window
143 && self.committed_sequence == other.committed_sequence
144 }
145}
146
147#[derive(Debug, Default)]
148pub struct RegionManifestBuilder {
149 metadata: Option<RegionMetadataRef>,
150 files: HashMap<FileId, FileMeta>,
151 pub removed_files: RemovedFilesRecord,
152 flushed_entry_id: EntryId,
153 flushed_sequence: SequenceNumber,
154 manifest_version: ManifestVersion,
155 truncated_entry_id: Option<EntryId>,
156 compaction_time_window: Option<Duration>,
157 committed_sequence: Option<SequenceNumber>,
158}
159
160impl RegionManifestBuilder {
161 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
163 if let Some(s) = checkpoint {
164 Self {
165 metadata: Some(s.metadata),
166 files: s.files,
167 removed_files: s.removed_files,
168 flushed_entry_id: s.flushed_entry_id,
169 manifest_version: s.manifest_version,
170 flushed_sequence: s.flushed_sequence,
171 truncated_entry_id: s.truncated_entry_id,
172 compaction_time_window: s.compaction_time_window,
173 committed_sequence: s.committed_sequence,
174 }
175 } else {
176 Default::default()
177 }
178 }
179
180 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
181 self.metadata = Some(change.metadata);
182 self.manifest_version = manifest_version;
183 }
184
185 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
186 self.manifest_version = manifest_version;
187 for file in edit.files_to_add {
188 self.files.insert(file.file_id, file);
189 }
190 self.removed_files.add_removed_files(
191 edit.files_to_remove
192 .iter()
193 .map(|meta| meta.file_id)
194 .collect(),
195 edit.timestamp_ms
196 .unwrap_or_else(|| Utc::now().timestamp_millis()),
197 );
198 for file in edit.files_to_remove {
199 self.files.remove(&file.file_id);
200 }
201 if let Some(flushed_entry_id) = edit.flushed_entry_id {
202 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
203 }
204 if let Some(flushed_sequence) = edit.flushed_sequence {
205 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
206 }
207
208 if let Some(committed_sequence) = edit.committed_sequence {
209 self.committed_sequence = Some(
210 self.committed_sequence
211 .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
212 );
213 }
214 if let Some(window) = edit.compaction_time_window {
215 self.compaction_time_window = Some(window);
216 }
217 }
218
219 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
220 self.manifest_version = manifest_version;
221 match truncate.kind {
222 TruncateKind::All {
223 truncated_entry_id,
224 truncated_sequence,
225 } => {
226 self.flushed_entry_id = truncated_entry_id;
227 self.flushed_sequence = truncated_sequence;
228 self.truncated_entry_id = Some(truncated_entry_id);
229 self.files.clear();
230 self.removed_files.add_removed_files(
231 self.files.values().map(|meta| meta.file_id).collect(),
232 truncate
233 .timestamp_ms
234 .unwrap_or_else(|| Utc::now().timestamp_millis()),
235 );
236 }
237 TruncateKind::Partial { files_to_remove } => {
238 self.removed_files.add_removed_files(
239 files_to_remove.iter().map(|meta| meta.file_id).collect(),
240 truncate
241 .timestamp_ms
242 .unwrap_or_else(|| Utc::now().timestamp_millis()),
243 );
244 for file in files_to_remove {
245 self.files.remove(&file.file_id);
246 }
247 }
248 }
249 }
250
251 pub fn files(&self) -> &HashMap<FileId, FileMeta> {
252 &self.files
253 }
254
255 pub fn contains_metadata(&self) -> bool {
257 self.metadata.is_some()
258 }
259
260 pub fn try_build(self) -> Result<RegionManifest> {
261 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
262 Ok(RegionManifest {
263 metadata,
264 files: self.files,
265 removed_files: self.removed_files,
266 flushed_entry_id: self.flushed_entry_id,
267 flushed_sequence: self.flushed_sequence,
268 committed_sequence: self.committed_sequence,
269 manifest_version: self.manifest_version,
270 truncated_entry_id: self.truncated_entry_id,
271 compaction_time_window: self.compaction_time_window,
272 })
273 }
274}
275
276#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
280pub struct RemovedFilesRecord {
281 pub removed_files: Vec<RemovedFiles>,
284}
285
286#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
287pub struct RemovedFiles {
288 pub removed_at: i64,
291 pub file_ids: HashSet<FileId>,
293}
294
295impl RemovedFilesRecord {
296 pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
298 self.removed_files.push(RemovedFiles {
299 removed_at: at,
300 file_ids,
301 });
302 }
303
304 pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
305 let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
306 if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
307 return Ok(());
308 }
309
310 let mut cur_file_cnt = total_removed_files;
311
312 let can_evict_until = chrono::Utc::now()
313 - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
314 input: opt.keep_ttl,
315 })?;
316
317 self.removed_files.sort_unstable_by_key(|f| f.removed_at);
318 let updated = std::mem::take(&mut self.removed_files)
319 .into_iter()
320 .filter_map(|f| {
321 if f.removed_at < can_evict_until.timestamp_millis()
322 && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
323 {
324 cur_file_cnt -= f.file_ids.len();
327 None
328 } else {
329 Some(f)
330 }
331 })
332 .collect();
333 self.removed_files = updated;
334
335 Ok(())
336 }
337}
338
339#[derive(Serialize, Deserialize, Debug, Clone)]
341#[cfg_attr(test, derive(PartialEq, Eq))]
342pub struct RegionCheckpoint {
343 pub last_version: ManifestVersion,
345 pub compacted_actions: usize,
347 pub checkpoint: Option<RegionManifest>,
349}
350
351impl RegionCheckpoint {
352 pub fn last_version(&self) -> ManifestVersion {
353 self.last_version
354 }
355
356 pub fn encode(&self) -> Result<Vec<u8>> {
357 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
358
359 Ok(json.into_bytes())
360 }
361
362 pub fn decode(bytes: &[u8]) -> Result<Self> {
363 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
364
365 serde_json::from_str(data).context(SerdeJsonSnafu)
366 }
367}
368
369#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
370pub struct RegionMetaActionList {
371 pub actions: Vec<RegionMetaAction>,
372}
373
374impl RegionMetaActionList {
375 pub fn with_action(action: RegionMetaAction) -> Self {
376 Self {
377 actions: vec![action],
378 }
379 }
380
381 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
382 Self { actions }
383 }
384
385 pub fn into_region_edit(self) -> RegionEdit {
386 let mut edit = RegionEdit {
387 files_to_add: Vec::new(),
388 files_to_remove: Vec::new(),
389 timestamp_ms: None,
390 compaction_time_window: None,
391 flushed_entry_id: None,
392 flushed_sequence: None,
393 committed_sequence: None,
394 };
395
396 for action in self.actions {
397 if let RegionMetaAction::Edit(region_edit) = action {
398 edit.files_to_add.extend(region_edit.files_to_add);
400 edit.files_to_remove.extend(region_edit.files_to_remove);
401 if let Some(eid) = region_edit.flushed_entry_id {
403 edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
404 }
405 if let Some(seq) = region_edit.flushed_sequence {
406 edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
407 }
408 if let Some(seq) = region_edit.committed_sequence {
409 edit.committed_sequence =
410 Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
411 }
412 if region_edit.compaction_time_window.is_some() {
414 edit.compaction_time_window = region_edit.compaction_time_window;
415 }
416 }
417 }
418
419 edit
420 }
421}
422
423impl RegionMetaActionList {
424 pub fn encode(&self) -> Result<Vec<u8>> {
426 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
427
428 Ok(json.into_bytes())
429 }
430
431 pub fn decode(bytes: &[u8]) -> Result<Self> {
432 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
433
434 serde_json::from_str(data).context(SerdeJsonSnafu)
435 }
436}
437
438#[cfg(test)]
439mod tests {
440
441 use common_time::Timestamp;
442
443 use super::*;
444
445 #[test]
449 fn test_region_action_compatibility() {
450 let region_edit = r#"{
451 "flushed_entry_id":null,
452 "compaction_time_window":null,
453 "files_to_add":[
454 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
455 ],
456 "files_to_remove":[
457 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
458 ]
459 }"#;
460 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
461
462 let region_edit = r#"{
463 "flushed_entry_id":10,
464 "flushed_sequence":10,
465 "compaction_time_window":null,
466 "files_to_add":[
467 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
468 ],
469 "files_to_remove":[
470 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
471 ]
472 }"#;
473 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
474
475 let region_change = r#" {
476 "metadata":{
477 "column_metadatas":[
478 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
479 ],
480 "primary_key":[1],
481 "region_id":5299989648942,
482 "schema_version":0
483 }
484 }"#;
485 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
486
487 let region_remove = r#"{"region_id":42}"#;
488 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
489 }
490
491 #[test]
492 fn test_region_manifest_builder() {
493 }
495
496 #[test]
497 fn test_encode_decode_region_checkpoint() {
498 }
500
501 #[test]
502 fn test_region_manifest_compatibility() {
503 let region_manifest_json = r#"{
505 "metadata": {
506 "column_metadatas": [
507 {
508 "column_schema": {
509 "name": "a",
510 "data_type": {"Int64": {}},
511 "is_nullable": false,
512 "is_time_index": false,
513 "default_constraint": null,
514 "metadata": {}
515 },
516 "semantic_type": "Tag",
517 "column_id": 1
518 },
519 {
520 "column_schema": {
521 "name": "b",
522 "data_type": {"Float64": {}},
523 "is_nullable": false,
524 "is_time_index": false,
525 "default_constraint": null,
526 "metadata": {}
527 },
528 "semantic_type": "Field",
529 "column_id": 2
530 },
531 {
532 "column_schema": {
533 "name": "c",
534 "data_type": {"Timestamp": {"Millisecond": null}},
535 "is_nullable": false,
536 "is_time_index": false,
537 "default_constraint": null,
538 "metadata": {}
539 },
540 "semantic_type": "Timestamp",
541 "column_id": 3
542 }
543 ],
544 "primary_key": [1],
545 "region_id": 4402341478400,
546 "schema_version": 0
547 },
548 "files": {
549 "4b220a70-2b03-4641-9687-b65d94641208": {
550 "region_id": 4402341478400,
551 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
552 "time_range": [
553 {"value": 1451609210000, "unit": "Millisecond"},
554 {"value": 1451609520000, "unit": "Millisecond"}
555 ],
556 "level": 1,
557 "file_size": 100
558 },
559 "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
560 "region_id": 4402341478400,
561 "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
562 "time_range": [
563 {"value": 1451609210000, "unit": "Millisecond"},
564 {"value": 1451609520000, "unit": "Millisecond"}
565 ],
566 "level": 0,
567 "file_size": 100
568 }
569 },
570 "flushed_entry_id": 10,
571 "flushed_sequence": 20,
572 "manifest_version": 1,
573 "truncated_entry_id": null,
574 "compaction_time_window": null
575 }"#;
576
577 let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
578
579 assert_eq!(manifest.files.len(), 2);
581 assert_eq!(manifest.flushed_entry_id, 10);
582 assert_eq!(manifest.flushed_sequence, 20);
583 assert_eq!(manifest.manifest_version, 1);
584
585 let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
587 file_ids.sort_unstable();
588 assert_eq!(
589 file_ids,
590 vec![
591 "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
592 "4b220a70-2b03-4641-9687-b65d94641208",
593 ]
594 );
595
596 let serialized_manifest = serde_json::to_string(&manifest).unwrap();
598 let deserialized_manifest: RegionManifest =
599 serde_json::from_str(&serialized_manifest).unwrap();
600 assert_eq!(manifest, deserialized_manifest);
601 assert_ne!(serialized_manifest, region_manifest_json);
602 }
603
604 #[test]
605 fn test_region_truncate_compat() {
606 let region_truncate_json = r#"{
608 "region_id": 4402341478400,
609 "truncated_entry_id": 10,
610 "truncated_sequence": 20
611 }"#;
612
613 let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
614 assert_eq!(truncate_v1.region_id, 4402341478400);
615 assert_eq!(
616 truncate_v1.kind,
617 TruncateKind::All {
618 truncated_entry_id: 10,
619 truncated_sequence: 20,
620 }
621 );
622
623 let region_truncate_v2_json = r#"{
625 "region_id": 4402341478400,
626 "files_to_remove": [
627 {
628 "region_id": 4402341478400,
629 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
630 "time_range": [
631 {
632 "value": 1451609210000,
633 "unit": "Millisecond"
634 },
635 {
636 "value": 1451609520000,
637 "unit": "Millisecond"
638 }
639 ],
640 "level": 1,
641 "file_size": 100
642 }
643 ]
644}"#;
645
646 let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
647 assert_eq!(truncate_v2.region_id, 4402341478400);
648 assert_eq!(
649 truncate_v2.kind,
650 TruncateKind::Partial {
651 files_to_remove: vec![FileMeta {
652 region_id: RegionId::from_u64(4402341478400),
653 file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
654 time_range: (
655 Timestamp::new_millisecond(1451609210000),
656 Timestamp::new_millisecond(1451609520000)
657 ),
658 level: 1,
659 file_size: 100,
660 ..Default::default()
661 }]
662 }
663 );
664 }
665
666 #[test]
667 fn test_region_manifest_removed_files() {
668 let region_metadata = r#"{
669 "column_metadatas": [
670 {
671 "column_schema": {
672 "name": "a",
673 "data_type": {"Int64": {}},
674 "is_nullable": false,
675 "is_time_index": false,
676 "default_constraint": null,
677 "metadata": {}
678 },
679 "semantic_type": "Tag",
680 "column_id": 1
681 },
682 {
683 "column_schema": {
684 "name": "b",
685 "data_type": {"Float64": {}},
686 "is_nullable": false,
687 "is_time_index": false,
688 "default_constraint": null,
689 "metadata": {}
690 },
691 "semantic_type": "Field",
692 "column_id": 2
693 },
694 {
695 "column_schema": {
696 "name": "c",
697 "data_type": {"Timestamp": {"Millisecond": null}},
698 "is_nullable": false,
699 "is_time_index": false,
700 "default_constraint": null,
701 "metadata": {}
702 },
703 "semantic_type": "Timestamp",
704 "column_id": 3
705 }
706 ],
707 "primary_key": [1],
708 "region_id": 4402341478400,
709 "schema_version": 0
710 }"#;
711
712 let metadata: RegionMetadataRef =
713 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
714 let manifest = RegionManifest {
715 metadata: metadata.clone(),
716 files: HashMap::new(),
717 flushed_entry_id: 0,
718 flushed_sequence: 0,
719 committed_sequence: None,
720 manifest_version: 0,
721 truncated_entry_id: None,
722 compaction_time_window: None,
723 removed_files: RemovedFilesRecord {
724 removed_files: vec![RemovedFiles {
725 removed_at: 0,
726 file_ids: HashSet::from([FileId::parse_str(
727 "4b220a70-2b03-4641-9687-b65d94641208",
728 )
729 .unwrap()]),
730 }],
731 },
732 };
733
734 let json = serde_json::to_string(&manifest).unwrap();
735 let new: RegionManifest = serde_json::from_str(&json).unwrap();
736
737 assert_eq!(manifest, new);
738 }
739
740 #[test]
742 fn test_old_region_manifest_compat() {
743 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
744 pub struct RegionManifestV1 {
745 pub metadata: RegionMetadataRef,
747 pub files: HashMap<FileId, FileMeta>,
749 pub flushed_entry_id: EntryId,
751 pub flushed_sequence: SequenceNumber,
753 pub manifest_version: ManifestVersion,
755 pub truncated_entry_id: Option<EntryId>,
757 #[serde(with = "humantime_serde")]
759 pub compaction_time_window: Option<Duration>,
760 }
761
762 let region_metadata = r#"{
763 "column_metadatas": [
764 {
765 "column_schema": {
766 "name": "a",
767 "data_type": {"Int64": {}},
768 "is_nullable": false,
769 "is_time_index": false,
770 "default_constraint": null,
771 "metadata": {}
772 },
773 "semantic_type": "Tag",
774 "column_id": 1
775 },
776 {
777 "column_schema": {
778 "name": "b",
779 "data_type": {"Float64": {}},
780 "is_nullable": false,
781 "is_time_index": false,
782 "default_constraint": null,
783 "metadata": {}
784 },
785 "semantic_type": "Field",
786 "column_id": 2
787 },
788 {
789 "column_schema": {
790 "name": "c",
791 "data_type": {"Timestamp": {"Millisecond": null}},
792 "is_nullable": false,
793 "is_time_index": false,
794 "default_constraint": null,
795 "metadata": {}
796 },
797 "semantic_type": "Timestamp",
798 "column_id": 3
799 }
800 ],
801 "primary_key": [1],
802 "region_id": 4402341478400,
803 "schema_version": 0
804 }"#;
805
806 let metadata: RegionMetadataRef =
807 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
808
809 let v1 = RegionManifestV1 {
811 metadata: metadata.clone(),
812 files: HashMap::new(),
813 flushed_entry_id: 0,
814 flushed_sequence: 0,
815 manifest_version: 0,
816 truncated_entry_id: None,
817 compaction_time_window: None,
818 };
819 let json = serde_json::to_string(&v1).unwrap();
820 let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
821 assert_eq!(
822 new_from_old,
823 RegionManifest {
824 metadata: metadata.clone(),
825 files: HashMap::new(),
826 removed_files: Default::default(),
827 flushed_entry_id: 0,
828 flushed_sequence: 0,
829 committed_sequence: None,
830 manifest_version: 0,
831 truncated_entry_id: None,
832 compaction_time_window: None,
833 }
834 );
835
836 let new_manifest = RegionManifest {
837 metadata: metadata.clone(),
838 files: HashMap::new(),
839 removed_files: Default::default(),
840 flushed_entry_id: 0,
841 flushed_sequence: 0,
842 committed_sequence: None,
843 manifest_version: 0,
844 truncated_entry_id: None,
845 compaction_time_window: None,
846 };
847 let json = serde_json::to_string(&new_manifest).unwrap();
848 let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
849 assert_eq!(
850 old_from_new,
851 RegionManifestV1 {
852 metadata: metadata.clone(),
853 files: HashMap::new(),
854 flushed_entry_id: 0,
855 flushed_sequence: 0,
856 manifest_version: 0,
857 truncated_entry_id: None,
858 compaction_time_window: None,
859 }
860 );
861
862 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
863 pub struct RegionEditV1 {
864 pub files_to_add: Vec<FileMeta>,
865 pub files_to_remove: Vec<FileMeta>,
866 #[serde(with = "humantime_serde")]
867 pub compaction_time_window: Option<Duration>,
868 pub flushed_entry_id: Option<EntryId>,
869 pub flushed_sequence: Option<SequenceNumber>,
870 }
871
872 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
874 pub struct RegionTruncateV1 {
875 pub region_id: RegionId,
876 pub kind: TruncateKind,
877 }
878
879 let json = serde_json::to_string(&RegionEditV1 {
880 files_to_add: vec![],
881 files_to_remove: vec![],
882 compaction_time_window: None,
883 flushed_entry_id: None,
884 flushed_sequence: None,
885 })
886 .unwrap();
887 let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
888 assert_eq!(
889 RegionEdit {
890 files_to_add: vec![],
891 files_to_remove: vec![],
892 timestamp_ms: None,
893 compaction_time_window: None,
894 flushed_entry_id: None,
895 flushed_sequence: None,
896 committed_sequence: None,
897 },
898 new_from_old
899 );
900
901 let new = RegionEdit {
903 files_to_add: vec![],
904 files_to_remove: vec![],
905 timestamp_ms: Some(42),
906 compaction_time_window: None,
907 flushed_entry_id: None,
908 flushed_sequence: None,
909 committed_sequence: None,
910 };
911
912 let new_json = serde_json::to_string(&new).unwrap();
913
914 let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
915 assert_eq!(
916 RegionEditV1 {
917 files_to_add: vec![],
918 files_to_remove: vec![],
919 compaction_time_window: None,
920 flushed_entry_id: None,
921 flushed_sequence: None,
922 },
923 old_from_new
924 );
925 }
926}