catalog/system_schema/information_schema/
cluster_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16use std::time::Duration;
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_time::timestamp::Timestamp;
25use common_workload::DatanodeWorkloadType;
26use datafusion::execution::TaskContext;
27use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
30use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::timestamp::TimestampMillisecond;
33use datatypes::value::Value;
34use datatypes::vectors::{
35    Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
36};
37use serde::Serialize;
38use snafu::ResultExt;
39use store_api::storage::{ScanRequest, TableId};
40
41use crate::CatalogManager;
42use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
43use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
44use crate::system_schema::utils;
45
46const PEER_TYPE_FRONTEND: &str = "FRONTEND";
47const PEER_TYPE_METASRV: &str = "METASRV";
48
49const PEER_ID: &str = "peer_id";
50const PEER_TYPE: &str = "peer_type";
51const PEER_ADDR: &str = "peer_addr";
52const PEER_HOSTNAME: &str = "peer_hostname";
53const TOTAL_CPU_MILLICORES: &str = "total_cpu_millicores";
54const TOTAL_MEMORY_BYTES: &str = "total_memory_bytes";
55const CPU_USAGE_MILLICORES: &str = "cpu_usage_millicores";
56const MEMORY_USAGE_BYTES: &str = "memory_usage_bytes";
57const VERSION: &str = "version";
58const GIT_COMMIT: &str = "git_commit";
59const START_TIME: &str = "start_time";
60const UPTIME: &str = "uptime";
61const ACTIVE_TIME: &str = "active_time";
62const NODE_STATUS: &str = "node_status";
63
64const INIT_CAPACITY: usize = 42;
65
66/// The `CLUSTER_INFO` table provides information about the current topology information of the cluster.
67///
68/// - `peer_id`: the peer server id.
69/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
70/// - `peer_addr`: the peer gRPC address.
71/// - `peer_hostname`: the hostname of the peer.
72/// - `total_cpu_millicores`: the total CPU millicores of the peer.
73/// - `total_memory_bytes`: the total memory bytes of the peer.
74/// - `cpu_usage_millicores`: the CPU usage millicores of the peer.
75/// - `memory_usage_bytes`: the memory usage bytes of the peer.
76/// - `version`: the build package version of the peer.
77/// - `git_commit`: the build git commit hash of the peer.
78/// - `start_time`: the starting time of the peer.
79/// - `uptime`: the uptime of the peer.
80/// - `active_time`: the time since the last activity of the peer.
81/// - `node_status`: the status info of the peer.
82///
83#[derive(Debug)]
84pub(super) struct InformationSchemaClusterInfo {
85    schema: SchemaRef,
86    catalog_manager: Weak<dyn CatalogManager>,
87}
88
89impl InformationSchemaClusterInfo {
90    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
91        Self {
92            schema: Self::schema(),
93            catalog_manager,
94        }
95    }
96
97    pub(crate) fn schema() -> SchemaRef {
98        Arc::new(Schema::new(vec![
99            ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
100            ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
101            ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
102            ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true),
103            ColumnSchema::new(
104                TOTAL_CPU_MILLICORES,
105                ConcreteDataType::int64_datatype(),
106                false,
107            ),
108            ColumnSchema::new(
109                TOTAL_MEMORY_BYTES,
110                ConcreteDataType::int64_datatype(),
111                false,
112            ),
113            ColumnSchema::new(
114                CPU_USAGE_MILLICORES,
115                ConcreteDataType::int64_datatype(),
116                false,
117            ),
118            ColumnSchema::new(
119                MEMORY_USAGE_BYTES,
120                ConcreteDataType::int64_datatype(),
121                false,
122            ),
123            ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
124            ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
125            ColumnSchema::new(
126                START_TIME,
127                ConcreteDataType::timestamp_millisecond_datatype(),
128                true,
129            ),
130            ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
131            ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
132            ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
133        ]))
134    }
135
136    fn builder(&self) -> InformationSchemaClusterInfoBuilder {
137        InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
138    }
139}
140
141impl InformationTable for InformationSchemaClusterInfo {
142    fn table_id(&self) -> TableId {
143        INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
144    }
145
146    fn table_name(&self) -> &'static str {
147        CLUSTER_INFO
148    }
149
150    fn schema(&self) -> SchemaRef {
151        self.schema.clone()
152    }
153
154    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
155        let schema = self.schema.arrow_schema().clone();
156        let mut builder = self.builder();
157        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
158            schema,
159            futures::stream::once(async move {
160                builder
161                    .make_cluster_info(Some(request))
162                    .await
163                    .map(|x| x.into_df_record_batch())
164                    .map_err(Into::into)
165            }),
166        ));
167        Ok(Box::pin(
168            RecordBatchStreamAdapter::try_new(stream)
169                .map_err(BoxedError::new)
170                .context(InternalSnafu)?,
171        ))
172    }
173}
174
175struct InformationSchemaClusterInfoBuilder {
176    schema: SchemaRef,
177    catalog_manager: Weak<dyn CatalogManager>,
178
179    peer_ids: Int64VectorBuilder,
180    peer_types: StringVectorBuilder,
181    peer_addrs: StringVectorBuilder,
182    peer_hostnames: StringVectorBuilder,
183    total_cpu_millicores: Int64VectorBuilder,
184    total_memory_bytes: Int64VectorBuilder,
185    cpu_usage_millicores: Int64VectorBuilder,
186    memory_usage_bytes: Int64VectorBuilder,
187    versions: StringVectorBuilder,
188    git_commits: StringVectorBuilder,
189    start_times: TimestampMillisecondVectorBuilder,
190    uptimes: StringVectorBuilder,
191    active_times: StringVectorBuilder,
192    node_status: StringVectorBuilder,
193}
194
195impl InformationSchemaClusterInfoBuilder {
196    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
197        Self {
198            schema,
199            catalog_manager,
200            peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
201            peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
202            peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
203            peer_hostnames: StringVectorBuilder::with_capacity(INIT_CAPACITY),
204            total_cpu_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
205            total_memory_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
206            cpu_usage_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
207            memory_usage_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
208            versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
209            git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
210            start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
211            uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
212            active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
213            node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
214        }
215    }
216
217    /// Construct the `information_schema.cluster_info` virtual table
218    async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
219        let predicates = Predicates::from_scan_request(&request);
220        let information_extension = utils::information_extension(&self.catalog_manager)?;
221        let node_infos = information_extension.nodes().await?;
222        for node_info in node_infos {
223            self.add_node_info(&predicates, node_info);
224        }
225        self.finish()
226    }
227
228    fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
229        let peer_type = node_info.status.role_name();
230        let peer_id = peer_id(peer_type, node_info.peer.id);
231
232        let row = [
233            (PEER_ID, &Value::from(peer_id)),
234            (PEER_TYPE, &Value::from(peer_type)),
235            (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
236            (PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())),
237            (VERSION, &Value::from(node_info.version.as_str())),
238            (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
239        ];
240
241        if !predicates.eval(&row) {
242            return;
243        }
244
245        self.peer_ids.push(Some(peer_id));
246        self.peer_types.push(Some(peer_type));
247        self.peer_addrs.push(Some(&node_info.peer.addr));
248        self.peer_hostnames.push(Some(&node_info.hostname));
249        self.versions.push(Some(&node_info.version));
250        self.git_commits.push(Some(&node_info.git_commit));
251        if node_info.start_time_ms > 0 {
252            self.start_times
253                .push(Some(TimestampMillisecond(Timestamp::new_millisecond(
254                    node_info.start_time_ms as i64,
255                ))));
256            self.uptimes.push(Some(
257                Self::format_duration_since(node_info.start_time_ms).as_str(),
258            ));
259        } else {
260            self.start_times.push(None);
261            self.uptimes.push(None);
262        }
263        self.total_cpu_millicores
264            .push(Some(node_info.total_cpu_millicores));
265        self.total_memory_bytes
266            .push(Some(node_info.total_memory_bytes));
267        self.cpu_usage_millicores
268            .push(Some(node_info.cpu_usage_millicores));
269        self.memory_usage_bytes
270            .push(Some(node_info.memory_usage_bytes));
271
272        if node_info.last_activity_ts > 0 {
273            self.active_times.push(Some(
274                Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
275            ));
276        } else {
277            self.active_times.push(None);
278        }
279        self.node_status
280            .push(format_node_status(&node_info).as_deref());
281    }
282
283    fn format_duration_since(ts: u64) -> String {
284        let now = common_time::util::current_time_millis() as u64;
285        let duration_since = now - ts;
286        humantime::format_duration(Duration::from_millis(duration_since)).to_string()
287    }
288
289    fn finish(&mut self) -> Result<RecordBatch> {
290        let columns: Vec<VectorRef> = vec![
291            Arc::new(self.peer_ids.finish()),
292            Arc::new(self.peer_types.finish()),
293            Arc::new(self.peer_addrs.finish()),
294            Arc::new(self.peer_hostnames.finish()),
295            Arc::new(self.total_cpu_millicores.finish()),
296            Arc::new(self.total_memory_bytes.finish()),
297            Arc::new(self.cpu_usage_millicores.finish()),
298            Arc::new(self.memory_usage_bytes.finish()),
299            Arc::new(self.versions.finish()),
300            Arc::new(self.git_commits.finish()),
301            Arc::new(self.start_times.finish()),
302            Arc::new(self.uptimes.finish()),
303            Arc::new(self.active_times.finish()),
304            Arc::new(self.node_status.finish()),
305        ];
306        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
307    }
308}
309
310impl DfPartitionStream for InformationSchemaClusterInfo {
311    fn schema(&self) -> &ArrowSchemaRef {
312        self.schema.arrow_schema()
313    }
314
315    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
316        let schema = self.schema.arrow_schema().clone();
317        let mut builder = self.builder();
318        Box::pin(DfRecordBatchStreamAdapter::new(
319            schema,
320            futures::stream::once(async move {
321                builder
322                    .make_cluster_info(None)
323                    .await
324                    .map(|x| x.into_df_record_batch())
325                    .map_err(Into::into)
326            }),
327        ))
328    }
329}
330
331fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
332    if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
333        -1
334    } else {
335        peer_id as i64
336    }
337}
338
339#[derive(Serialize)]
340struct DisplayMetasrvStatus {
341    is_leader: bool,
342}
343
344#[derive(Serialize)]
345struct DisplayDatanodeStatus {
346    workloads: Vec<DatanodeWorkloadType>,
347    leader_regions: usize,
348    follower_regions: usize,
349}
350
351impl From<&DatanodeStatus> for DisplayDatanodeStatus {
352    fn from(status: &DatanodeStatus) -> Self {
353        Self {
354            workloads: status
355                .workloads
356                .types
357                .iter()
358                .flat_map(|w| DatanodeWorkloadType::from_i32(*w))
359                .collect(),
360            leader_regions: status.leader_regions,
361            follower_regions: status.follower_regions,
362        }
363    }
364}
365
366fn format_node_status(node_info: &NodeInfo) -> Option<String> {
367    match &node_info.status {
368        NodeStatus::Datanode(datanode_status) => {
369            serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
370        }
371        NodeStatus::Frontend(_) => None,
372        NodeStatus::Flownode(_) => None,
373        NodeStatus::Metasrv(metasrv_status) => {
374            if metasrv_status.is_leader {
375                serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
376            } else {
377                None
378            }
379        }
380        NodeStatus::Standalone => None,
381    }
382}