common_meta/reconciliation/reconcile_database/
reconcile_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
19use common_telemetry::info;
20use futures::TryStreamExt;
21use serde::{Deserialize, Serialize};
22use snafu::OptionExt;
23use table::metadata::TableId;
24use table::table_name::TableName;
25use table::table_reference::TableReference;
26
27use crate::error::{Result, TableInfoNotFoundSnafu};
28use crate::key::table_route::TableRouteValue;
29use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd;
30use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
31use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
32use crate::reconciliation::utils::{Context, SubprocedureMeta};
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct ReconcileLogicalTables;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for ReconcileLogicalTables {
40    async fn next(
41        &mut self,
42        ctx: &mut ReconcileDatabaseContext,
43        procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        info!(
46            "Reconcile logical tables in database: {}, catalog: {}, inflight_subprocedures: {}",
47            ctx.persistent_ctx.schema,
48            ctx.persistent_ctx.catalog,
49            ctx.volatile_ctx.inflight_subprocedures.len()
50        );
51        // Waits for inflight subprocedures first.
52        ctx.wait_for_inflight_subprocedures(procedure_ctx).await?;
53
54        let catalog = &ctx.persistent_ctx.catalog;
55        let schema = &ctx.persistent_ctx.schema;
56        let parallelism = ctx.persistent_ctx.parallelism;
57        if ctx.volatile_ctx.tables.as_deref().is_none() {
58            let tables = ctx
59                .table_metadata_manager
60                .table_name_manager()
61                .tables(catalog, schema);
62            ctx.volatile_ctx.tables = Some(tables);
63        }
64
65        let pending_logical_tables = &mut ctx.volatile_ctx.pending_logical_tables;
66        let mut pending_procedures = Vec::with_capacity(parallelism);
67        let context = Context {
68            node_manager: ctx.node_manager.clone(),
69            table_metadata_manager: ctx.table_metadata_manager.clone(),
70            cache_invalidator: ctx.cache_invalidator.clone(),
71        };
72        // Safety: initialized above.
73        while let Some((table_name, table_name_value)) =
74            ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await?
75        {
76            let table_id = table_name_value.table_id();
77            let Some(table_route) = ctx
78                .table_metadata_manager
79                .table_route_manager()
80                .table_route_storage()
81                .get(table_id)
82                .await?
83            else {
84                continue;
85            };
86
87            let table_ref = TableReference::full(catalog, schema, &table_name);
88            Self::enqueue_logical_table(pending_logical_tables, table_id, table_ref, table_route);
89            // Try to build reconcile logical tables procedure.
90            if let Some(procedure) = Self::try_build_reconcile_logical_tables_procedure(
91                &context,
92                pending_logical_tables,
93                parallelism,
94            )
95            .await?
96            {
97                pending_procedures.push(procedure);
98            }
99            // Schedule reconcile logical tables procedures if the number of pending procedures
100            // is greater than or equal to parallelism.
101            if Self::should_schedule_reconcile_logical_tables(&pending_procedures, parallelism) {
102                return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures);
103            }
104        }
105
106        // Build remaining procedures.
107        Self::build_remaining_procedures(
108            &context,
109            pending_logical_tables,
110            &mut pending_procedures,
111            parallelism,
112        )
113        .await?;
114        // If there are remaining procedures, schedule reconcile logical tables procedures.
115        if !pending_procedures.is_empty() {
116            return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures);
117        }
118
119        ctx.volatile_ctx.tables.take();
120        Ok((Box::new(ReconcileDatabaseEnd), Status::executing(true)))
121    }
122
123    fn as_any(&self) -> &dyn Any {
124        self
125    }
126}
127
128impl ReconcileLogicalTables {
129    fn schedule_reconcile_logical_tables(
130        ctx: &mut ReconcileDatabaseContext,
131        buffer: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
132    ) -> Result<(Box<dyn State>, Status)> {
133        let buffer = std::mem::take(buffer);
134        let (procedures, meta): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
135
136        ctx.volatile_ctx.inflight_subprocedures.extend(meta);
137        Ok((
138            Box::new(ReconcileLogicalTables),
139            Status::suspended(procedures, false),
140        ))
141    }
142
143    fn should_schedule_reconcile_logical_tables(
144        buffer: &[(ProcedureWithId, SubprocedureMeta)],
145        parallelism: usize,
146    ) -> bool {
147        buffer.len() >= parallelism
148    }
149
150    async fn try_build_reconcile_logical_tables_procedure(
151        ctx: &Context,
152        pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
153        parallelism: usize,
154    ) -> Result<Option<(ProcedureWithId, SubprocedureMeta)>> {
155        let mut physical_table_id = None;
156        for (table_id, tables) in pending_logical_tables.iter() {
157            if tables.len() >= parallelism {
158                physical_table_id = Some(*table_id);
159                break;
160            }
161        }
162
163        if let Some(physical_table_id) = physical_table_id {
164            // Safety: Checked above.
165            let tables = pending_logical_tables.remove(&physical_table_id).unwrap();
166            return Ok(Some(
167                Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables)
168                    .await?,
169            ));
170        }
171
172        Ok(None)
173    }
174
175    async fn build_remaining_procedures(
176        ctx: &Context,
177        pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
178        pending_procedures: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
179        parallelism: usize,
180    ) -> Result<()> {
181        if pending_logical_tables.is_empty() {
182            return Ok(());
183        }
184
185        while let Some(physical_table_id) = pending_logical_tables.keys().next().cloned() {
186            if pending_procedures.len() >= parallelism {
187                return Ok(());
188            }
189
190            // Safety: Checked above.
191            let tables = pending_logical_tables.remove(&physical_table_id).unwrap();
192            pending_procedures.push(
193                Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables)
194                    .await?,
195            );
196        }
197
198        Ok(())
199    }
200
201    async fn build_reconcile_logical_tables_procedure(
202        ctx: &Context,
203        physical_table_id: TableId,
204        logical_tables: Vec<(TableId, TableName)>,
205    ) -> Result<(ProcedureWithId, SubprocedureMeta)> {
206        let table_info = ctx
207            .table_metadata_manager
208            .table_info_manager()
209            .get(physical_table_id)
210            .await?
211            .context(TableInfoNotFoundSnafu {
212                table: format!("table_id: {}", physical_table_id),
213            })?;
214
215        let physical_table_name = table_info.table_name();
216        let procedure = ReconcileLogicalTablesProcedure::new(
217            ctx.clone(),
218            physical_table_id,
219            physical_table_name.clone(),
220            logical_tables.clone(),
221            true,
222        );
223        let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
224        let subprocedure_meta = SubprocedureMeta::new_logical_table(
225            procedure_with_id.id,
226            physical_table_id,
227            physical_table_name,
228            logical_tables,
229        );
230        Ok((procedure_with_id, subprocedure_meta))
231    }
232
233    fn enqueue_logical_table(
234        tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
235        table_id: TableId,
236        table_ref: TableReference<'_>,
237        table_route: TableRouteValue,
238    ) {
239        if !table_route.is_physical() {
240            let logical_table_route = table_route.into_logical_table_route();
241            let physical_table_id = logical_table_route.physical_table_id();
242            tables
243                .entry(physical_table_id)
244                .or_default()
245                .push((table_id, table_ref.into()));
246        }
247    }
248}