common_meta/reconciliation/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_procedure::{
18    watcher, BoxedProcedure, ProcedureId, ProcedureManagerRef, ProcedureWithId,
19};
20use common_telemetry::{error, info, warn};
21use snafu::{OptionExt, ResultExt};
22use store_api::storage::TableId;
23use table::table_name::TableName;
24use table::table_reference::TableReference;
25
26use crate::cache_invalidator::CacheInvalidatorRef;
27use crate::error::{self, Result, TableNotFoundSnafu};
28use crate::key::table_name::TableNameKey;
29use crate::key::TableMetadataManagerRef;
30use crate::node_manager::NodeManagerRef;
31use crate::reconciliation::reconcile_catalog::ReconcileCatalogProcedure;
32use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM};
33use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
34use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
35use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
36use crate::reconciliation::utils::Context;
37
38pub type ReconciliationManagerRef = Arc<ReconciliationManager>;
39
40/// The manager for reconciliation procedures.
41pub struct ReconciliationManager {
42    procedure_manager: ProcedureManagerRef,
43    context: Context,
44}
45
46macro_rules! register_reconcile_loader {
47    ($self:ident, $procedure:ty) => {{
48        let context = $self.context.clone();
49        $self
50            .procedure_manager
51            .register_loader(
52                <$procedure>::TYPE_NAME,
53                Box::new(move |json| {
54                    let context = context.clone();
55                    let procedure = <$procedure>::from_json(context, json)?;
56                    Ok(Box::new(procedure))
57                }),
58            )
59            .context(error::RegisterProcedureLoaderSnafu {
60                type_name: <$procedure>::TYPE_NAME,
61            })?;
62    }};
63}
64
65impl ReconciliationManager {
66    pub fn new(
67        node_manager: NodeManagerRef,
68        table_metadata_manager: TableMetadataManagerRef,
69        cache_invalidator: CacheInvalidatorRef,
70        procedure_manager: ProcedureManagerRef,
71    ) -> Self {
72        Self {
73            procedure_manager,
74            context: Context {
75                node_manager,
76                table_metadata_manager,
77                cache_invalidator,
78            },
79        }
80    }
81
82    /// Try to start the reconciliation manager.
83    ///
84    /// This function will register the procedure loaders for the reconciliation procedures.
85    /// Returns an error if the procedure loaders are already registered.
86    pub fn try_start(&self) -> Result<()> {
87        register_reconcile_loader!(self, ReconcileLogicalTablesProcedure);
88        register_reconcile_loader!(self, ReconcileTableProcedure);
89        register_reconcile_loader!(self, ReconcileDatabaseProcedure);
90        register_reconcile_loader!(self, ReconcileCatalogProcedure);
91
92        Ok(())
93    }
94
95    /// Reconcile a table.
96    ///
97    /// Returns the procedure id of the reconciliation procedure.
98    pub async fn reconcile_table(
99        &self,
100        table_ref: TableReference<'_>,
101        resolve_strategy: ResolveStrategy,
102    ) -> Result<ProcedureId> {
103        let table_name_key =
104            TableNameKey::new(table_ref.catalog, table_ref.schema, table_ref.table);
105        let table_metadata_manager = &self.context.table_metadata_manager;
106        let table_id = table_metadata_manager
107            .table_name_manager()
108            .get(table_name_key)
109            .await?
110            .with_context(|| TableNotFoundSnafu {
111                table_name: table_ref.to_string(),
112            })?
113            .table_id();
114        let (physical_table_id, _) = table_metadata_manager
115            .table_route_manager()
116            .get_physical_table_route(table_id)
117            .await?;
118
119        if physical_table_id == table_id {
120            self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy)
121                .await
122        } else {
123            let physical_table_info = table_metadata_manager
124                .table_info_manager()
125                .get(physical_table_id)
126                .await?
127                .with_context(|| TableNotFoundSnafu {
128                    table_name: format!("table_id: {}", physical_table_id),
129                })?;
130
131            self.reconcile_logical_tables(
132                physical_table_id,
133                physical_table_info.table_name(),
134                vec![(table_id, table_ref.into())],
135            )
136            .await
137        }
138    }
139
140    /// Reconcile a database.
141    ///
142    /// Returns the procedure id of the reconciliation procedure.
143    pub async fn reconcile_database(
144        &self,
145        catalog: String,
146        schema: String,
147        resolve_strategy: ResolveStrategy,
148        parallelism: usize,
149    ) -> Result<ProcedureId> {
150        let parallelism = normalize_parallelism(parallelism);
151        let procedure = ReconcileDatabaseProcedure::new(
152            self.context.clone(),
153            catalog,
154            schema,
155            false,
156            parallelism,
157            resolve_strategy,
158            false,
159        );
160        self.spawn_procedure(Box::new(procedure)).await
161    }
162
163    async fn reconcile_physical_table(
164        &self,
165        table_id: TableId,
166        table_name: TableName,
167        resolve_strategy: ResolveStrategy,
168    ) -> Result<ProcedureId> {
169        let procedure = ReconcileTableProcedure::new(
170            self.context.clone(),
171            table_id,
172            table_name,
173            resolve_strategy,
174            false,
175        );
176        self.spawn_procedure(Box::new(procedure)).await
177    }
178
179    async fn reconcile_logical_tables(
180        &self,
181        physical_table_id: TableId,
182        physical_table_name: TableName,
183        logical_tables: Vec<(TableId, TableName)>,
184    ) -> Result<ProcedureId> {
185        let procedure = ReconcileLogicalTablesProcedure::new(
186            self.context.clone(),
187            physical_table_id,
188            physical_table_name,
189            logical_tables,
190            false,
191        );
192        self.spawn_procedure(Box::new(procedure)).await
193    }
194
195    /// Reconcile a catalog.
196    ///
197    /// Returns the procedure id of the reconciliation procedure.
198    pub async fn reconcile_catalog(
199        &self,
200        catalog: String,
201        resolve_strategy: ResolveStrategy,
202        parallelism: usize,
203    ) -> Result<ProcedureId> {
204        let parallelism = normalize_parallelism(parallelism);
205        let procedure = ReconcileCatalogProcedure::new(
206            self.context.clone(),
207            catalog,
208            false,
209            resolve_strategy,
210            parallelism,
211        );
212        self.spawn_procedure(Box::new(procedure)).await
213    }
214
215    async fn spawn_procedure(&self, procedure: BoxedProcedure) -> Result<ProcedureId> {
216        let procedure_manager = self.procedure_manager.clone();
217        let procedure_with_id = ProcedureWithId::with_random_id(procedure);
218        let procedure_id = procedure_with_id.id;
219        let mut watcher = procedure_manager
220            .submit(procedure_with_id)
221            .await
222            .context(error::SubmitProcedureSnafu)?;
223        common_runtime::spawn_global(async move {
224            if let Err(e) = watcher::wait(&mut watcher).await {
225                error!(e; "Failed to wait reconciliation procedure {procedure_id}");
226                return;
227            }
228
229            info!("Reconciliation procedure {procedure_id} is finished successfully!");
230        });
231        Ok(procedure_id)
232    }
233}
234
235fn normalize_parallelism(parallelism: usize) -> usize {
236    if parallelism == 0 {
237        warn!(
238            "Parallelism is 0, using default parallelism: {}",
239            DEFAULT_PARALLELISM
240        );
241        DEFAULT_PARALLELISM
242    } else {
243        parallelism
244    }
245}