1use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{
29 DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
30};
31use crate::manifest::manager::RemoveFileOptions;
32use crate::sst::FormatType;
33use crate::sst::file::FileMeta;
34use crate::wal::EntryId;
35
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
38pub enum RegionMetaAction {
39 Change(RegionChange),
41 Edit(RegionEdit),
43 Remove(RegionRemove),
45 Truncate(RegionTruncate),
47}
48
49#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
50pub struct RegionChange {
51 pub metadata: RegionMetadataRef,
53 #[serde(default)]
55 pub sst_format: FormatType,
56}
57
58#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
59pub struct RegionEdit {
60 pub files_to_add: Vec<FileMeta>,
61 pub files_to_remove: Vec<FileMeta>,
62 #[serde(default)]
64 pub timestamp_ms: Option<i64>,
65 #[serde(with = "humantime_serde")]
66 pub compaction_time_window: Option<Duration>,
67 pub flushed_entry_id: Option<EntryId>,
68 pub flushed_sequence: Option<SequenceNumber>,
69 pub committed_sequence: Option<SequenceNumber>,
70}
71
72#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
73pub struct RegionRemove {
74 pub region_id: RegionId,
75}
76
77#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
80pub struct RegionTruncate {
81 pub region_id: RegionId,
82 #[serde(flatten)]
83 pub kind: TruncateKind,
84 #[serde(default)]
86 pub timestamp_ms: Option<i64>,
87}
88
89#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
91#[serde(untagged)]
92pub enum TruncateKind {
93 All {
95 truncated_entry_id: EntryId,
97 truncated_sequence: SequenceNumber,
99 },
100 Partial { files_to_remove: Vec<FileMeta> },
102}
103
104#[derive(Serialize, Deserialize, Clone, Debug)]
106#[cfg_attr(test, derive(Eq))]
107pub struct RegionManifest {
108 pub metadata: RegionMetadataRef,
110 pub files: HashMap<FileId, FileMeta>,
112 #[serde(default)]
122 pub removed_files: RemovedFilesRecord,
123 pub flushed_entry_id: EntryId,
125 pub flushed_sequence: SequenceNumber,
127 pub committed_sequence: Option<SequenceNumber>,
128 pub manifest_version: ManifestVersion,
130 pub truncated_entry_id: Option<EntryId>,
132 #[serde(with = "humantime_serde")]
134 pub compaction_time_window: Option<Duration>,
135 #[serde(default)]
137 pub sst_format: FormatType,
138}
139
140#[cfg(test)]
141impl PartialEq for RegionManifest {
142 fn eq(&self, other: &Self) -> bool {
143 self.metadata == other.metadata
144 && self.files == other.files
145 && self.flushed_entry_id == other.flushed_entry_id
146 && self.flushed_sequence == other.flushed_sequence
147 && self.manifest_version == other.manifest_version
148 && self.truncated_entry_id == other.truncated_entry_id
149 && self.compaction_time_window == other.compaction_time_window
150 && self.committed_sequence == other.committed_sequence
151 }
152}
153
154#[derive(Debug, Default)]
155pub struct RegionManifestBuilder {
156 metadata: Option<RegionMetadataRef>,
157 files: HashMap<FileId, FileMeta>,
158 pub removed_files: RemovedFilesRecord,
159 flushed_entry_id: EntryId,
160 flushed_sequence: SequenceNumber,
161 manifest_version: ManifestVersion,
162 truncated_entry_id: Option<EntryId>,
163 compaction_time_window: Option<Duration>,
164 committed_sequence: Option<SequenceNumber>,
165 sst_format: FormatType,
166}
167
168impl RegionManifestBuilder {
169 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
171 if let Some(s) = checkpoint {
172 Self {
173 metadata: Some(s.metadata),
174 files: s.files,
175 removed_files: s.removed_files,
176 flushed_entry_id: s.flushed_entry_id,
177 manifest_version: s.manifest_version,
178 flushed_sequence: s.flushed_sequence,
179 truncated_entry_id: s.truncated_entry_id,
180 compaction_time_window: s.compaction_time_window,
181 committed_sequence: s.committed_sequence,
182 sst_format: s.sst_format,
183 }
184 } else {
185 Default::default()
186 }
187 }
188
189 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
190 self.metadata = Some(change.metadata);
191 self.manifest_version = manifest_version;
192 self.sst_format = change.sst_format;
193 }
194
195 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
196 self.manifest_version = manifest_version;
197 for file in edit.files_to_add {
198 self.files.insert(file.file_id, file);
199 }
200 self.removed_files.add_removed_files(
201 edit.files_to_remove
202 .iter()
203 .map(|meta| meta.file_id)
204 .collect(),
205 edit.timestamp_ms
206 .unwrap_or_else(|| Utc::now().timestamp_millis()),
207 );
208 for file in edit.files_to_remove {
209 self.files.remove(&file.file_id);
210 }
211 if let Some(flushed_entry_id) = edit.flushed_entry_id {
212 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
213 }
214 if let Some(flushed_sequence) = edit.flushed_sequence {
215 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
216 }
217
218 if let Some(committed_sequence) = edit.committed_sequence {
219 self.committed_sequence = Some(
220 self.committed_sequence
221 .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
222 );
223 }
224 if let Some(window) = edit.compaction_time_window {
225 self.compaction_time_window = Some(window);
226 }
227 }
228
229 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
230 self.manifest_version = manifest_version;
231 match truncate.kind {
232 TruncateKind::All {
233 truncated_entry_id,
234 truncated_sequence,
235 } => {
236 self.flushed_entry_id = truncated_entry_id;
237 self.flushed_sequence = truncated_sequence;
238 self.truncated_entry_id = Some(truncated_entry_id);
239 self.files.clear();
240 self.removed_files.add_removed_files(
241 self.files.values().map(|meta| meta.file_id).collect(),
242 truncate
243 .timestamp_ms
244 .unwrap_or_else(|| Utc::now().timestamp_millis()),
245 );
246 }
247 TruncateKind::Partial { files_to_remove } => {
248 self.removed_files.add_removed_files(
249 files_to_remove.iter().map(|meta| meta.file_id).collect(),
250 truncate
251 .timestamp_ms
252 .unwrap_or_else(|| Utc::now().timestamp_millis()),
253 );
254 for file in files_to_remove {
255 self.files.remove(&file.file_id);
256 }
257 }
258 }
259 }
260
261 pub fn files(&self) -> &HashMap<FileId, FileMeta> {
262 &self.files
263 }
264
265 pub fn contains_metadata(&self) -> bool {
267 self.metadata.is_some()
268 }
269
270 pub fn try_build(self) -> Result<RegionManifest> {
271 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
272 Ok(RegionManifest {
273 metadata,
274 files: self.files,
275 removed_files: self.removed_files,
276 flushed_entry_id: self.flushed_entry_id,
277 flushed_sequence: self.flushed_sequence,
278 committed_sequence: self.committed_sequence,
279 manifest_version: self.manifest_version,
280 truncated_entry_id: self.truncated_entry_id,
281 compaction_time_window: self.compaction_time_window,
282 sst_format: self.sst_format,
283 })
284 }
285}
286
287#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
291pub struct RemovedFilesRecord {
292 pub removed_files: Vec<RemovedFiles>,
295}
296
297#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
298pub struct RemovedFiles {
299 pub removed_at: i64,
302 pub file_ids: HashSet<FileId>,
304}
305
306impl RemovedFilesRecord {
307 pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
309 self.removed_files.push(RemovedFiles {
310 removed_at: at,
311 file_ids,
312 });
313 }
314
315 pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
316 let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
317 if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
318 return Ok(());
319 }
320
321 let mut cur_file_cnt = total_removed_files;
322
323 let can_evict_until = chrono::Utc::now()
324 - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
325 input: opt.keep_ttl,
326 })?;
327
328 self.removed_files.sort_unstable_by_key(|f| f.removed_at);
329 let updated = std::mem::take(&mut self.removed_files)
330 .into_iter()
331 .filter_map(|f| {
332 if f.removed_at < can_evict_until.timestamp_millis()
333 && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
334 {
335 cur_file_cnt -= f.file_ids.len();
338 None
339 } else {
340 Some(f)
341 }
342 })
343 .collect();
344 self.removed_files = updated;
345
346 Ok(())
347 }
348}
349
350#[derive(Serialize, Deserialize, Debug, Clone)]
352#[cfg_attr(test, derive(PartialEq, Eq))]
353pub struct RegionCheckpoint {
354 pub last_version: ManifestVersion,
356 pub compacted_actions: usize,
358 pub checkpoint: Option<RegionManifest>,
360}
361
362impl RegionCheckpoint {
363 pub fn last_version(&self) -> ManifestVersion {
364 self.last_version
365 }
366
367 pub fn encode(&self) -> Result<Vec<u8>> {
368 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
369
370 Ok(json.into_bytes())
371 }
372
373 pub fn decode(bytes: &[u8]) -> Result<Self> {
374 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
375
376 serde_json::from_str(data).context(SerdeJsonSnafu)
377 }
378}
379
380#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
381pub struct RegionMetaActionList {
382 pub actions: Vec<RegionMetaAction>,
383}
384
385impl RegionMetaActionList {
386 pub fn with_action(action: RegionMetaAction) -> Self {
387 Self {
388 actions: vec![action],
389 }
390 }
391
392 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
393 Self { actions }
394 }
395
396 pub fn into_region_edit(self) -> RegionEdit {
397 let mut edit = RegionEdit {
398 files_to_add: Vec::new(),
399 files_to_remove: Vec::new(),
400 timestamp_ms: None,
401 compaction_time_window: None,
402 flushed_entry_id: None,
403 flushed_sequence: None,
404 committed_sequence: None,
405 };
406
407 for action in self.actions {
408 if let RegionMetaAction::Edit(region_edit) = action {
409 edit.files_to_add.extend(region_edit.files_to_add);
411 edit.files_to_remove.extend(region_edit.files_to_remove);
412 if let Some(eid) = region_edit.flushed_entry_id {
414 edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
415 }
416 if let Some(seq) = region_edit.flushed_sequence {
417 edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
418 }
419 if let Some(seq) = region_edit.committed_sequence {
420 edit.committed_sequence =
421 Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
422 }
423 if region_edit.compaction_time_window.is_some() {
425 edit.compaction_time_window = region_edit.compaction_time_window;
426 }
427 }
428 }
429
430 edit
431 }
432}
433
434impl RegionMetaActionList {
435 pub fn encode(&self) -> Result<Vec<u8>> {
437 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
438
439 Ok(json.into_bytes())
440 }
441
442 pub fn decode(bytes: &[u8]) -> Result<Self> {
443 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
444
445 serde_json::from_str(data).context(SerdeJsonSnafu)
446 }
447}
448
449#[cfg(test)]
450mod tests {
451
452 use common_time::Timestamp;
453
454 use super::*;
455
456 #[test]
460 fn test_region_action_compatibility() {
461 let region_edit = r#"{
462 "flushed_entry_id":null,
463 "compaction_time_window":null,
464 "files_to_add":[
465 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
466 ],
467 "files_to_remove":[
468 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
469 ]
470 }"#;
471 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
472
473 let region_edit = r#"{
474 "flushed_entry_id":10,
475 "flushed_sequence":10,
476 "compaction_time_window":null,
477 "files_to_add":[
478 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
479 ],
480 "files_to_remove":[
481 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
482 ]
483 }"#;
484 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
485
486 let region_change = r#" {
488 "metadata":{
489 "column_metadatas":[
490 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
491 ],
492 "primary_key":[1],
493 "region_id":5299989648942,
494 "schema_version":0
495 }
496 }"#;
497 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
498
499 let region_remove = r#"{"region_id":42}"#;
500 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
501 }
502
503 #[test]
504 fn test_region_manifest_builder() {
505 }
507
508 #[test]
509 fn test_encode_decode_region_checkpoint() {
510 }
512
513 #[test]
514 fn test_region_manifest_compatibility() {
515 let region_manifest_json = r#"{
517 "metadata": {
518 "column_metadatas": [
519 {
520 "column_schema": {
521 "name": "a",
522 "data_type": {"Int64": {}},
523 "is_nullable": false,
524 "is_time_index": false,
525 "default_constraint": null,
526 "metadata": {}
527 },
528 "semantic_type": "Tag",
529 "column_id": 1
530 },
531 {
532 "column_schema": {
533 "name": "b",
534 "data_type": {"Float64": {}},
535 "is_nullable": false,
536 "is_time_index": false,
537 "default_constraint": null,
538 "metadata": {}
539 },
540 "semantic_type": "Field",
541 "column_id": 2
542 },
543 {
544 "column_schema": {
545 "name": "c",
546 "data_type": {"Timestamp": {"Millisecond": null}},
547 "is_nullable": false,
548 "is_time_index": false,
549 "default_constraint": null,
550 "metadata": {}
551 },
552 "semantic_type": "Timestamp",
553 "column_id": 3
554 }
555 ],
556 "primary_key": [1],
557 "region_id": 4402341478400,
558 "schema_version": 0
559 },
560 "files": {
561 "4b220a70-2b03-4641-9687-b65d94641208": {
562 "region_id": 4402341478400,
563 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
564 "time_range": [
565 {"value": 1451609210000, "unit": "Millisecond"},
566 {"value": 1451609520000, "unit": "Millisecond"}
567 ],
568 "level": 1,
569 "file_size": 100
570 },
571 "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
572 "region_id": 4402341478400,
573 "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
574 "time_range": [
575 {"value": 1451609210000, "unit": "Millisecond"},
576 {"value": 1451609520000, "unit": "Millisecond"}
577 ],
578 "level": 0,
579 "file_size": 100
580 }
581 },
582 "flushed_entry_id": 10,
583 "flushed_sequence": 20,
584 "manifest_version": 1,
585 "truncated_entry_id": null,
586 "compaction_time_window": null
587 }"#;
588
589 let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
590
591 assert_eq!(manifest.files.len(), 2);
593 assert_eq!(manifest.flushed_entry_id, 10);
594 assert_eq!(manifest.flushed_sequence, 20);
595 assert_eq!(manifest.manifest_version, 1);
596
597 let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
599 file_ids.sort_unstable();
600 assert_eq!(
601 file_ids,
602 vec![
603 "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
604 "4b220a70-2b03-4641-9687-b65d94641208",
605 ]
606 );
607
608 let serialized_manifest = serde_json::to_string(&manifest).unwrap();
610 let deserialized_manifest: RegionManifest =
611 serde_json::from_str(&serialized_manifest).unwrap();
612 assert_eq!(manifest, deserialized_manifest);
613 assert_ne!(serialized_manifest, region_manifest_json);
614 }
615
616 #[test]
617 fn test_region_truncate_compat() {
618 let region_truncate_json = r#"{
620 "region_id": 4402341478400,
621 "truncated_entry_id": 10,
622 "truncated_sequence": 20
623 }"#;
624
625 let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
626 assert_eq!(truncate_v1.region_id, 4402341478400);
627 assert_eq!(
628 truncate_v1.kind,
629 TruncateKind::All {
630 truncated_entry_id: 10,
631 truncated_sequence: 20,
632 }
633 );
634
635 let region_truncate_v2_json = r#"{
637 "region_id": 4402341478400,
638 "files_to_remove": [
639 {
640 "region_id": 4402341478400,
641 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
642 "time_range": [
643 {
644 "value": 1451609210000,
645 "unit": "Millisecond"
646 },
647 {
648 "value": 1451609520000,
649 "unit": "Millisecond"
650 }
651 ],
652 "level": 1,
653 "file_size": 100
654 }
655 ]
656}"#;
657
658 let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
659 assert_eq!(truncate_v2.region_id, 4402341478400);
660 assert_eq!(
661 truncate_v2.kind,
662 TruncateKind::Partial {
663 files_to_remove: vec![FileMeta {
664 region_id: RegionId::from_u64(4402341478400),
665 file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
666 time_range: (
667 Timestamp::new_millisecond(1451609210000),
668 Timestamp::new_millisecond(1451609520000)
669 ),
670 level: 1,
671 file_size: 100,
672 ..Default::default()
673 }]
674 }
675 );
676 }
677
678 #[test]
679 fn test_region_manifest_removed_files() {
680 let region_metadata = r#"{
681 "column_metadatas": [
682 {
683 "column_schema": {
684 "name": "a",
685 "data_type": {"Int64": {}},
686 "is_nullable": false,
687 "is_time_index": false,
688 "default_constraint": null,
689 "metadata": {}
690 },
691 "semantic_type": "Tag",
692 "column_id": 1
693 },
694 {
695 "column_schema": {
696 "name": "b",
697 "data_type": {"Float64": {}},
698 "is_nullable": false,
699 "is_time_index": false,
700 "default_constraint": null,
701 "metadata": {}
702 },
703 "semantic_type": "Field",
704 "column_id": 2
705 },
706 {
707 "column_schema": {
708 "name": "c",
709 "data_type": {"Timestamp": {"Millisecond": null}},
710 "is_nullable": false,
711 "is_time_index": false,
712 "default_constraint": null,
713 "metadata": {}
714 },
715 "semantic_type": "Timestamp",
716 "column_id": 3
717 }
718 ],
719 "primary_key": [1],
720 "region_id": 4402341478400,
721 "schema_version": 0
722 }"#;
723
724 let metadata: RegionMetadataRef =
725 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
726 let manifest = RegionManifest {
727 metadata: metadata.clone(),
728 files: HashMap::new(),
729 flushed_entry_id: 0,
730 flushed_sequence: 0,
731 committed_sequence: None,
732 manifest_version: 0,
733 truncated_entry_id: None,
734 compaction_time_window: None,
735 removed_files: RemovedFilesRecord {
736 removed_files: vec![RemovedFiles {
737 removed_at: 0,
738 file_ids: HashSet::from([FileId::parse_str(
739 "4b220a70-2b03-4641-9687-b65d94641208",
740 )
741 .unwrap()]),
742 }],
743 },
744 sst_format: FormatType::PrimaryKey,
745 };
746
747 let json = serde_json::to_string(&manifest).unwrap();
748 let new: RegionManifest = serde_json::from_str(&json).unwrap();
749
750 assert_eq!(manifest, new);
751 }
752
753 #[test]
755 fn test_old_region_manifest_compat() {
756 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
757 pub struct RegionManifestV1 {
758 pub metadata: RegionMetadataRef,
760 pub files: HashMap<FileId, FileMeta>,
762 pub flushed_entry_id: EntryId,
764 pub flushed_sequence: SequenceNumber,
766 pub manifest_version: ManifestVersion,
768 pub truncated_entry_id: Option<EntryId>,
770 #[serde(with = "humantime_serde")]
772 pub compaction_time_window: Option<Duration>,
773 }
774
775 let region_metadata = r#"{
776 "column_metadatas": [
777 {
778 "column_schema": {
779 "name": "a",
780 "data_type": {"Int64": {}},
781 "is_nullable": false,
782 "is_time_index": false,
783 "default_constraint": null,
784 "metadata": {}
785 },
786 "semantic_type": "Tag",
787 "column_id": 1
788 },
789 {
790 "column_schema": {
791 "name": "b",
792 "data_type": {"Float64": {}},
793 "is_nullable": false,
794 "is_time_index": false,
795 "default_constraint": null,
796 "metadata": {}
797 },
798 "semantic_type": "Field",
799 "column_id": 2
800 },
801 {
802 "column_schema": {
803 "name": "c",
804 "data_type": {"Timestamp": {"Millisecond": null}},
805 "is_nullable": false,
806 "is_time_index": false,
807 "default_constraint": null,
808 "metadata": {}
809 },
810 "semantic_type": "Timestamp",
811 "column_id": 3
812 }
813 ],
814 "primary_key": [1],
815 "region_id": 4402341478400,
816 "schema_version": 0
817 }"#;
818
819 let metadata: RegionMetadataRef =
820 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
821
822 let v1 = RegionManifestV1 {
824 metadata: metadata.clone(),
825 files: HashMap::new(),
826 flushed_entry_id: 0,
827 flushed_sequence: 0,
828 manifest_version: 0,
829 truncated_entry_id: None,
830 compaction_time_window: None,
831 };
832 let json = serde_json::to_string(&v1).unwrap();
833 let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
834 assert_eq!(
835 new_from_old,
836 RegionManifest {
837 metadata: metadata.clone(),
838 files: HashMap::new(),
839 removed_files: Default::default(),
840 flushed_entry_id: 0,
841 flushed_sequence: 0,
842 committed_sequence: None,
843 manifest_version: 0,
844 truncated_entry_id: None,
845 compaction_time_window: None,
846 sst_format: FormatType::PrimaryKey,
847 }
848 );
849
850 let new_manifest = RegionManifest {
851 metadata: metadata.clone(),
852 files: HashMap::new(),
853 removed_files: Default::default(),
854 flushed_entry_id: 0,
855 flushed_sequence: 0,
856 committed_sequence: None,
857 manifest_version: 0,
858 truncated_entry_id: None,
859 compaction_time_window: None,
860 sst_format: FormatType::PrimaryKey,
861 };
862 let json = serde_json::to_string(&new_manifest).unwrap();
863 let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
864 assert_eq!(
865 old_from_new,
866 RegionManifestV1 {
867 metadata: metadata.clone(),
868 files: HashMap::new(),
869 flushed_entry_id: 0,
870 flushed_sequence: 0,
871 manifest_version: 0,
872 truncated_entry_id: None,
873 compaction_time_window: None,
874 }
875 );
876
877 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
878 pub struct RegionEditV1 {
879 pub files_to_add: Vec<FileMeta>,
880 pub files_to_remove: Vec<FileMeta>,
881 #[serde(with = "humantime_serde")]
882 pub compaction_time_window: Option<Duration>,
883 pub flushed_entry_id: Option<EntryId>,
884 pub flushed_sequence: Option<SequenceNumber>,
885 }
886
887 let json = serde_json::to_string(&RegionEditV1 {
888 files_to_add: vec![],
889 files_to_remove: vec![],
890 compaction_time_window: None,
891 flushed_entry_id: None,
892 flushed_sequence: None,
893 })
894 .unwrap();
895 let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
896 assert_eq!(
897 RegionEdit {
898 files_to_add: vec![],
899 files_to_remove: vec![],
900 timestamp_ms: None,
901 compaction_time_window: None,
902 flushed_entry_id: None,
903 flushed_sequence: None,
904 committed_sequence: None,
905 },
906 new_from_old
907 );
908
909 let new = RegionEdit {
911 files_to_add: vec![],
912 files_to_remove: vec![],
913 timestamp_ms: Some(42),
914 compaction_time_window: None,
915 flushed_entry_id: None,
916 flushed_sequence: None,
917 committed_sequence: None,
918 };
919
920 let new_json = serde_json::to_string(&new).unwrap();
921
922 let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
923 assert_eq!(
924 RegionEditV1 {
925 files_to_add: vec![],
926 files_to_remove: vec![],
927 compaction_time_window: None,
928 flushed_entry_id: None,
929 flushed_sequence: None,
930 },
931 old_from_new
932 );
933 }
934
935 #[test]
936 fn test_region_change_backward_compatibility() {
937 let region_change_json = r#"{
939 "metadata": {
940 "column_metadatas": [
941 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
942 {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
943 {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
944 ],
945 "primary_key": [
946 1
947 ],
948 "region_id": 42,
949 "schema_version": 0
950 }
951 }"#;
952
953 let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
954 assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
955
956 let region_change = RegionChange {
958 metadata: region_change.metadata.clone(),
959 sst_format: FormatType::Flat,
960 };
961
962 let serialized = serde_json::to_string(®ion_change).unwrap();
963 let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
964 assert_eq!(deserialized.sst_format, FormatType::Flat);
965 }
966}