cli/data/export_v2/
coordinator.rs1use common_telemetry::info;
16
17use crate::common::ObjectStoreConfig;
18use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database};
19use crate::data::export_v2::error::Result;
20use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange};
21use crate::data::path::data_dir_for_schema_chunk;
22use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme};
23use crate::database::DatabaseClient;
24
25struct ExportContext<'a> {
26 storage: &'a dyn SnapshotStorage,
27 database_client: &'a DatabaseClient,
28 snapshot_uri: &'a str,
29 storage_config: &'a ObjectStoreConfig,
30 catalog: &'a str,
31 schemas: &'a [String],
32 format: DataFormat,
33 parallelism: usize,
34}
35
36pub async fn export_data(
37 storage: &dyn SnapshotStorage,
38 database_client: &DatabaseClient,
39 snapshot_uri: &str,
40 storage_config: &ObjectStoreConfig,
41 manifest: &mut Manifest,
42 parallelism: usize,
43) -> Result<()> {
44 if manifest.chunks.is_empty() {
45 return Ok(());
46 }
47
48 for idx in 0..manifest.chunks.len() {
49 if matches!(
50 manifest.chunks[idx].status,
51 ChunkStatus::Completed | ChunkStatus::Skipped
52 ) {
53 continue;
54 }
55
56 let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx);
57 manifest.touch();
58 storage.write_manifest(manifest).await?;
59
60 let context = ExportContext {
61 storage,
62 database_client,
63 snapshot_uri,
64 storage_config,
65 catalog: &manifest.catalog,
66 schemas: &manifest.schemas,
67 format: manifest.format,
68 parallelism,
69 };
70 let export_result = export_chunk(&context, chunk_id, time_range).await;
71
72 let result = match export_result {
73 Ok(files) => {
74 mark_chunk_completed(manifest, idx, files);
75 Ok(())
76 }
77 Err(err) => {
78 mark_chunk_failed(manifest, idx, err.to_string());
79 Err(err)
80 }
81 };
82
83 manifest.touch();
84 storage.write_manifest(manifest).await?;
85
86 result?;
87 }
88
89 Ok(())
90}
91
92fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) {
93 let chunk = &mut manifest.chunks[idx];
94 chunk.mark_in_progress();
95 (chunk.id, chunk.time_range.clone())
96}
97
98fn mark_chunk_completed(manifest: &mut Manifest, idx: usize, files: Vec<String>) {
99 let chunk = &mut manifest.chunks[idx];
100 if files.is_empty() {
101 chunk.mark_skipped();
102 } else {
103 chunk.mark_completed(files, None);
104 }
105}
106
107fn mark_chunk_failed(manifest: &mut Manifest, idx: usize, error: String) {
108 let chunk = &mut manifest.chunks[idx];
109 chunk.mark_failed(error);
110}
111
112async fn export_chunk(
113 context: &ExportContext<'_>,
114 chunk_id: u32,
115 time_range: TimeRange,
116) -> Result<Vec<String>> {
117 let scheme = StorageScheme::from_uri(context.snapshot_uri)?;
118 let needs_dir = matches!(scheme, StorageScheme::File);
119 let copy_options = CopyOptions {
120 format: context.format,
121 time_range,
122 parallelism: context.parallelism,
123 };
124
125 for schema in context.schemas {
126 let prefix = data_dir_for_schema_chunk(schema, chunk_id);
127 if needs_dir {
128 context.storage.create_dir_all(&prefix).await?;
129 }
130
131 let target = build_copy_target(
132 context.snapshot_uri,
133 context.storage_config,
134 schema,
135 chunk_id,
136 )?;
137 execute_copy_database(
138 context.database_client,
139 context.catalog,
140 schema,
141 &target,
142 ©_options,
143 )
144 .await?;
145 }
146
147 let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?;
148 info!("Collected {} files for chunk {}", files.len(), chunk_id);
149 Ok(files)
150}
151
152async fn list_chunk_files(
153 storage: &dyn SnapshotStorage,
154 schemas: &[String],
155 chunk_id: u32,
156) -> Result<Vec<String>> {
157 let mut files = Vec::new();
158
159 for schema in schemas {
160 let prefix = data_dir_for_schema_chunk(schema, chunk_id);
161 files.extend(storage.list_files_recursive(&prefix).await?);
162 }
163
164 files.sort();
165 Ok(files)
166}