1use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20use chrono::Utc;
21use common_telemetry::warn;
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt};
24use store_api::ManifestVersion;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::{FileId, IndexVersion, RegionId, SequenceNumber};
27use strum::Display;
28
29use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
30use crate::manifest::manager::RemoveFileOptions;
31use crate::region::ManifestStats;
32use crate::sst::FormatType;
33use crate::sst::file::FileMeta;
34use crate::wal::EntryId;
35
36#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Display)]
38pub enum RegionMetaAction {
39 Change(RegionChange),
41 PartitionExprChange(RegionPartitionExprChange),
43 Edit(RegionEdit),
45 Remove(RegionRemove),
47 Truncate(RegionTruncate),
49}
50
51#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
52pub struct RegionPartitionExprChange {
53 pub partition_expr: Option<String>,
55}
56
57impl RegionMetaAction {
58 pub fn is_change(&self) -> bool {
60 matches!(self, RegionMetaAction::Change(_))
61 }
62
63 pub fn is_edit(&self) -> bool {
65 matches!(self, RegionMetaAction::Edit(_))
66 }
67
68 pub fn is_partition_expr_change(&self) -> bool {
70 matches!(self, RegionMetaAction::PartitionExprChange(_))
71 }
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
75pub struct RegionChange {
76 pub metadata: RegionMetadataRef,
78 #[serde(default)]
80 pub sst_format: FormatType,
81 #[serde(default)]
83 pub append_mode: Option<bool>,
84}
85
86#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
87pub struct RegionEdit {
88 pub files_to_add: Vec<FileMeta>,
89 pub files_to_remove: Vec<FileMeta>,
90 #[serde(default)]
92 pub timestamp_ms: Option<i64>,
93 #[serde(with = "humantime_serde")]
94 pub compaction_time_window: Option<Duration>,
95 pub flushed_entry_id: Option<EntryId>,
96 pub flushed_sequence: Option<SequenceNumber>,
97 pub committed_sequence: Option<SequenceNumber>,
98}
99
100#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
101pub struct RegionRemove {
102 pub region_id: RegionId,
103}
104
105#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
108pub struct RegionTruncate {
109 pub region_id: RegionId,
110 #[serde(flatten)]
111 pub kind: TruncateKind,
112 #[serde(default)]
114 pub timestamp_ms: Option<i64>,
115}
116
117#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
119#[serde(untagged)]
120pub enum TruncateKind {
121 All {
123 truncated_entry_id: EntryId,
125 truncated_sequence: SequenceNumber,
127 },
128 Partial { files_to_remove: Vec<FileMeta> },
130}
131
132#[derive(Serialize, Deserialize, Clone, Debug)]
134#[cfg_attr(test, derive(Eq))]
135pub struct RegionManifest {
136 pub metadata: RegionMetadataRef,
138 pub files: HashMap<FileId, FileMeta>,
140 #[serde(default)]
150 pub removed_files: RemovedFilesRecord,
151 pub flushed_entry_id: EntryId,
153 pub flushed_sequence: SequenceNumber,
155 pub committed_sequence: Option<SequenceNumber>,
156 pub manifest_version: ManifestVersion,
158 pub truncated_entry_id: Option<EntryId>,
160 #[serde(with = "humantime_serde")]
162 pub compaction_time_window: Option<Duration>,
163 #[serde(default)]
165 pub sst_format: FormatType,
166 #[serde(default)]
168 pub append_mode: Option<bool>,
169}
170
171#[cfg(test)]
172impl PartialEq for RegionManifest {
173 fn eq(&self, other: &Self) -> bool {
174 self.metadata == other.metadata
175 && self.files == other.files
176 && self.flushed_entry_id == other.flushed_entry_id
177 && self.flushed_sequence == other.flushed_sequence
178 && self.manifest_version == other.manifest_version
179 && self.truncated_entry_id == other.truncated_entry_id
180 && self.compaction_time_window == other.compaction_time_window
181 && self.committed_sequence == other.committed_sequence
182 }
183}
184
185#[derive(Debug, Default)]
186pub struct RegionManifestBuilder {
187 metadata: Option<RegionMetadataRef>,
188 files: HashMap<FileId, FileMeta>,
189 pub removed_files: RemovedFilesRecord,
190 flushed_entry_id: EntryId,
191 flushed_sequence: SequenceNumber,
192 manifest_version: ManifestVersion,
193 truncated_entry_id: Option<EntryId>,
194 compaction_time_window: Option<Duration>,
195 committed_sequence: Option<SequenceNumber>,
196 sst_format: FormatType,
197 append_mode: Option<bool>,
198}
199
200impl RegionManifestBuilder {
201 pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
203 if let Some(s) = checkpoint {
204 Self {
205 metadata: Some(s.metadata),
206 files: s.files,
207 removed_files: s.removed_files,
208 flushed_entry_id: s.flushed_entry_id,
209 manifest_version: s.manifest_version,
210 flushed_sequence: s.flushed_sequence,
211 truncated_entry_id: s.truncated_entry_id,
212 compaction_time_window: s.compaction_time_window,
213 committed_sequence: s.committed_sequence,
214 sst_format: s.sst_format,
215 append_mode: s.append_mode,
216 }
217 } else {
218 Default::default()
219 }
220 }
221
222 pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
223 self.metadata = Some(change.metadata);
224 self.manifest_version = manifest_version;
225 self.sst_format = change.sst_format;
226 self.append_mode = change.append_mode.or(self.append_mode);
228 }
229
230 pub fn apply_partition_expr_change(
236 &mut self,
237 manifest_version: ManifestVersion,
238 change: RegionPartitionExprChange,
239 ) {
240 if let Some(metadata) = &self.metadata {
241 let mut metadata = metadata.as_ref().clone();
242 metadata.set_partition_expr(change.partition_expr);
243 self.metadata = Some(metadata.into());
244 self.manifest_version = manifest_version;
245 } else {
246 warn!(
247 "metadata is not set in region manifest builder, ignore partition expr change: {:?}",
248 change
249 );
250 }
251 }
252
253 pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
254 self.manifest_version = manifest_version;
255
256 let mut removed_files = vec![];
257 for file in edit.files_to_add {
258 if let Some(old_file) = self.files.insert(file.file_id, file.clone())
259 && let Some(old_index) = old_file.index_version()
260 && !old_file.is_index_up_to_date(&file)
261 {
262 removed_files.push(RemovedFile::Index(old_file.file_id, old_index));
264 }
265 }
266 removed_files.extend(
267 edit.files_to_remove
268 .iter()
269 .map(|f| RemovedFile::File(f.file_id, f.index_version())),
270 );
271 let at = edit
272 .timestamp_ms
273 .unwrap_or_else(|| Utc::now().timestamp_millis());
274 self.removed_files.add_removed_files(removed_files, at);
275
276 for file in edit.files_to_remove {
277 self.files.remove(&file.file_id);
278 }
279 if let Some(flushed_entry_id) = edit.flushed_entry_id {
280 self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
281 }
282 if let Some(flushed_sequence) = edit.flushed_sequence {
283 self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
284 }
285
286 if let Some(committed_sequence) = edit.committed_sequence {
287 self.committed_sequence = Some(
288 self.committed_sequence
289 .map_or(committed_sequence, |exist| exist.max(committed_sequence)),
290 );
291 }
292 if let Some(window) = edit.compaction_time_window {
293 self.compaction_time_window = Some(window);
294 }
295 }
296
297 pub fn apply_truncate(&mut self, manifest_version: ManifestVersion, truncate: RegionTruncate) {
298 self.manifest_version = manifest_version;
299 match truncate.kind {
300 TruncateKind::All {
301 truncated_entry_id,
302 truncated_sequence,
303 } => {
304 self.flushed_entry_id = truncated_entry_id;
305 self.flushed_sequence = truncated_sequence;
306 self.truncated_entry_id = Some(truncated_entry_id);
307 self.removed_files.add_removed_files(
308 self.files
309 .values()
310 .map(|f| RemovedFile::File(f.file_id, f.index_version()))
311 .collect(),
312 truncate
313 .timestamp_ms
314 .unwrap_or_else(|| Utc::now().timestamp_millis()),
315 );
316 self.files.clear();
317 }
318 TruncateKind::Partial { files_to_remove } => {
319 self.removed_files.add_removed_files(
320 files_to_remove
321 .iter()
322 .map(|f| RemovedFile::File(f.file_id, f.index_version()))
323 .collect(),
324 truncate
325 .timestamp_ms
326 .unwrap_or_else(|| Utc::now().timestamp_millis()),
327 );
328 for file in files_to_remove {
329 self.files.remove(&file.file_id);
330 }
331 }
332 }
333 }
334
335 pub fn files(&self) -> &HashMap<FileId, FileMeta> {
336 &self.files
337 }
338
339 pub fn contains_metadata(&self) -> bool {
341 self.metadata.is_some()
342 }
343
344 pub fn try_build(self) -> Result<RegionManifest> {
345 let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
346 Ok(RegionManifest {
347 metadata,
348 files: self.files,
349 removed_files: self.removed_files,
350 flushed_entry_id: self.flushed_entry_id,
351 flushed_sequence: self.flushed_sequence,
352 committed_sequence: self.committed_sequence,
353 manifest_version: self.manifest_version,
354 truncated_entry_id: self.truncated_entry_id,
355 compaction_time_window: self.compaction_time_window,
356 sst_format: self.sst_format,
357 append_mode: self.append_mode,
358 })
359 }
360}
361
362#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
366pub struct RemovedFilesRecord {
367 pub removed_files: Vec<RemovedFiles>,
370}
371
372impl RemovedFilesRecord {
373 pub fn clear_deleted_files(&mut self, deleted_files: Vec<RemovedFile>) {
375 let deleted_file_set: HashSet<_> = HashSet::from_iter(deleted_files);
376 for files in self.removed_files.iter_mut() {
377 files
378 .files
379 .retain(|removed| !deleted_file_set.contains(removed));
380 }
381
382 self.removed_files.retain(|fs| !fs.files.is_empty());
383 }
384
385 pub fn update_file_removed_cnt_to_stats(&self, stats: &ManifestStats) {
386 let cnt = self
387 .removed_files
388 .iter()
389 .map(|r| r.files.len() as u64)
390 .sum();
391 stats
392 .file_removed_cnt
393 .store(cnt, std::sync::atomic::Ordering::Relaxed);
394 }
395}
396
397#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
398pub struct RemovedFiles {
399 pub removed_at: i64,
402 #[serde(default)]
404 pub files: HashSet<RemovedFile>,
405}
406
407#[derive(Serialize, Hash, Clone, Debug, PartialEq, Eq)]
409pub enum RemovedFile {
410 File(FileId, Option<IndexVersion>),
411 Index(FileId, IndexVersion),
412}
413
414impl<'de> Deserialize<'de> for RemovedFile {
418 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
419 where
420 D: serde::Deserializer<'de>,
421 {
422 #[derive(Deserialize)]
423 #[serde(untagged)]
424 enum CompatRemovedFile {
425 Enum(RemovedFileEnum),
426 FileId(FileId),
427 }
428
429 #[derive(Deserialize)]
430 enum RemovedFileEnum {
431 File(FileId, Option<IndexVersion>),
432 Index(FileId, IndexVersion),
433 }
434
435 let compat = CompatRemovedFile::deserialize(deserializer)?;
436 match compat {
437 CompatRemovedFile::FileId(file_id) => Ok(RemovedFile::File(file_id, None)),
438 CompatRemovedFile::Enum(e) => match e {
439 RemovedFileEnum::File(file_id, version) => Ok(RemovedFile::File(file_id, version)),
440 RemovedFileEnum::Index(file_id, version) => {
441 Ok(RemovedFile::Index(file_id, version))
442 }
443 },
444 }
445 }
446}
447
448impl RemovedFile {
449 pub fn file_id(&self) -> FileId {
450 match self {
451 RemovedFile::File(file_id, _) => *file_id,
452 RemovedFile::Index(file_id, _) => *file_id,
453 }
454 }
455
456 pub fn index_version(&self) -> Option<IndexVersion> {
457 match self {
458 RemovedFile::File(_, index_version) => *index_version,
459 RemovedFile::Index(_, index_version) => Some(*index_version),
460 }
461 }
462}
463
464impl RemovedFilesRecord {
465 pub fn add_removed_files(&mut self, removed: Vec<RemovedFile>, at: i64) {
467 if removed.is_empty() {
468 return;
469 }
470 let files = removed.into_iter().collect();
471 self.removed_files.push(RemovedFiles {
472 removed_at: at,
473 files,
474 });
475 }
476
477 pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
478 if !opt.enable_gc {
479 self.removed_files.clear();
481 return Ok(());
482 }
483
484 Ok(())
487 }
488}
489
490#[derive(Serialize, Deserialize, Debug, Clone)]
492#[cfg_attr(test, derive(PartialEq, Eq))]
493pub struct RegionCheckpoint {
494 pub last_version: ManifestVersion,
496 pub compacted_actions: usize,
498 pub checkpoint: Option<RegionManifest>,
500}
501
502impl RegionCheckpoint {
503 pub fn last_version(&self) -> ManifestVersion {
504 self.last_version
505 }
506
507 pub fn encode(&self) -> Result<Vec<u8>> {
508 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
509
510 Ok(json.into_bytes())
511 }
512
513 pub fn decode(bytes: &[u8]) -> Result<Self> {
514 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
515
516 serde_json::from_str(data).context(SerdeJsonSnafu)
517 }
518}
519
520#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
521pub struct RegionMetaActionList {
522 pub actions: Vec<RegionMetaAction>,
523}
524
525impl RegionMetaActionList {
526 pub fn with_action(action: RegionMetaAction) -> Self {
527 Self {
528 actions: vec![action],
529 }
530 }
531
532 pub fn new(actions: Vec<RegionMetaAction>) -> Self {
533 Self { actions }
534 }
535
536 pub fn split_region_change_and_edit(
538 self,
539 ) -> (
540 Option<RegionPartitionExprChange>,
541 Option<RegionChange>,
542 RegionEdit,
543 ) {
544 let mut edit = RegionEdit {
545 files_to_add: Vec::new(),
546 files_to_remove: Vec::new(),
547 timestamp_ms: None,
548 compaction_time_window: None,
549 flushed_entry_id: None,
550 flushed_sequence: None,
551 committed_sequence: None,
552 };
553 let mut partition_expr_change = None;
554 let mut region_change = None;
555 for action in self.actions {
556 match action {
557 RegionMetaAction::PartitionExprChange(change) => {
558 partition_expr_change = Some(change);
559 }
560 RegionMetaAction::Change(change) => {
561 region_change = Some(change);
562 }
563 RegionMetaAction::Edit(region_edit) => {
564 edit.files_to_add.extend(region_edit.files_to_add);
566 edit.files_to_remove.extend(region_edit.files_to_remove);
567 if let Some(eid) = region_edit.flushed_entry_id {
569 edit.flushed_entry_id =
570 Some(edit.flushed_entry_id.map_or(eid, |v| v.max(eid)));
571 }
572 if let Some(seq) = region_edit.flushed_sequence {
573 edit.flushed_sequence =
574 Some(edit.flushed_sequence.map_or(seq, |v| v.max(seq)));
575 }
576 if let Some(seq) = region_edit.committed_sequence {
577 edit.committed_sequence =
578 Some(edit.committed_sequence.map_or(seq, |v| v.max(seq)));
579 }
580 if region_edit.compaction_time_window.is_some() {
582 edit.compaction_time_window = region_edit.compaction_time_window;
583 }
584 }
585 _ => {}
586 }
587 }
588
589 (partition_expr_change, region_change, edit)
590 }
591}
592
593impl RegionMetaActionList {
594 pub fn encode(&self) -> Result<Vec<u8>> {
596 let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
597
598 Ok(json.into_bytes())
599 }
600
601 pub fn decode(bytes: &[u8]) -> Result<Self> {
602 let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
603
604 serde_json::from_str(data).context(SerdeJsonSnafu)
605 }
606}
607
608#[cfg(test)]
609mod tests {
610
611 use common_time::Timestamp;
612
613 use super::*;
614
615 #[test]
619 fn test_region_action_compatibility() {
620 let region_edit = r#"{
621 "flushed_entry_id":null,
622 "compaction_time_window":null,
623 "files_to_add":[
624 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
625 ],
626 "files_to_remove":[
627 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
628 ]
629 }"#;
630 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
631
632 let region_edit = r#"{
633 "flushed_entry_id":10,
634 "flushed_sequence":10,
635 "compaction_time_window":null,
636 "files_to_add":[
637 {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
638 ],
639 "files_to_remove":[
640 {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100}
641 ]
642 }"#;
643 let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
644
645 let region_change = r#" {
647 "metadata":{
648 "column_metadatas":[
649 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
650 ],
651 "primary_key":[1],
652 "region_id":5299989648942,
653 "schema_version":0
654 }
655 }"#;
656 let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
657
658 let region_remove = r#"{"region_id":42}"#;
659 let _ = serde_json::from_str::<RegionRemove>(region_remove).unwrap();
660
661 let region_partition_expr_change = r#"{
662 "partition_expr": "{\"expr\":\"x < 100\"}"
663 }"#;
664 let _ = serde_json::from_str::<RegionPartitionExprChange>(region_partition_expr_change)
665 .unwrap();
666 }
667
668 #[test]
669 fn test_region_manifest_builder() {
670 }
672
673 #[test]
674 fn test_encode_decode_region_checkpoint() {
675 }
677
678 #[test]
679 fn test_region_manifest_compatibility() {
680 let region_manifest_json = r#"{
682 "metadata": {
683 "column_metadatas": [
684 {
685 "column_schema": {
686 "name": "a",
687 "data_type": {"Int64": {}},
688 "is_nullable": false,
689 "is_time_index": false,
690 "default_constraint": null,
691 "metadata": {}
692 },
693 "semantic_type": "Tag",
694 "column_id": 1
695 },
696 {
697 "column_schema": {
698 "name": "b",
699 "data_type": {"Float64": {}},
700 "is_nullable": false,
701 "is_time_index": false,
702 "default_constraint": null,
703 "metadata": {}
704 },
705 "semantic_type": "Field",
706 "column_id": 2
707 },
708 {
709 "column_schema": {
710 "name": "c",
711 "data_type": {"Timestamp": {"Millisecond": null}},
712 "is_nullable": false,
713 "is_time_index": false,
714 "default_constraint": null,
715 "metadata": {}
716 },
717 "semantic_type": "Timestamp",
718 "column_id": 3
719 }
720 ],
721 "primary_key": [1],
722 "region_id": 4402341478400,
723 "schema_version": 0
724 },
725 "files": {
726 "4b220a70-2b03-4641-9687-b65d94641208": {
727 "region_id": 4402341478400,
728 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
729 "time_range": [
730 {"value": 1451609210000, "unit": "Millisecond"},
731 {"value": 1451609520000, "unit": "Millisecond"}
732 ],
733 "level": 1,
734 "file_size": 100
735 },
736 "34b6ebb9-b8a5-4a4b-b744-56f67defad02": {
737 "region_id": 4402341478400,
738 "file_id": "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
739 "time_range": [
740 {"value": 1451609210000, "unit": "Millisecond"},
741 {"value": 1451609520000, "unit": "Millisecond"}
742 ],
743 "level": 0,
744 "file_size": 100
745 }
746 },
747 "flushed_entry_id": 10,
748 "flushed_sequence": 20,
749 "manifest_version": 1,
750 "truncated_entry_id": null,
751 "compaction_time_window": null
752 }"#;
753
754 let manifest = serde_json::from_str::<RegionManifest>(region_manifest_json).unwrap();
755
756 assert_eq!(manifest.files.len(), 2);
758 assert_eq!(manifest.flushed_entry_id, 10);
759 assert_eq!(manifest.flushed_sequence, 20);
760 assert_eq!(manifest.manifest_version, 1);
761
762 let mut file_ids: Vec<String> = manifest.files.keys().map(|id| id.to_string()).collect();
764 file_ids.sort_unstable();
765 assert_eq!(
766 file_ids,
767 vec![
768 "34b6ebb9-b8a5-4a4b-b744-56f67defad02",
769 "4b220a70-2b03-4641-9687-b65d94641208",
770 ]
771 );
772
773 let serialized_manifest = serde_json::to_string(&manifest).unwrap();
775 let deserialized_manifest: RegionManifest =
776 serde_json::from_str(&serialized_manifest).unwrap();
777 assert_eq!(manifest, deserialized_manifest);
778 assert_ne!(serialized_manifest, region_manifest_json);
779 }
780
781 #[test]
782 fn test_region_truncate_compat() {
783 let region_truncate_json = r#"{
785 "region_id": 4402341478400,
786 "truncated_entry_id": 10,
787 "truncated_sequence": 20
788 }"#;
789
790 let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
791 assert_eq!(truncate_v1.region_id, 4402341478400);
792 assert_eq!(
793 truncate_v1.kind,
794 TruncateKind::All {
795 truncated_entry_id: 10,
796 truncated_sequence: 20,
797 }
798 );
799
800 let region_truncate_v2_json = r#"{
802 "region_id": 4402341478400,
803 "files_to_remove": [
804 {
805 "region_id": 4402341478400,
806 "file_id": "4b220a70-2b03-4641-9687-b65d94641208",
807 "time_range": [
808 {
809 "value": 1451609210000,
810 "unit": "Millisecond"
811 },
812 {
813 "value": 1451609520000,
814 "unit": "Millisecond"
815 }
816 ],
817 "level": 1,
818 "file_size": 100
819 }
820 ]
821}"#;
822
823 let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
824 assert_eq!(truncate_v2.region_id, 4402341478400);
825 assert_eq!(
826 truncate_v2.kind,
827 TruncateKind::Partial {
828 files_to_remove: vec![FileMeta {
829 region_id: RegionId::from_u64(4402341478400),
830 file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
831 time_range: (
832 Timestamp::new_millisecond(1451609210000),
833 Timestamp::new_millisecond(1451609520000)
834 ),
835 level: 1,
836 file_size: 100,
837 ..Default::default()
838 }]
839 }
840 );
841 }
842
843 #[test]
844 fn test_region_manifest_removed_files() {
845 let region_metadata = r#"{
846 "column_metadatas": [
847 {
848 "column_schema": {
849 "name": "a",
850 "data_type": {"Int64": {}},
851 "is_nullable": false,
852 "is_time_index": false,
853 "default_constraint": null,
854 "metadata": {}
855 },
856 "semantic_type": "Tag",
857 "column_id": 1
858 },
859 {
860 "column_schema": {
861 "name": "b",
862 "data_type": {"Float64": {}},
863 "is_nullable": false,
864 "is_time_index": false,
865 "default_constraint": null,
866 "metadata": {}
867 },
868 "semantic_type": "Field",
869 "column_id": 2
870 },
871 {
872 "column_schema": {
873 "name": "c",
874 "data_type": {"Timestamp": {"Millisecond": null}},
875 "is_nullable": false,
876 "is_time_index": false,
877 "default_constraint": null,
878 "metadata": {}
879 },
880 "semantic_type": "Timestamp",
881 "column_id": 3
882 }
883 ],
884 "primary_key": [1],
885 "region_id": 4402341478400,
886 "schema_version": 0
887 }"#;
888
889 let metadata: RegionMetadataRef =
890 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
891 let manifest = RegionManifest {
892 metadata: metadata.clone(),
893 files: HashMap::new(),
894 flushed_entry_id: 0,
895 flushed_sequence: 0,
896 committed_sequence: None,
897 manifest_version: 0,
898 truncated_entry_id: None,
899 compaction_time_window: None,
900 removed_files: RemovedFilesRecord {
901 removed_files: vec![RemovedFiles {
902 removed_at: 0,
903 files: HashSet::from([RemovedFile::File(
904 FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
905 None,
906 )]),
907 }],
908 },
909 sst_format: FormatType::PrimaryKey,
910 append_mode: None,
911 };
912
913 let json = serde_json::to_string(&manifest).unwrap();
914 let new: RegionManifest = serde_json::from_str(&json).unwrap();
915
916 assert_eq!(manifest, new);
917 }
918
919 #[test]
921 fn test_old_region_manifest_compat() {
922 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
923 pub struct RegionManifestV1 {
924 pub metadata: RegionMetadataRef,
926 pub files: HashMap<FileId, FileMeta>,
928 pub flushed_entry_id: EntryId,
930 pub flushed_sequence: SequenceNumber,
932 pub manifest_version: ManifestVersion,
934 pub truncated_entry_id: Option<EntryId>,
936 #[serde(with = "humantime_serde")]
938 pub compaction_time_window: Option<Duration>,
939 }
940
941 let region_metadata = r#"{
942 "column_metadatas": [
943 {
944 "column_schema": {
945 "name": "a",
946 "data_type": {"Int64": {}},
947 "is_nullable": false,
948 "is_time_index": false,
949 "default_constraint": null,
950 "metadata": {}
951 },
952 "semantic_type": "Tag",
953 "column_id": 1
954 },
955 {
956 "column_schema": {
957 "name": "b",
958 "data_type": {"Float64": {}},
959 "is_nullable": false,
960 "is_time_index": false,
961 "default_constraint": null,
962 "metadata": {}
963 },
964 "semantic_type": "Field",
965 "column_id": 2
966 },
967 {
968 "column_schema": {
969 "name": "c",
970 "data_type": {"Timestamp": {"Millisecond": null}},
971 "is_nullable": false,
972 "is_time_index": false,
973 "default_constraint": null,
974 "metadata": {}
975 },
976 "semantic_type": "Timestamp",
977 "column_id": 3
978 }
979 ],
980 "primary_key": [1],
981 "region_id": 4402341478400,
982 "schema_version": 0
983 }"#;
984
985 let metadata: RegionMetadataRef =
986 serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
987
988 let v1 = RegionManifestV1 {
990 metadata: metadata.clone(),
991 files: HashMap::new(),
992 flushed_entry_id: 0,
993 flushed_sequence: 0,
994 manifest_version: 0,
995 truncated_entry_id: None,
996 compaction_time_window: None,
997 };
998 let json = serde_json::to_string(&v1).unwrap();
999 let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
1000 assert_eq!(
1001 new_from_old,
1002 RegionManifest {
1003 metadata: metadata.clone(),
1004 files: HashMap::new(),
1005 removed_files: Default::default(),
1006 flushed_entry_id: 0,
1007 flushed_sequence: 0,
1008 committed_sequence: None,
1009 manifest_version: 0,
1010 truncated_entry_id: None,
1011 compaction_time_window: None,
1012 sst_format: FormatType::PrimaryKey,
1013 append_mode: None,
1014 }
1015 );
1016
1017 let new_manifest = RegionManifest {
1018 metadata: metadata.clone(),
1019 files: HashMap::new(),
1020 removed_files: Default::default(),
1021 flushed_entry_id: 0,
1022 flushed_sequence: 0,
1023 committed_sequence: None,
1024 manifest_version: 0,
1025 truncated_entry_id: None,
1026 compaction_time_window: None,
1027 sst_format: FormatType::PrimaryKey,
1028 append_mode: None,
1029 };
1030 let json = serde_json::to_string(&new_manifest).unwrap();
1031 let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
1032 assert_eq!(
1033 old_from_new,
1034 RegionManifestV1 {
1035 metadata: metadata.clone(),
1036 files: HashMap::new(),
1037 flushed_entry_id: 0,
1038 flushed_sequence: 0,
1039 manifest_version: 0,
1040 truncated_entry_id: None,
1041 compaction_time_window: None,
1042 }
1043 );
1044
1045 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1046 pub struct RegionEditV1 {
1047 pub files_to_add: Vec<FileMeta>,
1048 pub files_to_remove: Vec<FileMeta>,
1049 #[serde(with = "humantime_serde")]
1050 pub compaction_time_window: Option<Duration>,
1051 pub flushed_entry_id: Option<EntryId>,
1052 pub flushed_sequence: Option<SequenceNumber>,
1053 }
1054
1055 let json = serde_json::to_string(&RegionEditV1 {
1056 files_to_add: vec![],
1057 files_to_remove: vec![],
1058 compaction_time_window: None,
1059 flushed_entry_id: None,
1060 flushed_sequence: None,
1061 })
1062 .unwrap();
1063 let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
1064 assert_eq!(
1065 RegionEdit {
1066 files_to_add: vec![],
1067 files_to_remove: vec![],
1068 timestamp_ms: None,
1069 compaction_time_window: None,
1070 flushed_entry_id: None,
1071 flushed_sequence: None,
1072 committed_sequence: None,
1073 },
1074 new_from_old
1075 );
1076
1077 let new = RegionEdit {
1079 files_to_add: vec![],
1080 files_to_remove: vec![],
1081 timestamp_ms: Some(42),
1082 compaction_time_window: None,
1083 flushed_entry_id: None,
1084 flushed_sequence: None,
1085 committed_sequence: None,
1086 };
1087
1088 let new_json = serde_json::to_string(&new).unwrap();
1089
1090 let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
1091 assert_eq!(
1092 RegionEditV1 {
1093 files_to_add: vec![],
1094 files_to_remove: vec![],
1095 compaction_time_window: None,
1096 flushed_entry_id: None,
1097 flushed_sequence: None,
1098 },
1099 old_from_new
1100 );
1101 }
1102
1103 #[test]
1104 fn test_region_change_backward_compatibility() {
1105 let region_change_json = r#"{
1107 "metadata": {
1108 "column_metadatas": [
1109 {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
1110 {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
1111 {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
1112 ],
1113 "primary_key": [
1114 1
1115 ],
1116 "region_id": 42,
1117 "schema_version": 0
1118 }
1119 }"#;
1120
1121 let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
1122 assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
1123
1124 let region_change = RegionChange {
1126 metadata: region_change.metadata.clone(),
1127 sst_format: FormatType::Flat,
1128 append_mode: None,
1129 };
1130
1131 let serialized = serde_json::to_string(®ion_change).unwrap();
1132 let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
1133 assert_eq!(deserialized.sst_format, FormatType::Flat);
1134 }
1135
1136 #[test]
1137 fn test_removed_file_compatibility() {
1138 let file_id = FileId::random();
1139 let json_str = format!("\"{}\"", file_id);
1141 let removed_file: RemovedFile = serde_json::from_str(&json_str).unwrap();
1142 assert_eq!(removed_file, RemovedFile::File(file_id, None));
1143
1144 let removed_file_v2 = RemovedFile::File(file_id, Some(10));
1146 let json_v2 = serde_json::to_string(&removed_file_v2).unwrap();
1147 let deserialized_v2: RemovedFile = serde_json::from_str(&json_v2).unwrap();
1148 assert_eq!(removed_file_v2, deserialized_v2);
1149
1150 let removed_index = RemovedFile::Index(file_id, 20);
1152 let json_index = serde_json::to_string(&removed_index).unwrap();
1153 let deserialized_index: RemovedFile = serde_json::from_str(&json_index).unwrap();
1154 assert_eq!(removed_index, deserialized_index);
1155
1156 let removed_file = RemovedFile::File(file_id, None);
1158 let json = serde_json::to_string(&removed_file).unwrap();
1159 let deserialized: RemovedFile = serde_json::from_str(&json).unwrap();
1160 assert_eq!(removed_file, deserialized);
1161
1162 let json_set = format!("[\"{}\"]", file_id);
1168 let removed_files_set: HashSet<RemovedFile> = serde_json::from_str(&json_set).unwrap();
1169 assert!(removed_files_set.contains(&RemovedFile::File(file_id, None)));
1170 }
1171
1172 #[test]
1195 fn test_removed_files_backward_compatibility() {
1196 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
1198 struct OldRemovedFiles {
1199 pub removed_at: i64,
1200 pub file_ids: HashSet<FileId>,
1201 }
1202
1203 let mut file_ids = HashSet::new();
1205 file_ids.insert(FileId::random());
1206 file_ids.insert(FileId::random());
1207
1208 let old_removed_files = OldRemovedFiles {
1209 removed_at: 1234567890,
1210 file_ids,
1211 };
1212
1213 let old_json = serde_json::to_string(&old_removed_files).unwrap();
1215
1216 let result: Result<RemovedFiles, _> = serde_json::from_str(&old_json);
1218
1219 assert!(result.is_ok(), "{:?}", result);
1221 let removed_files = result.unwrap();
1222 assert_eq!(removed_files.removed_at, 1234567890);
1223 assert!(removed_files.files.is_empty());
1224
1225 let file_id = FileId::random();
1227 let new_json = format!(
1228 r#"{{
1229 "removed_at": 1234567890,
1230 "files": ["{}"]
1231 }}"#,
1232 file_id
1233 );
1234
1235 let result: Result<RemovedFiles, _> = serde_json::from_str(&new_json);
1236 assert!(result.is_ok());
1237 let removed_files = result.unwrap();
1238 assert_eq!(removed_files.removed_at, 1234567890);
1239 assert_eq!(removed_files.files.len(), 1);
1240 assert!(
1241 removed_files
1242 .files
1243 .contains(&RemovedFile::File(file_id, None))
1244 );
1245 }
1246}