catalog/
information_extension.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ProcedureStatus;
16use common_error::ext::BoxedError;
17use common_meta::cluster::{ClusterInfo, NodeInfo};
18use common_meta::datanode::RegionStat;
19use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
20use common_meta::key::flow::flow_state::FlowStat;
21use common_meta::rpc::procedure;
22use common_procedure::{ProcedureInfo, ProcedureState};
23use meta_client::MetaClientRef;
24use snafu::ResultExt;
25
26use crate::error;
27use crate::information_schema::InformationExtension;
28
29pub struct DistributedInformationExtension {
30    meta_client: MetaClientRef,
31}
32
33impl DistributedInformationExtension {
34    pub fn new(meta_client: MetaClientRef) -> Self {
35        Self { meta_client }
36    }
37}
38
39#[async_trait::async_trait]
40impl InformationExtension for DistributedInformationExtension {
41    type Error = crate::error::Error;
42
43    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
44        self.meta_client
45            .list_nodes(None)
46            .await
47            .map_err(BoxedError::new)
48            .context(error::ListNodesSnafu)
49    }
50
51    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
52        let procedures = self
53            .meta_client
54            .list_procedures(&ExecutorContext::default())
55            .await
56            .map_err(BoxedError::new)
57            .context(error::ListProceduresSnafu)?
58            .procedures;
59        let mut result = Vec::with_capacity(procedures.len());
60        for procedure in procedures {
61            let pid = match procedure.id {
62                Some(pid) => pid,
63                None => return error::ProcedureIdNotFoundSnafu {}.fail(),
64            };
65            let pid = procedure::pb_pid_to_pid(&pid)
66                .map_err(BoxedError::new)
67                .context(error::ConvertProtoDataSnafu)?;
68            let status = ProcedureStatus::try_from(procedure.status)
69                .map(|v| v.as_str_name())
70                .unwrap_or("Unknown")
71                .to_string();
72            let procedure_info = ProcedureInfo {
73                id: pid,
74                type_name: procedure.type_name,
75                start_time_ms: procedure.start_time_ms,
76                end_time_ms: procedure.end_time_ms,
77                state: ProcedureState::Running,
78                lock_keys: procedure.lock_keys,
79            };
80            result.push((status, procedure_info));
81        }
82
83        Ok(result)
84    }
85
86    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
87        self.meta_client
88            .list_region_stats()
89            .await
90            .map_err(BoxedError::new)
91            .context(error::ListRegionStatsSnafu)
92    }
93
94    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
95        self.meta_client
96            .list_flow_stats()
97            .await
98            .map_err(BoxedError::new)
99            .context(crate::error::ListFlowStatsSnafu)
100    }
101}