Skip to main content

frontend/instance/
dashboard.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
21    RowInsertRequests, Rows, SemanticType,
22};
23use async_trait::async_trait;
24use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine};
25use common_error::ext::BoxedError;
26use common_query::OutputData;
27use common_recordbatch::util as record_util;
28use common_telemetry::info;
29use common_time::FOREVER;
30use datafusion::datasource::DefaultTableSource;
31use datafusion::logical_expr::col;
32use datafusion::sql::TableReference;
33use datafusion_expr::{DmlStatement, LogicalPlan, lit};
34use datatypes::arrow::array::{Array, AsArray};
35use servers::error::{
36    CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu,
37    TableNotFoundSnafu,
38};
39use servers::query_handler::DashboardDefinition;
40use session::context::{QueryContextBuilder, QueryContextRef};
41use snafu::{OptionExt, ResultExt};
42use table::TableRef;
43use table::metadata::TableInfo;
44use table::requests::TTL_KEY;
45use table::table::adapter::DfTableProviderAdapter;
46
47use crate::instance::Instance;
48
49pub const DASHBOARD_TABLE_NAME: &str = "dashboard";
50pub const DASHBOARD_TABLE_NAME_COLUMN_NAME: &str = "name";
51pub const DASHBOARD_TABLE_DEFINITION_COLUMN_NAME: &str = "definition";
52pub const DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
53
54impl Instance {
55    /// Build a schema for dashboard table.
56    /// Returns the (time index, primary keys, column) definitions.
57    fn build_dashboard_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
58        (
59            DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
60            vec![DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string()],
61            vec![
62                ColumnDef {
63                    name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
64                    data_type: ColumnDataType::String as i32,
65                    is_nullable: false,
66                    default_constraint: vec![],
67                    semantic_type: SemanticType::Tag as i32,
68                    comment: String::new(),
69                    datatype_extension: None,
70                    options: None,
71                },
72                ColumnDef {
73                    name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
74                    data_type: ColumnDataType::String as i32,
75                    is_nullable: false,
76                    default_constraint: vec![],
77                    semantic_type: SemanticType::Field as i32,
78                    comment: String::new(),
79                    datatype_extension: None,
80                    options: None,
81                },
82                ColumnDef {
83                    name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
84                    data_type: ColumnDataType::TimestampNanosecond as i32,
85                    is_nullable: false,
86                    default_constraint: vec![],
87                    semantic_type: SemanticType::Timestamp as i32,
88                    comment: String::new(),
89                    datatype_extension: None,
90                    options: None,
91                },
92            ],
93        )
94    }
95
96    /// Build a column schemas for inserting a row into the dashboard table.
97    fn build_dashboard_insert_column_schemas() -> Vec<PbColumnSchema> {
98        vec![
99            PbColumnSchema {
100                column_name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
101                datatype: ColumnDataType::String.into(),
102                semantic_type: SemanticType::Tag.into(),
103                ..Default::default()
104            },
105            PbColumnSchema {
106                column_name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
107                datatype: ColumnDataType::String.into(),
108                semantic_type: SemanticType::Field.into(),
109                ..Default::default()
110            },
111            PbColumnSchema {
112                column_name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
113                datatype: ColumnDataType::TimestampNanosecond.into(),
114                semantic_type: SemanticType::Timestamp.into(),
115                ..Default::default()
116            },
117        ]
118    }
119
120    fn dashboard_query_ctx(table_info: &TableInfo) -> QueryContextRef {
121        QueryContextBuilder::default()
122            .current_catalog(table_info.catalog_name.clone())
123            .current_schema(table_info.schema_name.clone())
124            .build()
125            .into()
126    }
127
128    async fn create_dashboard_table_if_not_exists(
129        &self,
130        ctx: QueryContextRef,
131    ) -> servers::error::Result<TableRef> {
132        let catalog = ctx.current_catalog();
133
134        if let Some(table) = self
135            .catalog_manager
136            .table(
137                catalog,
138                DEFAULT_PRIVATE_SCHEMA_NAME,
139                DASHBOARD_TABLE_NAME,
140                Some(&ctx),
141            )
142            .await?
143        {
144            return Ok(table);
145        }
146
147        let (time_index, primary_keys, column_defs) = Self::build_dashboard_schema();
148
149        let mut table_options = HashMap::new();
150        table_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
151
152        let mut create_table_expr = api::v1::CreateTableExpr {
153            catalog_name: catalog.to_string(),
154            schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
155            table_name: DASHBOARD_TABLE_NAME.to_string(),
156            desc: "GreptimeDB dashboard table".to_string(),
157            column_defs,
158            time_index,
159            primary_keys,
160            create_if_not_exists: true,
161            table_options,
162            table_id: None,
163            engine: default_engine().to_string(),
164        };
165
166        self.statement_executor
167            .create_table_inner(&mut create_table_expr, None, ctx.clone())
168            .await
169            .map_err(BoxedError::new)
170            .context(ExecuteQuerySnafu)?;
171
172        let table = self
173            .catalog_manager
174            .table(
175                catalog,
176                DEFAULT_PRIVATE_SCHEMA_NAME,
177                DASHBOARD_TABLE_NAME,
178                Some(&ctx),
179            )
180            .await?
181            .context(TableNotFoundSnafu {
182                catalog: catalog.to_string(),
183                schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
184                table: DASHBOARD_TABLE_NAME.to_string(),
185            })?;
186
187        Ok(table)
188    }
189
190    /// Insert a dashboard into the dashboard table.
191    async fn insert_dashboard(
192        &self,
193        name: &str,
194        definition: &str,
195        query_ctx: QueryContextRef,
196    ) -> servers::error::Result<()> {
197        let table = self
198            .create_dashboard_table_if_not_exists(query_ctx.clone())
199            .await?;
200        let table_info = table.table_info();
201
202        let insert = RowInsertRequest {
203            table_name: DASHBOARD_TABLE_NAME.to_string(),
204            rows: Some(Rows {
205                schema: Self::build_dashboard_insert_column_schemas(),
206                rows: vec![Row {
207                    values: vec![
208                        ValueData::StringValue(name.to_string()).into(),
209                        ValueData::StringValue(definition.to_string()).into(),
210                        ValueData::TimestampNanosecondValue(0).into(),
211                    ],
212                }],
213            }),
214        };
215
216        let requests = RowInsertRequests {
217            inserts: vec![insert],
218        };
219
220        let output = self
221            .inserter
222            .handle_row_inserts(
223                requests,
224                Self::dashboard_query_ctx(&table_info),
225                &self.statement_executor,
226                false,
227                false,
228            )
229            .await
230            .map_err(BoxedError::new)
231            .context(ExecuteQuerySnafu)?;
232
233        info!(
234            "Insert dashboard success, name: {}, table: {}, output: {:?}",
235            name,
236            table_info.full_table_name(),
237            output
238        );
239
240        Ok(())
241    }
242
243    /// List all dashboards.
244    async fn list_dashboards(
245        &self,
246        query_ctx: QueryContextRef,
247    ) -> servers::error::Result<Vec<DashboardDefinition>> {
248        let table = if let Some(table) = self
249            .catalog_manager
250            .table(
251                query_ctx.current_catalog(),
252                DEFAULT_PRIVATE_SCHEMA_NAME,
253                DASHBOARD_TABLE_NAME,
254                Some(&query_ctx),
255            )
256            .await?
257        {
258            table
259        } else {
260            return Ok(vec![]);
261        };
262
263        let table_info = table.table_info();
264
265        let dataframe = self
266            .query_engine
267            .read_table(table.clone())
268            .map_err(BoxedError::new)
269            .context(ExecuteQuerySnafu)?;
270
271        let dataframe = dataframe
272            .select_columns(&[
273                DASHBOARD_TABLE_NAME_COLUMN_NAME,
274                DASHBOARD_TABLE_DEFINITION_COLUMN_NAME,
275            ])
276            .context(DataFusionSnafu)?;
277
278        let plan = dataframe.into_parts().1;
279
280        let output = self
281            .query_engine
282            .execute(plan, Self::dashboard_query_ctx(&table_info))
283            .await
284            .map_err(BoxedError::new)
285            .context(ExecuteQuerySnafu)?;
286
287        let stream = match output.data {
288            OutputData::Stream(stream) => stream,
289            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
290            _ => unreachable!(),
291        };
292
293        let records = record_util::collect(stream)
294            .await
295            .context(CollectRecordbatchSnafu)?;
296
297        let mut dashboards = Vec::new();
298
299        for r in &records {
300            let name_column = r.column(0);
301            let definition_column = r.column(1);
302
303            let name = name_column
304                .as_string_opt::<i32>()
305                .context(NotSupportedSnafu {
306                    feat: "Invalid data type for greptime_private.dashboard.name",
307                })?;
308
309            let definition =
310                definition_column
311                    .as_string_opt::<i32>()
312                    .context(NotSupportedSnafu {
313                        feat: "Invalid data type for greptime_private.dashboard.definition",
314                    })?;
315
316            for i in 0..name.len() {
317                dashboards.push(DashboardDefinition {
318                    name: name.value(i).to_string(),
319                    definition: definition.value(i).to_string(),
320                });
321            }
322        }
323
324        Ok(dashboards)
325    }
326
327    /// Delete a dashboard by name.
328    async fn delete_dashboard(
329        &self,
330        name: &str,
331        query_ctx: QueryContextRef,
332    ) -> servers::error::Result<()> {
333        let table = self
334            .create_dashboard_table_if_not_exists(query_ctx.clone())
335            .await?;
336        let table_info = table.table_info();
337
338        let dataframe = self
339            .query_engine
340            .read_table(table.clone())
341            .map_err(BoxedError::new)
342            .context(ExecuteQuerySnafu)?;
343
344        let name_condition = col(DASHBOARD_TABLE_NAME_COLUMN_NAME).eq(lit(name));
345
346        let dataframe = dataframe.filter(name_condition).context(DataFusionSnafu)?;
347
348        let table_name = TableReference::full(
349            table_info.catalog_name.clone(),
350            table_info.schema_name.clone(),
351            table_info.name.clone(),
352        );
353
354        let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
355        let table_source = Arc::new(DefaultTableSource::new(table_provider));
356
357        let stmt = DmlStatement::new(
358            table_name,
359            table_source,
360            datafusion_expr::WriteOp::Delete,
361            Arc::new(dataframe.into_parts().1),
362        );
363
364        let plan = LogicalPlan::Dml(stmt);
365
366        let output = self
367            .query_engine
368            .execute(plan, Self::dashboard_query_ctx(&table_info))
369            .await
370            .map_err(BoxedError::new)
371            .context(ExecuteQuerySnafu)?;
372
373        info!(
374            "Delete dashboard success, name: {}, table: {}, output: {:?}",
375            name,
376            table_info.full_table_name(),
377            output
378        );
379
380        Ok(())
381    }
382}
383
384#[async_trait]
385impl servers::query_handler::DashboardHandler for Instance {
386    async fn save(
387        &self,
388        name: &str,
389        definition: &str,
390        ctx: QueryContextRef,
391    ) -> servers::error::Result<()> {
392        self.insert_dashboard(name, definition, ctx).await
393    }
394
395    async fn list(&self, ctx: QueryContextRef) -> servers::error::Result<Vec<DashboardDefinition>> {
396        self.list_dashboards(ctx).await
397    }
398
399    async fn delete(&self, name: &str, ctx: QueryContextRef) -> servers::error::Result<()> {
400        self.delete_dashboard(name, ctx).await
401    }
402}