Skip to main content

cli/data/export_v2/
manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Manifest data structures for Export/Import V2.
16
17use std::time::Duration;
18use std::{fmt, str};
19
20use chrono::{DateTime, Utc};
21use serde::{Deserialize, Serialize};
22use uuid::Uuid;
23
24use crate::data::export_v2::chunker::generate_chunks;
25use crate::data::export_v2::error::{
26    ChunkTimeWindowRequiresBoundsSnafu, Result as ExportResult, TimeParseEndBeforeStartSnafu,
27    TimeParseInvalidFormatSnafu,
28};
29
30/// Current manifest format version.
31pub const MANIFEST_VERSION: u32 = 1;
32
33/// Manifest file name within snapshot directory.
34pub const MANIFEST_FILE: &str = "manifest.json";
35
36/// Time range for data export (half-open interval: [start, end)).
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38pub struct TimeRange {
39    /// Start time (inclusive). None means earliest available data.
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub start: Option<DateTime<Utc>>,
42    /// End time (exclusive). None means current time.
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub end: Option<DateTime<Utc>>,
45}
46
47impl TimeRange {
48    /// Creates a new time range with specified bounds.
49    pub fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Self {
50        Self { start, end }
51    }
52
53    /// Creates an unbounded time range (all data).
54    pub fn unbounded() -> Self {
55        Self {
56            start: None,
57            end: None,
58        }
59    }
60
61    /// Returns true if this time range is unbounded.
62    pub fn is_unbounded(&self) -> bool {
63        self.start.is_none() && self.end.is_none()
64    }
65
66    /// Returns true if both bounds are specified.
67    pub fn is_bounded(&self) -> bool {
68        self.start.is_some() && self.end.is_some()
69    }
70
71    /// Parses a time range from optional RFC3339 strings.
72    pub fn parse(start: Option<&str>, end: Option<&str>) -> ExportResult<Self> {
73        let start = start.map(parse_time).transpose()?;
74        let end = end.map(parse_time).transpose()?;
75
76        if let (Some(start), Some(end)) = (start, end)
77            && end < start
78        {
79            return TimeParseEndBeforeStartSnafu.fail();
80        }
81
82        Ok(Self::new(start, end))
83    }
84}
85
86fn parse_time(input: &str) -> ExportResult<DateTime<Utc>> {
87    DateTime::parse_from_rfc3339(input)
88        .map(|dt| dt.with_timezone(&Utc))
89        .map_err(|_| TimeParseInvalidFormatSnafu { input }.build())
90}
91
92impl Default for TimeRange {
93    fn default() -> Self {
94        Self::unbounded()
95    }
96}
97
98/// Status of a chunk during export/import.
99#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
100#[serde(rename_all = "snake_case")]
101pub enum ChunkStatus {
102    /// Chunk is pending export.
103    #[default]
104    Pending,
105    /// Chunk export is in progress.
106    InProgress,
107    /// Chunk export completed successfully.
108    Completed,
109    /// Chunk had no data to export.
110    Skipped,
111    /// Chunk export failed.
112    Failed,
113}
114
115/// Metadata for a single chunk of exported data.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct ChunkMeta {
118    /// Chunk identifier (sequential number starting from 1).
119    pub id: u32,
120    /// Time range covered by this chunk.
121    pub time_range: TimeRange,
122    /// Export status.
123    pub status: ChunkStatus,
124    /// List of data files in this chunk (relative paths from snapshot root).
125    #[serde(default)]
126    pub files: Vec<String>,
127    /// SHA256 checksum of all files in this chunk (aggregated).
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub checksum: Option<String>,
130    /// Error message if status is Failed.
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub error: Option<String>,
133}
134
135impl ChunkMeta {
136    /// Creates a new pending chunk with the given id and time range.
137    pub fn new(id: u32, time_range: TimeRange) -> Self {
138        Self {
139            id,
140            time_range,
141            status: ChunkStatus::Pending,
142            files: vec![],
143            checksum: None,
144            error: None,
145        }
146    }
147
148    /// Creates a skipped chunk with the given id and time range.
149    pub fn skipped(id: u32, time_range: TimeRange) -> Self {
150        let mut chunk = Self::new(id, time_range);
151        chunk.mark_skipped();
152        chunk
153    }
154
155    /// Marks this chunk as in progress.
156    pub fn mark_in_progress(&mut self) {
157        self.status = ChunkStatus::InProgress;
158        self.error = None;
159    }
160
161    /// Marks this chunk as completed with the given files and checksum.
162    pub fn mark_completed(&mut self, files: Vec<String>, checksum: Option<String>) {
163        self.status = ChunkStatus::Completed;
164        self.files = files;
165        self.checksum = checksum;
166        self.error = None;
167    }
168
169    /// Marks this chunk as skipped because no data files were produced.
170    pub fn mark_skipped(&mut self) {
171        self.status = ChunkStatus::Skipped;
172        self.files.clear();
173        self.checksum = None;
174        self.error = None;
175    }
176
177    /// Marks this chunk as failed with the given error message.
178    pub fn mark_failed(&mut self, error: String) {
179        self.status = ChunkStatus::Failed;
180        self.error = Some(error);
181    }
182}
183
184/// Supported data formats for export.
185#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, clap::ValueEnum)]
186#[serde(rename_all = "lowercase")]
187#[value(rename_all = "lowercase")]
188pub enum DataFormat {
189    /// Apache Parquet format (default, recommended for production).
190    #[default]
191    Parquet,
192    /// CSV format (human-readable).
193    Csv,
194    /// JSON format (structured text).
195    Json,
196}
197
198impl fmt::Display for DataFormat {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        match self {
201            DataFormat::Parquet => write!(f, "parquet"),
202            DataFormat::Csv => write!(f, "csv"),
203            DataFormat::Json => write!(f, "json"),
204        }
205    }
206}
207
208impl str::FromStr for DataFormat {
209    type Err = String;
210
211    fn from_str(s: &str) -> Result<Self, Self::Err> {
212        match s.to_lowercase().as_str() {
213            "parquet" => Ok(DataFormat::Parquet),
214            "csv" => Ok(DataFormat::Csv),
215            "json" => Ok(DataFormat::Json),
216            _ => Err(format!(
217                "invalid format '{}': expected one of parquet, csv, json",
218                s
219            )),
220        }
221    }
222}
223
224/// Snapshot manifest containing all metadata.
225///
226/// The manifest is stored as `manifest.json` in the snapshot root directory.
227/// It contains:
228/// - Snapshot identification (UUID, timestamps)
229/// - Scope (catalog, schemas, time range)
230/// - Export configuration (format, schema_only)
231/// - Chunk metadata for resume support
232/// - Integrity checksums
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct Manifest {
235    /// Manifest format version for compatibility checking.
236    pub version: u32,
237    /// Unique snapshot identifier.
238    pub snapshot_id: Uuid,
239    /// Catalog name.
240    pub catalog: String,
241    /// List of schemas included in this snapshot.
242    pub schemas: Vec<String>,
243    /// Overall time range covered by this snapshot.
244    pub time_range: TimeRange,
245    /// Whether this is a schema-only snapshot (no data).
246    pub schema_only: bool,
247    /// Data format used for export.
248    pub format: DataFormat,
249    /// Chunk metadata (empty for schema-only snapshots).
250    #[serde(default)]
251    pub chunks: Vec<ChunkMeta>,
252    /// Snapshot-level SHA256 checksum (aggregated from all chunks).
253    #[serde(skip_serializing_if = "Option::is_none")]
254    pub checksum: Option<String>,
255    /// Creation timestamp.
256    pub created_at: DateTime<Utc>,
257    /// Last updated timestamp.
258    pub updated_at: DateTime<Utc>,
259}
260
261impl Manifest {
262    pub fn new_for_export(
263        catalog: String,
264        schemas: Vec<String>,
265        schema_only: bool,
266        time_range: TimeRange,
267        format: DataFormat,
268        chunk_time_window: Option<Duration>,
269    ) -> ExportResult<Self> {
270        if chunk_time_window.is_some() && !time_range.is_bounded() {
271            return ChunkTimeWindowRequiresBoundsSnafu.fail();
272        }
273
274        let mut manifest = if schema_only {
275            Self::new_schema_only(catalog, schemas)
276        } else {
277            Self::new_full(catalog, schemas, time_range, format)
278        };
279
280        if !schema_only {
281            manifest.chunks = match chunk_time_window {
282                Some(window) => generate_chunks(&manifest.time_range, window),
283                None => generate_single_chunk(&manifest.time_range),
284            };
285            manifest.touch();
286        }
287
288        Ok(manifest)
289    }
290
291    /// Creates a new manifest for schema-only export.
292    pub fn new_schema_only(catalog: String, schemas: Vec<String>) -> Self {
293        let now = Utc::now();
294        Self {
295            version: MANIFEST_VERSION,
296            snapshot_id: Uuid::new_v4(),
297            catalog,
298            schemas,
299            time_range: TimeRange::unbounded(),
300            schema_only: true,
301            format: DataFormat::Parquet,
302            chunks: vec![],
303            checksum: None,
304            created_at: now,
305            updated_at: now,
306        }
307    }
308
309    /// Creates a new manifest for full export with time range and format.
310    pub fn new_full(
311        catalog: String,
312        schemas: Vec<String>,
313        time_range: TimeRange,
314        format: DataFormat,
315    ) -> Self {
316        let now = Utc::now();
317        Self {
318            version: MANIFEST_VERSION,
319            snapshot_id: Uuid::new_v4(),
320            catalog,
321            schemas,
322            time_range,
323            schema_only: false,
324            format,
325            chunks: vec![],
326            checksum: None,
327            created_at: now,
328            updated_at: now,
329        }
330    }
331
332    /// Returns true if all chunks are completed (or if schema-only).
333    pub fn is_complete(&self) -> bool {
334        self.schema_only
335            || (!self.chunks.is_empty()
336                && self
337                    .chunks
338                    .iter()
339                    .all(|c| matches!(c.status, ChunkStatus::Completed | ChunkStatus::Skipped)))
340    }
341
342    /// Returns the number of pending chunks.
343    pub fn pending_count(&self) -> usize {
344        self.chunks
345            .iter()
346            .filter(|c| c.status == ChunkStatus::Pending)
347            .count()
348    }
349
350    /// Returns the number of in-progress chunks.
351    pub fn in_progress_count(&self) -> usize {
352        self.chunks
353            .iter()
354            .filter(|c| c.status == ChunkStatus::InProgress)
355            .count()
356    }
357
358    /// Returns the number of completed chunks.
359    pub fn completed_count(&self) -> usize {
360        self.chunks
361            .iter()
362            .filter(|c| c.status == ChunkStatus::Completed)
363            .count()
364    }
365
366    /// Returns the number of skipped chunks.
367    pub fn skipped_count(&self) -> usize {
368        self.chunks
369            .iter()
370            .filter(|c| c.status == ChunkStatus::Skipped)
371            .count()
372    }
373
374    /// Returns the number of failed chunks.
375    pub fn failed_count(&self) -> usize {
376        self.chunks
377            .iter()
378            .filter(|c| c.status == ChunkStatus::Failed)
379            .count()
380    }
381
382    /// Updates the `updated_at` timestamp to now.
383    pub fn touch(&mut self) {
384        self.updated_at = Utc::now();
385    }
386
387    /// Adds a chunk to the manifest.
388    pub fn add_chunk(&mut self, chunk: ChunkMeta) {
389        self.chunks.push(chunk);
390        self.touch();
391    }
392
393    /// Updates a chunk by id.
394    pub fn update_chunk(&mut self, id: u32, updater: impl FnOnce(&mut ChunkMeta)) {
395        if let Some(chunk) = self.chunks.iter_mut().find(|c| c.id == id) {
396            updater(chunk);
397            self.touch();
398        }
399    }
400}
401
402fn generate_single_chunk(time_range: &TimeRange) -> Vec<ChunkMeta> {
403    if let (Some(start), Some(end)) = (time_range.start, time_range.end) {
404        if start == end {
405            return vec![ChunkMeta::skipped(1, time_range.clone())];
406        }
407        if start > end {
408            return Vec::new();
409        }
410    }
411    vec![ChunkMeta::new(1, time_range.clone())]
412}
413
414#[cfg(test)]
415mod tests {
416    use std::time::Duration;
417
418    use chrono::{TimeZone, Utc};
419
420    use super::*;
421
422    #[test]
423    fn test_time_range_serialization() {
424        let range = TimeRange::unbounded();
425        let json = serde_json::to_string(&range).unwrap();
426        assert_eq!(json, "{}");
427
428        let range: TimeRange = serde_json::from_str("{}").unwrap();
429        assert!(range.is_unbounded());
430    }
431
432    #[test]
433    fn test_manifest_schema_only() {
434        let manifest =
435            Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]);
436
437        assert_eq!(manifest.version, MANIFEST_VERSION);
438        assert!(manifest.schema_only);
439        assert!(manifest.chunks.is_empty());
440        assert!(manifest.is_complete());
441    }
442
443    #[test]
444    fn test_generate_single_chunk_zero_width_range_is_skipped() {
445        let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
446        let chunks = generate_single_chunk(&TimeRange::new(Some(ts), Some(ts)));
447
448        assert_eq!(chunks.len(), 1);
449        assert_eq!(chunks[0].status, ChunkStatus::Skipped);
450        assert_eq!(chunks[0].time_range.start, Some(ts));
451        assert_eq!(chunks[0].time_range.end, Some(ts));
452    }
453
454    #[test]
455    fn test_generate_single_chunk_invalid_range_is_empty() {
456        let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
457        let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
458        let chunks = generate_single_chunk(&TimeRange::new(Some(start), Some(end)));
459
460        assert!(chunks.is_empty());
461    }
462
463    #[test]
464    fn test_manifest_full() {
465        let manifest = Manifest::new_full(
466            "greptime".to_string(),
467            vec!["public".to_string()],
468            TimeRange::unbounded(),
469            DataFormat::Parquet,
470        );
471
472        assert!(!manifest.schema_only);
473        assert!(manifest.chunks.is_empty());
474        assert!(!manifest.is_complete());
475    }
476
477    #[test]
478    fn test_data_format_parsing() {
479        assert_eq!(
480            "parquet".parse::<DataFormat>().unwrap(),
481            DataFormat::Parquet
482        );
483        assert_eq!("CSV".parse::<DataFormat>().unwrap(), DataFormat::Csv);
484        assert_eq!("JSON".parse::<DataFormat>().unwrap(), DataFormat::Json);
485        assert!("invalid".parse::<DataFormat>().is_err());
486    }
487
488    #[test]
489    fn test_chunk_status_transitions() {
490        let mut chunk = ChunkMeta::new(1, TimeRange::unbounded());
491        assert_eq!(chunk.status, ChunkStatus::Pending);
492
493        chunk.mark_in_progress();
494        assert_eq!(chunk.status, ChunkStatus::InProgress);
495
496        chunk.mark_completed(
497            vec!["file1.parquet".to_string()],
498            Some("abc123".to_string()),
499        );
500        assert_eq!(chunk.status, ChunkStatus::Completed);
501        assert_eq!(chunk.files.len(), 1);
502
503        chunk.mark_skipped();
504        assert_eq!(chunk.status, ChunkStatus::Skipped);
505        assert!(chunk.files.is_empty());
506    }
507
508    #[test]
509    fn test_manifest_is_complete_when_chunks_are_completed_or_skipped() {
510        let mut manifest = Manifest::new_full(
511            "greptime".to_string(),
512            vec!["public".to_string()],
513            TimeRange::unbounded(),
514            DataFormat::Parquet,
515        );
516        manifest.add_chunk(ChunkMeta::new(1, TimeRange::unbounded()));
517        manifest.add_chunk(ChunkMeta::new(2, TimeRange::unbounded()));
518
519        manifest.update_chunk(1, |chunk| {
520            chunk.mark_completed(vec!["a.parquet".to_string()], None)
521        });
522        manifest.update_chunk(2, |chunk| chunk.mark_skipped());
523
524        assert!(manifest.is_complete());
525        assert_eq!(manifest.completed_count(), 1);
526        assert_eq!(manifest.skipped_count(), 1);
527    }
528
529    #[test]
530    fn test_manifest_chunk_time_window_none_single_chunk() {
531        let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
532        let end = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
533        let range = TimeRange::new(Some(start), Some(end));
534        let manifest = Manifest::new_for_export(
535            "greptime".to_string(),
536            vec!["public".to_string()],
537            false,
538            range.clone(),
539            DataFormat::Parquet,
540            None,
541        )
542        .unwrap();
543
544        assert_eq!(manifest.chunks.len(), 1);
545        assert_eq!(manifest.chunks[0].time_range, range);
546    }
547
548    #[test]
549    fn test_time_range_parse_requires_order() {
550        let result = TimeRange::parse(Some("2025-01-02T00:00:00Z"), Some("2025-01-01T00:00:00Z"));
551        assert!(result.is_err());
552    }
553
554    #[test]
555    fn test_new_for_export_with_chunk_window_requires_bounded_range() {
556        let result = Manifest::new_for_export(
557            "greptime".to_string(),
558            vec!["public".to_string()],
559            false,
560            TimeRange::new(
561                None,
562                Some(Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
563            ),
564            DataFormat::Parquet,
565            Some(Duration::from_secs(3600)),
566        );
567        assert!(result.is_err());
568    }
569}