1#![allow(dead_code)]
16
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicU64, Ordering};
19
20use chrono::{DateTime, Utc};
21use fs2::FileExt;
22use serde::{Deserialize, Serialize};
23use snafu::{IntoError, OptionExt, ResultExt};
24use tokio::io::AsyncWriteExt;
25
26use crate::data::import_v2::error::{
27 ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu,
28 ImportStateUnknownChunkSnafu, Result,
29};
30use crate::data::path::encode_path_segment;
31
32const IMPORT_STATE_ROOT: &str = ".greptime";
33const IMPORT_STATE_DIR: &str = "import_state";
34static IMPORT_STATE_TMP_ID: AtomicU64 = AtomicU64::new(0);
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub(crate) enum ImportChunkStatus {
39 Pending,
40 InProgress,
41 Completed,
42 Failed,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub(crate) struct ImportChunkState {
47 pub(crate) id: u32,
48 pub(crate) status: ImportChunkStatus,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub(crate) error: Option<String>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub(crate) struct ImportState {
55 pub(crate) snapshot_id: String,
56 pub(crate) target_addr: String,
57 pub(crate) updated_at: DateTime<Utc>,
58 pub(crate) chunks: Vec<ImportChunkState>,
60}
61
62impl ImportState {
63 pub(crate) fn new<I>(
64 snapshot_id: impl Into<String>,
65 target_addr: impl Into<String>,
66 chunk_ids: I,
67 ) -> Self
68 where
69 I: IntoIterator<Item = u32>,
70 {
71 Self {
72 snapshot_id: snapshot_id.into(),
73 target_addr: target_addr.into(),
74 updated_at: Utc::now(),
75 chunks: chunk_ids
76 .into_iter()
77 .map(|id| ImportChunkState {
78 id,
79 status: ImportChunkStatus::Pending,
80 error: None,
81 })
82 .collect(),
83 }
84 }
85
86 pub(crate) fn chunk_status(&self, chunk_id: u32) -> Option<ImportChunkStatus> {
87 self.chunks
88 .iter()
89 .find(|chunk| chunk.id == chunk_id)
90 .map(|chunk| chunk.status.clone())
91 }
92
93 pub(crate) fn set_chunk_status(
94 &mut self,
95 chunk_id: u32,
96 status: ImportChunkStatus,
97 error: Option<String>,
98 ) -> Result<()> {
99 let chunk = self
100 .chunks
101 .iter_mut()
102 .find(|chunk| chunk.id == chunk_id)
103 .context(ImportStateUnknownChunkSnafu { chunk_id })?;
104 chunk.status = status;
105 chunk.error = error;
106 self.updated_at = Utc::now();
107 Ok(())
108 }
109}
110
111#[derive(Debug)]
112pub(crate) struct ImportStateLockGuard {
113 file: std::fs::File,
114}
115
116impl Drop for ImportStateLockGuard {
117 fn drop(&mut self) {
118 let _ = self.file.unlock();
119 }
120}
121
122pub(crate) fn default_state_path(snapshot_id: &str, target_addr: &str) -> Option<PathBuf> {
123 let home = default_home_dir_with(|key| std::env::var_os(key));
124 let cwd = std::env::current_dir().ok();
125 default_state_path_with(home.as_deref(), cwd.as_deref(), snapshot_id, target_addr)
126}
127
128fn default_home_dir_with<F>(get: F) -> Option<PathBuf>
129where
130 F: for<'a> Fn(&'a str) -> Option<std::ffi::OsString>,
131{
132 get("HOME")
133 .or_else(|| get("USERPROFILE"))
134 .map(PathBuf::from)
135 .or_else(|| {
136 let drive = get("HOMEDRIVE")?;
137 let path = get("HOMEPATH")?;
138 Some(PathBuf::from(drive).join(path))
139 })
140}
141
142fn default_state_path_with(
143 home: Option<&Path>,
144 cwd: Option<&Path>,
145 snapshot_id: &str,
146 target_addr: &str,
147) -> Option<PathBuf> {
148 let file_name = import_state_file_name(snapshot_id, target_addr);
149 match (home, cwd) {
150 (Some(home), _) => Some(
151 home.join(IMPORT_STATE_ROOT)
152 .join(IMPORT_STATE_DIR)
153 .join(file_name),
154 ),
155 (None, Some(cwd)) => Some(cwd.join(file_name)),
156 (None, None) => None,
157 }
158}
159
160fn import_state_file_name(snapshot_id: &str, target_addr: &str) -> String {
161 format!(
162 ".import_state_{}_{}.json",
163 encode_path_segment(snapshot_id),
164 encode_path_segment(target_addr)
165 )
166}
167
168pub(crate) async fn load_import_state(path: &Path) -> Result<Option<ImportState>> {
169 match tokio::fs::read(path).await {
170 Ok(bytes) => {
171 let mut state: ImportState =
172 serde_json::from_slice(&bytes).context(ImportStateParseSnafu)?;
173 normalize_import_state_for_resume(&mut state);
174 Ok(Some(state))
175 }
176 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
177 Err(source) => Err(source).context(ImportStateIoSnafu {
178 path: path.display().to_string(),
179 }),
180 }
181}
182
183pub(crate) async fn save_import_state(path: &Path, state: &ImportState) -> Result<()> {
185 if let Some(parent) = path.parent() {
186 tokio::fs::create_dir_all(parent)
187 .await
188 .context(ImportStateIoSnafu {
189 path: parent.display().to_string(),
190 })?;
191 }
192
193 let bytes =
194 serde_json::to_vec_pretty(state).expect("ImportState should always be serializable");
195 let tmp_path = unique_tmp_path(path);
196 let mut file = tokio::fs::File::create(&tmp_path)
197 .await
198 .context(ImportStateIoSnafu {
199 path: tmp_path.display().to_string(),
200 })?;
201 file.write_all(&bytes).await.context(ImportStateIoSnafu {
202 path: tmp_path.display().to_string(),
203 })?;
204 file.sync_all().await.context(ImportStateIoSnafu {
205 path: tmp_path.display().to_string(),
206 })?;
207 drop(file);
209
210 tokio::fs::rename(&tmp_path, path)
211 .await
212 .context(ImportStateIoSnafu {
213 path: path.display().to_string(),
214 })?;
215 sync_parent_dir(path).await?;
216 Ok(())
217}
218
219pub(crate) fn try_acquire_import_state_lock(path: &Path) -> Result<ImportStateLockGuard> {
220 if let Some(parent) = path.parent() {
221 std::fs::create_dir_all(parent).context(ImportStateIoSnafu {
222 path: parent.display().to_string(),
223 })?;
224 }
225
226 let lock_path = import_state_lock_path(path);
227 let file = std::fs::OpenOptions::new()
228 .create(true)
229 .read(true)
230 .write(true)
231 .truncate(false)
232 .open(&lock_path)
233 .context(ImportStateIoSnafu {
234 path: lock_path.display().to_string(),
235 })?;
236 file.try_lock_exclusive().map_err(|error| {
237 if is_lock_contention(&error) {
238 ImportStateLockedSnafu {
239 path: lock_path.display().to_string(),
240 }
241 .build()
242 } else {
243 ImportStateIoSnafu {
244 path: lock_path.display().to_string(),
245 }
246 .into_error(error)
247 }
248 })?;
249
250 Ok(ImportStateLockGuard { file })
251}
252
253fn is_lock_contention(error: &std::io::Error) -> bool {
254 error.kind() == std::io::ErrorKind::WouldBlock
255 || error.raw_os_error() == fs2::lock_contended_error().raw_os_error()
256}
257
258fn unique_tmp_path(path: &Path) -> PathBuf {
259 let pid = std::process::id();
260 let seq = IMPORT_STATE_TMP_ID.fetch_add(1, Ordering::Relaxed);
261 let file_name = path.file_name().unwrap_or_default().to_string_lossy();
262 path.with_file_name(format!("{file_name}.{pid}.{seq}.tmp"))
263}
264
265fn import_state_lock_path(path: &Path) -> PathBuf {
266 let file_name = path.file_name().unwrap_or_default().to_string_lossy();
267 path.with_file_name(format!("{file_name}.lock"))
268}
269
270fn normalize_import_state_for_resume(state: &mut ImportState) {
271 for chunk in &mut state.chunks {
272 if chunk.status == ImportChunkStatus::InProgress {
273 chunk.status = ImportChunkStatus::Pending;
274 chunk.error = None;
275 }
276 }
277}
278
279pub(crate) async fn delete_import_state(path: &Path) -> Result<()> {
280 match tokio::fs::remove_file(path).await {
281 Ok(()) => {
282 sync_parent_dir(path).await?;
283 Ok(())
284 }
285 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
286 Err(source) => Err(source).context(ImportStateIoSnafu {
287 path: path.display().to_string(),
288 }),
289 }
290}
291
292#[cfg(unix)]
293async fn sync_parent_dir(path: &Path) -> Result<()> {
294 let Some(parent) = path.parent() else {
295 return Ok(());
296 };
297
298 let dir = tokio::fs::File::open(parent)
299 .await
300 .context(ImportStateIoSnafu {
301 path: parent.display().to_string(),
302 })?;
303 dir.sync_all().await.context(ImportStateIoSnafu {
304 path: parent.display().to_string(),
305 })?;
306 Ok(())
307}
308
309#[cfg(not(unix))]
310async fn sync_parent_dir(_path: &Path) -> Result<()> {
311 Ok(())
312}
313
314#[cfg(test)]
315mod tests {
316 use std::process::Command;
317
318 use chrono::Utc;
319 use tempfile::tempdir;
320
321 use super::*;
322
323 const CHILD_LOCK_PATH_ENV: &str = "GREPTIME_IMPORT_STATE_LOCK_PATH";
324 const CHILD_LOCK_TEST: &str =
325 "data::import_v2::state::tests::test_try_acquire_import_state_lock_child_process";
326
327 #[test]
328 fn test_import_state_new_initializes_pending_chunks() {
329 let state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
330
331 assert_eq!(state.snapshot_id, "snapshot-1");
332 assert_eq!(state.target_addr, "127.0.0.1:4000");
333 assert_eq!(state.chunks.len(), 2);
334 assert_eq!(state.chunks[0].status, ImportChunkStatus::Pending);
335 assert_eq!(state.chunks[1].status, ImportChunkStatus::Pending);
336 }
337
338 #[test]
339 fn test_set_chunk_status_updates_timestamp_and_error() {
340 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
341 let before = state.updated_at;
342 state.updated_at = Utc::now() - chrono::Duration::seconds(10);
343
344 state
345 .set_chunk_status(1, ImportChunkStatus::Failed, Some("timeout".to_string()))
346 .unwrap();
347 assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Failed));
348 assert_eq!(state.chunks[0].error.as_deref(), Some("timeout"));
349 assert!(state.updated_at > before);
350 }
351
352 #[test]
353 fn test_set_chunk_status_rejects_unknown_chunk_id() {
354 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
355
356 let error = state
357 .set_chunk_status(99, ImportChunkStatus::Completed, None)
358 .unwrap_err();
359
360 assert!(matches!(
361 error,
362 crate::data::import_v2::error::Error::ImportStateUnknownChunk { chunk_id, .. } if chunk_id == 99
363 ));
364 }
365
366 #[tokio::test]
367 async fn test_save_and_load_import_state_round_trip() {
368 let dir = tempdir().unwrap();
369 let path = dir.path().join("import_state.json");
370 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
371 state
372 .set_chunk_status(2, ImportChunkStatus::Completed, None)
373 .unwrap();
374
375 save_import_state(&path, &state).await.unwrap();
376 let loaded = load_import_state(&path).await.unwrap().unwrap();
377
378 assert_eq!(loaded.snapshot_id, state.snapshot_id);
379 assert_eq!(loaded.target_addr, state.target_addr);
380 assert_eq!(loaded.chunks, state.chunks);
381 }
382
383 #[tokio::test]
384 async fn test_save_import_state_overwrites_existing_file() {
385 let dir = tempdir().unwrap();
386 let path = dir.path().join("import_state.json");
387 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
388 save_import_state(&path, &state).await.unwrap();
389
390 state
391 .set_chunk_status(1, ImportChunkStatus::Completed, None)
392 .unwrap();
393 save_import_state(&path, &state).await.unwrap();
394
395 let loaded = load_import_state(&path).await.unwrap().unwrap();
396 assert_eq!(loaded.chunk_status(1), Some(ImportChunkStatus::Completed));
397 }
398
399 #[test]
400 fn test_load_import_state_resets_in_progress_to_pending() {
401 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
402 state
403 .set_chunk_status(
404 2,
405 ImportChunkStatus::InProgress,
406 Some("running".to_string()),
407 )
408 .unwrap();
409
410 normalize_import_state_for_resume(&mut state);
411
412 assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Pending));
413 assert_eq!(state.chunk_status(2), Some(ImportChunkStatus::Pending));
414 assert_eq!(state.chunks[1].error, None);
415 }
416
417 #[test]
418 fn test_unique_tmp_path_generates_distinct_paths() {
419 let path = Path::new("/tmp/import_state.json");
420
421 let first = unique_tmp_path(path);
422 let second = unique_tmp_path(path);
423
424 assert_ne!(first, second);
425 assert!(first.starts_with("/tmp"));
426 assert!(second.starts_with("/tmp"));
427 assert!(
428 first
429 .file_name()
430 .unwrap()
431 .to_string_lossy()
432 .ends_with(".tmp")
433 );
434 assert!(
435 second
436 .file_name()
437 .unwrap()
438 .to_string_lossy()
439 .ends_with(".tmp")
440 );
441 }
442
443 #[test]
444 fn test_lock_contention_detection_accepts_platform_error() {
445 let error = fs2::lock_contended_error();
446
447 assert!(is_lock_contention(&error));
448 }
449
450 #[test]
451 fn test_try_acquire_import_state_lock_rejects_second_holder() {
452 let dir = tempdir().unwrap();
453 let path = dir.path().join("import_state.json");
454
455 let _first = try_acquire_import_state_lock(&path).unwrap();
456 let output = Command::new(std::env::current_exe().unwrap())
458 .arg(CHILD_LOCK_TEST)
459 .arg("--ignored")
460 .arg("--exact")
461 .env(CHILD_LOCK_PATH_ENV, &path)
462 .output()
463 .unwrap();
464
465 assert!(
466 output.status.success(),
467 "child lock test failed\nstdout:\n{}\nstderr:\n{}",
468 String::from_utf8_lossy(&output.stdout),
469 String::from_utf8_lossy(&output.stderr)
470 );
471 let stdout = String::from_utf8_lossy(&output.stdout);
472 assert!(
473 stdout.contains("1 passed"),
474 "child lock test did not run the expected ignored test\nstdout:\n{stdout}"
475 );
476 }
477
478 #[test]
479 #[ignore = "spawned by test_try_acquire_import_state_lock_rejects_second_holder"]
480 fn test_try_acquire_import_state_lock_child_process() {
481 let path = std::env::var_os(CHILD_LOCK_PATH_ENV)
482 .expect("child lock path must be set by the parent test");
483 let path = PathBuf::from(path);
484 let error = try_acquire_import_state_lock(&path).unwrap_err();
485
486 assert!(matches!(
487 error,
488 crate::data::import_v2::error::Error::ImportStateLocked { .. }
489 ));
490 }
491
492 #[tokio::test]
493 async fn test_delete_import_state_ignores_missing_file() {
494 let dir = tempdir().unwrap();
495 let path = dir.path().join("missing.json");
496
497 delete_import_state(&path).await.unwrap();
498 }
499
500 #[test]
501 fn test_default_state_path_prefers_home_and_encodes_snapshot_id() {
502 let home = tempdir().unwrap();
503 let cwd = tempdir().unwrap();
504
505 let path = default_state_path_with(
506 Some(home.path()),
507 Some(cwd.path()),
508 "../snapshot",
509 "127.0.0.1:4000",
510 )
511 .unwrap();
512
513 assert_eq!(
514 path,
515 home.path()
516 .join(IMPORT_STATE_ROOT)
517 .join(IMPORT_STATE_DIR)
518 .join(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000.json")
519 );
520 }
521
522 #[test]
523 fn test_default_state_path_falls_back_to_cwd_when_home_missing() {
524 let cwd = tempdir().unwrap();
525
526 let path =
527 default_state_path_with(None, Some(cwd.path()), "snapshot-1", "target-a").unwrap();
528
529 assert_eq!(
530 path,
531 cwd.path().join(".import_state_snapshot-1_target-a.json")
532 );
533 }
534
535 #[test]
536 fn test_default_state_path_isolated_by_target_addr() {
537 let cwd = tempdir().unwrap();
538
539 let first = default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4000")
540 .unwrap();
541 let second =
542 default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4001")
543 .unwrap();
544
545 assert_ne!(first, second);
546 }
547
548 #[test]
549 fn test_default_home_dir_prefers_home() {
550 let detected = default_home_dir_with(|key| match key {
551 "HOME" => Some(std::ffi::OsString::from("/tmp/home")),
552 "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")),
553 _ => None,
554 });
555
556 assert_eq!(detected, Some(PathBuf::from("/tmp/home")));
557 }
558
559 #[test]
560 fn test_default_home_dir_falls_back_to_userprofile() {
561 let detected = default_home_dir_with(|key| match key {
562 "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")),
563 _ => None,
564 });
565
566 assert_eq!(detected, Some(PathBuf::from("/tmp/userprofile")));
567 }
568
569 #[test]
570 fn test_default_home_dir_falls_back_to_home_drive_and_path() {
571 let detected = default_home_dir_with(|key| match key {
572 "HOMEDRIVE" => Some(std::ffi::OsString::from("/tmp")),
573 "HOMEPATH" => Some(std::ffi::OsString::from("windows-home")),
574 _ => None,
575 });
576
577 assert_eq!(detected, Some(PathBuf::from("/tmp").join("windows-home")));
578 }
579}