Skip to main content

cli/data/export_v2/
coordinator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::info;
16
17use crate::common::ObjectStoreConfig;
18use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database};
19use crate::data::export_v2::error::Result;
20use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange};
21use crate::data::path::data_dir_for_schema_chunk;
22use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme};
23use crate::database::DatabaseClient;
24
25struct ExportContext<'a> {
26    storage: &'a dyn SnapshotStorage,
27    database_client: &'a DatabaseClient,
28    snapshot_uri: &'a str,
29    storage_config: &'a ObjectStoreConfig,
30    catalog: &'a str,
31    schemas: &'a [String],
32    format: DataFormat,
33    parallelism: usize,
34}
35
36pub async fn export_data(
37    storage: &dyn SnapshotStorage,
38    database_client: &DatabaseClient,
39    snapshot_uri: &str,
40    storage_config: &ObjectStoreConfig,
41    manifest: &mut Manifest,
42    parallelism: usize,
43) -> Result<()> {
44    if manifest.chunks.is_empty() {
45        return Ok(());
46    }
47
48    for idx in 0..manifest.chunks.len() {
49        if matches!(
50            manifest.chunks[idx].status,
51            ChunkStatus::Completed | ChunkStatus::Skipped
52        ) {
53            continue;
54        }
55
56        let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx);
57        manifest.touch();
58        storage.write_manifest(manifest).await?;
59
60        let context = ExportContext {
61            storage,
62            database_client,
63            snapshot_uri,
64            storage_config,
65            catalog: &manifest.catalog,
66            schemas: &manifest.schemas,
67            format: manifest.format,
68            parallelism,
69        };
70        let export_result = export_chunk(&context, chunk_id, time_range).await;
71
72        let result = match export_result {
73            Ok(files) => {
74                mark_chunk_completed(manifest, idx, files);
75                Ok(())
76            }
77            Err(err) => {
78                mark_chunk_failed(manifest, idx, err.to_string());
79                Err(err)
80            }
81        };
82
83        manifest.touch();
84        storage.write_manifest(manifest).await?;
85
86        result?;
87    }
88
89    Ok(())
90}
91
92fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) {
93    let chunk = &mut manifest.chunks[idx];
94    chunk.mark_in_progress();
95    (chunk.id, chunk.time_range.clone())
96}
97
98fn mark_chunk_completed(manifest: &mut Manifest, idx: usize, files: Vec<String>) {
99    let chunk = &mut manifest.chunks[idx];
100    if files.is_empty() {
101        chunk.mark_skipped();
102    } else {
103        chunk.mark_completed(files, None);
104    }
105}
106
107fn mark_chunk_failed(manifest: &mut Manifest, idx: usize, error: String) {
108    let chunk = &mut manifest.chunks[idx];
109    chunk.mark_failed(error);
110}
111
112async fn export_chunk(
113    context: &ExportContext<'_>,
114    chunk_id: u32,
115    time_range: TimeRange,
116) -> Result<Vec<String>> {
117    let scheme = StorageScheme::from_uri(context.snapshot_uri)?;
118    let needs_dir = matches!(scheme, StorageScheme::File);
119    let copy_options = CopyOptions {
120        format: context.format,
121        time_range,
122        parallelism: context.parallelism,
123    };
124
125    for schema in context.schemas {
126        let prefix = data_dir_for_schema_chunk(schema, chunk_id);
127        if needs_dir {
128            context.storage.create_dir_all(&prefix).await?;
129        }
130
131        let target = build_copy_target(
132            context.snapshot_uri,
133            context.storage_config,
134            schema,
135            chunk_id,
136        )?;
137        execute_copy_database(
138            context.database_client,
139            context.catalog,
140            schema,
141            &target,
142            &copy_options,
143        )
144        .await?;
145    }
146
147    let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?;
148    info!("Collected {} files for chunk {}", files.len(), chunk_id);
149    Ok(files)
150}
151
152async fn list_chunk_files(
153    storage: &dyn SnapshotStorage,
154    schemas: &[String],
155    chunk_id: u32,
156) -> Result<Vec<String>> {
157    let mut files = Vec::new();
158
159    for schema in schemas {
160        let prefix = data_dir_for_schema_chunk(schema, chunk_id);
161        files.extend(storage.list_files_recursive(&prefix).await?);
162    }
163
164    files.sort();
165    Ok(files)
166}