1use std::time::Duration;
18use std::{fmt, str};
19
20use chrono::{DateTime, Utc};
21use serde::{Deserialize, Serialize};
22use uuid::Uuid;
23
24use crate::data::export_v2::chunker::generate_chunks;
25use crate::data::export_v2::error::{
26 ChunkTimeWindowRequiresBoundsSnafu, Result as ExportResult, TimeParseEndBeforeStartSnafu,
27 TimeParseInvalidFormatSnafu,
28};
29
30pub const MANIFEST_VERSION: u32 = 1;
32
33pub const MANIFEST_FILE: &str = "manifest.json";
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38pub struct TimeRange {
39 #[serde(skip_serializing_if = "Option::is_none")]
41 pub start: Option<DateTime<Utc>>,
42 #[serde(skip_serializing_if = "Option::is_none")]
44 pub end: Option<DateTime<Utc>>,
45}
46
47impl TimeRange {
48 pub fn new(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> Self {
50 Self { start, end }
51 }
52
53 pub fn unbounded() -> Self {
55 Self {
56 start: None,
57 end: None,
58 }
59 }
60
61 pub fn is_unbounded(&self) -> bool {
63 self.start.is_none() && self.end.is_none()
64 }
65
66 pub fn is_bounded(&self) -> bool {
68 self.start.is_some() && self.end.is_some()
69 }
70
71 pub fn parse(start: Option<&str>, end: Option<&str>) -> ExportResult<Self> {
73 let start = start.map(parse_time).transpose()?;
74 let end = end.map(parse_time).transpose()?;
75
76 if let (Some(start), Some(end)) = (start, end)
77 && end < start
78 {
79 return TimeParseEndBeforeStartSnafu.fail();
80 }
81
82 Ok(Self::new(start, end))
83 }
84}
85
86fn parse_time(input: &str) -> ExportResult<DateTime<Utc>> {
87 DateTime::parse_from_rfc3339(input)
88 .map(|dt| dt.with_timezone(&Utc))
89 .map_err(|_| TimeParseInvalidFormatSnafu { input }.build())
90}
91
92impl Default for TimeRange {
93 fn default() -> Self {
94 Self::unbounded()
95 }
96}
97
98#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
100#[serde(rename_all = "snake_case")]
101pub enum ChunkStatus {
102 #[default]
104 Pending,
105 InProgress,
107 Completed,
109 Skipped,
111 Failed,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct ChunkMeta {
118 pub id: u32,
120 pub time_range: TimeRange,
122 pub status: ChunkStatus,
124 #[serde(default)]
126 pub files: Vec<String>,
127 #[serde(skip_serializing_if = "Option::is_none")]
129 pub checksum: Option<String>,
130 #[serde(skip_serializing_if = "Option::is_none")]
132 pub error: Option<String>,
133}
134
135impl ChunkMeta {
136 pub fn new(id: u32, time_range: TimeRange) -> Self {
138 Self {
139 id,
140 time_range,
141 status: ChunkStatus::Pending,
142 files: vec![],
143 checksum: None,
144 error: None,
145 }
146 }
147
148 pub fn skipped(id: u32, time_range: TimeRange) -> Self {
150 let mut chunk = Self::new(id, time_range);
151 chunk.mark_skipped();
152 chunk
153 }
154
155 pub fn mark_in_progress(&mut self) {
157 self.status = ChunkStatus::InProgress;
158 self.error = None;
159 }
160
161 pub fn mark_completed(&mut self, files: Vec<String>, checksum: Option<String>) {
163 self.status = ChunkStatus::Completed;
164 self.files = files;
165 self.checksum = checksum;
166 self.error = None;
167 }
168
169 pub fn mark_skipped(&mut self) {
171 self.status = ChunkStatus::Skipped;
172 self.files.clear();
173 self.checksum = None;
174 self.error = None;
175 }
176
177 pub fn mark_failed(&mut self, error: String) {
179 self.status = ChunkStatus::Failed;
180 self.error = Some(error);
181 }
182}
183
184#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default, clap::ValueEnum)]
186#[serde(rename_all = "lowercase")]
187#[value(rename_all = "lowercase")]
188pub enum DataFormat {
189 #[default]
191 Parquet,
192 Csv,
194 Json,
196}
197
198impl fmt::Display for DataFormat {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 match self {
201 DataFormat::Parquet => write!(f, "parquet"),
202 DataFormat::Csv => write!(f, "csv"),
203 DataFormat::Json => write!(f, "json"),
204 }
205 }
206}
207
208impl str::FromStr for DataFormat {
209 type Err = String;
210
211 fn from_str(s: &str) -> Result<Self, Self::Err> {
212 match s.to_lowercase().as_str() {
213 "parquet" => Ok(DataFormat::Parquet),
214 "csv" => Ok(DataFormat::Csv),
215 "json" => Ok(DataFormat::Json),
216 _ => Err(format!(
217 "invalid format '{}': expected one of parquet, csv, json",
218 s
219 )),
220 }
221 }
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct Manifest {
235 pub version: u32,
237 pub snapshot_id: Uuid,
239 pub catalog: String,
241 pub schemas: Vec<String>,
243 pub time_range: TimeRange,
245 pub schema_only: bool,
247 pub format: DataFormat,
249 #[serde(default)]
251 pub chunks: Vec<ChunkMeta>,
252 #[serde(skip_serializing_if = "Option::is_none")]
254 pub checksum: Option<String>,
255 pub created_at: DateTime<Utc>,
257 pub updated_at: DateTime<Utc>,
259}
260
261impl Manifest {
262 pub fn new_for_export(
263 catalog: String,
264 schemas: Vec<String>,
265 schema_only: bool,
266 time_range: TimeRange,
267 format: DataFormat,
268 chunk_time_window: Option<Duration>,
269 ) -> ExportResult<Self> {
270 if chunk_time_window.is_some() && !time_range.is_bounded() {
271 return ChunkTimeWindowRequiresBoundsSnafu.fail();
272 }
273
274 let mut manifest = if schema_only {
275 Self::new_schema_only(catalog, schemas)
276 } else {
277 Self::new_full(catalog, schemas, time_range, format)
278 };
279
280 if !schema_only {
281 manifest.chunks = match chunk_time_window {
282 Some(window) => generate_chunks(&manifest.time_range, window),
283 None => generate_single_chunk(&manifest.time_range),
284 };
285 manifest.touch();
286 }
287
288 Ok(manifest)
289 }
290
291 pub fn new_schema_only(catalog: String, schemas: Vec<String>) -> Self {
293 let now = Utc::now();
294 Self {
295 version: MANIFEST_VERSION,
296 snapshot_id: Uuid::new_v4(),
297 catalog,
298 schemas,
299 time_range: TimeRange::unbounded(),
300 schema_only: true,
301 format: DataFormat::Parquet,
302 chunks: vec![],
303 checksum: None,
304 created_at: now,
305 updated_at: now,
306 }
307 }
308
309 pub fn new_full(
311 catalog: String,
312 schemas: Vec<String>,
313 time_range: TimeRange,
314 format: DataFormat,
315 ) -> Self {
316 let now = Utc::now();
317 Self {
318 version: MANIFEST_VERSION,
319 snapshot_id: Uuid::new_v4(),
320 catalog,
321 schemas,
322 time_range,
323 schema_only: false,
324 format,
325 chunks: vec![],
326 checksum: None,
327 created_at: now,
328 updated_at: now,
329 }
330 }
331
332 pub fn is_complete(&self) -> bool {
334 self.schema_only
335 || (!self.chunks.is_empty()
336 && self
337 .chunks
338 .iter()
339 .all(|c| matches!(c.status, ChunkStatus::Completed | ChunkStatus::Skipped)))
340 }
341
342 pub fn pending_count(&self) -> usize {
344 self.chunks
345 .iter()
346 .filter(|c| c.status == ChunkStatus::Pending)
347 .count()
348 }
349
350 pub fn in_progress_count(&self) -> usize {
352 self.chunks
353 .iter()
354 .filter(|c| c.status == ChunkStatus::InProgress)
355 .count()
356 }
357
358 pub fn completed_count(&self) -> usize {
360 self.chunks
361 .iter()
362 .filter(|c| c.status == ChunkStatus::Completed)
363 .count()
364 }
365
366 pub fn skipped_count(&self) -> usize {
368 self.chunks
369 .iter()
370 .filter(|c| c.status == ChunkStatus::Skipped)
371 .count()
372 }
373
374 pub fn failed_count(&self) -> usize {
376 self.chunks
377 .iter()
378 .filter(|c| c.status == ChunkStatus::Failed)
379 .count()
380 }
381
382 pub fn touch(&mut self) {
384 self.updated_at = Utc::now();
385 }
386
387 pub fn add_chunk(&mut self, chunk: ChunkMeta) {
389 self.chunks.push(chunk);
390 self.touch();
391 }
392
393 pub fn update_chunk(&mut self, id: u32, updater: impl FnOnce(&mut ChunkMeta)) {
395 if let Some(chunk) = self.chunks.iter_mut().find(|c| c.id == id) {
396 updater(chunk);
397 self.touch();
398 }
399 }
400}
401
402fn generate_single_chunk(time_range: &TimeRange) -> Vec<ChunkMeta> {
403 if let (Some(start), Some(end)) = (time_range.start, time_range.end) {
404 if start == end {
405 return vec![ChunkMeta::skipped(1, time_range.clone())];
406 }
407 if start > end {
408 return Vec::new();
409 }
410 }
411 vec![ChunkMeta::new(1, time_range.clone())]
412}
413
414#[cfg(test)]
415mod tests {
416 use std::time::Duration;
417
418 use chrono::{TimeZone, Utc};
419
420 use super::*;
421
422 #[test]
423 fn test_time_range_serialization() {
424 let range = TimeRange::unbounded();
425 let json = serde_json::to_string(&range).unwrap();
426 assert_eq!(json, "{}");
427
428 let range: TimeRange = serde_json::from_str("{}").unwrap();
429 assert!(range.is_unbounded());
430 }
431
432 #[test]
433 fn test_manifest_schema_only() {
434 let manifest =
435 Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]);
436
437 assert_eq!(manifest.version, MANIFEST_VERSION);
438 assert!(manifest.schema_only);
439 assert!(manifest.chunks.is_empty());
440 assert!(manifest.is_complete());
441 }
442
443 #[test]
444 fn test_generate_single_chunk_zero_width_range_is_skipped() {
445 let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
446 let chunks = generate_single_chunk(&TimeRange::new(Some(ts), Some(ts)));
447
448 assert_eq!(chunks.len(), 1);
449 assert_eq!(chunks[0].status, ChunkStatus::Skipped);
450 assert_eq!(chunks[0].time_range.start, Some(ts));
451 assert_eq!(chunks[0].time_range.end, Some(ts));
452 }
453
454 #[test]
455 fn test_generate_single_chunk_invalid_range_is_empty() {
456 let start = Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
457 let end = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
458 let chunks = generate_single_chunk(&TimeRange::new(Some(start), Some(end)));
459
460 assert!(chunks.is_empty());
461 }
462
463 #[test]
464 fn test_manifest_full() {
465 let manifest = Manifest::new_full(
466 "greptime".to_string(),
467 vec!["public".to_string()],
468 TimeRange::unbounded(),
469 DataFormat::Parquet,
470 );
471
472 assert!(!manifest.schema_only);
473 assert!(manifest.chunks.is_empty());
474 assert!(!manifest.is_complete());
475 }
476
477 #[test]
478 fn test_data_format_parsing() {
479 assert_eq!(
480 "parquet".parse::<DataFormat>().unwrap(),
481 DataFormat::Parquet
482 );
483 assert_eq!("CSV".parse::<DataFormat>().unwrap(), DataFormat::Csv);
484 assert_eq!("JSON".parse::<DataFormat>().unwrap(), DataFormat::Json);
485 assert!("invalid".parse::<DataFormat>().is_err());
486 }
487
488 #[test]
489 fn test_chunk_status_transitions() {
490 let mut chunk = ChunkMeta::new(1, TimeRange::unbounded());
491 assert_eq!(chunk.status, ChunkStatus::Pending);
492
493 chunk.mark_in_progress();
494 assert_eq!(chunk.status, ChunkStatus::InProgress);
495
496 chunk.mark_completed(
497 vec!["file1.parquet".to_string()],
498 Some("abc123".to_string()),
499 );
500 assert_eq!(chunk.status, ChunkStatus::Completed);
501 assert_eq!(chunk.files.len(), 1);
502
503 chunk.mark_skipped();
504 assert_eq!(chunk.status, ChunkStatus::Skipped);
505 assert!(chunk.files.is_empty());
506 }
507
508 #[test]
509 fn test_manifest_is_complete_when_chunks_are_completed_or_skipped() {
510 let mut manifest = Manifest::new_full(
511 "greptime".to_string(),
512 vec!["public".to_string()],
513 TimeRange::unbounded(),
514 DataFormat::Parquet,
515 );
516 manifest.add_chunk(ChunkMeta::new(1, TimeRange::unbounded()));
517 manifest.add_chunk(ChunkMeta::new(2, TimeRange::unbounded()));
518
519 manifest.update_chunk(1, |chunk| {
520 chunk.mark_completed(vec!["a.parquet".to_string()], None)
521 });
522 manifest.update_chunk(2, |chunk| chunk.mark_skipped());
523
524 assert!(manifest.is_complete());
525 assert_eq!(manifest.completed_count(), 1);
526 assert_eq!(manifest.skipped_count(), 1);
527 }
528
529 #[test]
530 fn test_manifest_chunk_time_window_none_single_chunk() {
531 let start = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
532 let end = Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap();
533 let range = TimeRange::new(Some(start), Some(end));
534 let manifest = Manifest::new_for_export(
535 "greptime".to_string(),
536 vec!["public".to_string()],
537 false,
538 range.clone(),
539 DataFormat::Parquet,
540 None,
541 )
542 .unwrap();
543
544 assert_eq!(manifest.chunks.len(), 1);
545 assert_eq!(manifest.chunks[0].time_range, range);
546 }
547
548 #[test]
549 fn test_time_range_parse_requires_order() {
550 let result = TimeRange::parse(Some("2025-01-02T00:00:00Z"), Some("2025-01-01T00:00:00Z"));
551 assert!(result.is_err());
552 }
553
554 #[test]
555 fn test_new_for_export_with_chunk_window_requires_bounded_range() {
556 let result = Manifest::new_for_export(
557 "greptime".to_string(),
558 vec!["public".to_string()],
559 false,
560 TimeRange::new(
561 None,
562 Some(Utc.with_ymd_and_hms(2025, 1, 2, 0, 0, 0).unwrap()),
563 ),
564 DataFormat::Parquet,
565 Some(Duration::from_secs(3600)),
566 );
567 assert!(result.is_err());
568 }
569}