standalone/
information_extension.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
18use client::SendableRecordBatchStream;
19use client::api::v1::meta::RegionRole;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{NodeInfo, NodeStatus};
22use common_meta::datanode::RegionStat;
23use common_meta::key::flow::flow_state::FlowStat;
24use common_meta::peer::Peer;
25use common_procedure::{ProcedureInfo, ProcedureManagerRef};
26use common_query::request::QueryRequest;
27use common_stat::{ResourceStatImpl, ResourceStatRef};
28use datanode::region_server::RegionServer;
29use flow::StreamingEngine;
30use snafu::ResultExt;
31use store_api::storage::RegionId;
32use tokio::sync::RwLock;
33
34pub struct StandaloneInformationExtension {
35    region_server: RegionServer,
36    procedure_manager: ProcedureManagerRef,
37    start_time_ms: u64,
38    flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
39    resource_stat: ResourceStatRef,
40}
41
42impl StandaloneInformationExtension {
43    pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
44        let mut resource_stat = ResourceStatImpl::default();
45        resource_stat.start_collect_cpu_usage();
46        Self {
47            region_server,
48            procedure_manager,
49            start_time_ms: common_time::util::current_time_millis() as u64,
50            flow_streaming_engine: RwLock::new(None),
51            resource_stat: Arc::new(resource_stat),
52        }
53    }
54
55    /// Set the flow streaming engine for the standalone instance.
56    pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
57        let mut guard = self.flow_streaming_engine.write().await;
58        *guard = Some(flow_streaming_engine);
59    }
60}
61
62#[async_trait::async_trait]
63impl InformationExtension for StandaloneInformationExtension {
64    type Error = catalog::error::Error;
65
66    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
67        let build_info = common_version::build_info();
68        let node_info = NodeInfo {
69            // For the standalone:
70            // - id always 0
71            // - empty string for peer_addr
72            peer: Peer {
73                id: 0,
74                addr: "".to_string(),
75            },
76            last_activity_ts: -1,
77            status: NodeStatus::Standalone,
78            version: build_info.version.to_string(),
79            git_commit: build_info.commit_short.to_string(),
80            // Use `self.start_time_ms` instead.
81            // It's not precise but enough.
82            start_time_ms: self.start_time_ms,
83            total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
84            total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
85            cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
86            memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
87            hostname: hostname::get()
88                .unwrap_or_default()
89                .to_string_lossy()
90                .to_string(),
91        };
92        Ok(vec![node_info])
93    }
94
95    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
96        self.procedure_manager
97            .list_procedures()
98            .await
99            .map_err(BoxedError::new)
100            .map(|procedures| {
101                procedures
102                    .into_iter()
103                    .map(|procedure| {
104                        let status = procedure.state.as_str_name().to_string();
105                        (status, procedure)
106                    })
107                    .collect::<Vec<_>>()
108            })
109            .context(catalog::error::ListProceduresSnafu)
110    }
111
112    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
113        let stats = self
114            .region_server
115            .reportable_regions()
116            .into_iter()
117            .map(|stat| {
118                let region_stat = self
119                    .region_server
120                    .region_statistic(stat.region_id)
121                    .unwrap_or_default();
122                RegionStat {
123                    id: stat.region_id,
124                    rcus: 0,
125                    wcus: 0,
126                    approximate_bytes: region_stat.estimated_disk_size(),
127                    engine: stat.engine,
128                    role: RegionRole::from(stat.role).into(),
129                    num_rows: region_stat.num_rows,
130                    memtable_size: region_stat.memtable_size,
131                    manifest_size: region_stat.manifest_size,
132                    sst_size: region_stat.sst_size,
133                    sst_num: region_stat.sst_num,
134                    index_size: region_stat.index_size,
135                    region_manifest: region_stat.manifest.into(),
136                    data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
137                    metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
138                    written_bytes: region_stat.written_bytes,
139                }
140            })
141            .collect::<Vec<_>>();
142        Ok(stats)
143    }
144
145    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
146        Ok(Some(
147            self.flow_streaming_engine
148                .read()
149                .await
150                .as_ref()
151                .unwrap()
152                .gen_state_report()
153                .await,
154        ))
155    }
156
157    async fn inspect_datanode(
158        &self,
159        request: DatanodeInspectRequest,
160    ) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
161        let req = QueryRequest {
162            plan: request
163                .build_plan()
164                .context(catalog::error::DatafusionSnafu)?,
165            region_id: RegionId::default(),
166            header: None,
167        };
168
169        self.region_server
170            .handle_read(req)
171            .await
172            .map_err(BoxedError::new)
173            .context(catalog::error::InternalSnafu)
174    }
175}