1use std::collections::BTreeSet;
16use std::path::{Path, PathBuf};
17use std::time::Instant;
18
19use async_trait::async_trait;
20use common_telemetry::{info, warn};
21
22use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus};
23use crate::data::import_v2::error::{
24 ImportStateDdlIncompleteSnafu, ImportStateMismatchSnafu, Result,
25};
26use crate::data::import_v2::state::{
27 ImportState, ImportStateLockGuard, ImportTaskKey, ImportTaskStatus, canonical_schema_selection,
28 delete_import_state, load_import_state, save_import_state, try_acquire_import_state_lock,
29};
30use crate::data::path::data_dir_for_schema_chunk;
31
32#[async_trait]
33pub(crate) trait ImportTaskExecutor {
34 async fn import_task(&self, task: &ImportTaskKey) -> Result<()>;
35}
36
37pub(crate) struct ImportResumeConfig {
38 pub(crate) snapshot_id: String,
39 pub(crate) target_addr: String,
40 pub(crate) catalog: String,
41 pub(crate) schemas: Vec<String>,
42 pub(crate) state_path: PathBuf,
43 pub(crate) tasks: Vec<ImportTaskKey>,
44}
45
46pub(crate) struct ImportResumeSession {
47 config: ImportResumeConfig,
48 state: ImportState,
49 lock: ImportStateLockGuard,
50}
51
52impl ImportResumeSession {
53 pub(crate) fn should_skip_ddl(&self) -> bool {
54 self.state.ddl_completed
55 }
56
57 pub(crate) async fn mark_ddl_completed(&mut self) -> Result<()> {
61 self.state.mark_ddl_completed();
62 save_import_state(&self.config.state_path, &self.state).await
63 }
64}
65
66pub(crate) fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
67 let prefix = data_dir_for_schema_chunk(schema, chunk.id);
68 chunk.files.iter().any(|path| {
69 let normalized = path.trim_start_matches('/');
70 normalized.starts_with(&prefix)
71 })
72}
73
74pub(crate) fn build_import_tasks(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<ImportTaskKey> {
75 let mut tasks = Vec::new();
76 for chunk in chunks {
77 if chunk.status == ChunkStatus::Skipped {
78 continue;
79 }
80 for schema in schemas {
82 if chunk_has_schema_files(chunk, schema) {
83 tasks.push(ImportTaskKey::new(chunk.id, schema.clone()));
84 }
85 }
86 }
87 tasks
88}
89
90pub(crate) async fn prepare_import_resume(
91 config: ImportResumeConfig,
92) -> Result<ImportResumeSession> {
93 validate_config_tasks(&config)?;
97
98 let lock = try_acquire_import_state_lock(&config.state_path)?;
99 let state = match load_import_state(&config.state_path).await? {
100 Some(loaded) => {
101 validate_state_matches(&loaded, &config)?;
102 loaded
103 }
104 None => {
105 let fresh = ImportState::new(
111 &config.snapshot_id,
112 &config.target_addr,
113 &config.catalog,
114 &config.schemas,
115 config.tasks.clone(),
116 );
117 save_import_state(&config.state_path, &fresh).await?;
118 fresh
119 }
120 };
121
122 Ok(ImportResumeSession {
123 config,
124 state,
125 lock,
126 })
127}
128
129pub(crate) async fn import_with_resume_session<E>(
130 session: ImportResumeSession,
131 executor: &E,
132) -> Result<()>
133where
134 E: ImportTaskExecutor + Sync,
135{
136 let ImportResumeSession {
137 config,
138 mut state,
139 lock,
140 } = session;
141
142 if !state.ddl_completed {
147 return ImportStateDdlIncompleteSnafu {
148 path: config.state_path.display().to_string(),
149 }
150 .fail();
151 }
152
153 let completed = state
154 .tasks
155 .iter()
156 .filter(|task| task.status == ImportTaskStatus::Completed)
157 .count();
158 info!(
159 "Import resume state: {} completed, {} pending, path: {}",
160 completed,
161 state.tasks.len().saturating_sub(completed),
162 config.state_path.display()
163 );
164
165 let import_start = Instant::now();
166 for (idx, task) in config.tasks.iter().enumerate() {
167 if state.task_status(task.chunk_id, &task.schema) == Some(ImportTaskStatus::Completed) {
168 info!(
169 "[{}/{}] Chunk {} schema {}: already completed, skipped",
170 idx + 1,
171 config.tasks.len(),
172 task.chunk_id,
173 task.schema
174 );
175 continue;
176 }
177
178 info!(
179 "[{}/{}] Chunk {} schema {}: importing...",
180 idx + 1,
181 config.tasks.len(),
182 task.chunk_id,
183 task.schema
184 );
185 state.set_task_status(
186 task.chunk_id,
187 &task.schema,
188 ImportTaskStatus::InProgress,
189 None,
190 )?;
191 save_import_state(&config.state_path, &state).await?;
192
193 let task_start = Instant::now();
194 let result = executor.import_task(task).await;
195
196 match result {
197 Ok(()) => {
198 update_status_and_save(
204 &config,
205 &mut state,
206 task,
207 ImportTaskStatus::Completed,
208 None,
209 )
210 .await?;
211 info!(
212 "[{}/{}] Chunk {} schema {}: done in {:?}",
213 idx + 1,
214 config.tasks.len(),
215 task.chunk_id,
216 task.schema,
217 task_start.elapsed()
218 );
219 }
220 Err(task_error) => {
221 if let Err(persist_error) = update_status_and_save(
225 &config,
226 &mut state,
227 task,
228 ImportTaskStatus::Failed,
229 Some(task_error.to_string()),
230 )
231 .await
232 {
233 warn!(
234 "Failed to persist Failed status for chunk {} schema {} after task error ({}); state file may be out of date: {}",
235 task.chunk_id, task.schema, task_error, persist_error
236 );
237 }
238 return Err(task_error);
239 }
240 }
241 }
242
243 delete_import_state(&config.state_path).await?;
244 info!("Data import finished in {:?}", import_start.elapsed());
245 drop(lock);
246 Ok(())
247}
248
249async fn update_status_and_save(
250 config: &ImportResumeConfig,
251 state: &mut ImportState,
252 task: &ImportTaskKey,
253 status: ImportTaskStatus,
254 error_message: Option<String>,
255) -> Result<()> {
256 state.set_task_status(task.chunk_id, &task.schema, status, error_message)?;
260 save_import_state(&config.state_path, state).await
261}
262
263fn validate_state_matches(state: &ImportState, config: &ImportResumeConfig) -> Result<()> {
264 if state.snapshot_id != config.snapshot_id {
265 return state_mismatch(
266 config,
267 format!(
268 "snapshot_id differs (state: {}, requested: {})",
269 state.snapshot_id, config.snapshot_id
270 ),
271 );
272 }
273 if state.target_addr != config.target_addr {
275 return state_mismatch(
276 config,
277 format!(
278 "target_addr differs (state: {}, requested: {})",
279 state.target_addr, config.target_addr
280 ),
281 );
282 }
283 if state.catalog != config.catalog {
284 return state_mismatch(
285 config,
286 format!(
287 "catalog differs (state: {}, requested: {})",
288 state.catalog, config.catalog
289 ),
290 );
291 }
292
293 let requested_schemas = canonical_schema_selection(&config.schemas);
294 if state.schemas != requested_schemas {
295 return state_mismatch(
296 config,
297 format!(
298 "schemas differ (state: {:?}, requested: {:?})",
299 state.schemas, requested_schemas
300 ),
301 );
302 }
303
304 if task_set_from_state(state, &config.state_path)? != task_set_from_config(config)? {
305 return state_mismatch(config, "task set differs".to_string());
306 }
307
308 Ok(())
309}
310
311fn state_mismatch(config: &ImportResumeConfig, reason: String) -> Result<()> {
312 ImportStateMismatchSnafu {
313 path: config.state_path.display().to_string(),
314 reason,
315 }
316 .fail()
317}
318
319fn task_set_from_state<'a>(
320 state: &'a ImportState,
321 state_path: &Path,
322) -> Result<BTreeSet<(u32, &'a str)>> {
323 let mut tasks = BTreeSet::new();
324 for task in &state.tasks {
325 if !tasks.insert((task.chunk_id, task.schema.as_str())) {
326 return ImportStateMismatchSnafu {
327 path: state_path.display().to_string(),
328 reason: format!(
329 "duplicate task key in state (chunk_id: {}, schema: {})",
330 task.chunk_id, task.schema
331 ),
332 }
333 .fail();
334 }
335 }
336 Ok(tasks)
337}
338
339fn task_set_from_config(config: &ImportResumeConfig) -> Result<BTreeSet<(u32, &str)>> {
340 let mut tasks = BTreeSet::new();
341 for task in &config.tasks {
342 if !tasks.insert((task.chunk_id, task.schema.as_str())) {
343 return ImportStateMismatchSnafu {
344 path: config.state_path.display().to_string(),
345 reason: format!(
346 "duplicate task key in request (chunk_id: {}, schema: {})",
347 task.chunk_id, task.schema
348 ),
349 }
350 .fail();
351 }
352 }
353 Ok(tasks)
354}
355
356fn validate_config_tasks(config: &ImportResumeConfig) -> Result<()> {
357 task_set_from_config(config).map(|_| ())
358}
359
360#[cfg(test)]
361mod tests {
362 use std::sync::atomic::{AtomicUsize, Ordering};
363 use std::sync::{Arc, Mutex};
364
365 use super::*;
366 use crate::data::export_v2::manifest::{ChunkMeta, TimeRange};
367 use crate::data::import_v2::error::TestTaskFailedSnafu;
368
369 #[derive(Debug, Clone, Copy)]
370 enum FailureMode {
371 Fatal,
372 RetryableThenSuccess { failures: usize },
373 }
374
375 struct RecordingExecutor {
376 imported: Arc<Mutex<Vec<ImportTaskKey>>>,
377 fail_task: Option<ImportTaskKey>,
378 failure_mode: Option<FailureMode>,
379 attempts: Arc<AtomicUsize>,
380 }
381
382 #[async_trait]
383 impl ImportTaskExecutor for RecordingExecutor {
384 async fn import_task(&self, task: &ImportTaskKey) -> Result<()> {
385 let attempt = self.attempts.fetch_add(1, Ordering::SeqCst);
386 if self.fail_task.as_ref() == Some(task) {
387 match self.failure_mode {
388 Some(FailureMode::Fatal) => {
389 return TestTaskFailedSnafu {
390 message: "fatal failure".to_string(),
391 retryable: false,
392 }
393 .fail();
394 }
395 Some(FailureMode::RetryableThenSuccess { failures }) if attempt < failures => {
396 return TestTaskFailedSnafu {
397 message: "retryable failure".to_string(),
398 retryable: true,
399 }
400 .fail();
401 }
402 _ => {}
403 }
404 }
405 self.imported.lock().unwrap().push(task.clone());
406 Ok(())
407 }
408 }
409
410 fn recording_executor(imported: Arc<Mutex<Vec<ImportTaskKey>>>) -> RecordingExecutor {
411 RecordingExecutor {
412 imported,
413 fail_task: None,
414 failure_mode: None,
415 attempts: Arc::new(AtomicUsize::new(0)),
416 }
417 }
418
419 fn config(path: PathBuf, tasks: Vec<ImportTaskKey>) -> ImportResumeConfig {
420 ImportResumeConfig {
421 snapshot_id: "snapshot-1".to_string(),
422 target_addr: "127.0.0.1:4000".to_string(),
423 catalog: "greptime".to_string(),
424 schemas: vec!["public".to_string(), "analytics".to_string()],
425 state_path: path,
426 tasks,
427 }
428 }
429
430 async fn run_import_with_resume<E>(config: ImportResumeConfig, executor: &E) -> Result<()>
431 where
432 E: ImportTaskExecutor + Sync,
433 {
434 let mut session = prepare_import_resume(config).await?;
438 if !session.should_skip_ddl() {
439 session.mark_ddl_completed().await?;
440 }
441 import_with_resume_session(session, executor).await
442 }
443
444 #[test]
445 fn test_build_import_tasks_skips_skipped_chunks_and_missing_schema_files() {
446 let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
447 completed.status = ChunkStatus::Completed;
448 completed.files = vec!["data/public/1/file.parquet".to_string()];
449 let mut skipped = ChunkMeta::new(2, TimeRange::unbounded());
450 skipped.status = ChunkStatus::Skipped;
451 skipped.files = vec!["data/public/2/file.parquet".to_string()];
452
453 let tasks = build_import_tasks(
454 &[completed, skipped],
455 &["public".to_string(), "analytics".to_string()],
456 );
457
458 assert_eq!(tasks, vec![ImportTaskKey::new(1, "public")]);
459 }
460
461 #[tokio::test]
462 async fn test_import_with_resume_skips_completed_tasks() {
463 let dir = tempfile::tempdir().unwrap();
464 let path = dir.path().join("import_state.json");
465 let tasks = vec![
466 ImportTaskKey::new(1, "public"),
467 ImportTaskKey::new(2, "analytics"),
468 ];
469 let mut state = ImportState::new(
470 "snapshot-1",
471 "127.0.0.1:4000",
472 "greptime",
473 &["public".to_string(), "analytics".to_string()],
474 tasks.clone(),
475 );
476 state.mark_ddl_completed();
477 state
478 .set_task_status(1, "public", ImportTaskStatus::Completed, None)
479 .unwrap();
480 save_import_state(&path, &state).await.unwrap();
481
482 let imported = Arc::new(Mutex::new(Vec::new()));
483 let executor = recording_executor(imported.clone());
484
485 run_import_with_resume(config(path.clone(), tasks), &executor)
486 .await
487 .unwrap();
488
489 assert_eq!(
490 imported.lock().unwrap().clone(),
491 vec![ImportTaskKey::new(2, "analytics")]
492 );
493 assert!(load_import_state(&path).await.unwrap().is_none());
494 }
495
496 #[tokio::test]
497 async fn test_import_with_resume_persists_failed_task() {
498 let dir = tempfile::tempdir().unwrap();
499 let path = dir.path().join("import_state.json");
500 let failed_task = ImportTaskKey::new(1, "public");
501 let tasks = vec![failed_task.clone()];
502 let imported = Arc::new(Mutex::new(Vec::new()));
503 let executor = RecordingExecutor {
504 imported,
505 fail_task: Some(failed_task.clone()),
506 failure_mode: Some(FailureMode::Fatal),
507 attempts: Arc::new(AtomicUsize::new(0)),
508 };
509
510 let error = run_import_with_resume(config(path.clone(), tasks), &executor)
511 .await
512 .unwrap_err();
513 assert!(matches!(
514 error,
515 crate::data::import_v2::error::Error::TestTaskFailed {
516 retryable: false,
517 ..
518 }
519 ));
520
521 let state = load_import_state(&path).await.unwrap().unwrap();
522 assert_eq!(
523 state.task_status(failed_task.chunk_id, &failed_task.schema),
524 Some(ImportTaskStatus::Failed)
525 );
526 }
527
528 #[tokio::test]
529 async fn test_import_with_resume_rejects_mismatched_state_identity() {
530 let dir = tempfile::tempdir().unwrap();
531 let path = dir.path().join("import_state.json");
532 let tasks = vec![ImportTaskKey::new(1, "public")];
533 let state = ImportState::new(
534 "snapshot-1",
535 "127.0.0.1:4001",
536 "greptime",
537 &["public".to_string(), "analytics".to_string()],
538 tasks.clone(),
539 );
540 save_import_state(&path, &state).await.unwrap();
541
542 let imported = Arc::new(Mutex::new(Vec::new()));
543 let executor = recording_executor(imported);
544
545 let error = run_import_with_resume(config(path, tasks), &executor)
546 .await
547 .unwrap_err();
548
549 assert!(matches!(
550 error,
551 crate::data::import_v2::error::Error::ImportStateMismatch { .. }
552 ));
553 }
554
555 #[tokio::test]
556 async fn test_prepare_import_resume_reports_existing_state_before_ddl() {
557 let dir = tempfile::tempdir().unwrap();
558 let tasks = vec![ImportTaskKey::new(1, "public")];
559
560 let fresh_session =
561 prepare_import_resume(config(dir.path().join("fresh_state.json"), tasks.clone()))
562 .await
563 .unwrap();
564 assert!(!fresh_session.should_skip_ddl());
565 drop(fresh_session);
566
567 let existing_path = dir.path().join("existing_state.json");
568 let mut state = ImportState::new(
569 "snapshot-1",
570 "127.0.0.1:4000",
571 "greptime",
572 &["public".to_string(), "analytics".to_string()],
573 tasks.clone(),
574 );
575 state.mark_ddl_completed();
576 save_import_state(&existing_path, &state).await.unwrap();
577
578 let resume_session = prepare_import_resume(config(existing_path, tasks))
579 .await
580 .unwrap();
581 assert!(resume_session.should_skip_ddl());
582 }
583
584 #[tokio::test]
585 async fn test_import_with_resume_rejects_duplicate_state_tasks() {
586 let dir = tempfile::tempdir().unwrap();
587 let path = dir.path().join("import_state.json");
588 let tasks = vec![ImportTaskKey::new(1, "public")];
589 let mut state = ImportState::new(
590 "snapshot-1",
591 "127.0.0.1:4000",
592 "greptime",
593 &["public".to_string(), "analytics".to_string()],
594 tasks.clone(),
595 );
596 state.tasks.push(state.tasks[0].clone());
597 save_import_state(&path, &state).await.unwrap();
598
599 let imported = Arc::new(Mutex::new(Vec::new()));
600 let executor = recording_executor(imported);
601
602 let error = run_import_with_resume(config(path, tasks), &executor)
603 .await
604 .unwrap_err();
605
606 assert!(matches!(
607 error,
608 crate::data::import_v2::error::Error::ImportStateMismatch { .. }
609 ));
610 }
611
612 #[tokio::test]
613 async fn test_import_with_resume_rejects_data_import_when_ddl_incomplete() {
614 let dir = tempfile::tempdir().unwrap();
615 let path = dir.path().join("import_state.json");
616 let tasks = vec![ImportTaskKey::new(1, "public")];
617
618 let session = prepare_import_resume(config(path, tasks)).await.unwrap();
622 let imported = Arc::new(Mutex::new(Vec::new()));
623 let executor = recording_executor(imported.clone());
624
625 let error = import_with_resume_session(session, &executor)
626 .await
627 .unwrap_err();
628
629 assert!(matches!(
630 error,
631 crate::data::import_v2::error::Error::ImportStateDdlIncomplete { .. }
632 ));
633 assert!(imported.lock().unwrap().is_empty());
634 }
635
636 #[tokio::test]
637 async fn test_prepare_import_resume_rejects_duplicate_request_tasks_on_fresh_state() {
638 let dir = tempfile::tempdir().unwrap();
639 let path = dir.path().join("import_state.json");
640 let task = ImportTaskKey::new(1, "public");
641 let error =
645 match prepare_import_resume(config(path.clone(), vec![task.clone(), task])).await {
646 Ok(_) => panic!("duplicate request tasks should be rejected"),
647 Err(error) => error,
648 };
649
650 assert!(matches!(
651 error,
652 crate::data::import_v2::error::Error::ImportStateMismatch { .. }
653 ));
654 assert!(load_import_state(&path).await.unwrap().is_none());
655 }
656
657 #[tokio::test]
658 async fn test_import_with_resume_does_not_retry_retryable_task_error() {
659 let dir = tempfile::tempdir().unwrap();
660 let path = dir.path().join("import_state.json");
661 let failed_task = ImportTaskKey::new(1, "public");
662 let tasks = vec![failed_task.clone()];
663 let imported = Arc::new(Mutex::new(Vec::new()));
664 let attempts = Arc::new(AtomicUsize::new(0));
665 let executor = RecordingExecutor {
666 imported: imported.clone(),
667 fail_task: Some(failed_task.clone()),
668 failure_mode: Some(FailureMode::RetryableThenSuccess { failures: 1 }),
672 attempts: attempts.clone(),
673 };
674
675 let error = run_import_with_resume(config(path.clone(), tasks), &executor)
676 .await
677 .unwrap_err();
678
679 assert!(matches!(
680 error,
681 crate::data::import_v2::error::Error::TestTaskFailed {
682 retryable: true,
683 ..
684 }
685 ));
686 assert_eq!(attempts.load(Ordering::SeqCst), 1);
687 assert!(imported.lock().unwrap().is_empty());
688
689 let state = load_import_state(&path).await.unwrap().unwrap();
690 assert_eq!(
691 state.task_status(failed_task.chunk_id, &failed_task.schema),
692 Some(ImportTaskStatus::Failed)
693 );
694 }
695}