1use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use store_api::ManifestVersion;
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
26use strum::Display;
27
28use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
29use crate::manifest::manager::RemoveFileOptions;
30use crate::region::ManifestStats;
31use crate::sst::FormatType;
32use crate::sst::file::FileMeta;
33use crate::wal::EntryId;
34
35#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
37pub enum RegionMetaAction {
38 Change(RegionChange),
40 Edit(RegionEdit),
42 Remove(RegionRemove),
44 Truncate(RegionTruncate),
46}
47
48impl RegionMetaAction {
49 pub fn is_change(&self) -> bool {
51 matches!(self, RegionMetaAction::Change(_))
52 }
53
54 pub fn is_edit(&self) -> bool {
56 matches!(self, RegionMetaAction::Edit(_))
57 }
58}
59
60#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
61pub struct RegionChange {
62 pub metadata: RegionMetadataRef,
64 #[serde(default)]
66 pub sst_format: FormatType,
67}
68
69#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
70pub struct RegionEdit {
71 pub files_to_add: Vec<FileMeta>,
72 pub files_to_remove: Vec<FileMeta>,
73 #[serde(default)]
75 pub timestamp_ms: Option<i64>,
76 #[serde(with = "humantime_serde")]
77 pub compaction_time_window: Option<Duration>,
78 pub flushed_entry_id: Option<EntryId>,
79 pub flushed_sequence: Option<SequenceNumber>,
80 pub committed_sequence: Option<SequenceNumber>,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
84pub struct RegionRemove {
85 pub region_id: RegionId,
86}
87
88#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
91pub struct RegionTruncate {
92 pub region_id: RegionId,
93 #[serde(flatten)]
94 pub kind: TruncateKind,
95 #[serde(default)]
97 pub timestamp_ms: Option<i64>,
98}
99
100#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
102#[serde(untagged)]
103pub enum TruncateKind {
104 All {
106 truncated_entry_id: EntryId,
108 truncated_sequence: SequenceNumber,
110 },
111 Partial { files_to_remove: Vec<FileMeta> },
113}
114
115#[derive(Serialize, Deserialize, Clone, Debug)]
117#[cfg_attr(test, derive(Eq))]
118pub struct RegionManifest {
119 pub metadata: RegionMetadataRef,
121 pub files: HashMap<FileId, FileMeta>,
123 #[serde(default)]
133 pub removed_files: RemovedFilesRecord,
134 pub flushed_entry_id: EntryId,
136 pub flushed_sequence: SequenceNumber,
138 pub committed_sequence: Option<SequenceNumber>,
139 pub manifest_version: ManifestVersion,
141 pub truncated_entry_id: Option<EntryId>,
143 #[serde(with = "humantime_serde")]
145 pub compaction_time_window: Option<Duration>,
146 #[serde(default)]
148 pub sst_format: FormatType,
149}
150
151#[cfg(test)]
152impl PartialEq for RegionManifest {
153 fn eq(&self, other: &Self) -> bool {
154 self.metadata == other.metadata
155 && self.files == other.files
156 && self.flushed_entry_id == other.flushed_entry_id
157 && self.flushed_sequence == other.flushed_sequence
158 && self.manifest_version == other.manifest_version
159 && self.truncated_entry_id == other.truncated_entry_id
160 && self.compaction_time_window == other.compaction_time_window
161 && self.committed_sequence == other.committed_sequence
162 }
163}
164
165#[derive(Debug, Default)]
166pub struct RegionManifestBuilder {
167 metadata: Option<RegionMetadataRef>,
168 files: HashMap<FileId, FileMeta>,
169 pub removed_files: RemovedFilesRecord,
170 flushed_entry_id: EntryId,
171 flushed_sequence: SequenceNumber,
172 manifest_version: ManifestVersion,
173 truncated_entry_id: Option<EntryId>,
174 compaction_time_window: Option<Duration>,
175 committed_sequence: Option<SequenceNumber>,
176 sst_format: FormatType,
177}
178
179impl RegionManifestBuilder {
180 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
182 if let Some(s) = checkpoint {
183 Self {
184 metadata: Some(s.metadata),
185 files: s.files,
186 removed_files: s.removed_files,
187 flushed_entry_id: s.flushed_entry_id,
188 manifest_version: s.manifest_version,
189 flushed_sequence: s.flushed_sequence,
190 truncated_entry_id: s.truncated_entry_id,
191 compaction_time_window: s.compaction_time_window,
192 committed_sequence: s.committed_sequence,
193 sst_format: s.sst_format,
194 }
195 } else {
196 Default::default()
197 }
198 }
199
200 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
201 self.metadata = Some(change.metadata);
202 self.manifest_version = manifest_version;
203 self.sst_format = change.sst_format;
204 }
205
206 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
207 self.manifest_version = manifest_version;
208
209 let mut removed_files = vec![];
210 for file in edit.files_to_add {
211 if let Some(old_file) = self.files.insert(file.file_id, file.clone())
212 && let Some(old_index) = old_file.index_version()
213 && !old_file.is_index_up_to_date(&file)
214 {
215 removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
217 }
218 }
219 removed_files.extend(
220 edit.files_to_remove
221 .iter()
222 .map(|f| RemovedFile::File(f.file_id, f.index_version())),
223 );
224 let at = edit
225 .timestamp_ms
226 .unwrap_or_else(|| Utc::now().timestamp_millis());
227 self.removed_files.add_removed_files(removed_files, at);
228
229 for file in edit.files_to_remove {
230 self.files.remove(&file.file_id);
231 }
232 if let Some(flushed_entry_id) = edit.flushed_entry_id {
233 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
234 }
235 if let Some(flushed_sequence) = edit.flushed_sequence {
236 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
237 }
238
239 if let Some(committed_sequence) = edit.committed_sequence {
240 self.committed_sequence = Some(
241 self.committed_sequence
242 .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
243 );
244 }
245 if let Some(window) = edit.compaction_time_window {
246 self.compaction_time_window = Some(window);
247 }
248 }
249
250 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
251 self.manifest_version = manifest_version;
252 match truncate.kind {
253 TruncateKind::All {
254 truncated_entry_id,
255 truncated_sequence,
256 } => {
257 self.flushed_entry_id = truncated_entry_id;
258 self.flushed_sequence = truncated_sequence;
259 self.truncated_entry_id = Some(truncated_entry_id);
260 self.removed_files.add_removed_files(
261 self.files
262 .values()
263 .map(|f| RemovedFile::File(f.file_id, f.index_version()))
264 .collect(),
265 truncate
266 .timestamp_ms
267 .unwrap_or_else(|| Utc::now().timestamp_millis()),
268 );
269 self.files.clear();
270 }
271 TruncateKind::Partial { files_to_remove } => {
272 self.removed_files.add_removed_files(
273 files_to_remove
274 .iter()
275 .map(|f| RemovedFile::File(f.file_id, f.index_version()))
276 .collect(),
277 truncate
278 .timestamp_ms
279 .unwrap_or_else(|| Utc::now().timestamp_millis()),
280 );
281 for file in files_to_remove {
282 self.files.remove(&file.file_id);
283 }
284 }
285 }
286 }
287
288 pub fn files(&self) -> &HashMap<FileId, FileMeta> {
289 &self.files
290 }
291
292 pub fn contains_metadata(&self) -> bool {
294 self.metadata.is_some()
295 }
296
297 pub fn try_build(self) -> Result<RegionManifest> {
298 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
299 Ok(RegionManifest {
300 metadata,
301 files: self.files,
302 removed_files: self.removed_files,
303 flushed_entry_id: self.flushed_entry_id,
304 flushed_sequence: self.flushed_sequence,
305 committed_sequence: self.committed_sequence,
306 manifest_version: self.manifest_version,
307 truncated_entry_id: self.truncated_entry_id,
308 compaction_time_window: self.compaction_time_window,
309 sst_format: self.sst_format,
310 })
311 }
312}
313
314#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
318pub struct RemovedFilesRecord {
319 pub removed_files: Vec<RemovedFiles>,
322}
323
324impl RemovedFilesRecord {
325 pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
327 let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
328 for files in self.removed_files.iter_mut() {
329 files
330 .files
331 .retain(|removed| !deleted_file_set.contains(removed));
332 }
333
334 self.removed_files.retain(|fs| !fs.files.is_empty());
335 }
336
337 pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
338 let cnt = self
339 .removed_files
340 .iter()
341 .map(|r| r.files.len() as u64)
342 .sum();
343 stats
344 .file_removed_cnt
345 .store(cnt, std::sync::atomic::Ordering::Relaxed);
346 }
347}
348
349#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
350pub struct RemovedFiles {
351 pub removed_at: i64,
354 #[serde(default)]
356 pub files: HashSet<RemovedFile>,
357}
358
359#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
361pub enum RemovedFile {
362 File(FileId, Option<IndexVersion>),
363 Index(FileId, IndexVersion),
364}
365
366impl<'de> Deserialize<'de> for RemovedFile {
370 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
371 where
372 D: serde::Deserializer<'de>,
373 {
374 #[derive(Deserialize)]
375 #[serde(untagged)]
376 enum CompatRemovedFile {
377 Enum(RemovedFileEnum),
378 FileId(FileId),
379 }
380
381 #[derive(Deserialize)]
382 enum RemovedFileEnum {
383 File(FileId, Option<IndexVersion>),
384 Index(FileId, IndexVersion),
385 }
386
387 let compat = CompatRemovedFile::deserialize(deserializer)?;
388 match compat {
389 CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
390 CompatRemovedFile::Enum(e) => match e {
391 RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
392 RemovedFileEnum::Index(file_id, version) => {
393 Ok(RemovedFile::Index(file_id, version))
394 }
395 },
396 }
397 }
398}
399
400impl RemovedFile {
401 pub fn file_id(&self) -> FileId {
402 match self {
403 RemovedFile::File(file_id, _) => *file_id,
404 RemovedFile::Index(file_id, _) => *file_id,
405 }
406 }
407
408 pub fn index_version(&self) -> Option<IndexVersion> {
409 match self {
410 RemovedFile::File(_, index_version) => *index_version,
411 RemovedFile::Index(_, index_version) => Some(*index_version),
412 }
413 }
414}
415
416impl RemovedFilesRecord {
417 pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
419 if removed.is_empty() {
420 return;
421 }
422 let files = removed.into_iter().collect();
423 self.removed_files.push(RemovedFiles {
424 removed_at: at,
425 files,
426 });
427 }
428
429 pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
430 if !opt.enable_gc {
431 self.removed_files.clear();
433 return Ok(());
434 }
435
436 Ok(())
439 }
440}
441
442#[derive(Serialize, Deserialize, Debug, Clone)]
444#[cfg_attr(test, derive(PartialEq, Eq))]
445pub struct RegionCheckpoint {
446 pub last_version: ManifestVersion,
448 pub compacted_actions: usize,
450 pub checkpoint: Option<RegionManifest>,
452}
453
454impl RegionCheckpoint {
455 pub fn last_version(&self) -> ManifestVersion {
456 self.last_version
457 }
458
459 pub fn encode(&self) -> Result<Vec<u8>> {
460 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
461
462 Ok(json.into_bytes())
463 }
464
465 pub fn decode(bytes: &[u8]) -> Result<Self> {
466 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
467
468 serde_json::from_str(data).context(SerdeJsonSnafu)
469 }
470}
471
472#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
473pub struct RegionMetaActionList {
474 pub actions: Vec<RegionMetaAction>,
475}
476
477impl RegionMetaActionList {
478 pub fn with_action(action: RegionMetaAction) -> Self {
479 Self {
480 actions: vec![action],
481 }
482 }
483
484 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
485 Self { actions }
486 }
487
488 pub fn split_region_change_and_edit(self) -> (Option<RegionChange>, RegionEdit) {
490 let mut edit = RegionEdit {
491 files_to_add: Vec::new(),
492 files_to_remove: Vec::new(),
493 timestamp_ms: None,
494 compaction_time_window: None,
495 flushed_entry_id: None,
496 flushed_sequence: None,
497 committed_sequence: None,
498 };
499 let mut region_change = None;
500 for action in self.actions {
501 match action {
502 RegionMetaAction::Change(change) => {
503 region_change = Some(change);
504 }
505 RegionMetaAction::Edit(region_edit) => {
506 edit.files_to_add.extend(region_edit.files_to_add);
508 edit.files_to_remove.extend(region_edit.files_to_remove);
509 if let Some(eid) = region_edit.flushed_entry_id {
511 edit.flushed_entry_id =
512 Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
513 }
514 if let Some(seq) = region_edit.flushed_sequence {
515 edit.flushed_sequence =
516 Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
517 }
518 if let Some(seq) = region_edit.committed_sequence {
519 edit.committed_sequence =
520 Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
521 }
522 if region_edit.compaction_time_window.is_some() {
524 edit.compaction_time_window = region_edit.compaction_time_window;
525 }
526 }
527 _ => {}
528 }
529 }
530
531 (region_change, edit)
532 }
533}
534
535impl RegionMetaActionList {
536 pub fn encode(&self) -> Result<Vec<u8>> {
538 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
539
540 Ok(json.into_bytes())
541 }
542
543 pub fn decode(bytes: &[u8]) -> Result<Self> {
544 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
545
546 serde_json::from_str(data).context(SerdeJsonSnafu)
547 }
548}
549
550#[cfg(test)]
551mod tests {
552
553 use common_time::Timestamp;
554
555 use super::*;
556
557 #[test]
561 fn test_region_action_compatibility() {
562 let region_edit = r#"{
563 "flushed_entry_id":null,
564 "compaction_time_window":null,
565 "files_to_add":[
566 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
567 ],
568 "files_to_remove":[
569 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
570 ]
571 }"#;
572 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
573
574 let region_edit = r#"{
575 "flushed_entry_id":10,
576 "flushed_sequence":10,
577 "compaction_time_window":null,
578 "files_to_add":[
579 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
580 ],
581 "files_to_remove":[
582 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
583 ]
584 }"#;
585 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
586
587 let region_change = r#" {
589 "metadata":{
590 "column_metadatas":[
591 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
592 ],
593 "primary_key":[1],
594 "region_id":5299989648942,
595 "schema_version":0
596 }
597 }"#;
598 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
599
600 let region_remove = r#"{"region_id":42}"#;
601 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
602 }
603
604 #[test]
605 fn test_region_manifest_builder() {
606 }
608
609 #[test]
610 fn test_encode_decode_region_checkpoint() {
611 }
613
614 #[test]
615 fn test_region_manifest_compatibility() {
616 let region_manifest_json = r#"{
618 "metadata": {
619 "column_metadatas": [
620 {
621 "column_schema": {
622 "name": "a",
623 "data_type": {"Int64": {}},
624 "is_nullable": false,
625 "is_time_index": false,
626 "default_constraint": null,
627 "metadata": {}
628 },
629 "semantic_type": "Tag",
630 "column_id": 1
631 },
632 {
633 "column_schema": {
634 "name": "b",
635 "data_type": {"Float64": {}},
636 "is_nullable": false,
637 "is_time_index": false,
638 "default_constraint": null,
639 "metadata": {}
640 },
641 "semantic_type": "Field",
642 "column_id": 2
643 },
644 {
645 "column_schema": {
646 "name": "c",
647 "data_type": {"Timestamp": {"Millisecond": null}},
648 "is_nullable": false,
649 "is_time_index": false,
650 "default_constraint": null,
651 "metadata": {}
652 },
653 "semantic_type": "Timestamp",
654 "column_id": 3
655 }
656 ],
657 "primary_key": [1],
658 "region_id": 4402341478400,
659 "schema_version": 0
660 },
661 "files": {
662 "4b220a70-2b03-4641-9687-b65d94641208": {
663 "region_id": 4402341478400,
664 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
665 "time_range": [
666 {"value": 1451609210000, "unit": "Millisecond"},
667 {"value": 1451609520000, "unit": "Millisecond"}
668 ],
669 "level": 1,
670 "file_size": 100
671 },
672 "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
673 "region_id": 4402341478400,
674 "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
675 "time_range": [
676 {"value": 1451609210000, "unit": "Millisecond"},
677 {"value": 1451609520000, "unit": "Millisecond"}
678 ],
679 "level": 0,
680 "file_size": 100
681 }
682 },
683 "flushed_entry_id": 10,
684 "flushed_sequence": 20,
685 "manifest_version": 1,
686 "truncated_entry_id": null,
687 "compaction_time_window": null
688 }"#;
689
690 let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
691
692 assert_eq!(manifest.files.len(), 2);
694 assert_eq!(manifest.flushed_entry_id, 10);
695 assert_eq!(manifest.flushed_sequence, 20);
696 assert_eq!(manifest.manifest_version, 1);
697
698 let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
700 file_ids.sort_unstable();
701 assert_eq!(
702 file_ids,
703 vec![
704 "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
705 "4b220a70-2b03-4641-9687-b65d94641208",
706 ]
707 );
708
709 let serialized_manifest = serde_json::to_string(&manifest).unwrap();
711 let deserialized_manifest: RegionManifest =
712 serde_json::from_str(&serialized_manifest).unwrap();
713 assert_eq!(manifest, deserialized_manifest);
714 assert_ne!(serialized_manifest, region_manifest_json);
715 }
716
717 #[test]
718 fn test_region_truncate_compat() {
719 let region_truncate_json = r#"{
721 "region_id": 4402341478400,
722 "truncated_entry_id": 10,
723 "truncated_sequence": 20
724 }"#;
725
726 let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
727 assert_eq!(truncate_v1.region_id, 4402341478400);
728 assert_eq!(
729 truncate_v1.kind,
730 TruncateKind::All {
731 truncated_entry_id: 10,
732 truncated_sequence: 20,
733 }
734 );
735
736 let region_truncate_v2_json = r#"{
738 "region_id": 4402341478400,
739 "files_to_remove": [
740 {
741 "region_id": 4402341478400,
742 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
743 "time_range": [
744 {
745 "value": 1451609210000,
746 "unit": "Millisecond"
747 },
748 {
749 "value": 1451609520000,
750 "unit": "Millisecond"
751 }
752 ],
753 "level": 1,
754 "file_size": 100
755 }
756 ]
757}"#;
758
759 let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
760 assert_eq!(truncate_v2.region_id, 4402341478400);
761 assert_eq!(
762 truncate_v2.kind,
763 TruncateKind::Partial {
764 files_to_remove: vec![FileMeta {
765 region_id: RegionId::from_u64(4402341478400),
766 file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
767 time_range: (
768 Timestamp::new_millisecond(1451609210000),
769 Timestamp::new_millisecond(1451609520000)
770 ),
771 level: 1,
772 file_size: 100,
773 ..Default::default()
774 }]
775 }
776 );
777 }
778
779 #[test]
780 fn test_region_manifest_removed_files() {
781 let region_metadata = r#"{
782 "column_metadatas": [
783 {
784 "column_schema": {
785 "name": "a",
786 "data_type": {"Int64": {}},
787 "is_nullable": false,
788 "is_time_index": false,
789 "default_constraint": null,
790 "metadata": {}
791 },
792 "semantic_type": "Tag",
793 "column_id": 1
794 },
795 {
796 "column_schema": {
797 "name": "b",
798 "data_type": {"Float64": {}},
799 "is_nullable": false,
800 "is_time_index": false,
801 "default_constraint": null,
802 "metadata": {}
803 },
804 "semantic_type": "Field",
805 "column_id": 2
806 },
807 {
808 "column_schema": {
809 "name": "c",
810 "data_type": {"Timestamp": {"Millisecond": null}},
811 "is_nullable": false,
812 "is_time_index": false,
813 "default_constraint": null,
814 "metadata": {}
815 },
816 "semantic_type": "Timestamp",
817 "column_id": 3
818 }
819 ],
820 "primary_key": [1],
821 "region_id": 4402341478400,
822 "schema_version": 0
823 }"#;
824
825 let metadata: RegionMetadataRef =
826 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
827 let manifest = RegionManifest {
828 metadata: metadata.clone(),
829 files: HashMap::new(),
830 flushed_entry_id: 0,
831 flushed_sequence: 0,
832 committed_sequence: None,
833 manifest_version: 0,
834 truncated_entry_id: None,
835 compaction_time_window: None,
836 removed_files: RemovedFilesRecord {
837 removed_files: vec![RemovedFiles {
838 removed_at: 0,
839 files: HashSet::from([RemovedFile::File(
840 FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
841 None,
842 )]),
843 }],
844 },
845 sst_format: FormatType::PrimaryKey,
846 };
847
848 let json = serde_json::to_string(&manifest).unwrap();
849 let new: RegionManifest = serde_json::from_str(&json).unwrap();
850
851 assert_eq!(manifest, new);
852 }
853
854 #[test]
856 fn test_old_region_manifest_compat() {
857 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
858 pub struct RegionManifestV1 {
859 pub metadata: RegionMetadataRef,
861 pub files: HashMap<FileId, FileMeta>,
863 pub flushed_entry_id: EntryId,
865 pub flushed_sequence: SequenceNumber,
867 pub manifest_version: ManifestVersion,
869 pub truncated_entry_id: Option<EntryId>,
871 #[serde(with = "humantime_serde")]
873 pub compaction_time_window: Option<Duration>,
874 }
875
876 let region_metadata = r#"{
877 "column_metadatas": [
878 {
879 "column_schema": {
880 "name": "a",
881 "data_type": {"Int64": {}},
882 "is_nullable": false,
883 "is_time_index": false,
884 "default_constraint": null,
885 "metadata": {}
886 },
887 "semantic_type": "Tag",
888 "column_id": 1
889 },
890 {
891 "column_schema": {
892 "name": "b",
893 "data_type": {"Float64": {}},
894 "is_nullable": false,
895 "is_time_index": false,
896 "default_constraint": null,
897 "metadata": {}
898 },
899 "semantic_type": "Field",
900 "column_id": 2
901 },
902 {
903 "column_schema": {
904 "name": "c",
905 "data_type": {"Timestamp": {"Millisecond": null}},
906 "is_nullable": false,
907 "is_time_index": false,
908 "default_constraint": null,
909 "metadata": {}
910 },
911 "semantic_type": "Timestamp",
912 "column_id": 3
913 }
914 ],
915 "primary_key": [1],
916 "region_id": 4402341478400,
917 "schema_version": 0
918 }"#;
919
920 let metadata: RegionMetadataRef =
921 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
922
923 let v1 = RegionManifestV1 {
925 metadata: metadata.clone(),
926 files: HashMap::new(),
927 flushed_entry_id: 0,
928 flushed_sequence: 0,
929 manifest_version: 0,
930 truncated_entry_id: None,
931 compaction_time_window: None,
932 };
933 let json = serde_json::to_string(&v1).unwrap();
934 let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
935 assert_eq!(
936 new_from_old,
937 RegionManifest {
938 metadata: metadata.clone(),
939 files: HashMap::new(),
940 removed_files: Default::default(),
941 flushed_entry_id: 0,
942 flushed_sequence: 0,
943 committed_sequence: None,
944 manifest_version: 0,
945 truncated_entry_id: None,
946 compaction_time_window: None,
947 sst_format: FormatType::PrimaryKey,
948 }
949 );
950
951 let new_manifest = RegionManifest {
952 metadata: metadata.clone(),
953 files: HashMap::new(),
954 removed_files: Default::default(),
955 flushed_entry_id: 0,
956 flushed_sequence: 0,
957 committed_sequence: None,
958 manifest_version: 0,
959 truncated_entry_id: None,
960 compaction_time_window: None,
961 sst_format: FormatType::PrimaryKey,
962 };
963 let json = serde_json::to_string(&new_manifest).unwrap();
964 let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
965 assert_eq!(
966 old_from_new,
967 RegionManifestV1 {
968 metadata: metadata.clone(),
969 files: HashMap::new(),
970 flushed_entry_id: 0,
971 flushed_sequence: 0,
972 manifest_version: 0,
973 truncated_entry_id: None,
974 compaction_time_window: None,
975 }
976 );
977
978 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
979 pub struct RegionEditV1 {
980 pub files_to_add: Vec<FileMeta>,
981 pub files_to_remove: Vec<FileMeta>,
982 #[serde(with = "humantime_serde")]
983 pub compaction_time_window: Option<Duration>,
984 pub flushed_entry_id: Option<EntryId>,
985 pub flushed_sequence: Option<SequenceNumber>,
986 }
987
988 let json = serde_json::to_string(&RegionEditV1 {
989 files_to_add: vec![],
990 files_to_remove: vec![],
991 compaction_time_window: None,
992 flushed_entry_id: None,
993 flushed_sequence: None,
994 })
995 .unwrap();
996 let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
997 assert_eq!(
998 RegionEdit {
999 files_to_add: vec![],
1000 files_to_remove: vec![],
1001 timestamp_ms: None,
1002 compaction_time_window: None,
1003 flushed_entry_id: None,
1004 flushed_sequence: None,
1005 committed_sequence: None,
1006 },
1007 new_from_old
1008 );
1009
1010 let new = RegionEdit {
1012 files_to_add: vec![],
1013 files_to_remove: vec![],
1014 timestamp_ms: Some(42),
1015 compaction_time_window: None,
1016 flushed_entry_id: None,
1017 flushed_sequence: None,
1018 committed_sequence: None,
1019 };
1020
1021 let new_json = serde_json::to_string(&new).unwrap();
1022
1023 let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
1024 assert_eq!(
1025 RegionEditV1 {
1026 files_to_add: vec![],
1027 files_to_remove: vec![],
1028 compaction_time_window: None,
1029 flushed_entry_id: None,
1030 flushed_sequence: None,
1031 },
1032 old_from_new
1033 );
1034 }
1035
1036 #[test]
1037 fn test_region_change_backward_compatibility() {
1038 let region_change_json = r#"{
1040 "metadata": {
1041 "column_metadatas": [
1042 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
1043 {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
1044 {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
1045 ],
1046 "primary_key": [
1047 1
1048 ],
1049 "region_id": 42,
1050 "schema_version": 0
1051 }
1052 }"#;
1053
1054 let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
1055 assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
1056
1057 let region_change = RegionChange {
1059 metadata: region_change.metadata.clone(),
1060 sst_format: FormatType::Flat,
1061 };
1062
1063 let serialized = serde_json::to_string(®ion_change).unwrap();
1064 let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
1065 assert_eq!(deserialized.sst_format, FormatType::Flat);
1066 }
1067
1068 #[test]
1069 fn test_removed_file_compatibility() {
1070 let file_id = FileId::random();
1071 let json_str = format!("\"{}\"", file_id);
1073 let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
1074 assert_eq!(removed_file, RemovedFile::File(file_id, None));
1075
1076 let removed_file_v2 = RemovedFile::File(file_id, Some(10));
1078 let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
1079 let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
1080 assert_eq!(removed_file_v2, deserialized_v2);
1081
1082 let removed_index = RemovedFile::Index(file_id, 20);
1084 let json_index = serde_json::to_string(&removed_index).unwrap();
1085 let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
1086 assert_eq!(removed_index, deserialized_index);
1087
1088 let removed_file = RemovedFile::File(file_id, None);
1090 let json = serde_json::to_string(&removed_file).unwrap();
1091 let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
1092 assert_eq!(removed_file, deserialized);
1093
1094 let json_set = format!("[\"{}\"]", file_id);
1100 let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
1101 assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
1102 }
1103
1104 #[test]
1127 fn test_removed_files_backward_compatibility() {
1128 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1130 struct OldRemovedFiles {
1131 pub removed_at: i64,
1132 pub file_ids: HashSet<FileId>,
1133 }
1134
1135 let mut file_ids = HashSet::new();
1137 file_ids.insert(FileId::random());
1138 file_ids.insert(FileId::random());
1139
1140 let old_removed_files = OldRemovedFiles {
1141 removed_at: 1234567890,
1142 file_ids,
1143 };
1144
1145 let old_json = serde_json::to_string(&old_removed_files).unwrap();
1147
1148 let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
1150
1151 assert!(result.is_ok(), "{:?}", result);
1153 let removed_files = result.unwrap();
1154 assert_eq!(removed_files.removed_at, 1234567890);
1155 assert!(removed_files.files.is_empty());
1156
1157 let file_id = FileId::random();
1159 let new_json = format!(
1160 r#"{{
1161 "removed_at": 1234567890,
1162 "files": ["{}"]
1163 }}"#,
1164 file_id
1165 );
1166
1167 let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
1168 assert!(result.is_ok());
1169 let removed_files = result.unwrap();
1170 assert_eq!(removed_files.removed_at, 1234567890);
1171 assert_eq!(removed_files.files.len(), 1);
1172 assert!(
1173 removed_files
1174 .files
1175 .contains(&RemovedFile::File(file_id, None))
1176 );
1177 }
1178}