catalog/system_schema/information_schema/
cluster_info.rs1use std::sync::{Arc, Weak};
16use std::time::Duration;
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_time::timestamp::Timestamp;
25use common_workload::DatanodeWorkloadType;
26use datafusion::execution::TaskContext;
27use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
30use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
31use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
32use datatypes::timestamp::TimestampMillisecond;
33use datatypes::value::Value;
34use datatypes::vectors::{
35 Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
36};
37use serde::Serialize;
38use snafu::ResultExt;
39use store_api::storage::{ScanRequest, TableId};
40
41use crate::CatalogManager;
42use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
43use crate::system_schema::information_schema::{CLUSTER_INFO, InformationTable, Predicates};
44use crate::system_schema::utils;
45
46const PEER_TYPE_FRONTEND: &str = "FRONTEND";
47const PEER_TYPE_METASRV: &str = "METASRV";
48
49const PEER_ID: &str = "peer_id";
50const PEER_TYPE: &str = "peer_type";
51const PEER_ADDR: &str = "peer_addr";
52const PEER_HOSTNAME: &str = "peer_hostname";
53const TOTAL_CPU_MILLICORES: &str = "total_cpu_millicores";
54const TOTAL_MEMORY_BYTES: &str = "total_memory_bytes";
55const CPU_USAGE_MILLICORES: &str = "cpu_usage_millicores";
56const MEMORY_USAGE_BYTES: &str = "memory_usage_bytes";
57const VERSION: &str = "version";
58const GIT_COMMIT: &str = "git_commit";
59const START_TIME: &str = "start_time";
60const UPTIME: &str = "uptime";
61const ACTIVE_TIME: &str = "active_time";
62const NODE_STATUS: &str = "node_status";
63
64const INIT_CAPACITY: usize = 42;
65
66#[derive(Debug)]
84pub(super) struct InformationSchemaClusterInfo {
85 schema: SchemaRef,
86 catalog_manager: Weak<dyn CatalogManager>,
87}
88
89impl InformationSchemaClusterInfo {
90 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
91 Self {
92 schema: Self::schema(),
93 catalog_manager,
94 }
95 }
96
97 pub(crate) fn schema() -> SchemaRef {
98 Arc::new(Schema::new(vec![
99 ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
100 ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
101 ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
102 ColumnSchema::new(PEER_HOSTNAME, ConcreteDataType::string_datatype(), true),
103 ColumnSchema::new(
104 TOTAL_CPU_MILLICORES,
105 ConcreteDataType::int64_datatype(),
106 false,
107 ),
108 ColumnSchema::new(
109 TOTAL_MEMORY_BYTES,
110 ConcreteDataType::int64_datatype(),
111 false,
112 ),
113 ColumnSchema::new(
114 CPU_USAGE_MILLICORES,
115 ConcreteDataType::int64_datatype(),
116 false,
117 ),
118 ColumnSchema::new(
119 MEMORY_USAGE_BYTES,
120 ConcreteDataType::int64_datatype(),
121 false,
122 ),
123 ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
124 ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
125 ColumnSchema::new(
126 START_TIME,
127 ConcreteDataType::timestamp_millisecond_datatype(),
128 true,
129 ),
130 ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
131 ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
132 ColumnSchema::new(NODE_STATUS, ConcreteDataType::string_datatype(), true),
133 ]))
134 }
135
136 fn builder(&self) -> InformationSchemaClusterInfoBuilder {
137 InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
138 }
139}
140
141impl InformationTable for InformationSchemaClusterInfo {
142 fn table_id(&self) -> TableId {
143 INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
144 }
145
146 fn table_name(&self) -> &'static str {
147 CLUSTER_INFO
148 }
149
150 fn schema(&self) -> SchemaRef {
151 self.schema.clone()
152 }
153
154 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
155 let schema = self.schema.arrow_schema().clone();
156 let mut builder = self.builder();
157 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
158 schema,
159 futures::stream::once(async move {
160 builder
161 .make_cluster_info(Some(request))
162 .await
163 .map(|x| x.into_df_record_batch())
164 .map_err(Into::into)
165 }),
166 ));
167 Ok(Box::pin(
168 RecordBatchStreamAdapter::try_new(stream)
169 .map_err(BoxedError::new)
170 .context(InternalSnafu)?,
171 ))
172 }
173}
174
175struct InformationSchemaClusterInfoBuilder {
176 schema: SchemaRef,
177 catalog_manager: Weak<dyn CatalogManager>,
178
179 peer_ids: Int64VectorBuilder,
180 peer_types: StringVectorBuilder,
181 peer_addrs: StringVectorBuilder,
182 peer_hostnames: StringVectorBuilder,
183 total_cpu_millicores: Int64VectorBuilder,
184 total_memory_bytes: Int64VectorBuilder,
185 cpu_usage_millicores: Int64VectorBuilder,
186 memory_usage_bytes: Int64VectorBuilder,
187 versions: StringVectorBuilder,
188 git_commits: StringVectorBuilder,
189 start_times: TimestampMillisecondVectorBuilder,
190 uptimes: StringVectorBuilder,
191 active_times: StringVectorBuilder,
192 node_status: StringVectorBuilder,
193}
194
195impl InformationSchemaClusterInfoBuilder {
196 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
197 Self {
198 schema,
199 catalog_manager,
200 peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
201 peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
202 peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
203 peer_hostnames: StringVectorBuilder::with_capacity(INIT_CAPACITY),
204 total_cpu_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
205 total_memory_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
206 cpu_usage_millicores: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
207 memory_usage_bytes: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
208 versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
209 git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
210 start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
211 uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
212 active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
213 node_status: StringVectorBuilder::with_capacity(INIT_CAPACITY),
214 }
215 }
216
217 async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
219 let predicates = Predicates::from_scan_request(&request);
220 let information_extension = utils::information_extension(&self.catalog_manager)?;
221 let node_infos = information_extension.nodes().await?;
222 for node_info in node_infos {
223 self.add_node_info(&predicates, node_info);
224 }
225 self.finish()
226 }
227
228 fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
229 let peer_type = node_info.status.role_name();
230 let peer_id = peer_id(peer_type, node_info.peer.id);
231
232 let row = [
233 (PEER_ID, &Value::from(peer_id)),
234 (PEER_TYPE, &Value::from(peer_type)),
235 (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
236 (PEER_HOSTNAME, &Value::from(node_info.hostname.as_str())),
237 (VERSION, &Value::from(node_info.version.as_str())),
238 (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
239 ];
240
241 if !predicates.eval(&row) {
242 return;
243 }
244
245 self.peer_ids.push(Some(peer_id));
246 self.peer_types.push(Some(peer_type));
247 self.peer_addrs.push(Some(&node_info.peer.addr));
248 self.peer_hostnames.push(Some(&node_info.hostname));
249 self.versions.push(Some(&node_info.version));
250 self.git_commits.push(Some(&node_info.git_commit));
251 if node_info.start_time_ms > 0 {
252 self.start_times
253 .push(Some(TimestampMillisecond(Timestamp::new_millisecond(
254 node_info.start_time_ms as i64,
255 ))));
256 self.uptimes.push(Some(
257 Self::format_duration_since(node_info.start_time_ms).as_str(),
258 ));
259 } else {
260 self.start_times.push(None);
261 self.uptimes.push(None);
262 }
263 self.total_cpu_millicores
264 .push(Some(node_info.total_cpu_millicores));
265 self.total_memory_bytes
266 .push(Some(node_info.total_memory_bytes));
267 self.cpu_usage_millicores
268 .push(Some(node_info.cpu_usage_millicores));
269 self.memory_usage_bytes
270 .push(Some(node_info.memory_usage_bytes));
271
272 if node_info.last_activity_ts > 0 {
273 self.active_times.push(Some(
274 Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
275 ));
276 } else {
277 self.active_times.push(None);
278 }
279 self.node_status
280 .push(format_node_status(&node_info).as_deref());
281 }
282
283 fn format_duration_since(ts: u64) -> String {
284 let now = common_time::util::current_time_millis() as u64;
285 let duration_since = now - ts;
286 humantime::format_duration(Duration::from_millis(duration_since)).to_string()
287 }
288
289 fn finish(&mut self) -> Result<RecordBatch> {
290 let columns: Vec<VectorRef> = vec![
291 Arc::new(self.peer_ids.finish()),
292 Arc::new(self.peer_types.finish()),
293 Arc::new(self.peer_addrs.finish()),
294 Arc::new(self.peer_hostnames.finish()),
295 Arc::new(self.total_cpu_millicores.finish()),
296 Arc::new(self.total_memory_bytes.finish()),
297 Arc::new(self.cpu_usage_millicores.finish()),
298 Arc::new(self.memory_usage_bytes.finish()),
299 Arc::new(self.versions.finish()),
300 Arc::new(self.git_commits.finish()),
301 Arc::new(self.start_times.finish()),
302 Arc::new(self.uptimes.finish()),
303 Arc::new(self.active_times.finish()),
304 Arc::new(self.node_status.finish()),
305 ];
306 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
307 }
308}
309
310impl DfPartitionStream for InformationSchemaClusterInfo {
311 fn schema(&self) -> &ArrowSchemaRef {
312 self.schema.arrow_schema()
313 }
314
315 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
316 let schema = self.schema.arrow_schema().clone();
317 let mut builder = self.builder();
318 Box::pin(DfRecordBatchStreamAdapter::new(
319 schema,
320 futures::stream::once(async move {
321 builder
322 .make_cluster_info(None)
323 .await
324 .map(|x| x.into_df_record_batch())
325 .map_err(Into::into)
326 }),
327 ))
328 }
329}
330
331fn peer_id(peer_type: &str, peer_id: u64) -> i64 {
332 if peer_type == PEER_TYPE_FRONTEND || peer_type == PEER_TYPE_METASRV {
333 -1
334 } else {
335 peer_id as i64
336 }
337}
338
339#[derive(Serialize)]
340struct DisplayMetasrvStatus {
341 is_leader: bool,
342}
343
344#[derive(Serialize)]
345struct DisplayDatanodeStatus {
346 workloads: Vec<DatanodeWorkloadType>,
347 leader_regions: usize,
348 follower_regions: usize,
349}
350
351impl From<&DatanodeStatus> for DisplayDatanodeStatus {
352 fn from(status: &DatanodeStatus) -> Self {
353 Self {
354 workloads: status
355 .workloads
356 .types
357 .iter()
358 .flat_map(|w| DatanodeWorkloadType::from_i32(*w))
359 .collect(),
360 leader_regions: status.leader_regions,
361 follower_regions: status.follower_regions,
362 }
363 }
364}
365
366fn format_node_status(node_info: &NodeInfo) -> Option<String> {
367 match &node_info.status {
368 NodeStatus::Datanode(datanode_status) => {
369 serde_json::to_string(&DisplayDatanodeStatus::from(datanode_status)).ok()
370 }
371 NodeStatus::Frontend(_) => None,
372 NodeStatus::Flownode(_) => None,
373 NodeStatus::Metasrv(metasrv_status) => {
374 if metasrv_status.is_leader {
375 serde_json::to_string(&DisplayMetasrvStatus { is_leader: true }).ok()
376 } else {
377 None
378 }
379 }
380 NodeStatus::Standalone => None,
381 }
382}