frontend/instance/
dashboard.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
21    RowInsertRequests, Rows, SemanticType,
22};
23use async_trait::async_trait;
24use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine};
25use common_error::ext::BoxedError;
26use common_query::OutputData;
27use common_recordbatch::util as record_util;
28use common_telemetry::info;
29use common_time::FOREVER;
30use datafusion::datasource::DefaultTableSource;
31use datafusion::logical_expr::col;
32use datafusion::sql::TableReference;
33use datafusion_expr::{DmlStatement, LogicalPlan, lit};
34use datatypes::arrow::array::{Array, AsArray};
35use servers::error::{
36    CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu,
37    TableNotFoundSnafu,
38};
39use servers::query_handler::DashboardDefinition;
40use session::context::{QueryContextBuilder, QueryContextRef};
41use snafu::{OptionExt, ResultExt};
42use table::TableRef;
43use table::metadata::TableInfo;
44use table::requests::TTL_KEY;
45use table::table::adapter::DfTableProviderAdapter;
46
47use crate::instance::Instance;
48
49pub const DASHBOARD_TABLE_NAME: &str = "dashboard";
50pub const DASHBOARD_TABLE_NAME_COLUMN_NAME: &str = "name";
51pub const DASHBOARD_TABLE_DEFINITION_COLUMN_NAME: &str = "definition";
52pub const DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
53
54impl Instance {
55    /// Build a schema for dashboard table.
56    /// Returns the (time index, primary keys, column) definitions.
57    fn build_dashboard_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
58        (
59            DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
60            vec![DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string()],
61            vec![
62                ColumnDef {
63                    name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
64                    data_type: ColumnDataType::String as i32,
65                    is_nullable: false,
66                    default_constraint: vec![],
67                    semantic_type: SemanticType::Tag as i32,
68                    comment: String::new(),
69                    datatype_extension: None,
70                    options: None,
71                },
72                ColumnDef {
73                    name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
74                    data_type: ColumnDataType::String as i32,
75                    is_nullable: false,
76                    default_constraint: vec![],
77                    semantic_type: SemanticType::Field as i32,
78                    comment: String::new(),
79                    datatype_extension: None,
80                    options: None,
81                },
82                ColumnDef {
83                    name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
84                    data_type: ColumnDataType::TimestampNanosecond as i32,
85                    is_nullable: false,
86                    default_constraint: vec![],
87                    semantic_type: SemanticType::Timestamp as i32,
88                    comment: String::new(),
89                    datatype_extension: None,
90                    options: None,
91                },
92            ],
93        )
94    }
95
96    /// Build a column schemas for inserting a row into the dashboard table.
97    fn build_dashboard_insert_column_schemas() -> Vec<PbColumnSchema> {
98        vec![
99            PbColumnSchema {
100                column_name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
101                datatype: ColumnDataType::String.into(),
102                semantic_type: SemanticType::Tag.into(),
103                ..Default::default()
104            },
105            PbColumnSchema {
106                column_name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
107                datatype: ColumnDataType::String.into(),
108                semantic_type: SemanticType::Field.into(),
109                ..Default::default()
110            },
111            PbColumnSchema {
112                column_name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
113                datatype: ColumnDataType::TimestampNanosecond.into(),
114                semantic_type: SemanticType::Timestamp.into(),
115                ..Default::default()
116            },
117        ]
118    }
119
120    fn dashboard_query_ctx(table_info: &TableInfo) -> QueryContextRef {
121        QueryContextBuilder::default()
122            .current_catalog(table_info.catalog_name.clone())
123            .current_schema(table_info.schema_name.clone())
124            .build()
125            .into()
126    }
127
128    async fn create_dashboard_table_if_not_exists(
129        &self,
130        ctx: QueryContextRef,
131    ) -> servers::error::Result<TableRef> {
132        let catalog = ctx.current_catalog();
133
134        if let Some(table) = self
135            .catalog_manager
136            .table(
137                catalog,
138                DEFAULT_PRIVATE_SCHEMA_NAME,
139                DASHBOARD_TABLE_NAME,
140                Some(&ctx),
141            )
142            .await
143            .context(CatalogSnafu)?
144        {
145            return Ok(table);
146        }
147
148        let (time_index, primary_keys, column_defs) = Self::build_dashboard_schema();
149
150        let mut table_options = HashMap::new();
151        table_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
152
153        let mut create_table_expr = api::v1::CreateTableExpr {
154            catalog_name: catalog.to_string(),
155            schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
156            table_name: DASHBOARD_TABLE_NAME.to_string(),
157            desc: "GreptimeDB dashboard table".to_string(),
158            column_defs,
159            time_index,
160            primary_keys,
161            create_if_not_exists: true,
162            table_options,
163            table_id: None,
164            engine: default_engine().to_string(),
165        };
166
167        self.statement_executor
168            .create_table_inner(&mut create_table_expr, None, ctx.clone())
169            .await
170            .map_err(BoxedError::new)
171            .context(ExecuteQuerySnafu)?;
172
173        let table = self
174            .catalog_manager
175            .table(
176                catalog,
177                DEFAULT_PRIVATE_SCHEMA_NAME,
178                DASHBOARD_TABLE_NAME,
179                Some(&ctx),
180            )
181            .await
182            .context(CatalogSnafu)?
183            .context(TableNotFoundSnafu {
184                catalog: catalog.to_string(),
185                schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
186                table: DASHBOARD_TABLE_NAME.to_string(),
187            })?;
188
189        Ok(table)
190    }
191
192    /// Insert a dashboard into the dashboard table.
193    async fn insert_dashboard(
194        &self,
195        name: &str,
196        definition: &str,
197        query_ctx: QueryContextRef,
198    ) -> servers::error::Result<()> {
199        let table = self
200            .create_dashboard_table_if_not_exists(query_ctx.clone())
201            .await?;
202        let table_info = table.table_info();
203
204        let insert = RowInsertRequest {
205            table_name: DASHBOARD_TABLE_NAME.to_string(),
206            rows: Some(Rows {
207                schema: Self::build_dashboard_insert_column_schemas(),
208                rows: vec![Row {
209                    values: vec![
210                        ValueData::StringValue(name.to_string()).into(),
211                        ValueData::StringValue(definition.to_string()).into(),
212                        ValueData::TimestampNanosecondValue(0).into(),
213                    ],
214                }],
215            }),
216        };
217
218        let requests = RowInsertRequests {
219            inserts: vec![insert],
220        };
221
222        let output = self
223            .inserter
224            .handle_row_inserts(
225                requests,
226                Self::dashboard_query_ctx(&table_info),
227                &self.statement_executor,
228                false,
229                false,
230            )
231            .await
232            .map_err(BoxedError::new)
233            .context(ExecuteQuerySnafu)?;
234
235        info!(
236            "Insert dashboard success, name: {}, table: {}, output: {:?}",
237            name,
238            table_info.full_table_name(),
239            output
240        );
241
242        Ok(())
243    }
244
245    /// List all dashboards.
246    async fn list_dashboards(
247        &self,
248        query_ctx: QueryContextRef,
249    ) -> servers::error::Result<Vec<DashboardDefinition>> {
250        let table = if let Some(table) = self
251            .catalog_manager
252            .table(
253                query_ctx.current_catalog(),
254                DEFAULT_PRIVATE_SCHEMA_NAME,
255                DASHBOARD_TABLE_NAME,
256                Some(&query_ctx),
257            )
258            .await
259            .context(CatalogSnafu)?
260        {
261            table
262        } else {
263            return Ok(vec![]);
264        };
265
266        let table_info = table.table_info();
267
268        let dataframe = self
269            .query_engine
270            .read_table(table.clone())
271            .map_err(BoxedError::new)
272            .context(ExecuteQuerySnafu)?;
273
274        let dataframe = dataframe
275            .select_columns(&[
276                DASHBOARD_TABLE_NAME_COLUMN_NAME,
277                DASHBOARD_TABLE_DEFINITION_COLUMN_NAME,
278            ])
279            .context(DataFusionSnafu)?;
280
281        let plan = dataframe.into_parts().1;
282
283        let output = self
284            .query_engine
285            .execute(plan, Self::dashboard_query_ctx(&table_info))
286            .await
287            .map_err(BoxedError::new)
288            .context(ExecuteQuerySnafu)?;
289
290        let stream = match output.data {
291            OutputData::Stream(stream) => stream,
292            OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
293            _ => unreachable!(),
294        };
295
296        let records = record_util::collect(stream)
297            .await
298            .context(CollectRecordbatchSnafu)?;
299
300        let mut dashboards = Vec::new();
301
302        for r in &records {
303            let name_column = r.column(0);
304            let definition_column = r.column(1);
305
306            let name = name_column
307                .as_string_opt::<i32>()
308                .context(NotSupportedSnafu {
309                    feat: "Invalid data type for greptime_private.dashboard.name",
310                })?;
311
312            let definition =
313                definition_column
314                    .as_string_opt::<i32>()
315                    .context(NotSupportedSnafu {
316                        feat: "Invalid data type for greptime_private.dashboard.definition",
317                    })?;
318
319            for i in 0..name.len() {
320                dashboards.push(DashboardDefinition {
321                    name: name.value(i).to_string(),
322                    definition: definition.value(i).to_string(),
323                });
324            }
325        }
326
327        Ok(dashboards)
328    }
329
330    /// Delete a dashboard by name.
331    async fn delete_dashboard(
332        &self,
333        name: &str,
334        query_ctx: QueryContextRef,
335    ) -> servers::error::Result<()> {
336        let table = self
337            .create_dashboard_table_if_not_exists(query_ctx.clone())
338            .await?;
339        let table_info = table.table_info();
340
341        let dataframe = self
342            .query_engine
343            .read_table(table.clone())
344            .map_err(BoxedError::new)
345            .context(ExecuteQuerySnafu)?;
346
347        let name_condition = col(DASHBOARD_TABLE_NAME_COLUMN_NAME).eq(lit(name));
348
349        let dataframe = dataframe.filter(name_condition).context(DataFusionSnafu)?;
350
351        let table_name = TableReference::full(
352            table_info.catalog_name.clone(),
353            table_info.schema_name.clone(),
354            table_info.name.clone(),
355        );
356
357        let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
358        let table_source = Arc::new(DefaultTableSource::new(table_provider));
359
360        let stmt = DmlStatement::new(
361            table_name,
362            table_source,
363            datafusion_expr::WriteOp::Delete,
364            Arc::new(dataframe.into_parts().1),
365        );
366
367        let plan = LogicalPlan::Dml(stmt);
368
369        let output = self
370            .query_engine
371            .execute(plan, Self::dashboard_query_ctx(&table_info))
372            .await
373            .map_err(BoxedError::new)
374            .context(ExecuteQuerySnafu)?;
375
376        info!(
377            "Delete dashboard success, name: {}, table: {}, output: {:?}",
378            name,
379            table_info.full_table_name(),
380            output
381        );
382
383        Ok(())
384    }
385}
386
387#[async_trait]
388impl servers::query_handler::DashboardHandler for Instance {
389    async fn save(
390        &self,
391        name: &str,
392        definition: &str,
393        ctx: QueryContextRef,
394    ) -> servers::error::Result<()> {
395        self.insert_dashboard(name, definition, ctx).await
396    }
397
398    async fn list(&self, ctx: QueryContextRef) -> servers::error::Result<Vec<DashboardDefinition>> {
399        self.list_dashboards(ctx).await
400    }
401
402    async fn delete(&self, name: &str, ctx: QueryContextRef) -> servers::error::Result<()> {
403        self.delete_dashboard(name, ctx).await
404    }
405}