catalog/system_schema/information_schema/
cluster_info.rs1use std::sync::{Arc, Weak};
16use std::time::Duration;
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_time::timestamp::Timestamp;
25use common_workload::DatanodeWorkloadType;
26use datafusion::execution::TaskContext;
27use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
30use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::timestamp::TimestampMillisecond;
33use datatypes::value::Value;
34use datatypes::vectors::{
35 Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
36 UInt32VectorBuilder, UInt64VectorBuilder,
37};
38use serde::Serialize;
39use snafu::ResultExt;
40use store_api::storage::{ScanRequest, TableId};
41
42use crate::CatalogManager;
43use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
44use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
45use crate::system_schema::utils;
46
47const PEER_TYPE_FRONTEND: &str = "FRONTEND";
48const PEER_TYPE_METASRV: &str = "METASRV";
49
50const PEER_ID: &str = "peer_id";
51const PEER_TYPE: &str = "peer_type";
52const PEER_ADDR: &str = "peer_addr";
53const CPUS: &str = "cpus";
54const MEMORY_BYTES: &str = "memory_bytes";
55const VERSION: &str = "version";
56const GIT_COMMIT: &str = "git_commit";
57const START_TIME: &str = "start_time";
58const UPTIME: &str = "uptime";
59const ACTIVE_TIME: &str = "active_time";
60const NODE_STATUS: &str = "node_status";
61
62const INIT_CAPACITY: usize = 42;
63
64#[derive(Debug)]
79pub(super) struct InformationSchemaClusterInfo {
80 schema: SchemaRef,
81 catalog_manager: Weak<dyn CatalogManager>,
82}
83
84impl InformationSchemaClusterInfo {
85 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
86 Self {
87 schema: Self::schema(),
88 catalog_manager,
89 }
90 }
91
92 pub(crate) fn schema() -> SchemaRef {
93 Arc::new(Schema::new(vec![
94 ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
95 ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
96 ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
97 ColumnSchema::new(CPUS, ConcreteDataType::uint32_datatype(), false),
98 ColumnSchema::new(MEMORY_BYTES, ConcreteDataType::uint64_datatype(), false),
99 ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
100 ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
101 ColumnSchema::new(
102 START_TIME,
103 ConcreteDataType::timestamp_millisecond_datatype(),
104 true,
105 ),
106 ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
107 ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
108 ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
109 ]))
110 }
111
112 fn builder(&self) -> InformationSchemaClusterInfoBuilder {
113 InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
114 }
115}
116
117impl InformationTable for InformationSchemaClusterInfo {
118 fn table_id(&self) -> TableId {
119 INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
120 }
121
122 fn table_name(&self) -> &'static str {
123 CLUSTER_INFO
124 }
125
126 fn schema(&self) -> SchemaRef {
127 self.schema.clone()
128 }
129
130 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
131 let schema = self.schema.arrow_schema().clone();
132 let mut builder = self.builder();
133 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
134 schema,
135 futures::stream::once(async move {
136 builder
137 .make_cluster_info(Some(request))
138 .await
139 .map(|x| x.into_df_record_batch())
140 .map_err(Into::into)
141 }),
142 ));
143 Ok(Box::pin(
144 RecordBatchStreamAdapter::try_new(stream)
145 .map_err(BoxedError::new)
146 .context(InternalSnafu)?,
147 ))
148 }
149}
150
151struct InformationSchemaClusterInfoBuilder {
152 schema: SchemaRef,
153 catalog_manager: Weak<dyn CatalogManager>,
154
155 peer_ids: Int64VectorBuilder,
156 peer_types: StringVectorBuilder,
157 peer_addrs: StringVectorBuilder,
158 cpus: UInt32VectorBuilder,
159 memory_bytes: UInt64VectorBuilder,
160 versions: StringVectorBuilder,
161 git_commits: StringVectorBuilder,
162 start_times: TimestampMillisecondVectorBuilder,
163 uptimes: StringVectorBuilder,
164 active_times: StringVectorBuilder,
165 node_status: StringVectorBuilder,
166}
167
168impl InformationSchemaClusterInfoBuilder {
169 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
170 Self {
171 schema,
172 catalog_manager,
173 peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
174 peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
175 peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
176 cpus: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
177 memory_bytes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
178 versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
179 git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
180 start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
181 uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
182 active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
183 node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
184 }
185 }
186
187 async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
189 let predicates = Predicates::from_scan_request(&request);
190 let information_extension = utils::information_extension(&self.catalog_manager)?;
191 let node_infos = information_extension.nodes().await?;
192 for node_info in node_infos {
193 self.add_node_info(&predicates, node_info);
194 }
195 self.finish()
196 }
197
198 fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
199 let peer_type = node_info.status.role_name();
200 let peer_id = peer_id(peer_type, node_info.peer.id);
201
202 let row = [
203 (PEER_ID, &Value::from(peer_id)),
204 (PEER_TYPE, &Value::from(peer_type)),
205 (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
206 (VERSION, &Value::from(node_info.version.as_str())),
207 (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
208 ];
209
210 if !predicates.eval(&row) {
211 return;
212 }
213
214 self.peer_ids.push(Some(peer_id));
215 self.peer_types.push(Some(peer_type));
216 self.peer_addrs.push(Some(&node_info.peer.addr));
217 self.versions.push(Some(&node_info.version));
218 self.git_commits.push(Some(&node_info.git_commit));
219 if node_info.start_time_ms > 0 {
220 self.start_times
221 .push(Some(TimestampMillisecond(Timestamp::new_millisecond(
222 node_info.start_time_ms as i64,
223 ))));
224 self.uptimes.push(Some(
225 Self::format_duration_since(node_info.start_time_ms).as_str(),
226 ));
227 } else {
228 self.start_times.push(None);
229 self.uptimes.push(None);
230 }
231 self.cpus.push(Some(node_info.cpus));
232 self.memory_bytes.push(Some(node_info.memory_bytes));
233
234 if node_info.last_activity_ts > 0 {
235 self.active_times.push(Some(
236 Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
237 ));
238 } else {
239 self.active_times.push(None);
240 }
241 self.node_status
242 .push(format_node_status(&node_info).as_deref());
243 }
244
245 fn format_duration_since(ts: u64) -> String {
246 let now = common_time::util::current_time_millis() as u64;
247 let duration_since = now - ts;
248 humantime::format_duration(Duration::from_millis(duration_since)).to_string()
249 }
250
251 fn finish(&mut self) -> Result<RecordBatch> {
252 let columns: Vec<VectorRef> = vec![
253 Arc::new(self.peer_ids.finish()),
254 Arc::new(self.peer_types.finish()),
255 Arc::new(self.peer_addrs.finish()),
256 Arc::new(self.cpus.finish()),
257 Arc::new(self.memory_bytes.finish()),
258 Arc::new(self.versions.finish()),
259 Arc::new(self.git_commits.finish()),
260 Arc::new(self.start_times.finish()),
261 Arc::new(self.uptimes.finish()),
262 Arc::new(self.active_times.finish()),
263 Arc::new(self.node_status.finish()),
264 ];
265 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
266 }
267}
268
269impl DfPartitionStream for InformationSchemaClusterInfo {
270 fn schema(&self) -> &ArrowSchemaRef {
271 self.schema.arrow_schema()
272 }
273
274 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
275 let schema = self.schema.arrow_schema().clone();
276 let mut builder = self.builder();
277 Box::pin(DfRecordBatchStreamAdapter::new(
278 schema,
279 futures::stream::once(async move {
280 builder
281 .make_cluster_info(None)
282 .await
283 .map(|x| x.into_df_record_batch())
284 .map_err(Into::into)
285 }),
286 ))
287 }
288}
289
290fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
291 if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
292 -1
293 } else {
294 peer_id as i64
295 }
296}
297
298#[derive(Serialize)]
299struct DisplayMetasrvStatus {
300 is_leader: bool,
301}
302
303#[derive(Serialize)]
304struct DisplayDatanodeStatus {
305 workloads: Vec<DatanodeWorkloadType>,
306 leader_regions: usize,
307 follower_regions: usize,
308}
309
310impl From<&DatanodeStatus> for DisplayDatanodeStatus {
311 fn from(status: &DatanodeStatus) -> Self {
312 Self {
313 workloads: status
314 .workloads
315 .types
316 .iter()
317 .flat_map(|w| DatanodeWorkloadType::from_i32(*w))
318 .collect(),
319 leader_regions: status.leader_regions,
320 follower_regions: status.follower_regions,
321 }
322 }
323}
324
325fn format_node_status(node_info: &NodeInfo) -> Option<String> {
326 match &node_info.status {
327 NodeStatus::Datanode(datanode_status) => {
328 serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
329 }
330 NodeStatus::Frontend(_) => None,
331 NodeStatus::Flownode(_) => None,
332 NodeStatus::Metasrv(metasrv_status) => {
333 if metasrv_status.is_leader {
334 serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
335 } else {
336 None
337 }
338 }
339 NodeStatus::Standalone => None,
340 }
341}