catalog/
information_extension.rs1use api::v1::meta::ProcedureStatus;
16use common_error::ext::BoxedError;
17use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
18use common_meta::datanode::RegionStat;
19use common_meta::key::flow::flow_state::FlowStat;
20use common_meta::node_manager::DatanodeManagerRef;
21use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
22use common_meta::rpc::procedure;
23use common_procedure::{ProcedureInfo, ProcedureState};
24use common_query::request::QueryRequest;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::util::ChainedRecordBatchStream;
27use meta_client::MetaClientRef;
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30
31use crate::error;
32use crate::information_schema::{DatanodeInspectRequest, InformationExtension};
33
34pub struct DistributedInformationExtension {
35 meta_client: MetaClientRef,
36 datanode_manager: DatanodeManagerRef,
37}
38
39impl DistributedInformationExtension {
40 pub fn new(meta_client: MetaClientRef, datanode_manager: DatanodeManagerRef) -> Self {
41 Self {
42 meta_client,
43 datanode_manager,
44 }
45 }
46}
47
48#[async_trait::async_trait]
49impl InformationExtension for DistributedInformationExtension {
50 type Error = crate::error::Error;
51
52 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
53 self.meta_client
54 .list_nodes(None)
55 .await
56 .map_err(BoxedError::new)
57 .context(error::ListNodesSnafu)
58 }
59
60 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
61 let procedures = self
62 .meta_client
63 .list_procedures(&ExecutorContext::default())
64 .await
65 .map_err(BoxedError::new)
66 .context(error::ListProceduresSnafu)?
67 .procedures;
68 let mut result = Vec::with_capacity(procedures.len());
69 for procedure in procedures {
70 let pid = match procedure.id {
71 Some(pid) => pid,
72 None => return error::ProcedureIdNotFoundSnafu {}.fail(),
73 };
74 let pid = procedure::pb_pid_to_pid(&pid)
75 .map_err(BoxedError::new)
76 .context(error::ConvertProtoDataSnafu)?;
77 let status = ProcedureStatus::try_from(procedure.status)
78 .map(|v| v.as_str_name())
79 .unwrap_or("Unknown")
80 .to_string();
81 let procedure_info = ProcedureInfo {
82 id: pid,
83 type_name: procedure.type_name,
84 start_time_ms: procedure.start_time_ms,
85 end_time_ms: procedure.end_time_ms,
86 state: ProcedureState::Running,
87 lock_keys: procedure.lock_keys,
88 };
89 result.push((status, procedure_info));
90 }
91
92 Ok(result)
93 }
94
95 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
96 self.meta_client
97 .list_region_stats()
98 .await
99 .map_err(BoxedError::new)
100 .context(error::ListRegionStatsSnafu)
101 }
102
103 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
104 self.meta_client
105 .list_flow_stats()
106 .await
107 .map_err(BoxedError::new)
108 .context(crate::error::ListFlowStatsSnafu)
109 }
110
111 async fn inspect_datanode(
112 &self,
113 request: DatanodeInspectRequest,
114 ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
115 let nodes = self
117 .meta_client
118 .list_nodes(Some(Role::Datanode))
119 .await
120 .map_err(BoxedError::new)
121 .context(crate::error::ListNodesSnafu)?;
122
123 let plan = request
124 .build_plan()
125 .context(crate::error::DatafusionSnafu)?;
126
127 let mut streams = Vec::with_capacity(nodes.len());
128 for node in nodes {
129 let client = self.datanode_manager.datanode(&node.peer).await;
130 let stream = client
131 .handle_query(QueryRequest {
132 plan: plan.clone(),
133 region_id: RegionId::default(),
134 header: None,
135 })
136 .await
137 .context(crate::error::HandleQuerySnafu)?;
138 streams.push(stream);
139 }
140
141 let chained =
142 ChainedRecordBatchStream::new(streams).context(crate::error::CreateRecordBatchSnafu)?;
143 Ok(Box::pin(chained))
144 }
145}