catalog/system_schema/information_schema/
cluster_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16use std::time::Duration;
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_time::timestamp::Timestamp;
25use common_workload::DatanodeWorkloadType;
26use datafusion::execution::TaskContext;
27use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
30use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::timestamp::TimestampMillisecond;
33use datatypes::value::Value;
34use datatypes::vectors::{
35    Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
36    UInt32VectorBuilder, UInt64VectorBuilder,
37};
38use serde::Serialize;
39use snafu::ResultExt;
40use store_api::storage::{ScanRequest, TableId};
41
42use crate::CatalogManager;
43use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
44use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
45use crate::system_schema::utils;
46
47const PEER_TYPE_FRONTEND: &str = "FRONTEND";
48const PEER_TYPE_METASRV: &str = "METASRV";
49
50const PEER_ID: &str = "peer_id";
51const PEER_TYPE: &str = "peer_type";
52const PEER_ADDR: &str = "peer_addr";
53const CPUS: &str = "cpus";
54const MEMORY_BYTES: &str = "memory_bytes";
55const VERSION: &str = "version";
56const GIT_COMMIT: &str = "git_commit";
57const START_TIME: &str = "start_time";
58const UPTIME: &str = "uptime";
59const ACTIVE_TIME: &str = "active_time";
60const NODE_STATUS: &str = "node_status";
61
62const INIT_CAPACITY: usize = 42;
63
64/// The `CLUSTER_INFO` table provides information about the current topology information of the cluster.
65///
66/// - `peer_id`: the peer server id.
67/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
68/// - `peer_addr`: the peer gRPC address.
69/// - `cpus`: the number of CPUs of the peer.
70/// - `memory_bytes`: the memory bytes of the peer.
71/// - `version`: the build package version of the peer.
72/// - `git_commit`: the build git commit hash of the peer.
73/// - `start_time`: the starting time of the peer.
74/// - `uptime`: the uptime of the peer.
75/// - `active_time`: the time since the last activity of the peer.
76/// - `node_status`: the status info of the peer.
77///
78#[derive(Debug)]
79pub(super) struct InformationSchemaClusterInfo {
80    schema: SchemaRef,
81    catalog_manager: Weak<dyn CatalogManager>,
82}
83
84impl InformationSchemaClusterInfo {
85    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
86        Self {
87            schema: Self::schema(),
88            catalog_manager,
89        }
90    }
91
92    pub(crate) fn schema() -> SchemaRef {
93        Arc::new(Schema::new(vec![
94            ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
95            ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
96            ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
97            ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
98            ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
99            ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
100            ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
101            ColumnSchema::new(
102                START_TIME,
103                ConcreteDataType::timestamp_millisecond_datatype(),
104                true,
105            ),
106            ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
107            ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
108            ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
109        ]))
110    }
111
112    fn builder(&self) -> InformationSchemaClusterInfoBuilder {
113        InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
114    }
115}
116
117impl InformationTable for InformationSchemaClusterInfo {
118    fn table_id(&self) -> TableId {
119        INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
120    }
121
122    fn table_name(&self) -> &'static str {
123        CLUSTER_INFO
124    }
125
126    fn schema(&self) -> SchemaRef {
127        self.schema.clone()
128    }
129
130    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
131        let schema = self.schema.arrow_schema().clone();
132        let mut builder = self.builder();
133        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
134            schema,
135            futures::stream::once(async move {
136                builder
137                    .make_cluster_info(Some(request))
138                    .await
139                    .map(|x| x.into_df_record_batch())
140                    .map_err(Into::into)
141            }),
142        ));
143        Ok(Box::pin(
144            RecordBatchStreamAdapter::try_new(stream)
145                .map_err(BoxedError::new)
146                .context(InternalSnafu)?,
147        ))
148    }
149}
150
151struct InformationSchemaClusterInfoBuilder {
152    schema: SchemaRef,
153    catalog_manager: Weak<dyn CatalogManager>,
154
155    peer_ids: Int64VectorBuilder,
156    peer_types: StringVectorBuilder,
157    peer_addrs: StringVectorBuilder,
158    cpus: UInt32VectorBuilder,
159    memory_bytes: UInt64VectorBuilder,
160    versions: StringVectorBuilder,
161    git_commits: StringVectorBuilder,
162    start_times: TimestampMillisecondVectorBuilder,
163    uptimes: StringVectorBuilder,
164    active_times: StringVectorBuilder,
165    node_status: StringVectorBuilder,
166}
167
168impl InformationSchemaClusterInfoBuilder {
169    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
170        Self {
171            schema,
172            catalog_manager,
173            peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
174            peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175            peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
176            cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
177            memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
178            versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
179            git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
180            start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
181            uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
182            active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
183            node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
184        }
185    }
186
187    /// Construct the `information_schema.cluster_info` virtual table
188    async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
189        let predicates = Predicates::from_scan_request(&request);
190        let information_extension = utils::information_extension(&self.catalog_manager)?;
191        let node_infos = information_extension.nodes().await?;
192        for node_info in node_infos {
193            self.add_node_info(&predicates, node_info);
194        }
195        self.finish()
196    }
197
198    fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
199        let peer_type = node_info.status.role_name();
200        let peer_id = peer_id(peer_type, node_info.peer.id);
201
202        let row = [
203            (PEER_ID, &Value::from(peer_id)),
204            (PEER_TYPE, &Value::from(peer_type)),
205            (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
206            (VERSION, &Value::from(node_info.version.as_str())),
207            (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
208        ];
209
210        if !predicates.eval(&row) {
211            return;
212        }
213
214        self.peer_ids.push(Some(peer_id));
215        self.peer_types.push(Some(peer_type));
216        self.peer_addrs.push(Some(&node_info.peer.addr));
217        self.versions.push(Some(&node_info.version));
218        self.git_commits.push(Some(&node_info.git_commit));
219        if node_info.start_time_ms > 0 {
220            self.start_times
221                .push(Some(TimestampMillisecond(Timestamp::new_millisecond(
222                    node_info.start_time_ms as i64,
223                ))));
224            self.uptimes.push(Some(
225                Self::format_duration_since(node_info.start_time_ms).as_str(),
226            ));
227        } else {
228            self.start_times.push(None);
229            self.uptimes.push(None);
230        }
231        self.cpus.push(Some(node_info.cpus));
232        self.memory_bytes.push(Some(node_info.memory_bytes));
233
234        if node_info.last_activity_ts > 0 {
235            self.active_times.push(Some(
236                Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
237            ));
238        } else {
239            self.active_times.push(None);
240        }
241        self.node_status
242            .push(format_node_status(&node_info).as_deref());
243    }
244
245    fn format_duration_since(ts: u64) -> String {
246        let now = common_time::util::current_time_millis() as u64;
247        let duration_since = now - ts;
248        humantime::format_duration(Duration::from_millis(duration_since)).to_string()
249    }
250
251    fn finish(&mut self) -> Result<RecordBatch> {
252        let columns: Vec<VectorRef> = vec![
253            Arc::new(self.peer_ids.finish()),
254            Arc::new(self.peer_types.finish()),
255            Arc::new(self.peer_addrs.finish()),
256            Arc::new(self.cpus.finish()),
257            Arc::new(self.memory_bytes.finish()),
258            Arc::new(self.versions.finish()),
259            Arc::new(self.git_commits.finish()),
260            Arc::new(self.start_times.finish()),
261            Arc::new(self.uptimes.finish()),
262            Arc::new(self.active_times.finish()),
263            Arc::new(self.node_status.finish()),
264        ];
265        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
266    }
267}
268
269impl DfPartitionStream for InformationSchemaClusterInfo {
270    fn schema(&self) -> &ArrowSchemaRef {
271        self.schema.arrow_schema()
272    }
273
274    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
275        let schema = self.schema.arrow_schema().clone();
276        let mut builder = self.builder();
277        Box::pin(DfRecordBatchStreamAdapter::new(
278            schema,
279            futures::stream::once(async move {
280                builder
281                    .make_cluster_info(None)
282                    .await
283                    .map(|x| x.into_df_record_batch())
284                    .map_err(Into::into)
285            }),
286        ))
287    }
288}
289
290fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
291    if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
292        -1
293    } else {
294        peer_id as i64
295    }
296}
297
298#[derive(Serialize)]
299struct DisplayMetasrvStatus {
300    is_leader: bool,
301}
302
303#[derive(Serialize)]
304struct DisplayDatanodeStatus {
305    workloads: Vec<DatanodeWorkloadType>,
306    leader_regions: usize,
307    follower_regions: usize,
308}
309
310impl From<&DatanodeStatus> for DisplayDatanodeStatus {
311    fn from(status: &DatanodeStatus) -> Self {
312        Self {
313            workloads: status
314                .workloads
315                .types
316                .iter()
317                .flat_map(|w| DatanodeWorkloadType::from_i32(*w))
318                .collect(),
319            leader_regions: status.leader_regions,
320            follower_regions: status.follower_regions,
321        }
322    }
323}
324
325fn format_node_status(node_info: &NodeInfo) -> Option<String> {
326    match &node_info.status {
327        NodeStatus::Datanode(datanode_status) => {
328            serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
329        }
330        NodeStatus::Frontend(_) => None,
331        NodeStatus::Flownode(_) => None,
332        NodeStatus::Metasrv(metasrv_status) => {
333            if metasrv_status.is_leader {
334                serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
335            } else {
336                None
337            }
338        }
339        NodeStatus::Standalone => None,
340    }
341}