Skip to main content

cli/data/import_v2/
coordinator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::path::{Path, PathBuf};
17use std::time::Instant;
18
19use async_trait::async_trait;
20use common_telemetry::{info, warn};
21
22use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus};
23use crate::data::import_v2::error::{
24    ImportStateDdlIncompleteSnafu, ImportStateMismatchSnafu, Result,
25};
26use crate::data::import_v2::state::{
27    ImportState, ImportStateLockGuard, ImportTaskKey, ImportTaskStatus, canonical_schema_selection,
28    delete_import_state, load_import_state, save_import_state, try_acquire_import_state_lock,
29};
30use crate::data::path::data_dir_for_schema_chunk;
31
32#[async_trait]
33pub(crate) trait ImportTaskExecutor {
34    async fn import_task(&self, task: &ImportTaskKey) -> Result<()>;
35}
36
37pub(crate) struct ImportResumeConfig {
38    pub(crate) snapshot_id: String,
39    pub(crate) target_addr: String,
40    pub(crate) catalog: String,
41    pub(crate) schemas: Vec<String>,
42    pub(crate) state_path: PathBuf,
43    pub(crate) tasks: Vec<ImportTaskKey>,
44}
45
46pub(crate) struct ImportResumeSession {
47    config: ImportResumeConfig,
48    state: ImportState,
49    lock: ImportStateLockGuard,
50}
51
52impl ImportResumeSession {
53    pub(crate) fn should_skip_ddl(&self) -> bool {
54        self.state.ddl_completed
55    }
56
57    /// Marks DDL as completed and persists the state. Must be called after a
58    /// successful DDL run on a fresh session, so that crashes after this point
59    /// resume into the data-import phase instead of replaying DDL.
60    pub(crate) async fn mark_ddl_completed(&mut self) -> Result<()> {
61        self.state.mark_ddl_completed();
62        save_import_state(&self.config.state_path, &self.state).await
63    }
64}
65
66pub(crate) fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
67    let prefix = data_dir_for_schema_chunk(schema, chunk.id);
68    chunk.files.iter().any(|path| {
69        let normalized = path.trim_start_matches('/');
70        normalized.starts_with(&prefix)
71    })
72}
73
74pub(crate) fn build_import_tasks(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<ImportTaskKey> {
75    let mut tasks = Vec::new();
76    for chunk in chunks {
77        if chunk.status == ChunkStatus::Skipped {
78            continue;
79        }
80        // TODO: build a per-chunk schema index if chunk file manifests become large.
81        for schema in schemas {
82            if chunk_has_schema_files(chunk, schema) {
83                tasks.push(ImportTaskKey::new(chunk.id, schema.clone()));
84            }
85        }
86    }
87    tasks
88}
89
90pub(crate) async fn prepare_import_resume(
91    config: ImportResumeConfig,
92) -> Result<ImportResumeSession> {
93    // Validate the request before touching the state file or acquiring the
94    // lock. Duplicate task keys would corrupt the resume bookkeeping because
95    // status lookups use linear `find()` and only ever see the first match.
96    validate_config_tasks(&config)?;
97
98    let lock = try_acquire_import_state_lock(&config.state_path)?;
99    let state = match load_import_state(&config.state_path).await? {
100        Some(loaded) => {
101            validate_state_matches(&loaded, &config)?;
102            loaded
103        }
104        None => {
105            // Persist a fresh state immediately so that any crash after this
106            // point is recoverable as a resume. `ddl_completed=false` on a
107            // loaded state therefore means a previous run reached this point
108            // but did not confirm DDL completion - DDL must be (re-)run before
109            // data import is allowed.
110            let fresh = ImportState::new(
111                &config.snapshot_id,
112                &config.target_addr,
113                &config.catalog,
114                &config.schemas,
115                config.tasks.clone(),
116            );
117            save_import_state(&config.state_path, &fresh).await?;
118            fresh
119        }
120    };
121
122    Ok(ImportResumeSession {
123        config,
124        state,
125        lock,
126    })
127}
128
129pub(crate) async fn import_with_resume_session<E>(
130    session: ImportResumeSession,
131    executor: &E,
132) -> Result<()>
133where
134    E: ImportTaskExecutor + Sync,
135{
136    let ImportResumeSession {
137        config,
138        mut state,
139        lock,
140    } = session;
141
142    // The state machine requires DDL to be explicitly marked completed before
143    // data import; otherwise a caller could import data and leave a state that
144    // replays DDL on the next resume. Surface the misuse instead of silently
145    // importing.
146    if !state.ddl_completed {
147        return ImportStateDdlIncompleteSnafu {
148            path: config.state_path.display().to_string(),
149        }
150        .fail();
151    }
152
153    let completed = state
154        .tasks
155        .iter()
156        .filter(|task| task.status == ImportTaskStatus::Completed)
157        .count();
158    info!(
159        "Import resume state: {} completed, {} pending, path: {}",
160        completed,
161        state.tasks.len().saturating_sub(completed),
162        config.state_path.display()
163    );
164
165    let import_start = Instant::now();
166    for (idx, task) in config.tasks.iter().enumerate() {
167        if state.task_status(task.chunk_id, &task.schema) == Some(ImportTaskStatus::Completed) {
168            info!(
169                "[{}/{}] Chunk {} schema {}: already completed, skipped",
170                idx + 1,
171                config.tasks.len(),
172                task.chunk_id,
173                task.schema
174            );
175            continue;
176        }
177
178        info!(
179            "[{}/{}] Chunk {} schema {}: importing...",
180            idx + 1,
181            config.tasks.len(),
182            task.chunk_id,
183            task.schema
184        );
185        state.set_task_status(
186            task.chunk_id,
187            &task.schema,
188            ImportTaskStatus::InProgress,
189            None,
190        )?;
191        save_import_state(&config.state_path, &state).await?;
192
193        let task_start = Instant::now();
194        let result = executor.import_task(task).await;
195
196        match result {
197            Ok(()) => {
198                // The task itself succeeded. If we cannot persist the
199                // Completed marker, the next resume will replay it (potentially
200                // duplicating data depending on engine semantics), but we must
201                // not pretend the import as a whole failed - return the persist
202                // error so the operator notices, after logging the success.
203                update_status_and_save(
204                    &config,
205                    &mut state,
206                    task,
207                    ImportTaskStatus::Completed,
208                    None,
209                )
210                .await?;
211                info!(
212                    "[{}/{}] Chunk {} schema {}: done in {:?}",
213                    idx + 1,
214                    config.tasks.len(),
215                    task.chunk_id,
216                    task.schema,
217                    task_start.elapsed()
218                );
219            }
220            Err(task_error) => {
221                // Persist Failed best-effort, but always surface the original
222                // task error to the caller. State persistence problems are
223                // logged so they are not silently lost.
224                if let Err(persist_error) = update_status_and_save(
225                    &config,
226                    &mut state,
227                    task,
228                    ImportTaskStatus::Failed,
229                    Some(task_error.to_string()),
230                )
231                .await
232                {
233                    warn!(
234                        "Failed to persist Failed status for chunk {} schema {} after task error ({}); state file may be out of date: {}",
235                        task.chunk_id, task.schema, task_error, persist_error
236                    );
237                }
238                return Err(task_error);
239            }
240        }
241    }
242
243    delete_import_state(&config.state_path).await?;
244    info!("Data import finished in {:?}", import_start.elapsed());
245    drop(lock);
246    Ok(())
247}
248
249async fn update_status_and_save(
250    config: &ImportResumeConfig,
251    state: &mut ImportState,
252    task: &ImportTaskKey,
253    status: ImportTaskStatus,
254    error_message: Option<String>,
255) -> Result<()> {
256    // set_task_status only fails if the task isn't in the state; that would
257    // indicate a logic bug since `task` came from the same config. Surface it
258    // instead of swallowing.
259    state.set_task_status(task.chunk_id, &task.schema, status, error_message)?;
260    save_import_state(&config.state_path, state).await
261}
262
263fn validate_state_matches(state: &ImportState, config: &ImportResumeConfig) -> Result<()> {
264    if state.snapshot_id != config.snapshot_id {
265        return state_mismatch(
266            config,
267            format!(
268                "snapshot_id differs (state: {}, requested: {})",
269                state.snapshot_id, config.snapshot_id
270            ),
271        );
272    }
273    // Target addresses are compared literally; hostname normalization is left to the caller.
274    if state.target_addr != config.target_addr {
275        return state_mismatch(
276            config,
277            format!(
278                "target_addr differs (state: {}, requested: {})",
279                state.target_addr, config.target_addr
280            ),
281        );
282    }
283    if state.catalog != config.catalog {
284        return state_mismatch(
285            config,
286            format!(
287                "catalog differs (state: {}, requested: {})",
288                state.catalog, config.catalog
289            ),
290        );
291    }
292
293    let requested_schemas = canonical_schema_selection(&config.schemas);
294    if state.schemas != requested_schemas {
295        return state_mismatch(
296            config,
297            format!(
298                "schemas differ (state: {:?}, requested: {:?})",
299                state.schemas, requested_schemas
300            ),
301        );
302    }
303
304    if task_set_from_state(state, &config.state_path)? != task_set_from_config(config)? {
305        return state_mismatch(config, "task set differs".to_string());
306    }
307
308    Ok(())
309}
310
311fn state_mismatch(config: &ImportResumeConfig, reason: String) -> Result<()> {
312    ImportStateMismatchSnafu {
313        path: config.state_path.display().to_string(),
314        reason,
315    }
316    .fail()
317}
318
319fn task_set_from_state<'a>(
320    state: &'a ImportState,
321    state_path: &Path,
322) -> Result<BTreeSet<(u32, &'a str)>> {
323    let mut tasks = BTreeSet::new();
324    for task in &state.tasks {
325        if !tasks.insert((task.chunk_id, task.schema.as_str())) {
326            return ImportStateMismatchSnafu {
327                path: state_path.display().to_string(),
328                reason: format!(
329                    "duplicate task key in state (chunk_id: {}, schema: {})",
330                    task.chunk_id, task.schema
331                ),
332            }
333            .fail();
334        }
335    }
336    Ok(tasks)
337}
338
339fn task_set_from_config(config: &ImportResumeConfig) -> Result<BTreeSet<(u32, &str)>> {
340    let mut tasks = BTreeSet::new();
341    for task in &config.tasks {
342        if !tasks.insert((task.chunk_id, task.schema.as_str())) {
343            return ImportStateMismatchSnafu {
344                path: config.state_path.display().to_string(),
345                reason: format!(
346                    "duplicate task key in request (chunk_id: {}, schema: {})",
347                    task.chunk_id, task.schema
348                ),
349            }
350            .fail();
351        }
352    }
353    Ok(tasks)
354}
355
356fn validate_config_tasks(config: &ImportResumeConfig) -> Result<()> {
357    task_set_from_config(config).map(|_| ())
358}
359
360#[cfg(test)]
361mod tests {
362    use std::sync::atomic::{AtomicUsize, Ordering};
363    use std::sync::{Arc, Mutex};
364
365    use super::*;
366    use crate::data::export_v2::manifest::{ChunkMeta, TimeRange};
367    use crate::data::import_v2::error::TestTaskFailedSnafu;
368
369    #[derive(Debug, Clone, Copy)]
370    enum FailureMode {
371        Fatal,
372        RetryableThenSuccess { failures: usize },
373    }
374
375    struct RecordingExecutor {
376        imported: Arc<Mutex<Vec<ImportTaskKey>>>,
377        fail_task: Option<ImportTaskKey>,
378        failure_mode: Option<FailureMode>,
379        attempts: Arc<AtomicUsize>,
380    }
381
382    #[async_trait]
383    impl ImportTaskExecutor for RecordingExecutor {
384        async fn import_task(&self, task: &ImportTaskKey) -> Result<()> {
385            let attempt = self.attempts.fetch_add(1, Ordering::SeqCst);
386            if self.fail_task.as_ref() == Some(task) {
387                match self.failure_mode {
388                    Some(FailureMode::Fatal) => {
389                        return TestTaskFailedSnafu {
390                            message: "fatal failure".to_string(),
391                            retryable: false,
392                        }
393                        .fail();
394                    }
395                    Some(FailureMode::RetryableThenSuccess { failures }) if attempt < failures => {
396                        return TestTaskFailedSnafu {
397                            message: "retryable failure".to_string(),
398                            retryable: true,
399                        }
400                        .fail();
401                    }
402                    _ => {}
403                }
404            }
405            self.imported.lock().unwrap().push(task.clone());
406            Ok(())
407        }
408    }
409
410    fn recording_executor(imported: Arc<Mutex<Vec<ImportTaskKey>>>) -> RecordingExecutor {
411        RecordingExecutor {
412            imported,
413            fail_task: None,
414            failure_mode: None,
415            attempts: Arc::new(AtomicUsize::new(0)),
416        }
417    }
418
419    fn config(path: PathBuf, tasks: Vec<ImportTaskKey>) -> ImportResumeConfig {
420        ImportResumeConfig {
421            snapshot_id: "snapshot-1".to_string(),
422            target_addr: "127.0.0.1:4000".to_string(),
423            catalog: "greptime".to_string(),
424            schemas: vec!["public".to_string(), "analytics".to_string()],
425            state_path: path,
426            tasks,
427        }
428    }
429
430    async fn run_import_with_resume<E>(config: ImportResumeConfig, executor: &E) -> Result<()>
431    where
432        E: ImportTaskExecutor + Sync,
433    {
434        // Mirror the production caller: mark DDL completed for fresh sessions
435        // so the data-import guard is satisfied. Tests that want to exercise
436        // the unsafe path drive prepare/import directly.
437        let mut session = prepare_import_resume(config).await?;
438        if !session.should_skip_ddl() {
439            session.mark_ddl_completed().await?;
440        }
441        import_with_resume_session(session, executor).await
442    }
443
444    #[test]
445    fn test_build_import_tasks_skips_skipped_chunks_and_missing_schema_files() {
446        let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
447        completed.status = ChunkStatus::Completed;
448        completed.files = vec!["data/public/1/file.parquet".to_string()];
449        let mut skipped = ChunkMeta::new(2, TimeRange::unbounded());
450        skipped.status = ChunkStatus::Skipped;
451        skipped.files = vec!["data/public/2/file.parquet".to_string()];
452
453        let tasks = build_import_tasks(
454            &[completed, skipped],
455            &["public".to_string(), "analytics".to_string()],
456        );
457
458        assert_eq!(tasks, vec![ImportTaskKey::new(1, "public")]);
459    }
460
461    #[tokio::test]
462    async fn test_import_with_resume_skips_completed_tasks() {
463        let dir = tempfile::tempdir().unwrap();
464        let path = dir.path().join("import_state.json");
465        let tasks = vec![
466            ImportTaskKey::new(1, "public"),
467            ImportTaskKey::new(2, "analytics"),
468        ];
469        let mut state = ImportState::new(
470            "snapshot-1",
471            "127.0.0.1:4000",
472            "greptime",
473            &["public".to_string(), "analytics".to_string()],
474            tasks.clone(),
475        );
476        state.mark_ddl_completed();
477        state
478            .set_task_status(1, "public", ImportTaskStatus::Completed, None)
479            .unwrap();
480        save_import_state(&path, &state).await.unwrap();
481
482        let imported = Arc::new(Mutex::new(Vec::new()));
483        let executor = recording_executor(imported.clone());
484
485        run_import_with_resume(config(path.clone(), tasks), &executor)
486            .await
487            .unwrap();
488
489        assert_eq!(
490            imported.lock().unwrap().clone(),
491            vec![ImportTaskKey::new(2, "analytics")]
492        );
493        assert!(load_import_state(&path).await.unwrap().is_none());
494    }
495
496    #[tokio::test]
497    async fn test_import_with_resume_persists_failed_task() {
498        let dir = tempfile::tempdir().unwrap();
499        let path = dir.path().join("import_state.json");
500        let failed_task = ImportTaskKey::new(1, "public");
501        let tasks = vec![failed_task.clone()];
502        let imported = Arc::new(Mutex::new(Vec::new()));
503        let executor = RecordingExecutor {
504            imported,
505            fail_task: Some(failed_task.clone()),
506            failure_mode: Some(FailureMode::Fatal),
507            attempts: Arc::new(AtomicUsize::new(0)),
508        };
509
510        let error = run_import_with_resume(config(path.clone(), tasks), &executor)
511            .await
512            .unwrap_err();
513        assert!(matches!(
514            error,
515            crate::data::import_v2::error::Error::TestTaskFailed {
516                retryable: false,
517                ..
518            }
519        ));
520
521        let state = load_import_state(&path).await.unwrap().unwrap();
522        assert_eq!(
523            state.task_status(failed_task.chunk_id, &failed_task.schema),
524            Some(ImportTaskStatus::Failed)
525        );
526    }
527
528    #[tokio::test]
529    async fn test_import_with_resume_rejects_mismatched_state_identity() {
530        let dir = tempfile::tempdir().unwrap();
531        let path = dir.path().join("import_state.json");
532        let tasks = vec![ImportTaskKey::new(1, "public")];
533        let state = ImportState::new(
534            "snapshot-1",
535            "127.0.0.1:4001",
536            "greptime",
537            &["public".to_string(), "analytics".to_string()],
538            tasks.clone(),
539        );
540        save_import_state(&path, &state).await.unwrap();
541
542        let imported = Arc::new(Mutex::new(Vec::new()));
543        let executor = recording_executor(imported);
544
545        let error = run_import_with_resume(config(path, tasks), &executor)
546            .await
547            .unwrap_err();
548
549        assert!(matches!(
550            error,
551            crate::data::import_v2::error::Error::ImportStateMismatch { .. }
552        ));
553    }
554
555    #[tokio::test]
556    async fn test_prepare_import_resume_reports_existing_state_before_ddl() {
557        let dir = tempfile::tempdir().unwrap();
558        let tasks = vec![ImportTaskKey::new(1, "public")];
559
560        let fresh_session =
561            prepare_import_resume(config(dir.path().join("fresh_state.json"), tasks.clone()))
562                .await
563                .unwrap();
564        assert!(!fresh_session.should_skip_ddl());
565        drop(fresh_session);
566
567        let existing_path = dir.path().join("existing_state.json");
568        let mut state = ImportState::new(
569            "snapshot-1",
570            "127.0.0.1:4000",
571            "greptime",
572            &["public".to_string(), "analytics".to_string()],
573            tasks.clone(),
574        );
575        state.mark_ddl_completed();
576        save_import_state(&existing_path, &state).await.unwrap();
577
578        let resume_session = prepare_import_resume(config(existing_path, tasks))
579            .await
580            .unwrap();
581        assert!(resume_session.should_skip_ddl());
582    }
583
584    #[tokio::test]
585    async fn test_import_with_resume_rejects_duplicate_state_tasks() {
586        let dir = tempfile::tempdir().unwrap();
587        let path = dir.path().join("import_state.json");
588        let tasks = vec![ImportTaskKey::new(1, "public")];
589        let mut state = ImportState::new(
590            "snapshot-1",
591            "127.0.0.1:4000",
592            "greptime",
593            &["public".to_string(), "analytics".to_string()],
594            tasks.clone(),
595        );
596        state.tasks.push(state.tasks[0].clone());
597        save_import_state(&path, &state).await.unwrap();
598
599        let imported = Arc::new(Mutex::new(Vec::new()));
600        let executor = recording_executor(imported);
601
602        let error = run_import_with_resume(config(path, tasks), &executor)
603            .await
604            .unwrap_err();
605
606        assert!(matches!(
607            error,
608            crate::data::import_v2::error::Error::ImportStateMismatch { .. }
609        ));
610    }
611
612    #[tokio::test]
613    async fn test_import_with_resume_rejects_data_import_when_ddl_incomplete() {
614        let dir = tempfile::tempdir().unwrap();
615        let path = dir.path().join("import_state.json");
616        let tasks = vec![ImportTaskKey::new(1, "public")];
617
618        // prepare creates fresh state with ddl_completed=false; calling
619        // import_with_resume_session directly (without mark_ddl_completed)
620        // must be rejected.
621        let session = prepare_import_resume(config(path, tasks)).await.unwrap();
622        let imported = Arc::new(Mutex::new(Vec::new()));
623        let executor = recording_executor(imported.clone());
624
625        let error = import_with_resume_session(session, &executor)
626            .await
627            .unwrap_err();
628
629        assert!(matches!(
630            error,
631            crate::data::import_v2::error::Error::ImportStateDdlIncomplete { .. }
632        ));
633        assert!(imported.lock().unwrap().is_empty());
634    }
635
636    #[tokio::test]
637    async fn test_prepare_import_resume_rejects_duplicate_request_tasks_on_fresh_state() {
638        let dir = tempfile::tempdir().unwrap();
639        let path = dir.path().join("import_state.json");
640        let task = ImportTaskKey::new(1, "public");
641        // No state file yet - duplicate detection must run before the fresh
642        // state is persisted, otherwise corrupted bookkeeping would be
643        // written to disk and observed only on a later resume.
644        let error =
645            match prepare_import_resume(config(path.clone(), vec![task.clone(), task])).await {
646                Ok(_) => panic!("duplicate request tasks should be rejected"),
647                Err(error) => error,
648            };
649
650        assert!(matches!(
651            error,
652            crate::data::import_v2::error::Error::ImportStateMismatch { .. }
653        ));
654        assert!(load_import_state(&path).await.unwrap().is_none());
655    }
656
657    #[tokio::test]
658    async fn test_import_with_resume_does_not_retry_retryable_task_error() {
659        let dir = tempfile::tempdir().unwrap();
660        let path = dir.path().join("import_state.json");
661        let failed_task = ImportTaskKey::new(1, "public");
662        let tasks = vec![failed_task.clone()];
663        let imported = Arc::new(Mutex::new(Vec::new()));
664        let attempts = Arc::new(AtomicUsize::new(0));
665        let executor = RecordingExecutor {
666            imported: imported.clone(),
667            fail_task: Some(failed_task.clone()),
668            // If task import were retried, the second attempt would succeed.
669            // COPY DATABASE FROM failures are ambiguous, so retryable errors
670            // must still stop immediately to avoid duplicate rows.
671            failure_mode: Some(FailureMode::RetryableThenSuccess { failures: 1 }),
672            attempts: attempts.clone(),
673        };
674
675        let error = run_import_with_resume(config(path.clone(), tasks), &executor)
676            .await
677            .unwrap_err();
678
679        assert!(matches!(
680            error,
681            crate::data::import_v2::error::Error::TestTaskFailed {
682                retryable: true,
683                ..
684            }
685        ));
686        assert_eq!(attempts.load(Ordering::SeqCst), 1);
687        assert!(imported.lock().unwrap().is_empty());
688
689        let state = load_import_state(&path).await.unwrap().unwrap();
690        assert_eq!(
691            state.task_status(failed_task.chunk_id, &failed_task.schema),
692            Some(ImportTaskStatus::Failed)
693        );
694    }
695}