catalog/system_schema/information_schema/
runtime_metrics.rs1use std::sync::Arc;
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use common_time::util::current_time_millis;
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::prelude::{ConcreteDataType, MutableVector};
28use datatypes::scalars::ScalarVectorBuilder;
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use datatypes::vectors::{
31 ConstantVector, Float64VectorBuilder, StringVectorBuilder, TimestampMillisecondVector,
32 VectorRef,
33};
34use itertools::Itertools;
35use snafu::ResultExt;
36use store_api::storage::{ScanRequest, TableId};
37
38use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
39use crate::system_schema::information_schema::{InformationTable, RUNTIME_METRICS};
40
41#[derive(Debug)]
42pub(super) struct InformationSchemaMetrics {
43 schema: SchemaRef,
44}
45
46const METRIC_NAME: &str = "metric_name";
47const METRIC_VALUE: &str = "value";
48const METRIC_LABELS: &str = "labels";
49const PEER_ADDR: &str = "peer_addr";
50const PEER_TYPE: &str = "peer_type";
51const TIMESTAMP: &str = "timestamp";
52
53impl InformationSchemaMetrics {
56 pub(super) fn new() -> Self {
57 Self {
58 schema: Self::schema(),
59 }
60 }
61
62 fn schema() -> SchemaRef {
63 Arc::new(Schema::new(vec![
64 ColumnSchema::new(METRIC_NAME, ConcreteDataType::string_datatype(), false),
65 ColumnSchema::new(METRIC_VALUE, ConcreteDataType::float64_datatype(), false),
66 ColumnSchema::new(METRIC_LABELS, ConcreteDataType::string_datatype(), true),
67 ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
68 ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
69 ColumnSchema::new(
70 TIMESTAMP,
71 ConcreteDataType::timestamp_millisecond_datatype(),
72 false,
73 ),
74 ]))
75 }
76
77 fn builder(&self) -> InformationSchemaMetricsBuilder {
78 InformationSchemaMetricsBuilder::new(self.schema.clone())
79 }
80}
81
82impl InformationTable for InformationSchemaMetrics {
83 fn table_id(&self) -> TableId {
84 INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID
85 }
86
87 fn table_name(&self) -> &'static str {
88 RUNTIME_METRICS
89 }
90
91 fn schema(&self) -> SchemaRef {
92 self.schema.clone()
93 }
94
95 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
96 let schema = self.schema.arrow_schema().clone();
97 let mut builder = self.builder();
98 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
99 schema,
100 futures::stream::once(async move {
101 builder
102 .make_metrics(Some(request))
103 .await
104 .map(|x| x.into_df_record_batch())
105 .map_err(Into::into)
106 }),
107 ));
108
109 Ok(Box::pin(
110 RecordBatchStreamAdapter::try_new(stream)
111 .map_err(BoxedError::new)
112 .context(InternalSnafu)?,
113 ))
114 }
115}
116
117struct InformationSchemaMetricsBuilder {
118 schema: SchemaRef,
119
120 metric_names: StringVectorBuilder,
121 metric_values: Float64VectorBuilder,
122 metric_labels: StringVectorBuilder,
123 peer_addrs: StringVectorBuilder,
124 peer_types: StringVectorBuilder,
125}
126
127impl InformationSchemaMetricsBuilder {
128 fn new(schema: SchemaRef) -> Self {
129 Self {
130 schema,
131 metric_names: StringVectorBuilder::with_capacity(42),
132 metric_values: Float64VectorBuilder::with_capacity(42),
133 metric_labels: StringVectorBuilder::with_capacity(42),
134 peer_addrs: StringVectorBuilder::with_capacity(42),
135 peer_types: StringVectorBuilder::with_capacity(42),
136 }
137 }
138
139 fn add_metric(
140 &mut self,
141 metric_name: &str,
142 labels: String,
143 metric_value: f64,
144 peer: Option<&str>,
145 peer_type: &str,
146 ) {
147 self.metric_names.push(Some(metric_name));
148 self.metric_values.push(Some(metric_value));
149 self.metric_labels.push(Some(&labels));
150 self.peer_addrs.push(peer);
151 self.peer_types.push(Some(peer_type));
152 }
153
154 async fn make_metrics(&mut self, _request: Option<ScanRequest>) -> Result<RecordBatch> {
155 let metric_families = prometheus::gather();
156
157 let write_request =
158 common_telemetry::metric::convert_metric_to_write_request(metric_families, None, 0);
159
160 for ts in write_request.timeseries {
161 let metric_name = ts
163 .labels
164 .iter()
165 .find_map(|label| {
166 if label.name == "__name__" {
167 Some(label.value.clone())
168 } else {
169 None
170 }
171 })
172 .unwrap();
173
174 self.add_metric(
175 &metric_name,
176 ts.labels
177 .into_iter()
178 .filter_map(|label| {
179 if label.name == "__name__" {
180 None
181 } else {
182 Some(format!("{}={}", label.name, label.value))
183 }
184 })
185 .join(", "),
186 ts.samples[0].value,
188 None,
190 "STANDALONE",
191 );
192 }
193
194 self.finish()
196 }
197
198 fn finish(&mut self) -> Result<RecordBatch> {
199 let rows_num = self.metric_names.len();
200
201 let timestamps = Arc::new(ConstantVector::new(
202 Arc::new(TimestampMillisecondVector::from_slice([
203 current_time_millis(),
204 ])),
205 rows_num,
206 ));
207
208 let columns: Vec<VectorRef> = vec![
209 Arc::new(self.metric_names.finish()),
210 Arc::new(self.metric_values.finish()),
211 Arc::new(self.metric_labels.finish()),
212 Arc::new(self.peer_addrs.finish()),
213 Arc::new(self.peer_types.finish()),
214 timestamps,
215 ];
216
217 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
218 }
219}
220
221impl DfPartitionStream for InformationSchemaMetrics {
222 fn schema(&self) -> &ArrowSchemaRef {
223 self.schema.arrow_schema()
224 }
225
226 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
227 let schema = self.schema.arrow_schema().clone();
228 let mut builder = self.builder();
229 Box::pin(DfRecordBatchStreamAdapter::new(
230 schema,
231 futures::stream::once(async move {
232 builder
233 .make_metrics(None)
234 .await
235 .map(|x| x.into_df_record_batch())
236 .map_err(Into::into)
237 }),
238 ))
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use common_recordbatch::RecordBatches;
245
246 use super::*;
247
248 #[tokio::test]
249 async fn test_make_metrics() {
250 let metrics = InformationSchemaMetrics::new();
251
252 let stream = metrics.to_stream(ScanRequest::default()).unwrap();
253
254 let batches = RecordBatches::try_collect(stream).await.unwrap();
255
256 let result_literal = batches.pretty_print().unwrap();
257
258 assert!(result_literal.contains(METRIC_NAME));
259 assert!(result_literal.contains(METRIC_VALUE));
260 assert!(result_literal.contains(METRIC_LABELS));
261 assert!(result_literal.contains(PEER_ADDR));
262 assert!(result_literal.contains(PEER_TYPE));
263 assert!(result_literal.contains(TIMESTAMP));
264 }
265}