1use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
29use crate::manifest::manager::RemoveFileOptions;
30use crate::region::ManifestStats;
31use crate::sst::FormatType;
32use crate::sst::file::FileMeta;
33use crate::wal::EntryId;
34
35#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38 Change(RegionChange),
40 Edit(RegionEdit),
42 Remove(RegionRemove),
44 Truncate(RegionTruncate),
46}
47
48#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
49pub struct RegionChange {
50 pub metadata: RegionMetadataRef,
52 #[serde(default)]
54 pub sst_format: FormatType,
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
58pub struct RegionEdit {
59 pub files_to_add: Vec<FileMeta>,
60 pub files_to_remove: Vec<FileMeta>,
61 #[serde(default)]
63 pub timestamp_ms: Option<i64>,
64 #[serde(with = "humantime_serde")]
65 pub compaction_time_window: Option<Duration>,
66 pub flushed_entry_id: Option<EntryId>,
67 pub flushed_sequence: Option<SequenceNumber>,
68 pub committed_sequence: Option<SequenceNumber>,
69}
70
71#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
72pub struct RegionRemove {
73 pub region_id: RegionId,
74}
75
76#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
79pub struct RegionTruncate {
80 pub region_id: RegionId,
81 #[serde(flatten)]
82 pub kind: TruncateKind,
83 #[serde(default)]
85 pub timestamp_ms: Option<i64>,
86}
87
88#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
90#[serde(untagged)]
91pub enum TruncateKind {
92 All {
94 truncated_entry_id: EntryId,
96 truncated_sequence: SequenceNumber,
98 },
99 Partial { files_to_remove: Vec<FileMeta> },
101}
102
103#[derive(Serialize, Deserialize, Clone, Debug)]
105#[cfg_attr(test, derive(Eq))]
106pub struct RegionManifest {
107 pub metadata: RegionMetadataRef,
109 pub files: HashMap<FileId, FileMeta>,
111 #[serde(default)]
121 pub removed_files: RemovedFilesRecord,
122 pub flushed_entry_id: EntryId,
124 pub flushed_sequence: SequenceNumber,
126 pub committed_sequence: Option<SequenceNumber>,
127 pub manifest_version: ManifestVersion,
129 pub truncated_entry_id: Option<EntryId>,
131 #[serde(with = "humantime_serde")]
133 pub compaction_time_window: Option<Duration>,
134 #[serde(default)]
136 pub sst_format: FormatType,
137}
138
139#[cfg(test)]
140impl PartialEq for RegionManifest {
141 fn eq(&self, other: &Self) -> bool {
142 self.metadata == other.metadata
143 && self.files == other.files
144 && self.flushed_entry_id == other.flushed_entry_id
145 && self.flushed_sequence == other.flushed_sequence
146 && self.manifest_version == other.manifest_version
147 && self.truncated_entry_id == other.truncated_entry_id
148 && self.compaction_time_window == other.compaction_time_window
149 && self.committed_sequence == other.committed_sequence
150 }
151}
152
153#[derive(Debug, Default)]
154pub struct RegionManifestBuilder {
155 metadata: Option<RegionMetadataRef>,
156 files: HashMap<FileId, FileMeta>,
157 pub removed_files: RemovedFilesRecord,
158 flushed_entry_id: EntryId,
159 flushed_sequence: SequenceNumber,
160 manifest_version: ManifestVersion,
161 truncated_entry_id: Option<EntryId>,
162 compaction_time_window: Option<Duration>,
163 committed_sequence: Option<SequenceNumber>,
164 sst_format: FormatType,
165}
166
167impl RegionManifestBuilder {
168 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
170 if let Some(s) = checkpoint {
171 Self {
172 metadata: Some(s.metadata),
173 files: s.files,
174 removed_files: s.removed_files,
175 flushed_entry_id: s.flushed_entry_id,
176 manifest_version: s.manifest_version,
177 flushed_sequence: s.flushed_sequence,
178 truncated_entry_id: s.truncated_entry_id,
179 compaction_time_window: s.compaction_time_window,
180 committed_sequence: s.committed_sequence,
181 sst_format: s.sst_format,
182 }
183 } else {
184 Default::default()
185 }
186 }
187
188 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
189 self.metadata = Some(change.metadata);
190 self.manifest_version = manifest_version;
191 self.sst_format = change.sst_format;
192 }
193
194 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
195 self.manifest_version = manifest_version;
196 for file in edit.files_to_add {
197 self.files.insert(file.file_id, file);
198 }
199 self.removed_files.add_removed_files(
200 edit.files_to_remove
201 .iter()
202 .map(|meta| meta.file_id)
203 .collect(),
204 edit.timestamp_ms
205 .unwrap_or_else(|| Utc::now().timestamp_millis()),
206 );
207 for file in edit.files_to_remove {
208 self.files.remove(&file.file_id);
209 }
210 if let Some(flushed_entry_id) = edit.flushed_entry_id {
211 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
212 }
213 if let Some(flushed_sequence) = edit.flushed_sequence {
214 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
215 }
216
217 if let Some(committed_sequence) = edit.committed_sequence {
218 self.committed_sequence = Some(
219 self.committed_sequence
220 .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
221 );
222 }
223 if let Some(window) = edit.compaction_time_window {
224 self.compaction_time_window = Some(window);
225 }
226 }
227
228 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
229 self.manifest_version = manifest_version;
230 match truncate.kind {
231 TruncateKind::All {
232 truncated_entry_id,
233 truncated_sequence,
234 } => {
235 self.flushed_entry_id = truncated_entry_id;
236 self.flushed_sequence = truncated_sequence;
237 self.truncated_entry_id = Some(truncated_entry_id);
238 self.removed_files.add_removed_files(
239 self.files.values().map(|meta| meta.file_id).collect(),
240 truncate
241 .timestamp_ms
242 .unwrap_or_else(|| Utc::now().timestamp_millis()),
243 );
244 self.files.clear();
245 }
246 TruncateKind::Partial { files_to_remove } => {
247 self.removed_files.add_removed_files(
248 files_to_remove.iter().map(|meta| meta.file_id).collect(),
249 truncate
250 .timestamp_ms
251 .unwrap_or_else(|| Utc::now().timestamp_millis()),
252 );
253 for file in files_to_remove {
254 self.files.remove(&file.file_id);
255 }
256 }
257 }
258 }
259
260 pub fn files(&self) -> &HashMap<FileId, FileMeta> {
261 &self.files
262 }
263
264 pub fn contains_metadata(&self) -> bool {
266 self.metadata.is_some()
267 }
268
269 pub fn try_build(self) -> Result<RegionManifest> {
270 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
271 Ok(RegionManifest {
272 metadata,
273 files: self.files,
274 removed_files: self.removed_files,
275 flushed_entry_id: self.flushed_entry_id,
276 flushed_sequence: self.flushed_sequence,
277 committed_sequence: self.committed_sequence,
278 manifest_version: self.manifest_version,
279 truncated_entry_id: self.truncated_entry_id,
280 compaction_time_window: self.compaction_time_window,
281 sst_format: self.sst_format,
282 })
283 }
284}
285
286#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
290pub struct RemovedFilesRecord {
291 pub removed_files: Vec<RemovedFiles>,
294}
295
296impl RemovedFilesRecord {
297 pub fn clear_deleted_files(&mut self, deleted_files: Vec<FileId>) {
299 let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
300 for files in self.removed_files.iter_mut() {
301 files.file_ids.retain(|fid| !deleted_file_set.contains(fid));
302 }
303
304 self.removed_files.retain(|fs| !fs.file_ids.is_empty());
305 }
306
307 pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
308 let cnt = self
309 .removed_files
310 .iter()
311 .map(|r| r.file_ids.len() as u64)
312 .sum();
313 stats
314 .file_removed_cnt
315 .store(cnt, std::sync::atomic::Ordering::Relaxed);
316 }
317}
318
319#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
320pub struct RemovedFiles {
321 pub removed_at: i64,
324 pub file_ids: HashSet<FileId>,
326}
327
328impl RemovedFilesRecord {
329 pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
331 if file_ids.is_empty() {
332 return;
333 }
334 self.removed_files.push(RemovedFiles {
335 removed_at: at,
336 file_ids,
337 });
338 }
339
340 pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
341 if !opt.enable_gc {
342 self.removed_files.clear();
344 return Ok(());
345 }
346
347 Ok(())
350 }
351}
352
353#[derive(Serialize, Deserialize, Debug, Clone)]
355#[cfg_attr(test, derive(PartialEq, Eq))]
356pub struct RegionCheckpoint {
357 pub last_version: ManifestVersion,
359 pub compacted_actions: usize,
361 pub checkpoint: Option<RegionManifest>,
363}
364
365impl RegionCheckpoint {
366 pub fn last_version(&self) -> ManifestVersion {
367 self.last_version
368 }
369
370 pub fn encode(&self) -> Result<Vec<u8>> {
371 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
372
373 Ok(json.into_bytes())
374 }
375
376 pub fn decode(bytes: &[u8]) -> Result<Self> {
377 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
378
379 serde_json::from_str(data).context(SerdeJsonSnafu)
380 }
381}
382
383#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
384pub struct RegionMetaActionList {
385 pub actions: Vec<RegionMetaAction>,
386}
387
388impl RegionMetaActionList {
389 pub fn with_action(action: RegionMetaAction) -> Self {
390 Self {
391 actions: vec![action],
392 }
393 }
394
395 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
396 Self { actions }
397 }
398
399 pub fn into_region_edit(self) -> RegionEdit {
400 let mut edit = RegionEdit {
401 files_to_add: Vec::new(),
402 files_to_remove: Vec::new(),
403 timestamp_ms: None,
404 compaction_time_window: None,
405 flushed_entry_id: None,
406 flushed_sequence: None,
407 committed_sequence: None,
408 };
409
410 for action in self.actions {
411 if let RegionMetaAction::Edit(region_edit) = action {
412 edit.files_to_add.extend(region_edit.files_to_add);
414 edit.files_to_remove.extend(region_edit.files_to_remove);
415 if let Some(eid) = region_edit.flushed_entry_id {
417 edit.flushed_entry_id = Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
418 }
419 if let Some(seq) = region_edit.flushed_sequence {
420 edit.flushed_sequence = Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
421 }
422 if let Some(seq) = region_edit.committed_sequence {
423 edit.committed_sequence =
424 Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
425 }
426 if region_edit.compaction_time_window.is_some() {
428 edit.compaction_time_window = region_edit.compaction_time_window;
429 }
430 }
431 }
432
433 edit
434 }
435}
436
437impl RegionMetaActionList {
438 pub fn encode(&self) -> Result<Vec<u8>> {
440 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
441
442 Ok(json.into_bytes())
443 }
444
445 pub fn decode(bytes: &[u8]) -> Result<Self> {
446 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
447
448 serde_json::from_str(data).context(SerdeJsonSnafu)
449 }
450}
451
452#[cfg(test)]
453mod tests {
454
455 use common_time::Timestamp;
456
457 use super::*;
458
459 #[test]
463 fn test_region_action_compatibility() {
464 let region_edit = r#"{
465 "flushed_entry_id":null,
466 "compaction_time_window":null,
467 "files_to_add":[
468 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
469 ],
470 "files_to_remove":[
471 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
472 ]
473 }"#;
474 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
475
476 let region_edit = r#"{
477 "flushed_entry_id":10,
478 "flushed_sequence":10,
479 "compaction_time_window":null,
480 "files_to_add":[
481 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
482 ],
483 "files_to_remove":[
484 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
485 ]
486 }"#;
487 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
488
489 let region_change = r#" {
491 "metadata":{
492 "column_metadatas":[
493 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
494 ],
495 "primary_key":[1],
496 "region_id":5299989648942,
497 "schema_version":0
498 }
499 }"#;
500 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
501
502 let region_remove = r#"{"region_id":42}"#;
503 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
504 }
505
506 #[test]
507 fn test_region_manifest_builder() {
508 }
510
511 #[test]
512 fn test_encode_decode_region_checkpoint() {
513 }
515
516 #[test]
517 fn test_region_manifest_compatibility() {
518 let region_manifest_json = r#"{
520 "metadata": {
521 "column_metadatas": [
522 {
523 "column_schema": {
524 "name": "a",
525 "data_type": {"Int64": {}},
526 "is_nullable": false,
527 "is_time_index": false,
528 "default_constraint": null,
529 "metadata": {}
530 },
531 "semantic_type": "Tag",
532 "column_id": 1
533 },
534 {
535 "column_schema": {
536 "name": "b",
537 "data_type": {"Float64": {}},
538 "is_nullable": false,
539 "is_time_index": false,
540 "default_constraint": null,
541 "metadata": {}
542 },
543 "semantic_type": "Field",
544 "column_id": 2
545 },
546 {
547 "column_schema": {
548 "name": "c",
549 "data_type": {"Timestamp": {"Millisecond": null}},
550 "is_nullable": false,
551 "is_time_index": false,
552 "default_constraint": null,
553 "metadata": {}
554 },
555 "semantic_type": "Timestamp",
556 "column_id": 3
557 }
558 ],
559 "primary_key": [1],
560 "region_id": 4402341478400,
561 "schema_version": 0
562 },
563 "files": {
564 "4b220a70-2b03-4641-9687-b65d94641208": {
565 "region_id": 4402341478400,
566 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
567 "time_range": [
568 {"value": 1451609210000, "unit": "Millisecond"},
569 {"value": 1451609520000, "unit": "Millisecond"}
570 ],
571 "level": 1,
572 "file_size": 100
573 },
574 "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
575 "region_id": 4402341478400,
576 "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
577 "time_range": [
578 {"value": 1451609210000, "unit": "Millisecond"},
579 {"value": 1451609520000, "unit": "Millisecond"}
580 ],
581 "level": 0,
582 "file_size": 100
583 }
584 },
585 "flushed_entry_id": 10,
586 "flushed_sequence": 20,
587 "manifest_version": 1,
588 "truncated_entry_id": null,
589 "compaction_time_window": null
590 }"#;
591
592 let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
593
594 assert_eq!(manifest.files.len(), 2);
596 assert_eq!(manifest.flushed_entry_id, 10);
597 assert_eq!(manifest.flushed_sequence, 20);
598 assert_eq!(manifest.manifest_version, 1);
599
600 let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
602 file_ids.sort_unstable();
603 assert_eq!(
604 file_ids,
605 vec![
606 "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
607 "4b220a70-2b03-4641-9687-b65d94641208",
608 ]
609 );
610
611 let serialized_manifest = serde_json::to_string(&manifest).unwrap();
613 let deserialized_manifest: RegionManifest =
614 serde_json::from_str(&serialized_manifest).unwrap();
615 assert_eq!(manifest, deserialized_manifest);
616 assert_ne!(serialized_manifest, region_manifest_json);
617 }
618
619 #[test]
620 fn test_region_truncate_compat() {
621 let region_truncate_json = r#"{
623 "region_id": 4402341478400,
624 "truncated_entry_id": 10,
625 "truncated_sequence": 20
626 }"#;
627
628 let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
629 assert_eq!(truncate_v1.region_id, 4402341478400);
630 assert_eq!(
631 truncate_v1.kind,
632 TruncateKind::All {
633 truncated_entry_id: 10,
634 truncated_sequence: 20,
635 }
636 );
637
638 let region_truncate_v2_json = r#"{
640 "region_id": 4402341478400,
641 "files_to_remove": [
642 {
643 "region_id": 4402341478400,
644 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
645 "time_range": [
646 {
647 "value": 1451609210000,
648 "unit": "Millisecond"
649 },
650 {
651 "value": 1451609520000,
652 "unit": "Millisecond"
653 }
654 ],
655 "level": 1,
656 "file_size": 100
657 }
658 ]
659}"#;
660
661 let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
662 assert_eq!(truncate_v2.region_id, 4402341478400);
663 assert_eq!(
664 truncate_v2.kind,
665 TruncateKind::Partial {
666 files_to_remove: vec![FileMeta {
667 region_id: RegionId::from_u64(4402341478400),
668 file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
669 time_range: (
670 Timestamp::new_millisecond(1451609210000),
671 Timestamp::new_millisecond(1451609520000)
672 ),
673 level: 1,
674 file_size: 100,
675 ..Default::default()
676 }]
677 }
678 );
679 }
680
681 #[test]
682 fn test_region_manifest_removed_files() {
683 let region_metadata = r#"{
684 "column_metadatas": [
685 {
686 "column_schema": {
687 "name": "a",
688 "data_type": {"Int64": {}},
689 "is_nullable": false,
690 "is_time_index": false,
691 "default_constraint": null,
692 "metadata": {}
693 },
694 "semantic_type": "Tag",
695 "column_id": 1
696 },
697 {
698 "column_schema": {
699 "name": "b",
700 "data_type": {"Float64": {}},
701 "is_nullable": false,
702 "is_time_index": false,
703 "default_constraint": null,
704 "metadata": {}
705 },
706 "semantic_type": "Field",
707 "column_id": 2
708 },
709 {
710 "column_schema": {
711 "name": "c",
712 "data_type": {"Timestamp": {"Millisecond": null}},
713 "is_nullable": false,
714 "is_time_index": false,
715 "default_constraint": null,
716 "metadata": {}
717 },
718 "semantic_type": "Timestamp",
719 "column_id": 3
720 }
721 ],
722 "primary_key": [1],
723 "region_id": 4402341478400,
724 "schema_version": 0
725 }"#;
726
727 let metadata: RegionMetadataRef =
728 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
729 let manifest = RegionManifest {
730 metadata: metadata.clone(),
731 files: HashMap::new(),
732 flushed_entry_id: 0,
733 flushed_sequence: 0,
734 committed_sequence: None,
735 manifest_version: 0,
736 truncated_entry_id: None,
737 compaction_time_window: None,
738 removed_files: RemovedFilesRecord {
739 removed_files: vec![RemovedFiles {
740 removed_at: 0,
741 file_ids: HashSet::from([FileId::parse_str(
742 "4b220a70-2b03-4641-9687-b65d94641208",
743 )
744 .unwrap()]),
745 }],
746 },
747 sst_format: FormatType::PrimaryKey,
748 };
749
750 let json = serde_json::to_string(&manifest).unwrap();
751 let new: RegionManifest = serde_json::from_str(&json).unwrap();
752
753 assert_eq!(manifest, new);
754 }
755
756 #[test]
758 fn test_old_region_manifest_compat() {
759 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
760 pub struct RegionManifestV1 {
761 pub metadata: RegionMetadataRef,
763 pub files: HashMap<FileId, FileMeta>,
765 pub flushed_entry_id: EntryId,
767 pub flushed_sequence: SequenceNumber,
769 pub manifest_version: ManifestVersion,
771 pub truncated_entry_id: Option<EntryId>,
773 #[serde(with = "humantime_serde")]
775 pub compaction_time_window: Option<Duration>,
776 }
777
778 let region_metadata = r#"{
779 "column_metadatas": [
780 {
781 "column_schema": {
782 "name": "a",
783 "data_type": {"Int64": {}},
784 "is_nullable": false,
785 "is_time_index": false,
786 "default_constraint": null,
787 "metadata": {}
788 },
789 "semantic_type": "Tag",
790 "column_id": 1
791 },
792 {
793 "column_schema": {
794 "name": "b",
795 "data_type": {"Float64": {}},
796 "is_nullable": false,
797 "is_time_index": false,
798 "default_constraint": null,
799 "metadata": {}
800 },
801 "semantic_type": "Field",
802 "column_id": 2
803 },
804 {
805 "column_schema": {
806 "name": "c",
807 "data_type": {"Timestamp": {"Millisecond": null}},
808 "is_nullable": false,
809 "is_time_index": false,
810 "default_constraint": null,
811 "metadata": {}
812 },
813 "semantic_type": "Timestamp",
814 "column_id": 3
815 }
816 ],
817 "primary_key": [1],
818 "region_id": 4402341478400,
819 "schema_version": 0
820 }"#;
821
822 let metadata: RegionMetadataRef =
823 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
824
825 let v1 = RegionManifestV1 {
827 metadata: metadata.clone(),
828 files: HashMap::new(),
829 flushed_entry_id: 0,
830 flushed_sequence: 0,
831 manifest_version: 0,
832 truncated_entry_id: None,
833 compaction_time_window: None,
834 };
835 let json = serde_json::to_string(&v1).unwrap();
836 let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
837 assert_eq!(
838 new_from_old,
839 RegionManifest {
840 metadata: metadata.clone(),
841 files: HashMap::new(),
842 removed_files: Default::default(),
843 flushed_entry_id: 0,
844 flushed_sequence: 0,
845 committed_sequence: None,
846 manifest_version: 0,
847 truncated_entry_id: None,
848 compaction_time_window: None,
849 sst_format: FormatType::PrimaryKey,
850 }
851 );
852
853 let new_manifest = RegionManifest {
854 metadata: metadata.clone(),
855 files: HashMap::new(),
856 removed_files: Default::default(),
857 flushed_entry_id: 0,
858 flushed_sequence: 0,
859 committed_sequence: None,
860 manifest_version: 0,
861 truncated_entry_id: None,
862 compaction_time_window: None,
863 sst_format: FormatType::PrimaryKey,
864 };
865 let json = serde_json::to_string(&new_manifest).unwrap();
866 let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
867 assert_eq!(
868 old_from_new,
869 RegionManifestV1 {
870 metadata: metadata.clone(),
871 files: HashMap::new(),
872 flushed_entry_id: 0,
873 flushed_sequence: 0,
874 manifest_version: 0,
875 truncated_entry_id: None,
876 compaction_time_window: None,
877 }
878 );
879
880 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
881 pub struct RegionEditV1 {
882 pub files_to_add: Vec<FileMeta>,
883 pub files_to_remove: Vec<FileMeta>,
884 #[serde(with = "humantime_serde")]
885 pub compaction_time_window: Option<Duration>,
886 pub flushed_entry_id: Option<EntryId>,
887 pub flushed_sequence: Option<SequenceNumber>,
888 }
889
890 let json = serde_json::to_string(&RegionEditV1 {
891 files_to_add: vec![],
892 files_to_remove: vec![],
893 compaction_time_window: None,
894 flushed_entry_id: None,
895 flushed_sequence: None,
896 })
897 .unwrap();
898 let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
899 assert_eq!(
900 RegionEdit {
901 files_to_add: vec![],
902 files_to_remove: vec![],
903 timestamp_ms: None,
904 compaction_time_window: None,
905 flushed_entry_id: None,
906 flushed_sequence: None,
907 committed_sequence: None,
908 },
909 new_from_old
910 );
911
912 let new = RegionEdit {
914 files_to_add: vec![],
915 files_to_remove: vec![],
916 timestamp_ms: Some(42),
917 compaction_time_window: None,
918 flushed_entry_id: None,
919 flushed_sequence: None,
920 committed_sequence: None,
921 };
922
923 let new_json = serde_json::to_string(&new).unwrap();
924
925 let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
926 assert_eq!(
927 RegionEditV1 {
928 files_to_add: vec![],
929 files_to_remove: vec![],
930 compaction_time_window: None,
931 flushed_entry_id: None,
932 flushed_sequence: None,
933 },
934 old_from_new
935 );
936 }
937
938 #[test]
939 fn test_region_change_backward_compatibility() {
940 let region_change_json = r#"{
942 "metadata": {
943 "column_metadatas": [
944 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
945 {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
946 {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
947 ],
948 "primary_key": [
949 1
950 ],
951 "region_id": 42,
952 "schema_version": 0
953 }
954 }"#;
955
956 let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
957 assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
958
959 let region_change = RegionChange {
961 metadata: region_change.metadata.clone(),
962 sst_format: FormatType::Flat,
963 };
964
965 let serialized = serde_json::to_string(®ion_change).unwrap();
966 let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
967 assert_eq!(deserialized.sst_format, FormatType::Flat);
968 }
969}