catalog/system_schema/information_schema/
cluster_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16use std::time::Duration;
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::cluster::NodeInfo;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_time::timestamp::Timestamp;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::timestamp::TimestampMillisecond;
32use datatypes::value::Value;
33use datatypes::vectors::{
34    Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
35};
36use snafu::ResultExt;
37use store_api::storage::{ScanRequest, TableId};
38
39use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
40use crate::system_schema::information_schema::{InformationTable, Predicates, CLUSTER_INFO};
41use crate::system_schema::utils;
42use crate::CatalogManager;
43
44const PEER_ID: &str = "peer_id";
45const PEER_TYPE: &str = "peer_type";
46const PEER_ADDR: &str = "peer_addr";
47const VERSION: &str = "version";
48const GIT_COMMIT: &str = "git_commit";
49const START_TIME: &str = "start_time";
50const UPTIME: &str = "uptime";
51const ACTIVE_TIME: &str = "active_time";
52
53const INIT_CAPACITY: usize = 42;
54
55/// The `CLUSTER_INFO` table provides information about the current topology information of the cluster.
56///
57/// - `peer_id`: the peer server id.
58/// - `peer_type`: the peer type, such as `datanode`, `frontend`, `metasrv` etc.
59/// - `peer_addr`: the peer gRPC address.
60/// - `version`: the build package version of the peer.
61/// - `git_commit`: the build git commit hash of the peer.
62/// - `start_time`: the starting time of the peer.
63/// - `uptime`: the uptime of the peer.
64/// - `active_time`: the time since the last activity of the peer.
65///
66#[derive(Debug)]
67pub(super) struct InformationSchemaClusterInfo {
68    schema: SchemaRef,
69    catalog_manager: Weak<dyn CatalogManager>,
70}
71
72impl InformationSchemaClusterInfo {
73    pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
74        Self {
75            schema: Self::schema(),
76            catalog_manager,
77        }
78    }
79
80    pub(crate) fn schema() -> SchemaRef {
81        Arc::new(Schema::new(vec![
82            ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
83            ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
84            ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
85            ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
86            ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
87            ColumnSchema::new(
88                START_TIME,
89                ConcreteDataType::timestamp_millisecond_datatype(),
90                true,
91            ),
92            ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
93            ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
94        ]))
95    }
96
97    fn builder(&self) -> InformationSchemaClusterInfoBuilder {
98        InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
99    }
100}
101
102impl InformationTable for InformationSchemaClusterInfo {
103    fn table_id(&self) -> TableId {
104        INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
105    }
106
107    fn table_name(&self) -> &'static str {
108        CLUSTER_INFO
109    }
110
111    fn schema(&self) -> SchemaRef {
112        self.schema.clone()
113    }
114
115    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
116        let schema = self.schema.arrow_schema().clone();
117        let mut builder = self.builder();
118        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
119            schema,
120            futures::stream::once(async move {
121                builder
122                    .make_cluster_info(Some(request))
123                    .await
124                    .map(|x| x.into_df_record_batch())
125                    .map_err(Into::into)
126            }),
127        ));
128        Ok(Box::pin(
129            RecordBatchStreamAdapter::try_new(stream)
130                .map_err(BoxedError::new)
131                .context(InternalSnafu)?,
132        ))
133    }
134}
135
136struct InformationSchemaClusterInfoBuilder {
137    schema: SchemaRef,
138    catalog_manager: Weak<dyn CatalogManager>,
139
140    peer_ids: Int64VectorBuilder,
141    peer_types: StringVectorBuilder,
142    peer_addrs: StringVectorBuilder,
143    versions: StringVectorBuilder,
144    git_commits: StringVectorBuilder,
145    start_times: TimestampMillisecondVectorBuilder,
146    uptimes: StringVectorBuilder,
147    active_times: StringVectorBuilder,
148}
149
150impl InformationSchemaClusterInfoBuilder {
151    fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
152        Self {
153            schema,
154            catalog_manager,
155            peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
156            peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
157            peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
158            versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
159            git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
160            start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
161            uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
162            active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
163        }
164    }
165
166    /// Construct the `information_schema.cluster_info` virtual table
167    async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
168        let predicates = Predicates::from_scan_request(&request);
169        let information_extension = utils::information_extension(&self.catalog_manager)?;
170        let node_infos = information_extension.nodes().await?;
171        for node_info in node_infos {
172            self.add_node_info(&predicates, node_info);
173        }
174        self.finish()
175    }
176
177    fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
178        let peer_type = node_info.status.role_name();
179
180        let row = [
181            (PEER_ID, &Value::from(node_info.peer.id)),
182            (PEER_TYPE, &Value::from(peer_type)),
183            (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
184            (VERSION, &Value::from(node_info.version.as_str())),
185            (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
186        ];
187
188        if !predicates.eval(&row) {
189            return;
190        }
191
192        if peer_type == "FRONTEND" || peer_type == "METASRV" {
193            // Always set peer_id to be -1 for frontends and metasrvs
194            self.peer_ids.push(Some(-1));
195        } else {
196            self.peer_ids.push(Some(node_info.peer.id as i64));
197        }
198
199        self.peer_types.push(Some(peer_type));
200        self.peer_addrs.push(Some(&node_info.peer.addr));
201        self.versions.push(Some(&node_info.version));
202        self.git_commits.push(Some(&node_info.git_commit));
203        if node_info.start_time_ms > 0 {
204            self.start_times
205                .push(Some(TimestampMillisecond(Timestamp::new_millisecond(
206                    node_info.start_time_ms as i64,
207                ))));
208            self.uptimes.push(Some(
209                Self::format_duration_since(node_info.start_time_ms).as_str(),
210            ));
211        } else {
212            self.start_times.push(None);
213            self.uptimes.push(None);
214        }
215
216        if node_info.last_activity_ts > 0 {
217            self.active_times.push(Some(
218                Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
219            ));
220        } else {
221            self.active_times.push(None);
222        }
223    }
224
225    fn format_duration_since(ts: u64) -> String {
226        let now = common_time::util::current_time_millis() as u64;
227        let duration_since = now - ts;
228        humantime::format_duration(Duration::from_millis(duration_since)).to_string()
229    }
230
231    fn finish(&mut self) -> Result<RecordBatch> {
232        let columns: Vec<VectorRef> = vec![
233            Arc::new(self.peer_ids.finish()),
234            Arc::new(self.peer_types.finish()),
235            Arc::new(self.peer_addrs.finish()),
236            Arc::new(self.versions.finish()),
237            Arc::new(self.git_commits.finish()),
238            Arc::new(self.start_times.finish()),
239            Arc::new(self.uptimes.finish()),
240            Arc::new(self.active_times.finish()),
241        ];
242        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
243    }
244}
245
246impl DfPartitionStream for InformationSchemaClusterInfo {
247    fn schema(&self) -> &ArrowSchemaRef {
248        self.schema.arrow_schema()
249    }
250
251    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
252        let schema = self.schema.arrow_schema().clone();
253        let mut builder = self.builder();
254        Box::pin(DfRecordBatchStreamAdapter::new(
255            schema,
256            futures::stream::once(async move {
257                builder
258                    .make_cluster_info(None)
259                    .await
260                    .map(|x| x.into_df_record_batch())
261                    .map_err(Into::into)
262            }),
263        ))
264    }
265}