common_meta/
ddl.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::meta::ProcedureDetailResponse;
19use common_telemetry::tracing_context::W3cTrace;
20use store_api::storage::{RegionId, RegionNumber, TableId};
21
22use crate::cache_invalidator::CacheInvalidatorRef;
23use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
24use crate::ddl::table_meta::TableMetadataAllocatorRef;
25use crate::error::{Result, UnsupportedSnafu};
26use crate::key::flow::FlowMetadataManagerRef;
27use crate::key::table_route::PhysicalTableRouteValue;
28use crate::key::TableMetadataManagerRef;
29use crate::node_manager::NodeManagerRef;
30use crate::region_keeper::MemoryRegionKeeperRef;
31use crate::region_registry::LeaderRegionRegistryRef;
32use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
33use crate::rpc::procedure::{
34    AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
35    RemoveRegionFollowerRequest,
36};
37use crate::DatanodeId;
38
39pub mod alter_database;
40pub mod alter_logical_tables;
41pub mod alter_table;
42pub mod create_database;
43pub mod create_flow;
44pub mod create_logical_tables;
45pub mod create_table;
46mod create_table_template;
47pub mod create_view;
48pub mod drop_database;
49pub mod drop_flow;
50pub mod drop_table;
51pub mod drop_view;
52pub mod flow_meta;
53mod physical_table_metadata;
54pub mod table_meta;
55#[cfg(any(test, feature = "testing"))]
56pub mod test_util;
57#[cfg(test)]
58pub(crate) mod tests;
59pub mod truncate_table;
60pub mod utils;
61
62#[derive(Debug, Default)]
63pub struct ExecutorContext {
64    pub tracing_context: Option<W3cTrace>,
65}
66
67/// The procedure executor that accepts ddl, region migration task etc.
68#[async_trait::async_trait]
69pub trait ProcedureExecutor: Send + Sync {
70    /// Submit a ddl task
71    async fn submit_ddl_task(
72        &self,
73        ctx: &ExecutorContext,
74        request: SubmitDdlTaskRequest,
75    ) -> Result<SubmitDdlTaskResponse>;
76
77    /// Add a region follower
78    async fn add_region_follower(
79        &self,
80        _ctx: &ExecutorContext,
81        _request: AddRegionFollowerRequest,
82    ) -> Result<()> {
83        UnsupportedSnafu {
84            operation: "add_region_follower",
85        }
86        .fail()
87    }
88
89    /// Remove a region follower
90    async fn remove_region_follower(
91        &self,
92        _ctx: &ExecutorContext,
93        _request: RemoveRegionFollowerRequest,
94    ) -> Result<()> {
95        UnsupportedSnafu {
96            operation: "remove_region_follower",
97        }
98        .fail()
99    }
100
101    /// Submit a region migration task
102    async fn migrate_region(
103        &self,
104        ctx: &ExecutorContext,
105        request: MigrateRegionRequest,
106    ) -> Result<MigrateRegionResponse>;
107
108    /// Query the procedure state by its id
109    async fn query_procedure_state(
110        &self,
111        ctx: &ExecutorContext,
112        pid: &str,
113    ) -> Result<ProcedureStateResponse>;
114
115    async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
116}
117
118pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
119
120/// Metadata allocated to a table.
121#[derive(Default)]
122pub struct TableMetadata {
123    /// Table id.
124    pub table_id: TableId,
125    /// Route information for each region of the table.
126    pub table_route: PhysicalTableRouteValue,
127    /// The encoded wal options for regions of the table.
128    // If a region does not have an associated wal options, no key for the region would be found in the map.
129    pub region_wal_options: HashMap<RegionNumber, String>,
130}
131
132pub type RegionFailureDetectorControllerRef = Arc<dyn RegionFailureDetectorController>;
133
134pub type DetectingRegion = (DatanodeId, RegionId);
135
136/// Used for actively registering Region failure detectors.
137///
138/// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode.
139#[async_trait::async_trait]
140pub trait RegionFailureDetectorController: Send + Sync {
141    /// Registers failure detectors for the given identifiers.
142    async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
143
144    /// Deregisters failure detectors for the given identifiers.
145    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
146}
147
148/// A noop implementation of [`RegionFailureDetectorController`].
149#[derive(Debug, Clone)]
150pub struct NoopRegionFailureDetectorControl;
151
152#[async_trait::async_trait]
153impl RegionFailureDetectorController for NoopRegionFailureDetectorControl {
154    async fn register_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
155
156    async fn deregister_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
157}
158
159/// The context of ddl.
160#[derive(Clone)]
161pub struct DdlContext {
162    /// Sends querying and requests to nodes.
163    pub node_manager: NodeManagerRef,
164    /// Cache invalidation.
165    pub cache_invalidator: CacheInvalidatorRef,
166    /// Keep tracking operating regions.
167    pub memory_region_keeper: MemoryRegionKeeperRef,
168    /// The leader region registry.
169    pub leader_region_registry: LeaderRegionRegistryRef,
170    /// Table metadata manager.
171    pub table_metadata_manager: TableMetadataManagerRef,
172    /// Allocator for table metadata.
173    pub table_metadata_allocator: TableMetadataAllocatorRef,
174    /// Flow metadata manager.
175    pub flow_metadata_manager: FlowMetadataManagerRef,
176    /// Allocator for flow metadata.
177    pub flow_metadata_allocator: FlowMetadataAllocatorRef,
178    /// controller of region failure detector.
179    pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
180}
181
182impl DdlContext {
183    /// Notifies the RegionSupervisor to register failure detector of new created regions.
184    ///
185    /// The datanode may crash without sending a heartbeat that contains information about newly created regions,
186    /// which may prevent the RegionSupervisor from detecting failures in these newly created regions.
187    pub async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
188        self.region_failure_detector_controller
189            .register_failure_detectors(detecting_regions)
190            .await;
191    }
192
193    /// Notifies the RegionSupervisor to remove failure detectors.
194    ///
195    /// Once the regions were dropped, subsequent heartbeats no longer include these regions.
196    /// Therefore, we should remove the failure detectors for these dropped regions.
197    async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
198        self.region_failure_detector_controller
199            .deregister_failure_detectors(detecting_regions)
200            .await;
201    }
202}