catalog/system_schema/information_schema/
cluster_info.rs1use std::sync::{Arc, Weak};
16use std::time::Duration;
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_meta::cluster::NodeInfo;
22use common_recordbatch::adapter::RecordBatchStreamAdapter;
23use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
24use common_time::timestamp::Timestamp;
25use datafusion::execution::TaskContext;
26use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
27use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
28use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
29use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
30use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
31use datatypes::timestamp::TimestampMillisecond;
32use datatypes::value::Value;
33use datatypes::vectors::{
34 Int64VectorBuilder, StringVectorBuilder, TimestampMillisecondVectorBuilder,
35};
36use snafu::ResultExt;
37use store_api::storage::{ScanRequest, TableId};
38
39use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
40use crate::system_schema::information_schema::{InformationTable, Predicates, CLUSTER_INFO};
41use crate::system_schema::utils;
42use crate::CatalogManager;
43
44const PEER_ID: &str = "peer_id";
45const PEER_TYPE: &str = "peer_type";
46const PEER_ADDR: &str = "peer_addr";
47const VERSION: &str = "version";
48const GIT_COMMIT: &str = "git_commit";
49const START_TIME: &str = "start_time";
50const UPTIME: &str = "uptime";
51const ACTIVE_TIME: &str = "active_time";
52
53const INIT_CAPACITY: usize = 42;
54
55#[derive(Debug)]
67pub(super) struct InformationSchemaClusterInfo {
68 schema: SchemaRef,
69 catalog_manager: Weak<dyn CatalogManager>,
70}
71
72impl InformationSchemaClusterInfo {
73 pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
74 Self {
75 schema: Self::schema(),
76 catalog_manager,
77 }
78 }
79
80 pub(crate) fn schema() -> SchemaRef {
81 Arc::new(Schema::new(vec![
82 ColumnSchema::new(PEER_ID, ConcreteDataType::int64_datatype(), false),
83 ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
84 ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
85 ColumnSchema::new(VERSION, ConcreteDataType::string_datatype(), false),
86 ColumnSchema::new(GIT_COMMIT, ConcreteDataType::string_datatype(), false),
87 ColumnSchema::new(
88 START_TIME,
89 ConcreteDataType::timestamp_millisecond_datatype(),
90 true,
91 ),
92 ColumnSchema::new(UPTIME, ConcreteDataType::string_datatype(), true),
93 ColumnSchema::new(ACTIVE_TIME, ConcreteDataType::string_datatype(), true),
94 ]))
95 }
96
97 fn builder(&self) -> InformationSchemaClusterInfoBuilder {
98 InformationSchemaClusterInfoBuilder::new(self.schema.clone(), self.catalog_manager.clone())
99 }
100}
101
102impl InformationTable for InformationSchemaClusterInfo {
103 fn table_id(&self) -> TableId {
104 INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID
105 }
106
107 fn table_name(&self) -> &'static str {
108 CLUSTER_INFO
109 }
110
111 fn schema(&self) -> SchemaRef {
112 self.schema.clone()
113 }
114
115 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
116 let schema = self.schema.arrow_schema().clone();
117 let mut builder = self.builder();
118 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
119 schema,
120 futures::stream::once(async move {
121 builder
122 .make_cluster_info(Some(request))
123 .await
124 .map(|x| x.into_df_record_batch())
125 .map_err(Into::into)
126 }),
127 ));
128 Ok(Box::pin(
129 RecordBatchStreamAdapter::try_new(stream)
130 .map_err(BoxedError::new)
131 .context(InternalSnafu)?,
132 ))
133 }
134}
135
136struct InformationSchemaClusterInfoBuilder {
137 schema: SchemaRef,
138 catalog_manager: Weak<dyn CatalogManager>,
139
140 peer_ids: Int64VectorBuilder,
141 peer_types: StringVectorBuilder,
142 peer_addrs: StringVectorBuilder,
143 versions: StringVectorBuilder,
144 git_commits: StringVectorBuilder,
145 start_times: TimestampMillisecondVectorBuilder,
146 uptimes: StringVectorBuilder,
147 active_times: StringVectorBuilder,
148}
149
150impl InformationSchemaClusterInfoBuilder {
151 fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
152 Self {
153 schema,
154 catalog_manager,
155 peer_ids: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
156 peer_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
157 peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
158 versions: StringVectorBuilder::with_capacity(INIT_CAPACITY),
159 git_commits: StringVectorBuilder::with_capacity(INIT_CAPACITY),
160 start_times: TimestampMillisecondVectorBuilder::with_capacity(INIT_CAPACITY),
161 uptimes: StringVectorBuilder::with_capacity(INIT_CAPACITY),
162 active_times: StringVectorBuilder::with_capacity(INIT_CAPACITY),
163 }
164 }
165
166 async fn make_cluster_info(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
168 let predicates = Predicates::from_scan_request(&request);
169 let information_extension = utils::information_extension(&self.catalog_manager)?;
170 let node_infos = information_extension.nodes().await?;
171 for node_info in node_infos {
172 self.add_node_info(&predicates, node_info);
173 }
174 self.finish()
175 }
176
177 fn add_node_info(&mut self, predicates: &Predicates, node_info: NodeInfo) {
178 let peer_type = node_info.status.role_name();
179
180 let row = [
181 (PEER_ID, &Value::from(node_info.peer.id)),
182 (PEER_TYPE, &Value::from(peer_type)),
183 (PEER_ADDR, &Value::from(node_info.peer.addr.as_str())),
184 (VERSION, &Value::from(node_info.version.as_str())),
185 (GIT_COMMIT, &Value::from(node_info.git_commit.as_str())),
186 ];
187
188 if !predicates.eval(&row) {
189 return;
190 }
191
192 if peer_type == "FRONTEND" || peer_type == "METASRV" {
193 self.peer_ids.push(Some(-1));
195 } else {
196 self.peer_ids.push(Some(node_info.peer.id as i64));
197 }
198
199 self.peer_types.push(Some(peer_type));
200 self.peer_addrs.push(Some(&node_info.peer.addr));
201 self.versions.push(Some(&node_info.version));
202 self.git_commits.push(Some(&node_info.git_commit));
203 if node_info.start_time_ms > 0 {
204 self.start_times
205 .push(Some(TimestampMillisecond(Timestamp::new_millisecond(
206 node_info.start_time_ms as i64,
207 ))));
208 self.uptimes.push(Some(
209 Self::format_duration_since(node_info.start_time_ms).as_str(),
210 ));
211 } else {
212 self.start_times.push(None);
213 self.uptimes.push(None);
214 }
215
216 if node_info.last_activity_ts > 0 {
217 self.active_times.push(Some(
218 Self::format_duration_since(node_info.last_activity_ts as u64).as_str(),
219 ));
220 } else {
221 self.active_times.push(None);
222 }
223 }
224
225 fn format_duration_since(ts: u64) -> String {
226 let now = common_time::util::current_time_millis() as u64;
227 let duration_since = now - ts;
228 humantime::format_duration(Duration::from_millis(duration_since)).to_string()
229 }
230
231 fn finish(&mut self) -> Result<RecordBatch> {
232 let columns: Vec<VectorRef> = vec![
233 Arc::new(self.peer_ids.finish()),
234 Arc::new(self.peer_types.finish()),
235 Arc::new(self.peer_addrs.finish()),
236 Arc::new(self.versions.finish()),
237 Arc::new(self.git_commits.finish()),
238 Arc::new(self.start_times.finish()),
239 Arc::new(self.uptimes.finish()),
240 Arc::new(self.active_times.finish()),
241 ];
242 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
243 }
244}
245
246impl DfPartitionStream for InformationSchemaClusterInfo {
247 fn schema(&self) -> &ArrowSchemaRef {
248 self.schema.arrow_schema()
249 }
250
251 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
252 let schema = self.schema.arrow_schema().clone();
253 let mut builder = self.builder();
254 Box::pin(DfRecordBatchStreamAdapter::new(
255 schema,
256 futures::stream::once(async move {
257 builder
258 .make_cluster_info(None)
259 .await
260 .map(|x| x.into_df_record_batch())
261 .map_err(Into::into)
262 }),
263 ))
264 }
265}