1use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use common_telemetry::warn;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt};
24use store_api::ManifestVersion;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
27use strum::Display;
28
29use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
30use crate::manifest::manager::RemoveFileOptions;
31use crate::region::ManifestStats;
32use crate::sst::FormatType;
33use crate::sst::file::FileMeta;
34use crate::wal::EntryId;
35
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
38pub enum RegionMetaAction {
39 Change(RegionChange),
41 PartitionExprChange(RegionPartitionExprChange),
43 Edit(RegionEdit),
45 Remove(RegionRemove),
47 Truncate(RegionTruncate),
49}
50
51#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
52pub struct RegionPartitionExprChange {
53 pub partition_expr: Option<String>,
55}
56
57impl RegionMetaAction {
58 pub fn is_change(&self) -> bool {
60 matches!(self, RegionMetaAction::Change(_))
61 }
62
63 pub fn is_edit(&self) -> bool {
65 matches!(self, RegionMetaAction::Edit(_))
66 }
67
68 pub fn is_partition_expr_change(&self) -> bool {
70 matches!(self, RegionMetaAction::PartitionExprChange(_))
71 }
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
75pub struct RegionChange {
76 pub metadata: RegionMetadataRef,
78 #[serde(default)]
80 pub sst_format: FormatType,
81}
82
83#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
84pub struct RegionEdit {
85 pub files_to_add: Vec<FileMeta>,
86 pub files_to_remove: Vec<FileMeta>,
87 #[serde(default)]
89 pub timestamp_ms: Option<i64>,
90 #[serde(with = "humantime_serde")]
91 pub compaction_time_window: Option<Duration>,
92 pub flushed_entry_id: Option<EntryId>,
93 pub flushed_sequence: Option<SequenceNumber>,
94 pub committed_sequence: Option<SequenceNumber>,
95}
96
97#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
98pub struct RegionRemove {
99 pub region_id: RegionId,
100}
101
102#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
105pub struct RegionTruncate {
106 pub region_id: RegionId,
107 #[serde(flatten)]
108 pub kind: TruncateKind,
109 #[serde(default)]
111 pub timestamp_ms: Option<i64>,
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
116#[serde(untagged)]
117pub enum TruncateKind {
118 All {
120 truncated_entry_id: EntryId,
122 truncated_sequence: SequenceNumber,
124 },
125 Partial { files_to_remove: Vec<FileMeta> },
127}
128
129#[derive(Serialize, Deserialize, Clone, Debug)]
131#[cfg_attr(test, derive(Eq))]
132pub struct RegionManifest {
133 pub metadata: RegionMetadataRef,
135 pub files: HashMap<FileId, FileMeta>,
137 #[serde(default)]
147 pub removed_files: RemovedFilesRecord,
148 pub flushed_entry_id: EntryId,
150 pub flushed_sequence: SequenceNumber,
152 pub committed_sequence: Option<SequenceNumber>,
153 pub manifest_version: ManifestVersion,
155 pub truncated_entry_id: Option<EntryId>,
157 #[serde(with = "humantime_serde")]
159 pub compaction_time_window: Option<Duration>,
160 #[serde(default)]
162 pub sst_format: FormatType,
163}
164
165#[cfg(test)]
166impl PartialEq for RegionManifest {
167 fn eq(&self, other: &Self) -> bool {
168 self.metadata == other.metadata
169 && self.files == other.files
170 && self.flushed_entry_id == other.flushed_entry_id
171 && self.flushed_sequence == other.flushed_sequence
172 && self.manifest_version == other.manifest_version
173 && self.truncated_entry_id == other.truncated_entry_id
174 && self.compaction_time_window == other.compaction_time_window
175 && self.committed_sequence == other.committed_sequence
176 }
177}
178
179#[derive(Debug, Default)]
180pub struct RegionManifestBuilder {
181 metadata: Option<RegionMetadataRef>,
182 files: HashMap<FileId, FileMeta>,
183 pub removed_files: RemovedFilesRecord,
184 flushed_entry_id: EntryId,
185 flushed_sequence: SequenceNumber,
186 manifest_version: ManifestVersion,
187 truncated_entry_id: Option<EntryId>,
188 compaction_time_window: Option<Duration>,
189 committed_sequence: Option<SequenceNumber>,
190 sst_format: FormatType,
191}
192
193impl RegionManifestBuilder {
194 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
196 if let Some(s) = checkpoint {
197 Self {
198 metadata: Some(s.metadata),
199 files: s.files,
200 removed_files: s.removed_files,
201 flushed_entry_id: s.flushed_entry_id,
202 manifest_version: s.manifest_version,
203 flushed_sequence: s.flushed_sequence,
204 truncated_entry_id: s.truncated_entry_id,
205 compaction_time_window: s.compaction_time_window,
206 committed_sequence: s.committed_sequence,
207 sst_format: s.sst_format,
208 }
209 } else {
210 Default::default()
211 }
212 }
213
214 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
215 self.metadata = Some(change.metadata);
216 self.manifest_version = manifest_version;
217 self.sst_format = change.sst_format;
218 }
219
220 pub fn apply_partition_expr_change(
226 &mut self,
227 manifest_version: ManifestVersion,
228 change: RegionPartitionExprChange,
229 ) {
230 if let Some(metadata) = &self.metadata {
231 let mut metadata = metadata.as_ref().clone();
232 metadata.set_partition_expr(change.partition_expr);
233 self.metadata = Some(metadata.into());
234 self.manifest_version = manifest_version;
235 } else {
236 warn!(
237 "metadata is not set in region manifest builder, ignore partition expr change: {:?}",
238 change
239 );
240 }
241 }
242
243 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
244 self.manifest_version = manifest_version;
245
246 let mut removed_files = vec![];
247 for file in edit.files_to_add {
248 if let Some(old_file) = self.files.insert(file.file_id, file.clone())
249 && let Some(old_index) = old_file.index_version()
250 && !old_file.is_index_up_to_date(&file)
251 {
252 removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
254 }
255 }
256 removed_files.extend(
257 edit.files_to_remove
258 .iter()
259 .map(|f| RemovedFile::File(f.file_id, f.index_version())),
260 );
261 let at = edit
262 .timestamp_ms
263 .unwrap_or_else(|| Utc::now().timestamp_millis());
264 self.removed_files.add_removed_files(removed_files, at);
265
266 for file in edit.files_to_remove {
267 self.files.remove(&file.file_id);
268 }
269 if let Some(flushed_entry_id) = edit.flushed_entry_id {
270 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
271 }
272 if let Some(flushed_sequence) = edit.flushed_sequence {
273 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
274 }
275
276 if let Some(committed_sequence) = edit.committed_sequence {
277 self.committed_sequence = Some(
278 self.committed_sequence
279 .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
280 );
281 }
282 if let Some(window) = edit.compaction_time_window {
283 self.compaction_time_window = Some(window);
284 }
285 }
286
287 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
288 self.manifest_version = manifest_version;
289 match truncate.kind {
290 TruncateKind::All {
291 truncated_entry_id,
292 truncated_sequence,
293 } => {
294 self.flushed_entry_id = truncated_entry_id;
295 self.flushed_sequence = truncated_sequence;
296 self.truncated_entry_id = Some(truncated_entry_id);
297 self.removed_files.add_removed_files(
298 self.files
299 .values()
300 .map(|f| RemovedFile::File(f.file_id, f.index_version()))
301 .collect(),
302 truncate
303 .timestamp_ms
304 .unwrap_or_else(|| Utc::now().timestamp_millis()),
305 );
306 self.files.clear();
307 }
308 TruncateKind::Partial { files_to_remove } => {
309 self.removed_files.add_removed_files(
310 files_to_remove
311 .iter()
312 .map(|f| RemovedFile::File(f.file_id, f.index_version()))
313 .collect(),
314 truncate
315 .timestamp_ms
316 .unwrap_or_else(|| Utc::now().timestamp_millis()),
317 );
318 for file in files_to_remove {
319 self.files.remove(&file.file_id);
320 }
321 }
322 }
323 }
324
325 pub fn files(&self) -> &HashMap<FileId, FileMeta> {
326 &self.files
327 }
328
329 pub fn contains_metadata(&self) -> bool {
331 self.metadata.is_some()
332 }
333
334 pub fn try_build(self) -> Result<RegionManifest> {
335 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
336 Ok(RegionManifest {
337 metadata,
338 files: self.files,
339 removed_files: self.removed_files,
340 flushed_entry_id: self.flushed_entry_id,
341 flushed_sequence: self.flushed_sequence,
342 committed_sequence: self.committed_sequence,
343 manifest_version: self.manifest_version,
344 truncated_entry_id: self.truncated_entry_id,
345 compaction_time_window: self.compaction_time_window,
346 sst_format: self.sst_format,
347 })
348 }
349}
350
351#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
355pub struct RemovedFilesRecord {
356 pub removed_files: Vec<RemovedFiles>,
359}
360
361impl RemovedFilesRecord {
362 pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
364 let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
365 for files in self.removed_files.iter_mut() {
366 files
367 .files
368 .retain(|removed| !deleted_file_set.contains(removed));
369 }
370
371 self.removed_files.retain(|fs| !fs.files.is_empty());
372 }
373
374 pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
375 let cnt = self
376 .removed_files
377 .iter()
378 .map(|r| r.files.len() as u64)
379 .sum();
380 stats
381 .file_removed_cnt
382 .store(cnt, std::sync::atomic::Ordering::Relaxed);
383 }
384}
385
386#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
387pub struct RemovedFiles {
388 pub removed_at: i64,
391 #[serde(default)]
393 pub files: HashSet<RemovedFile>,
394}
395
396#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
398pub enum RemovedFile {
399 File(FileId, Option<IndexVersion>),
400 Index(FileId, IndexVersion),
401}
402
403impl<'de> Deserialize<'de> for RemovedFile {
407 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
408 where
409 D: serde::Deserializer<'de>,
410 {
411 #[derive(Deserialize)]
412 #[serde(untagged)]
413 enum CompatRemovedFile {
414 Enum(RemovedFileEnum),
415 FileId(FileId),
416 }
417
418 #[derive(Deserialize)]
419 enum RemovedFileEnum {
420 File(FileId, Option<IndexVersion>),
421 Index(FileId, IndexVersion),
422 }
423
424 let compat = CompatRemovedFile::deserialize(deserializer)?;
425 match compat {
426 CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
427 CompatRemovedFile::Enum(e) => match e {
428 RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
429 RemovedFileEnum::Index(file_id, version) => {
430 Ok(RemovedFile::Index(file_id, version))
431 }
432 },
433 }
434 }
435}
436
437impl RemovedFile {
438 pub fn file_id(&self) -> FileId {
439 match self {
440 RemovedFile::File(file_id, _) => *file_id,
441 RemovedFile::Index(file_id, _) => *file_id,
442 }
443 }
444
445 pub fn index_version(&self) -> Option<IndexVersion> {
446 match self {
447 RemovedFile::File(_, index_version) => *index_version,
448 RemovedFile::Index(_, index_version) => Some(*index_version),
449 }
450 }
451}
452
453impl RemovedFilesRecord {
454 pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
456 if removed.is_empty() {
457 return;
458 }
459 let files = removed.into_iter().collect();
460 self.removed_files.push(RemovedFiles {
461 removed_at: at,
462 files,
463 });
464 }
465
466 pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
467 if !opt.enable_gc {
468 self.removed_files.clear();
470 return Ok(());
471 }
472
473 Ok(())
476 }
477}
478
479#[derive(Serialize, Deserialize, Debug, Clone)]
481#[cfg_attr(test, derive(PartialEq, Eq))]
482pub struct RegionCheckpoint {
483 pub last_version: ManifestVersion,
485 pub compacted_actions: usize,
487 pub checkpoint: Option<RegionManifest>,
489}
490
491impl RegionCheckpoint {
492 pub fn last_version(&self) -> ManifestVersion {
493 self.last_version
494 }
495
496 pub fn encode(&self) -> Result<Vec<u8>> {
497 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
498
499 Ok(json.into_bytes())
500 }
501
502 pub fn decode(bytes: &[u8]) -> Result<Self> {
503 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
504
505 serde_json::from_str(data).context(SerdeJsonSnafu)
506 }
507}
508
509#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
510pub struct RegionMetaActionList {
511 pub actions: Vec<RegionMetaAction>,
512}
513
514impl RegionMetaActionList {
515 pub fn with_action(action: RegionMetaAction) -> Self {
516 Self {
517 actions: vec![action],
518 }
519 }
520
521 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
522 Self { actions }
523 }
524
525 pub fn split_region_change_and_edit(
527 self,
528 ) -> (
529 Option<RegionPartitionExprChange>,
530 Option<RegionChange>,
531 RegionEdit,
532 ) {
533 let mut edit = RegionEdit {
534 files_to_add: Vec::new(),
535 files_to_remove: Vec::new(),
536 timestamp_ms: None,
537 compaction_time_window: None,
538 flushed_entry_id: None,
539 flushed_sequence: None,
540 committed_sequence: None,
541 };
542 let mut partition_expr_change = None;
543 let mut region_change = None;
544 for action in self.actions {
545 match action {
546 RegionMetaAction::PartitionExprChange(change) => {
547 partition_expr_change = Some(change);
548 }
549 RegionMetaAction::Change(change) => {
550 region_change = Some(change);
551 }
552 RegionMetaAction::Edit(region_edit) => {
553 edit.files_to_add.extend(region_edit.files_to_add);
555 edit.files_to_remove.extend(region_edit.files_to_remove);
556 if let Some(eid) = region_edit.flushed_entry_id {
558 edit.flushed_entry_id =
559 Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
560 }
561 if let Some(seq) = region_edit.flushed_sequence {
562 edit.flushed_sequence =
563 Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
564 }
565 if let Some(seq) = region_edit.committed_sequence {
566 edit.committed_sequence =
567 Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
568 }
569 if region_edit.compaction_time_window.is_some() {
571 edit.compaction_time_window = region_edit.compaction_time_window;
572 }
573 }
574 _ => {}
575 }
576 }
577
578 (partition_expr_change, region_change, edit)
579 }
580}
581
582impl RegionMetaActionList {
583 pub fn encode(&self) -> Result<Vec<u8>> {
585 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
586
587 Ok(json.into_bytes())
588 }
589
590 pub fn decode(bytes: &[u8]) -> Result<Self> {
591 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
592
593 serde_json::from_str(data).context(SerdeJsonSnafu)
594 }
595}
596
597#[cfg(test)]
598mod tests {
599
600 use common_time::Timestamp;
601
602 use super::*;
603
604 #[test]
608 fn test_region_action_compatibility() {
609 let region_edit = r#"{
610 "flushed_entry_id":null,
611 "compaction_time_window":null,
612 "files_to_add":[
613 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
614 ],
615 "files_to_remove":[
616 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
617 ]
618 }"#;
619 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
620
621 let region_edit = r#"{
622 "flushed_entry_id":10,
623 "flushed_sequence":10,
624 "compaction_time_window":null,
625 "files_to_add":[
626 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
627 ],
628 "files_to_remove":[
629 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
630 ]
631 }"#;
632 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
633
634 let region_change = r#" {
636 "metadata":{
637 "column_metadatas":[
638 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
639 ],
640 "primary_key":[1],
641 "region_id":5299989648942,
642 "schema_version":0
643 }
644 }"#;
645 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
646
647 let region_remove = r#"{"region_id":42}"#;
648 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
649
650 let region_partition_expr_change = r#"{
651 "partition_expr": "{\"expr\":\"x < 100\"}"
652 }"#;
653 let _ = serde_json::from_str::<RegionPartitionExprChange>(region_partition_expr_change)
654 .unwrap();
655 }
656
657 #[test]
658 fn test_region_manifest_builder() {
659 }
661
662 #[test]
663 fn test_encode_decode_region_checkpoint() {
664 }
666
667 #[test]
668 fn test_region_manifest_compatibility() {
669 let region_manifest_json = r#"{
671 "metadata": {
672 "column_metadatas": [
673 {
674 "column_schema": {
675 "name": "a",
676 "data_type": {"Int64": {}},
677 "is_nullable": false,
678 "is_time_index": false,
679 "default_constraint": null,
680 "metadata": {}
681 },
682 "semantic_type": "Tag",
683 "column_id": 1
684 },
685 {
686 "column_schema": {
687 "name": "b",
688 "data_type": {"Float64": {}},
689 "is_nullable": false,
690 "is_time_index": false,
691 "default_constraint": null,
692 "metadata": {}
693 },
694 "semantic_type": "Field",
695 "column_id": 2
696 },
697 {
698 "column_schema": {
699 "name": "c",
700 "data_type": {"Timestamp": {"Millisecond": null}},
701 "is_nullable": false,
702 "is_time_index": false,
703 "default_constraint": null,
704 "metadata": {}
705 },
706 "semantic_type": "Timestamp",
707 "column_id": 3
708 }
709 ],
710 "primary_key": [1],
711 "region_id": 4402341478400,
712 "schema_version": 0
713 },
714 "files": {
715 "4b220a70-2b03-4641-9687-b65d94641208": {
716 "region_id": 4402341478400,
717 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
718 "time_range": [
719 {"value": 1451609210000, "unit": "Millisecond"},
720 {"value": 1451609520000, "unit": "Millisecond"}
721 ],
722 "level": 1,
723 "file_size": 100
724 },
725 "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
726 "region_id": 4402341478400,
727 "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
728 "time_range": [
729 {"value": 1451609210000, "unit": "Millisecond"},
730 {"value": 1451609520000, "unit": "Millisecond"}
731 ],
732 "level": 0,
733 "file_size": 100
734 }
735 },
736 "flushed_entry_id": 10,
737 "flushed_sequence": 20,
738 "manifest_version": 1,
739 "truncated_entry_id": null,
740 "compaction_time_window": null
741 }"#;
742
743 let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
744
745 assert_eq!(manifest.files.len(), 2);
747 assert_eq!(manifest.flushed_entry_id, 10);
748 assert_eq!(manifest.flushed_sequence, 20);
749 assert_eq!(manifest.manifest_version, 1);
750
751 let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
753 file_ids.sort_unstable();
754 assert_eq!(
755 file_ids,
756 vec![
757 "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
758 "4b220a70-2b03-4641-9687-b65d94641208",
759 ]
760 );
761
762 let serialized_manifest = serde_json::to_string(&manifest).unwrap();
764 let deserialized_manifest: RegionManifest =
765 serde_json::from_str(&serialized_manifest).unwrap();
766 assert_eq!(manifest, deserialized_manifest);
767 assert_ne!(serialized_manifest, region_manifest_json);
768 }
769
770 #[test]
771 fn test_region_truncate_compat() {
772 let region_truncate_json = r#"{
774 "region_id": 4402341478400,
775 "truncated_entry_id": 10,
776 "truncated_sequence": 20
777 }"#;
778
779 let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
780 assert_eq!(truncate_v1.region_id, 4402341478400);
781 assert_eq!(
782 truncate_v1.kind,
783 TruncateKind::All {
784 truncated_entry_id: 10,
785 truncated_sequence: 20,
786 }
787 );
788
789 let region_truncate_v2_json = r#"{
791 "region_id": 4402341478400,
792 "files_to_remove": [
793 {
794 "region_id": 4402341478400,
795 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
796 "time_range": [
797 {
798 "value": 1451609210000,
799 "unit": "Millisecond"
800 },
801 {
802 "value": 1451609520000,
803 "unit": "Millisecond"
804 }
805 ],
806 "level": 1,
807 "file_size": 100
808 }
809 ]
810}"#;
811
812 let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
813 assert_eq!(truncate_v2.region_id, 4402341478400);
814 assert_eq!(
815 truncate_v2.kind,
816 TruncateKind::Partial {
817 files_to_remove: vec![FileMeta {
818 region_id: RegionId::from_u64(4402341478400),
819 file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
820 time_range: (
821 Timestamp::new_millisecond(1451609210000),
822 Timestamp::new_millisecond(1451609520000)
823 ),
824 level: 1,
825 file_size: 100,
826 ..Default::default()
827 }]
828 }
829 );
830 }
831
832 #[test]
833 fn test_region_manifest_removed_files() {
834 let region_metadata = r#"{
835 "column_metadatas": [
836 {
837 "column_schema": {
838 "name": "a",
839 "data_type": {"Int64": {}},
840 "is_nullable": false,
841 "is_time_index": false,
842 "default_constraint": null,
843 "metadata": {}
844 },
845 "semantic_type": "Tag",
846 "column_id": 1
847 },
848 {
849 "column_schema": {
850 "name": "b",
851 "data_type": {"Float64": {}},
852 "is_nullable": false,
853 "is_time_index": false,
854 "default_constraint": null,
855 "metadata": {}
856 },
857 "semantic_type": "Field",
858 "column_id": 2
859 },
860 {
861 "column_schema": {
862 "name": "c",
863 "data_type": {"Timestamp": {"Millisecond": null}},
864 "is_nullable": false,
865 "is_time_index": false,
866 "default_constraint": null,
867 "metadata": {}
868 },
869 "semantic_type": "Timestamp",
870 "column_id": 3
871 }
872 ],
873 "primary_key": [1],
874 "region_id": 4402341478400,
875 "schema_version": 0
876 }"#;
877
878 let metadata: RegionMetadataRef =
879 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
880 let manifest = RegionManifest {
881 metadata: metadata.clone(),
882 files: HashMap::new(),
883 flushed_entry_id: 0,
884 flushed_sequence: 0,
885 committed_sequence: None,
886 manifest_version: 0,
887 truncated_entry_id: None,
888 compaction_time_window: None,
889 removed_files: RemovedFilesRecord {
890 removed_files: vec![RemovedFiles {
891 removed_at: 0,
892 files: HashSet::from([RemovedFile::File(
893 FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
894 None,
895 )]),
896 }],
897 },
898 sst_format: FormatType::PrimaryKey,
899 };
900
901 let json = serde_json::to_string(&manifest).unwrap();
902 let new: RegionManifest = serde_json::from_str(&json).unwrap();
903
904 assert_eq!(manifest, new);
905 }
906
907 #[test]
909 fn test_old_region_manifest_compat() {
910 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
911 pub struct RegionManifestV1 {
912 pub metadata: RegionMetadataRef,
914 pub files: HashMap<FileId, FileMeta>,
916 pub flushed_entry_id: EntryId,
918 pub flushed_sequence: SequenceNumber,
920 pub manifest_version: ManifestVersion,
922 pub truncated_entry_id: Option<EntryId>,
924 #[serde(with = "humantime_serde")]
926 pub compaction_time_window: Option<Duration>,
927 }
928
929 let region_metadata = r#"{
930 "column_metadatas": [
931 {
932 "column_schema": {
933 "name": "a",
934 "data_type": {"Int64": {}},
935 "is_nullable": false,
936 "is_time_index": false,
937 "default_constraint": null,
938 "metadata": {}
939 },
940 "semantic_type": "Tag",
941 "column_id": 1
942 },
943 {
944 "column_schema": {
945 "name": "b",
946 "data_type": {"Float64": {}},
947 "is_nullable": false,
948 "is_time_index": false,
949 "default_constraint": null,
950 "metadata": {}
951 },
952 "semantic_type": "Field",
953 "column_id": 2
954 },
955 {
956 "column_schema": {
957 "name": "c",
958 "data_type": {"Timestamp": {"Millisecond": null}},
959 "is_nullable": false,
960 "is_time_index": false,
961 "default_constraint": null,
962 "metadata": {}
963 },
964 "semantic_type": "Timestamp",
965 "column_id": 3
966 }
967 ],
968 "primary_key": [1],
969 "region_id": 4402341478400,
970 "schema_version": 0
971 }"#;
972
973 let metadata: RegionMetadataRef =
974 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
975
976 let v1 = RegionManifestV1 {
978 metadata: metadata.clone(),
979 files: HashMap::new(),
980 flushed_entry_id: 0,
981 flushed_sequence: 0,
982 manifest_version: 0,
983 truncated_entry_id: None,
984 compaction_time_window: None,
985 };
986 let json = serde_json::to_string(&v1).unwrap();
987 let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
988 assert_eq!(
989 new_from_old,
990 RegionManifest {
991 metadata: metadata.clone(),
992 files: HashMap::new(),
993 removed_files: Default::default(),
994 flushed_entry_id: 0,
995 flushed_sequence: 0,
996 committed_sequence: None,
997 manifest_version: 0,
998 truncated_entry_id: None,
999 compaction_time_window: None,
1000 sst_format: FormatType::PrimaryKey,
1001 }
1002 );
1003
1004 let new_manifest = RegionManifest {
1005 metadata: metadata.clone(),
1006 files: HashMap::new(),
1007 removed_files: Default::default(),
1008 flushed_entry_id: 0,
1009 flushed_sequence: 0,
1010 committed_sequence: None,
1011 manifest_version: 0,
1012 truncated_entry_id: None,
1013 compaction_time_window: None,
1014 sst_format: FormatType::PrimaryKey,
1015 };
1016 let json = serde_json::to_string(&new_manifest).unwrap();
1017 let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
1018 assert_eq!(
1019 old_from_new,
1020 RegionManifestV1 {
1021 metadata: metadata.clone(),
1022 files: HashMap::new(),
1023 flushed_entry_id: 0,
1024 flushed_sequence: 0,
1025 manifest_version: 0,
1026 truncated_entry_id: None,
1027 compaction_time_window: None,
1028 }
1029 );
1030
1031 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1032 pub struct RegionEditV1 {
1033 pub files_to_add: Vec<FileMeta>,
1034 pub files_to_remove: Vec<FileMeta>,
1035 #[serde(with = "humantime_serde")]
1036 pub compaction_time_window: Option<Duration>,
1037 pub flushed_entry_id: Option<EntryId>,
1038 pub flushed_sequence: Option<SequenceNumber>,
1039 }
1040
1041 let json = serde_json::to_string(&RegionEditV1 {
1042 files_to_add: vec![],
1043 files_to_remove: vec![],
1044 compaction_time_window: None,
1045 flushed_entry_id: None,
1046 flushed_sequence: None,
1047 })
1048 .unwrap();
1049 let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
1050 assert_eq!(
1051 RegionEdit {
1052 files_to_add: vec![],
1053 files_to_remove: vec![],
1054 timestamp_ms: None,
1055 compaction_time_window: None,
1056 flushed_entry_id: None,
1057 flushed_sequence: None,
1058 committed_sequence: None,
1059 },
1060 new_from_old
1061 );
1062
1063 let new = RegionEdit {
1065 files_to_add: vec![],
1066 files_to_remove: vec![],
1067 timestamp_ms: Some(42),
1068 compaction_time_window: None,
1069 flushed_entry_id: None,
1070 flushed_sequence: None,
1071 committed_sequence: None,
1072 };
1073
1074 let new_json = serde_json::to_string(&new).unwrap();
1075
1076 let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
1077 assert_eq!(
1078 RegionEditV1 {
1079 files_to_add: vec![],
1080 files_to_remove: vec![],
1081 compaction_time_window: None,
1082 flushed_entry_id: None,
1083 flushed_sequence: None,
1084 },
1085 old_from_new
1086 );
1087 }
1088
1089 #[test]
1090 fn test_region_change_backward_compatibility() {
1091 let region_change_json = r#"{
1093 "metadata": {
1094 "column_metadatas": [
1095 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
1096 {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
1097 {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
1098 ],
1099 "primary_key": [
1100 1
1101 ],
1102 "region_id": 42,
1103 "schema_version": 0
1104 }
1105 }"#;
1106
1107 let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
1108 assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
1109
1110 let region_change = RegionChange {
1112 metadata: region_change.metadata.clone(),
1113 sst_format: FormatType::Flat,
1114 };
1115
1116 let serialized = serde_json::to_string(®ion_change).unwrap();
1117 let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
1118 assert_eq!(deserialized.sst_format, FormatType::Flat);
1119 }
1120
1121 #[test]
1122 fn test_removed_file_compatibility() {
1123 let file_id = FileId::random();
1124 let json_str = format!("\"{}\"", file_id);
1126 let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
1127 assert_eq!(removed_file, RemovedFile::File(file_id, None));
1128
1129 let removed_file_v2 = RemovedFile::File(file_id, Some(10));
1131 let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
1132 let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
1133 assert_eq!(removed_file_v2, deserialized_v2);
1134
1135 let removed_index = RemovedFile::Index(file_id, 20);
1137 let json_index = serde_json::to_string(&removed_index).unwrap();
1138 let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
1139 assert_eq!(removed_index, deserialized_index);
1140
1141 let removed_file = RemovedFile::File(file_id, None);
1143 let json = serde_json::to_string(&removed_file).unwrap();
1144 let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
1145 assert_eq!(removed_file, deserialized);
1146
1147 let json_set = format!("[\"{}\"]", file_id);
1153 let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
1154 assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
1155 }
1156
1157 #[test]
1180 fn test_removed_files_backward_compatibility() {
1181 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1183 struct OldRemovedFiles {
1184 pub removed_at: i64,
1185 pub file_ids: HashSet<FileId>,
1186 }
1187
1188 let mut file_ids = HashSet::new();
1190 file_ids.insert(FileId::random());
1191 file_ids.insert(FileId::random());
1192
1193 let old_removed_files = OldRemovedFiles {
1194 removed_at: 1234567890,
1195 file_ids,
1196 };
1197
1198 let old_json = serde_json::to_string(&old_removed_files).unwrap();
1200
1201 let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
1203
1204 assert!(result.is_ok(), "{:?}", result);
1206 let removed_files = result.unwrap();
1207 assert_eq!(removed_files.removed_at, 1234567890);
1208 assert!(removed_files.files.is_empty());
1209
1210 let file_id = FileId::random();
1212 let new_json = format!(
1213 r#"{{
1214 "removed_at": 1234567890,
1215 "files": ["{}"]
1216 }}"#,
1217 file_id
1218 );
1219
1220 let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
1221 assert!(result.is_ok());
1222 let removed_files = result.unwrap();
1223 assert_eq!(removed_files.removed_at, 1234567890);
1224 assert_eq!(removed_files.files.len(), 1);
1225 assert!(
1226 removed_files
1227 .files
1228 .contains(&RemovedFile::File(file_id, None))
1229 );
1230 }
1231}