catalog/system_schema/information_schema/
runtime_metrics.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
22use common_time::util::current_time_millis;
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
27use datatypes::prelude::{ConcreteDataType, MutableVector};
28use datatypes::scalars::ScalarVectorBuilder;
29use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
30use datatypes::vectors::{
31    ConstantVector, Float64VectorBuilder, StringVectorBuilder, TimestampMillisecondVector,
32    VectorRef,
33};
34use itertools::Itertools;
35use snafu::ResultExt;
36use store_api::storage::{ScanRequest, TableId};
37
38use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
39use crate::system_schema::information_schema::{InformationTable, RUNTIME_METRICS};
40
41#[derive(Debug)]
42pub(super) struct InformationSchemaMetrics {
43    schema: SchemaRef,
44}
45
46const METRIC_NAME: &str = "metric_name";
47const METRIC_VALUE: &str = "value";
48const METRIC_LABELS: &str = "labels";
49const PEER_ADDR: &str = "peer_addr";
50const PEER_TYPE: &str = "peer_type";
51const TIMESTAMP: &str = "timestamp";
52
53/// The `information_schema.runtime_metrics` virtual table.
54/// It provides the GreptimeDB runtime metrics for the users by SQL.
55impl InformationSchemaMetrics {
56    pub(super) fn new() -> Self {
57        Self {
58            schema: Self::schema(),
59        }
60    }
61
62    fn schema() -> SchemaRef {
63        Arc::new(Schema::new(vec![
64            ColumnSchema::new(METRIC_NAME, ConcreteDataType::string_datatype(), false),
65            ColumnSchema::new(METRIC_VALUE, ConcreteDataType::float64_datatype(), false),
66            ColumnSchema::new(METRIC_LABELS, ConcreteDataType::string_datatype(), true),
67            ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
68            ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
69            ColumnSchema::new(
70                TIMESTAMP,
71                ConcreteDataType::timestamp_millisecond_datatype(),
72                false,
73            ),
74        ]))
75    }
76
77    fn builder(&self) -> InformationSchemaMetricsBuilder {
78        InformationSchemaMetricsBuilder::new(self.schema.clone())
79    }
80}
81
82impl InformationTable for InformationSchemaMetrics {
83    fn table_id(&self) -> TableId {
84        INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID
85    }
86
87    fn table_name(&self) -> &'static str {
88        RUNTIME_METRICS
89    }
90
91    fn schema(&self) -> SchemaRef {
92        self.schema.clone()
93    }
94
95    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
96        let schema = self.schema.arrow_schema().clone();
97        let mut builder = self.builder();
98        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
99            schema,
100            futures::stream::once(async move {
101                builder
102                    .make_metrics(Some(request))
103                    .await
104                    .map(|x| x.into_df_record_batch())
105                    .map_err(Into::into)
106            }),
107        ));
108
109        Ok(Box::pin(
110            RecordBatchStreamAdapter::try_new(stream)
111                .map_err(BoxedError::new)
112                .context(InternalSnafu)?,
113        ))
114    }
115}
116
117struct InformationSchemaMetricsBuilder {
118    schema: SchemaRef,
119
120    metric_names: StringVectorBuilder,
121    metric_values: Float64VectorBuilder,
122    metric_labels: StringVectorBuilder,
123    peer_addrs: StringVectorBuilder,
124    peer_types: StringVectorBuilder,
125}
126
127impl InformationSchemaMetricsBuilder {
128    fn new(schema: SchemaRef) -> Self {
129        Self {
130            schema,
131            metric_names: StringVectorBuilder::with_capacity(42),
132            metric_values: Float64VectorBuilder::with_capacity(42),
133            metric_labels: StringVectorBuilder::with_capacity(42),
134            peer_addrs: StringVectorBuilder::with_capacity(42),
135            peer_types: StringVectorBuilder::with_capacity(42),
136        }
137    }
138
139    fn add_metric(
140        &mut self,
141        metric_name: &str,
142        labels: String,
143        metric_value: f64,
144        peer: Option<&str>,
145        peer_type: &str,
146    ) {
147        self.metric_names.push(Some(metric_name));
148        self.metric_values.push(Some(metric_value));
149        self.metric_labels.push(Some(&labels));
150        self.peer_addrs.push(peer);
151        self.peer_types.push(Some(peer_type));
152    }
153
154    async fn make_metrics(&mut self, _request: Option<ScanRequest>) -> Result<RecordBatch> {
155        let metric_families = prometheus::gather();
156
157        let write_request =
158            common_telemetry::metric::convert_metric_to_write_request(metric_families, None, 0);
159
160        for ts in write_request.timeseries {
161            //Safety: always has `__name__` label
162            let metric_name = ts
163                .labels
164                .iter()
165                .find_map(|label| {
166                    if label.name == "__name__" {
167                        Some(label.value.clone())
168                    } else {
169                        None
170                    }
171                })
172                .unwrap();
173
174            self.add_metric(
175                &metric_name,
176                ts.labels
177                    .into_iter()
178                    .filter_map(|label| {
179                        if label.name == "__name__" {
180                            None
181                        } else {
182                            Some(format!("{}={}", label.name, label.value))
183                        }
184                    })
185                    .join(", "),
186                // Safety: always has a sample
187                ts.samples[0].value,
188                // The peer column is always `None` for standalone
189                None,
190                "STANDALONE",
191            );
192        }
193
194        // FIXME(dennis): fetching other peers metrics
195        self.finish()
196    }
197
198    fn finish(&mut self) -> Result<RecordBatch> {
199        let rows_num = self.metric_names.len();
200
201        let timestamps = Arc::new(ConstantVector::new(
202            Arc::new(TimestampMillisecondVector::from_slice([
203                current_time_millis(),
204            ])),
205            rows_num,
206        ));
207
208        let columns: Vec<VectorRef> = vec![
209            Arc::new(self.metric_names.finish()),
210            Arc::new(self.metric_values.finish()),
211            Arc::new(self.metric_labels.finish()),
212            Arc::new(self.peer_addrs.finish()),
213            Arc::new(self.peer_types.finish()),
214            timestamps,
215        ];
216
217        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
218    }
219}
220
221impl DfPartitionStream for InformationSchemaMetrics {
222    fn schema(&self) -> &ArrowSchemaRef {
223        self.schema.arrow_schema()
224    }
225
226    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
227        let schema = self.schema.arrow_schema().clone();
228        let mut builder = self.builder();
229        Box::pin(DfRecordBatchStreamAdapter::new(
230            schema,
231            futures::stream::once(async move {
232                builder
233                    .make_metrics(None)
234                    .await
235                    .map(|x| x.into_df_record_batch())
236                    .map_err(Into::into)
237            }),
238        ))
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use common_recordbatch::RecordBatches;
245
246    use super::*;
247
248    #[tokio::test]
249    async fn test_make_metrics() {
250        let metrics = InformationSchemaMetrics::new();
251
252        let stream = metrics.to_stream(ScanRequest::default()).unwrap();
253
254        let batches = RecordBatches::try_collect(stream).await.unwrap();
255
256        let result_literal = batches.pretty_print().unwrap();
257
258        assert!(result_literal.contains(METRIC_NAME));
259        assert!(result_literal.contains(METRIC_VALUE));
260        assert!(result_literal.contains(METRIC_LABELS));
261        assert!(result_literal.contains(PEER_ADDR));
262        assert!(result_literal.contains(PEER_TYPE));
263        assert!(result_literal.contains(TIMESTAMP));
264    }
265}