Skip to main content

cli/data/import_v2/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(dead_code)]
16
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicU64, Ordering};
19
20use chrono::{DateTime, Utc};
21use fs2::FileExt;
22use serde::{Deserialize, Serialize};
23use snafu::{IntoError, OptionExt, ResultExt};
24use tokio::io::AsyncWriteExt;
25
26use crate::data::import_v2::error::{
27    ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu,
28    ImportStateUnknownChunkSnafu, Result,
29};
30use crate::data::path::encode_path_segment;
31
32const IMPORT_STATE_ROOT: &str = ".greptime";
33const IMPORT_STATE_DIR: &str = "import_state";
34static IMPORT_STATE_TMP_ID: AtomicU64 = AtomicU64::new(0);
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub(crate) enum ImportChunkStatus {
39    Pending,
40    InProgress,
41    Completed,
42    Failed,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub(crate) struct ImportChunkState {
47    pub(crate) id: u32,
48    pub(crate) status: ImportChunkStatus,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub(crate) error: Option<String>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub(crate) struct ImportState {
55    pub(crate) snapshot_id: String,
56    pub(crate) target_addr: String,
57    pub(crate) updated_at: DateTime<Utc>,
58    // Chunk counts are expected to stay below ~1000, so linear scans are acceptable here.
59    pub(crate) chunks: Vec<ImportChunkState>,
60}
61
62impl ImportState {
63    pub(crate) fn new<I>(
64        snapshot_id: impl Into<String>,
65        target_addr: impl Into<String>,
66        chunk_ids: I,
67    ) -> Self
68    where
69        I: IntoIterator<Item = u32>,
70    {
71        Self {
72            snapshot_id: snapshot_id.into(),
73            target_addr: target_addr.into(),
74            updated_at: Utc::now(),
75            chunks: chunk_ids
76                .into_iter()
77                .map(|id| ImportChunkState {
78                    id,
79                    status: ImportChunkStatus::Pending,
80                    error: None,
81                })
82                .collect(),
83        }
84    }
85
86    pub(crate) fn chunk_status(&self, chunk_id: u32) -> Option<ImportChunkStatus> {
87        self.chunks
88            .iter()
89            .find(|chunk| chunk.id == chunk_id)
90            .map(|chunk| chunk.status.clone())
91    }
92
93    pub(crate) fn set_chunk_status(
94        &mut self,
95        chunk_id: u32,
96        status: ImportChunkStatus,
97        error: Option<String>,
98    ) -> Result<()> {
99        let chunk = self
100            .chunks
101            .iter_mut()
102            .find(|chunk| chunk.id == chunk_id)
103            .context(ImportStateUnknownChunkSnafu { chunk_id })?;
104        chunk.status = status;
105        chunk.error = error;
106        self.updated_at = Utc::now();
107        Ok(())
108    }
109}
110
111#[derive(Debug)]
112pub(crate) struct ImportStateLockGuard {
113    file: std::fs::File,
114}
115
116impl Drop for ImportStateLockGuard {
117    fn drop(&mut self) {
118        let _ = self.file.unlock();
119    }
120}
121
122pub(crate) fn default_state_path(snapshot_id: &str, target_addr: &str) -> Option<PathBuf> {
123    let home = default_home_dir_with(|key| std::env::var_os(key));
124    let cwd = std::env::current_dir().ok();
125    default_state_path_with(home.as_deref(), cwd.as_deref(), snapshot_id, target_addr)
126}
127
128fn default_home_dir_with<F>(get: F) -> Option<PathBuf>
129where
130    F: for<'a> Fn(&'a str) -> Option<std::ffi::OsString>,
131{
132    get("HOME")
133        .or_else(|| get("USERPROFILE"))
134        .map(PathBuf::from)
135        .or_else(|| {
136            let drive = get("HOMEDRIVE")?;
137            let path = get("HOMEPATH")?;
138            Some(PathBuf::from(drive).join(path))
139        })
140}
141
142fn default_state_path_with(
143    home: Option<&Path>,
144    cwd: Option<&Path>,
145    snapshot_id: &str,
146    target_addr: &str,
147) -> Option<PathBuf> {
148    let file_name = import_state_file_name(snapshot_id, target_addr);
149    match (home, cwd) {
150        (Some(home), _) => Some(
151            home.join(IMPORT_STATE_ROOT)
152                .join(IMPORT_STATE_DIR)
153                .join(file_name),
154        ),
155        (None, Some(cwd)) => Some(cwd.join(file_name)),
156        (None, None) => None,
157    }
158}
159
160fn import_state_file_name(snapshot_id: &str, target_addr: &str) -> String {
161    format!(
162        ".import_state_{}_{}.json",
163        encode_path_segment(snapshot_id),
164        encode_path_segment(target_addr)
165    )
166}
167
168pub(crate) async fn load_import_state(path: &Path) -> Result<Option<ImportState>> {
169    match tokio::fs::read(path).await {
170        Ok(bytes) => {
171            let mut state: ImportState =
172                serde_json::from_slice(&bytes).context(ImportStateParseSnafu)?;
173            normalize_import_state_for_resume(&mut state);
174            Ok(Some(state))
175        }
176        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
177        Err(source) => Err(source).context(ImportStateIoSnafu {
178            path: path.display().to_string(),
179        }),
180    }
181}
182
183/// Caller must hold the lock acquired via `try_acquire_import_state_lock`.
184pub(crate) async fn save_import_state(path: &Path, state: &ImportState) -> Result<()> {
185    if let Some(parent) = path.parent() {
186        tokio::fs::create_dir_all(parent)
187            .await
188            .context(ImportStateIoSnafu {
189                path: parent.display().to_string(),
190            })?;
191    }
192
193    let bytes =
194        serde_json::to_vec_pretty(state).expect("ImportState should always be serializable");
195    let tmp_path = unique_tmp_path(path);
196    let mut file = tokio::fs::File::create(&tmp_path)
197        .await
198        .context(ImportStateIoSnafu {
199            path: tmp_path.display().to_string(),
200        })?;
201    file.write_all(&bytes).await.context(ImportStateIoSnafu {
202        path: tmp_path.display().to_string(),
203    })?;
204    file.sync_all().await.context(ImportStateIoSnafu {
205        path: tmp_path.display().to_string(),
206    })?;
207    // Close before rename; Windows forbids renaming an open file.
208    drop(file);
209
210    tokio::fs::rename(&tmp_path, path)
211        .await
212        .context(ImportStateIoSnafu {
213            path: path.display().to_string(),
214        })?;
215    sync_parent_dir(path).await?;
216    Ok(())
217}
218
219pub(crate) fn try_acquire_import_state_lock(path: &Path) -> Result<ImportStateLockGuard> {
220    if let Some(parent) = path.parent() {
221        std::fs::create_dir_all(parent).context(ImportStateIoSnafu {
222            path: parent.display().to_string(),
223        })?;
224    }
225
226    let lock_path = import_state_lock_path(path);
227    let file = std::fs::OpenOptions::new()
228        .create(true)
229        .read(true)
230        .write(true)
231        .truncate(false)
232        .open(&lock_path)
233        .context(ImportStateIoSnafu {
234            path: lock_path.display().to_string(),
235        })?;
236    file.try_lock_exclusive().map_err(|error| {
237        if is_lock_contention(&error) {
238            ImportStateLockedSnafu {
239                path: lock_path.display().to_string(),
240            }
241            .build()
242        } else {
243            ImportStateIoSnafu {
244                path: lock_path.display().to_string(),
245            }
246            .into_error(error)
247        }
248    })?;
249
250    Ok(ImportStateLockGuard { file })
251}
252
253fn is_lock_contention(error: &std::io::Error) -> bool {
254    error.kind() == std::io::ErrorKind::WouldBlock
255        || error.raw_os_error() == fs2::lock_contended_error().raw_os_error()
256}
257
258fn unique_tmp_path(path: &Path) -> PathBuf {
259    let pid = std::process::id();
260    let seq = IMPORT_STATE_TMP_ID.fetch_add(1, Ordering::Relaxed);
261    let file_name = path.file_name().unwrap_or_default().to_string_lossy();
262    path.with_file_name(format!("{file_name}.{pid}.{seq}.tmp"))
263}
264
265fn import_state_lock_path(path: &Path) -> PathBuf {
266    let file_name = path.file_name().unwrap_or_default().to_string_lossy();
267    path.with_file_name(format!("{file_name}.lock"))
268}
269
270fn normalize_import_state_for_resume(state: &mut ImportState) {
271    for chunk in &mut state.chunks {
272        if chunk.status == ImportChunkStatus::InProgress {
273            chunk.status = ImportChunkStatus::Pending;
274            chunk.error = None;
275        }
276    }
277}
278
279pub(crate) async fn delete_import_state(path: &Path) -> Result<()> {
280    match tokio::fs::remove_file(path).await {
281        Ok(()) => {
282            sync_parent_dir(path).await?;
283            Ok(())
284        }
285        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
286        Err(source) => Err(source).context(ImportStateIoSnafu {
287            path: path.display().to_string(),
288        }),
289    }
290}
291
292#[cfg(unix)]
293async fn sync_parent_dir(path: &Path) -> Result<()> {
294    let Some(parent) = path.parent() else {
295        return Ok(());
296    };
297
298    let dir = tokio::fs::File::open(parent)
299        .await
300        .context(ImportStateIoSnafu {
301            path: parent.display().to_string(),
302        })?;
303    dir.sync_all().await.context(ImportStateIoSnafu {
304        path: parent.display().to_string(),
305    })?;
306    Ok(())
307}
308
309#[cfg(not(unix))]
310async fn sync_parent_dir(_path: &Path) -> Result<()> {
311    Ok(())
312}
313
314#[cfg(test)]
315mod tests {
316    use std::process::Command;
317
318    use chrono::Utc;
319    use tempfile::tempdir;
320
321    use super::*;
322
323    const CHILD_LOCK_PATH_ENV: &str = "GREPTIME_IMPORT_STATE_LOCK_PATH";
324    const CHILD_LOCK_TEST: &str =
325        "data::import_v2::state::tests::test_try_acquire_import_state_lock_child_process";
326
327    #[test]
328    fn test_import_state_new_initializes_pending_chunks() {
329        let state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
330
331        assert_eq!(state.snapshot_id, "snapshot-1");
332        assert_eq!(state.target_addr, "127.0.0.1:4000");
333        assert_eq!(state.chunks.len(), 2);
334        assert_eq!(state.chunks[0].status, ImportChunkStatus::Pending);
335        assert_eq!(state.chunks[1].status, ImportChunkStatus::Pending);
336    }
337
338    #[test]
339    fn test_set_chunk_status_updates_timestamp_and_error() {
340        let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
341        let before = state.updated_at;
342        state.updated_at = Utc::now() - chrono::Duration::seconds(10);
343
344        state
345            .set_chunk_status(1, ImportChunkStatus::Failed, Some("timeout".to_string()))
346            .unwrap();
347        assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Failed));
348        assert_eq!(state.chunks[0].error.as_deref(), Some("timeout"));
349        assert!(state.updated_at > before);
350    }
351
352    #[test]
353    fn test_set_chunk_status_rejects_unknown_chunk_id() {
354        let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
355
356        let error = state
357            .set_chunk_status(99, ImportChunkStatus::Completed, None)
358            .unwrap_err();
359
360        assert!(matches!(
361            error,
362            crate::data::import_v2::error::Error::ImportStateUnknownChunk { chunk_id, .. } if chunk_id == 99
363        ));
364    }
365
366    #[tokio::test]
367    async fn test_save_and_load_import_state_round_trip() {
368        let dir = tempdir().unwrap();
369        let path = dir.path().join("import_state.json");
370        let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
371        state
372            .set_chunk_status(2, ImportChunkStatus::Completed, None)
373            .unwrap();
374
375        save_import_state(&path, &state).await.unwrap();
376        let loaded = load_import_state(&path).await.unwrap().unwrap();
377
378        assert_eq!(loaded.snapshot_id, state.snapshot_id);
379        assert_eq!(loaded.target_addr, state.target_addr);
380        assert_eq!(loaded.chunks, state.chunks);
381    }
382
383    #[tokio::test]
384    async fn test_save_import_state_overwrites_existing_file() {
385        let dir = tempdir().unwrap();
386        let path = dir.path().join("import_state.json");
387        let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
388        save_import_state(&path, &state).await.unwrap();
389
390        state
391            .set_chunk_status(1, ImportChunkStatus::Completed, None)
392            .unwrap();
393        save_import_state(&path, &state).await.unwrap();
394
395        let loaded = load_import_state(&path).await.unwrap().unwrap();
396        assert_eq!(loaded.chunk_status(1), Some(ImportChunkStatus::Completed));
397    }
398
399    #[test]
400    fn test_load_import_state_resets_in_progress_to_pending() {
401        let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
402        state
403            .set_chunk_status(
404                2,
405                ImportChunkStatus::InProgress,
406                Some("running".to_string()),
407            )
408            .unwrap();
409
410        normalize_import_state_for_resume(&mut state);
411
412        assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Pending));
413        assert_eq!(state.chunk_status(2), Some(ImportChunkStatus::Pending));
414        assert_eq!(state.chunks[1].error, None);
415    }
416
417    #[test]
418    fn test_unique_tmp_path_generates_distinct_paths() {
419        let path = Path::new("/tmp/import_state.json");
420
421        let first = unique_tmp_path(path);
422        let second = unique_tmp_path(path);
423
424        assert_ne!(first, second);
425        assert!(first.starts_with("/tmp"));
426        assert!(second.starts_with("/tmp"));
427        assert!(
428            first
429                .file_name()
430                .unwrap()
431                .to_string_lossy()
432                .ends_with(".tmp")
433        );
434        assert!(
435            second
436                .file_name()
437                .unwrap()
438                .to_string_lossy()
439                .ends_with(".tmp")
440        );
441    }
442
443    #[test]
444    fn test_lock_contention_detection_accepts_platform_error() {
445        let error = fs2::lock_contended_error();
446
447        assert!(is_lock_contention(&error));
448    }
449
450    #[test]
451    fn test_try_acquire_import_state_lock_rejects_second_holder() {
452        let dir = tempdir().unwrap();
453        let path = dir.path().join("import_state.json");
454
455        let _first = try_acquire_import_state_lock(&path).unwrap();
456        // Import state locking guards concurrent CLI processes, so validate cross-process exclusion.
457        let output = Command::new(std::env::current_exe().unwrap())
458            .arg(CHILD_LOCK_TEST)
459            .arg("--ignored")
460            .arg("--exact")
461            .env(CHILD_LOCK_PATH_ENV, &path)
462            .output()
463            .unwrap();
464
465        assert!(
466            output.status.success(),
467            "child lock test failed\nstdout:\n{}\nstderr:\n{}",
468            String::from_utf8_lossy(&output.stdout),
469            String::from_utf8_lossy(&output.stderr)
470        );
471        let stdout = String::from_utf8_lossy(&output.stdout);
472        assert!(
473            stdout.contains("1 passed"),
474            "child lock test did not run the expected ignored test\nstdout:\n{stdout}"
475        );
476    }
477
478    #[test]
479    #[ignore = "spawned by test_try_acquire_import_state_lock_rejects_second_holder"]
480    fn test_try_acquire_import_state_lock_child_process() {
481        let path = std::env::var_os(CHILD_LOCK_PATH_ENV)
482            .expect("child lock path must be set by the parent test");
483        let path = PathBuf::from(path);
484        let error = try_acquire_import_state_lock(&path).unwrap_err();
485
486        assert!(matches!(
487            error,
488            crate::data::import_v2::error::Error::ImportStateLocked { .. }
489        ));
490    }
491
492    #[tokio::test]
493    async fn test_delete_import_state_ignores_missing_file() {
494        let dir = tempdir().unwrap();
495        let path = dir.path().join("missing.json");
496
497        delete_import_state(&path).await.unwrap();
498    }
499
500    #[test]
501    fn test_default_state_path_prefers_home_and_encodes_snapshot_id() {
502        let home = tempdir().unwrap();
503        let cwd = tempdir().unwrap();
504
505        let path = default_state_path_with(
506            Some(home.path()),
507            Some(cwd.path()),
508            "../snapshot",
509            "127.0.0.1:4000",
510        )
511        .unwrap();
512
513        assert_eq!(
514            path,
515            home.path()
516                .join(IMPORT_STATE_ROOT)
517                .join(IMPORT_STATE_DIR)
518                .join(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000.json")
519        );
520    }
521
522    #[test]
523    fn test_default_state_path_falls_back_to_cwd_when_home_missing() {
524        let cwd = tempdir().unwrap();
525
526        let path =
527            default_state_path_with(None, Some(cwd.path()), "snapshot-1", "target-a").unwrap();
528
529        assert_eq!(
530            path,
531            cwd.path().join(".import_state_snapshot-1_target-a.json")
532        );
533    }
534
535    #[test]
536    fn test_default_state_path_isolated_by_target_addr() {
537        let cwd = tempdir().unwrap();
538
539        let first = default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4000")
540            .unwrap();
541        let second =
542            default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4001")
543                .unwrap();
544
545        assert_ne!(first, second);
546    }
547
548    #[test]
549    fn test_default_home_dir_prefers_home() {
550        let detected = default_home_dir_with(|key| match key {
551            "HOME" => Some(std::ffi::OsString::from("/tmp/home")),
552            "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")),
553            _ => None,
554        });
555
556        assert_eq!(detected, Some(PathBuf::from("/tmp/home")));
557    }
558
559    #[test]
560    fn test_default_home_dir_falls_back_to_userprofile() {
561        let detected = default_home_dir_with(|key| match key {
562            "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")),
563            _ => None,
564        });
565
566        assert_eq!(detected, Some(PathBuf::from("/tmp/userprofile")));
567    }
568
569    #[test]
570    fn test_default_home_dir_falls_back_to_home_drive_and_path() {
571        let detected = default_home_dir_with(|key| match key {
572            "HOMEDRIVE" => Some(std::ffi::OsString::from("/tmp")),
573            "HOMEPATH" => Some(std::ffi::OsString::from("windows-home")),
574            _ => None,
575        });
576
577        assert_eq!(detected, Some(PathBuf::from("/tmp").join("windows-home")));
578    }
579}