catalog/
information_extension.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::ProcedureStatus;
16use common_error::ext::BoxedError;
17use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
18use common_meta::datanode::RegionStat;
19use common_meta::key::flow::flow_state::FlowStat;
20use common_meta::node_manager::DatanodeManagerRef;
21use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
22use common_meta::rpc::procedure;
23use common_procedure::{ProcedureInfo, ProcedureState};
24use common_query::request::QueryRequest;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::util::ChainedRecordBatchStream;
27use meta_client::MetaClientRef;
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30
31use crate::error;
32use crate::information_schema::{DatanodeInspectRequest, InformationExtension};
33
34pub struct DistributedInformationExtension {
35    meta_client: MetaClientRef,
36    datanode_manager: DatanodeManagerRef,
37}
38
39impl DistributedInformationExtension {
40    pub fn new(meta_client: MetaClientRef, datanode_manager: DatanodeManagerRef) -> Self {
41        Self {
42            meta_client,
43            datanode_manager,
44        }
45    }
46}
47
48#[async_trait::async_trait]
49impl InformationExtension for DistributedInformationExtension {
50    type Error = crate::error::Error;
51
52    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
53        self.meta_client
54            .list_nodes(None)
55            .await
56            .map_err(BoxedError::new)
57            .context(error::ListNodesSnafu)
58    }
59
60    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
61        let procedures = self
62            .meta_client
63            .list_procedures(&ExecutorContext::default())
64            .await
65            .map_err(BoxedError::new)
66            .context(error::ListProceduresSnafu)?
67            .procedures;
68        let mut result = Vec::with_capacity(procedures.len());
69        for procedure in procedures {
70            let pid = match procedure.id {
71                Some(pid) => pid,
72                None => return error::ProcedureIdNotFoundSnafu {}.fail(),
73            };
74            let pid = procedure::pb_pid_to_pid(&pid)
75                .map_err(BoxedError::new)
76                .context(error::ConvertProtoDataSnafu)?;
77            let status = ProcedureStatus::try_from(procedure.status)
78                .map(|v| v.as_str_name())
79                .unwrap_or("Unknown")
80                .to_string();
81            let procedure_info = ProcedureInfo {
82                id: pid,
83                type_name: procedure.type_name,
84                start_time_ms: procedure.start_time_ms,
85                end_time_ms: procedure.end_time_ms,
86                state: ProcedureState::Running,
87                lock_keys: procedure.lock_keys,
88            };
89            result.push((status, procedure_info));
90        }
91
92        Ok(result)
93    }
94
95    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
96        self.meta_client
97            .list_region_stats()
98            .await
99            .map_err(BoxedError::new)
100            .context(error::ListRegionStatsSnafu)
101    }
102
103    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
104        self.meta_client
105            .list_flow_stats()
106            .await
107            .map_err(BoxedError::new)
108            .context(crate::error::ListFlowStatsSnafu)
109    }
110
111    async fn inspect_datanode(
112        &self,
113        request: DatanodeInspectRequest,
114    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
115        // Aggregate results from all datanodes
116        let nodes = self
117            .meta_client
118            .list_nodes(Some(Role::Datanode))
119            .await
120            .map_err(BoxedError::new)
121            .context(crate::error::ListNodesSnafu)?;
122
123        let plan = request
124            .build_plan()
125            .context(crate::error::DatafusionSnafu)?;
126
127        let mut streams = Vec::with_capacity(nodes.len());
128        for node in nodes {
129            let client = self.datanode_manager.datanode(&node.peer).await;
130            let stream = client
131                .handle_query(QueryRequest {
132                    plan: plan.clone(),
133                    region_id: RegionId::default(),
134                    header: None,
135                })
136                .await
137                .context(crate::error::HandleQuerySnafu)?;
138            streams.push(stream);
139        }
140
141        let chained =
142            ChainedRecordBatchStream::new(streams).context(crate::error::CreateRecordBatchSnafu)?;
143        Ok(Box::pin(chained))
144    }
145}