standalone/
information_extension.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
18use client::SendableRecordBatchStream;
19use client::api::v1::meta::RegionRole;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{NodeInfo, NodeStatus};
22use common_meta::datanode::RegionStat;
23use common_meta::key::flow::flow_state::FlowStat;
24use common_meta::peer::Peer;
25use common_procedure::{ProcedureInfo, ProcedureManagerRef};
26use common_query::request::QueryRequest;
27use datanode::region_server::RegionServer;
28use flow::StreamingEngine;
29use snafu::ResultExt;
30use store_api::storage::RegionId;
31use tokio::sync::RwLock;
32
33pub struct StandaloneInformationExtension {
34    region_server: RegionServer,
35    procedure_manager: ProcedureManagerRef,
36    start_time_ms: u64,
37    flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
38}
39
40impl StandaloneInformationExtension {
41    pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
42        Self {
43            region_server,
44            procedure_manager,
45            start_time_ms: common_time::util::current_time_millis() as u64,
46            flow_streaming_engine: RwLock::new(None),
47        }
48    }
49
50    /// Set the flow streaming engine for the standalone instance.
51    pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
52        let mut guard = self.flow_streaming_engine.write().await;
53        *guard = Some(flow_streaming_engine);
54    }
55}
56
57#[async_trait::async_trait]
58impl InformationExtension for StandaloneInformationExtension {
59    type Error = catalog::error::Error;
60
61    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
62        let build_info = common_version::build_info();
63        let node_info = NodeInfo {
64            // For the standalone:
65            // - id always 0
66            // - empty string for peer_addr
67            peer: Peer {
68                id: 0,
69                addr: "".to_string(),
70            },
71            last_activity_ts: -1,
72            status: NodeStatus::Standalone,
73            version: build_info.version.to_string(),
74            git_commit: build_info.commit_short.to_string(),
75            // Use `self.start_time_ms` instead.
76            // It's not precise but enough.
77            start_time_ms: self.start_time_ms,
78            cpus: common_config::utils::get_cpus() as u32,
79            memory_bytes: common_config::utils::get_sys_total_memory()
80                .unwrap_or_default()
81                .as_bytes(),
82        };
83        Ok(vec![node_info])
84    }
85
86    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
87        self.procedure_manager
88            .list_procedures()
89            .await
90            .map_err(BoxedError::new)
91            .map(|procedures| {
92                procedures
93                    .into_iter()
94                    .map(|procedure| {
95                        let status = procedure.state.as_str_name().to_string();
96                        (status, procedure)
97                    })
98                    .collect::<Vec<_>>()
99            })
100            .context(catalog::error::ListProceduresSnafu)
101    }
102
103    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
104        let stats = self
105            .region_server
106            .reportable_regions()
107            .into_iter()
108            .map(|stat| {
109                let region_stat = self
110                    .region_server
111                    .region_statistic(stat.region_id)
112                    .unwrap_or_default();
113                RegionStat {
114                    id: stat.region_id,
115                    rcus: 0,
116                    wcus: 0,
117                    approximate_bytes: region_stat.estimated_disk_size(),
118                    engine: stat.engine,
119                    role: RegionRole::from(stat.role).into(),
120                    num_rows: region_stat.num_rows,
121                    memtable_size: region_stat.memtable_size,
122                    manifest_size: region_stat.manifest_size,
123                    sst_size: region_stat.sst_size,
124                    sst_num: region_stat.sst_num,
125                    index_size: region_stat.index_size,
126                    region_manifest: region_stat.manifest.into(),
127                    data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
128                    metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
129                    written_bytes: region_stat.written_bytes,
130                }
131            })
132            .collect::<Vec<_>>();
133        Ok(stats)
134    }
135
136    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
137        Ok(Some(
138            self.flow_streaming_engine
139                .read()
140                .await
141                .as_ref()
142                .unwrap()
143                .gen_state_report()
144                .await,
145        ))
146    }
147
148    async fn inspect_datanode(
149        &self,
150        request: DatanodeInspectRequest,
151    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
152        let req = QueryRequest {
153            plan: request
154                .build_plan()
155                .context(catalog::error::DatafusionSnafu)?,
156            region_id: RegionId::default(),
157            header: None,
158        };
159
160        self.region_server
161            .handle_read(req)
162            .await
163            .map_err(BoxedError::new)
164            .context(catalog::error::InternalSnafu)
165    }
166}