common_meta/reconciliation/reconcile_database/
reconcile_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
18use common_telemetry::info;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use store_api::storage::TableId;
22use table::table_name::TableName;
23use table::table_reference::TableReference;
24
25use crate::error::Result;
26use crate::key::table_route::TableRouteValue;
27use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables;
28use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
29use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
30use crate::reconciliation::utils::{Context, SubprocedureMeta};
31
32#[derive(Debug, Serialize, Deserialize)]
33pub(crate) struct ReconcileTables;
34
35#[async_trait::async_trait]
36#[typetag::serde]
37impl State for ReconcileTables {
38    async fn next(
39        &mut self,
40        ctx: &mut ReconcileDatabaseContext,
41        procedure_ctx: &ProcedureContext,
42    ) -> Result<(Box<dyn State>, Status)> {
43        info!(
44            "Reconcile tables in database: {}, catalog: {}, inflight_subprocedures: {}",
45            ctx.persistent_ctx.schema,
46            ctx.persistent_ctx.catalog,
47            ctx.volatile_ctx.inflight_subprocedures.len()
48        );
49        // Waits for inflight subprocedures first.
50        ctx.wait_for_inflight_subprocedures(procedure_ctx).await?;
51
52        let catalog = &ctx.persistent_ctx.catalog;
53        let schema = &ctx.persistent_ctx.schema;
54        let parallelism = ctx.persistent_ctx.parallelism;
55        if ctx.volatile_ctx.tables.as_deref().is_none() {
56            let tables = ctx
57                .table_metadata_manager
58                .table_name_manager()
59                .tables(catalog, schema);
60            ctx.volatile_ctx.tables = Some(tables);
61        }
62
63        let pending_tables = &mut ctx.volatile_ctx.pending_tables;
64        // Safety: must exists.
65        while let Some((table_name, table_name_value)) =
66            ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await?
67        {
68            let table_id = table_name_value.table_id();
69            let Some(table_route) = ctx
70                .table_metadata_manager
71                .table_route_manager()
72                .table_route_storage()
73                .get(table_id)
74                .await?
75            else {
76                continue;
77            };
78
79            let table_ref = TableReference::full(catalog, schema, &table_name);
80            // Enqueue table.
81            Self::enqueue_table(pending_tables, table_id, table_ref, table_route);
82            // Schedule reconcile table procedures if the number of pending procedures
83            // is greater than or equal to parallelism.
84            if Self::should_schedule_reconcile_tables(pending_tables, parallelism) {
85                return Self::schedule_reconcile_tables(ctx);
86            }
87        }
88
89        // If there are remaining tables, schedule reconcile table procedures.
90        if !pending_tables.is_empty() {
91            return Self::schedule_reconcile_tables(ctx);
92        }
93        ctx.volatile_ctx.tables.take();
94        Ok((Box::new(ReconcileLogicalTables), Status::executing(true)))
95    }
96
97    fn as_any(&self) -> &dyn Any {
98        self
99    }
100}
101
102impl ReconcileTables {
103    fn schedule_reconcile_tables(
104        ctx: &mut ReconcileDatabaseContext,
105    ) -> Result<(Box<dyn State>, Status)> {
106        let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables);
107        let (procedures, meta): (Vec<_>, Vec<_>) =
108            Self::build_reconcile_table_procedures(ctx, tables)
109                .into_iter()
110                .unzip();
111        ctx.volatile_ctx.inflight_subprocedures.extend(meta);
112        Ok((
113            Box::new(ReconcileTables),
114            Status::suspended(procedures, false),
115        ))
116    }
117
118    fn should_schedule_reconcile_tables(
119        pending_tables: &[(TableId, TableName)],
120        parallelism: usize,
121    ) -> bool {
122        pending_tables.len() >= parallelism
123    }
124
125    fn build_reconcile_table_procedures(
126        ctx: &ReconcileDatabaseContext,
127        tables: Vec<(TableId, TableName)>,
128    ) -> Vec<(ProcedureWithId, SubprocedureMeta)> {
129        let mut procedures = Vec::with_capacity(tables.len());
130        for (table_id, table_name) in tables {
131            let context = Context {
132                node_manager: ctx.node_manager.clone(),
133                table_metadata_manager: ctx.table_metadata_manager.clone(),
134                cache_invalidator: ctx.cache_invalidator.clone(),
135            };
136            let procedure = ReconcileTableProcedure::new(
137                context,
138                table_id,
139                table_name.clone(),
140                ctx.persistent_ctx.resolve_strategy,
141                true,
142            );
143            let procedure = ProcedureWithId::with_random_id(Box::new(procedure));
144            let meta =
145                SubprocedureMeta::new_physical_table(procedure.id, table_id, table_name.clone());
146            info!(
147                "Reconcile table: {}, table_id: {}, procedure_id: {}",
148                table_name, table_id, procedure.id
149            );
150            procedures.push((procedure, meta));
151        }
152
153        procedures
154    }
155
156    fn enqueue_table(
157        tables: &mut Vec<(TableId, TableName)>,
158        table_id: TableId,
159        table_ref: TableReference<'_>,
160        table_route: TableRouteValue,
161    ) {
162        if table_route.is_physical() {
163            tables.push((table_id, table_ref.into()));
164        }
165    }
166}