catalog/
information_extension.rs1use api::v1::meta::ProcedureStatus;
16use common_error::ext::BoxedError;
17use common_meta::cluster::{ClusterInfo, NodeInfo};
18use common_meta::datanode::RegionStat;
19use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
20use common_meta::key::flow::flow_state::FlowStat;
21use common_meta::rpc::procedure;
22use common_procedure::{ProcedureInfo, ProcedureState};
23use meta_client::MetaClientRef;
24use snafu::ResultExt;
25
26use crate::error;
27use crate::information_schema::InformationExtension;
28
29pub struct DistributedInformationExtension {
30 meta_client: MetaClientRef,
31}
32
33impl DistributedInformationExtension {
34 pub fn new(meta_client: MetaClientRef) -> Self {
35 Self { meta_client }
36 }
37}
38
39#[async_trait::async_trait]
40impl InformationExtension for DistributedInformationExtension {
41 type Error = crate::error::Error;
42
43 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
44 self.meta_client
45 .list_nodes(None)
46 .await
47 .map_err(BoxedError::new)
48 .context(error::ListNodesSnafu)
49 }
50
51 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
52 let procedures = self
53 .meta_client
54 .list_procedures(&ExecutorContext::default())
55 .await
56 .map_err(BoxedError::new)
57 .context(error::ListProceduresSnafu)?
58 .procedures;
59 let mut result = Vec::with_capacity(procedures.len());
60 for procedure in procedures {
61 let pid = match procedure.id {
62 Some(pid) => pid,
63 None => return error::ProcedureIdNotFoundSnafu {}.fail(),
64 };
65 let pid = procedure::pb_pid_to_pid(&pid)
66 .map_err(BoxedError::new)
67 .context(error::ConvertProtoDataSnafu)?;
68 let status = ProcedureStatus::try_from(procedure.status)
69 .map(|v| v.as_str_name())
70 .unwrap_or("Unknown")
71 .to_string();
72 let procedure_info = ProcedureInfo {
73 id: pid,
74 type_name: procedure.type_name,
75 start_time_ms: procedure.start_time_ms,
76 end_time_ms: procedure.end_time_ms,
77 state: ProcedureState::Running,
78 lock_keys: procedure.lock_keys,
79 };
80 result.push((status, procedure_info));
81 }
82
83 Ok(result)
84 }
85
86 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
87 self.meta_client
88 .list_region_stats()
89 .await
90 .map_err(BoxedError::new)
91 .context(error::ListRegionStatsSnafu)
92 }
93
94 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
95 self.meta_client
96 .list_flow_stats()
97 .await
98 .map_err(BoxedError::new)
99 .context(crate::error::ListFlowStatsSnafu)
100 }
101}