1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
21 RowInsertRequests, Rows, SemanticType,
22};
23use async_trait::async_trait;
24use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine};
25use common_error::ext::BoxedError;
26use common_query::OutputData;
27use common_recordbatch::util as record_util;
28use common_telemetry::info;
29use common_time::FOREVER;
30use datafusion::datasource::DefaultTableSource;
31use datafusion::logical_expr::col;
32use datafusion::sql::TableReference;
33use datafusion_expr::{DmlStatement, LogicalPlan, lit};
34use datatypes::arrow::array::{Array, AsArray};
35use servers::error::{
36 CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu,
37 TableNotFoundSnafu,
38};
39use servers::query_handler::DashboardDefinition;
40use session::context::{QueryContextBuilder, QueryContextRef};
41use snafu::{OptionExt, ResultExt};
42use table::TableRef;
43use table::metadata::TableInfo;
44use table::requests::TTL_KEY;
45use table::table::adapter::DfTableProviderAdapter;
46
47use crate::instance::Instance;
48
49pub const DASHBOARD_TABLE_NAME: &str = "dashboard";
50pub const DASHBOARD_TABLE_NAME_COLUMN_NAME: &str = "name";
51pub const DASHBOARD_TABLE_DEFINITION_COLUMN_NAME: &str = "definition";
52pub const DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
53
54impl Instance {
55 fn build_dashboard_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
58 (
59 DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
60 vec![DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string()],
61 vec![
62 ColumnDef {
63 name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
64 data_type: ColumnDataType::String as i32,
65 is_nullable: false,
66 default_constraint: vec![],
67 semantic_type: SemanticType::Tag as i32,
68 comment: String::new(),
69 datatype_extension: None,
70 options: None,
71 },
72 ColumnDef {
73 name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
74 data_type: ColumnDataType::String as i32,
75 is_nullable: false,
76 default_constraint: vec![],
77 semantic_type: SemanticType::Field as i32,
78 comment: String::new(),
79 datatype_extension: None,
80 options: None,
81 },
82 ColumnDef {
83 name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
84 data_type: ColumnDataType::TimestampNanosecond as i32,
85 is_nullable: false,
86 default_constraint: vec![],
87 semantic_type: SemanticType::Timestamp as i32,
88 comment: String::new(),
89 datatype_extension: None,
90 options: None,
91 },
92 ],
93 )
94 }
95
96 fn build_dashboard_insert_column_schemas() -> Vec<PbColumnSchema> {
98 vec![
99 PbColumnSchema {
100 column_name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
101 datatype: ColumnDataType::String.into(),
102 semantic_type: SemanticType::Tag.into(),
103 ..Default::default()
104 },
105 PbColumnSchema {
106 column_name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
107 datatype: ColumnDataType::String.into(),
108 semantic_type: SemanticType::Field.into(),
109 ..Default::default()
110 },
111 PbColumnSchema {
112 column_name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
113 datatype: ColumnDataType::TimestampNanosecond.into(),
114 semantic_type: SemanticType::Timestamp.into(),
115 ..Default::default()
116 },
117 ]
118 }
119
120 fn dashboard_query_ctx(table_info: &TableInfo) -> QueryContextRef {
121 QueryContextBuilder::default()
122 .current_catalog(table_info.catalog_name.clone())
123 .current_schema(table_info.schema_name.clone())
124 .build()
125 .into()
126 }
127
128 async fn create_dashboard_table_if_not_exists(
129 &self,
130 ctx: QueryContextRef,
131 ) -> servers::error::Result<TableRef> {
132 let catalog = ctx.current_catalog();
133
134 if let Some(table) = self
135 .catalog_manager
136 .table(
137 catalog,
138 DEFAULT_PRIVATE_SCHEMA_NAME,
139 DASHBOARD_TABLE_NAME,
140 Some(&ctx),
141 )
142 .await
143 .context(CatalogSnafu)?
144 {
145 return Ok(table);
146 }
147
148 let (time_index, primary_keys, column_defs) = Self::build_dashboard_schema();
149
150 let mut table_options = HashMap::new();
151 table_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
152
153 let mut create_table_expr = api::v1::CreateTableExpr {
154 catalog_name: catalog.to_string(),
155 schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
156 table_name: DASHBOARD_TABLE_NAME.to_string(),
157 desc: "GreptimeDB dashboard table".to_string(),
158 column_defs,
159 time_index,
160 primary_keys,
161 create_if_not_exists: true,
162 table_options,
163 table_id: None,
164 engine: default_engine().to_string(),
165 };
166
167 self.statement_executor
168 .create_table_inner(&mut create_table_expr, None, ctx.clone())
169 .await
170 .map_err(BoxedError::new)
171 .context(ExecuteQuerySnafu)?;
172
173 let table = self
174 .catalog_manager
175 .table(
176 catalog,
177 DEFAULT_PRIVATE_SCHEMA_NAME,
178 DASHBOARD_TABLE_NAME,
179 Some(&ctx),
180 )
181 .await
182 .context(CatalogSnafu)?
183 .context(TableNotFoundSnafu {
184 catalog: catalog.to_string(),
185 schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
186 table: DASHBOARD_TABLE_NAME.to_string(),
187 })?;
188
189 Ok(table)
190 }
191
192 async fn insert_dashboard(
194 &self,
195 name: &str,
196 definition: &str,
197 query_ctx: QueryContextRef,
198 ) -> servers::error::Result<()> {
199 let table = self
200 .create_dashboard_table_if_not_exists(query_ctx.clone())
201 .await?;
202 let table_info = table.table_info();
203
204 let insert = RowInsertRequest {
205 table_name: DASHBOARD_TABLE_NAME.to_string(),
206 rows: Some(Rows {
207 schema: Self::build_dashboard_insert_column_schemas(),
208 rows: vec![Row {
209 values: vec![
210 ValueData::StringValue(name.to_string()).into(),
211 ValueData::StringValue(definition.to_string()).into(),
212 ValueData::TimestampNanosecondValue(0).into(),
213 ],
214 }],
215 }),
216 };
217
218 let requests = RowInsertRequests {
219 inserts: vec![insert],
220 };
221
222 let output = self
223 .inserter
224 .handle_row_inserts(
225 requests,
226 Self::dashboard_query_ctx(&table_info),
227 &self.statement_executor,
228 false,
229 false,
230 )
231 .await
232 .map_err(BoxedError::new)
233 .context(ExecuteQuerySnafu)?;
234
235 info!(
236 "Insert dashboard success, name: {}, table: {}, output: {:?}",
237 name,
238 table_info.full_table_name(),
239 output
240 );
241
242 Ok(())
243 }
244
245 async fn list_dashboards(
247 &self,
248 query_ctx: QueryContextRef,
249 ) -> servers::error::Result<Vec<DashboardDefinition>> {
250 let table = if let Some(table) = self
251 .catalog_manager
252 .table(
253 query_ctx.current_catalog(),
254 DEFAULT_PRIVATE_SCHEMA_NAME,
255 DASHBOARD_TABLE_NAME,
256 Some(&query_ctx),
257 )
258 .await
259 .context(CatalogSnafu)?
260 {
261 table
262 } else {
263 return Ok(vec![]);
264 };
265
266 let table_info = table.table_info();
267
268 let dataframe = self
269 .query_engine
270 .read_table(table.clone())
271 .map_err(BoxedError::new)
272 .context(ExecuteQuerySnafu)?;
273
274 let dataframe = dataframe
275 .select_columns(&[
276 DASHBOARD_TABLE_NAME_COLUMN_NAME,
277 DASHBOARD_TABLE_DEFINITION_COLUMN_NAME,
278 ])
279 .context(DataFusionSnafu)?;
280
281 let plan = dataframe.into_parts().1;
282
283 let output = self
284 .query_engine
285 .execute(plan, Self::dashboard_query_ctx(&table_info))
286 .await
287 .map_err(BoxedError::new)
288 .context(ExecuteQuerySnafu)?;
289
290 let stream = match output.data {
291 OutputData::Stream(stream) => stream,
292 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
293 _ => unreachable!(),
294 };
295
296 let records = record_util::collect(stream)
297 .await
298 .context(CollectRecordbatchSnafu)?;
299
300 let mut dashboards = Vec::new();
301
302 for r in &records {
303 let name_column = r.column(0);
304 let definition_column = r.column(1);
305
306 let name = name_column
307 .as_string_opt::<i32>()
308 .context(NotSupportedSnafu {
309 feat: "Invalid data type for greptime_private.dashboard.name",
310 })?;
311
312 let definition =
313 definition_column
314 .as_string_opt::<i32>()
315 .context(NotSupportedSnafu {
316 feat: "Invalid data type for greptime_private.dashboard.definition",
317 })?;
318
319 for i in 0..name.len() {
320 dashboards.push(DashboardDefinition {
321 name: name.value(i).to_string(),
322 definition: definition.value(i).to_string(),
323 });
324 }
325 }
326
327 Ok(dashboards)
328 }
329
330 async fn delete_dashboard(
332 &self,
333 name: &str,
334 query_ctx: QueryContextRef,
335 ) -> servers::error::Result<()> {
336 let table = self
337 .create_dashboard_table_if_not_exists(query_ctx.clone())
338 .await?;
339 let table_info = table.table_info();
340
341 let dataframe = self
342 .query_engine
343 .read_table(table.clone())
344 .map_err(BoxedError::new)
345 .context(ExecuteQuerySnafu)?;
346
347 let name_condition = col(DASHBOARD_TABLE_NAME_COLUMN_NAME).eq(lit(name));
348
349 let dataframe = dataframe.filter(name_condition).context(DataFusionSnafu)?;
350
351 let table_name = TableReference::full(
352 table_info.catalog_name.clone(),
353 table_info.schema_name.clone(),
354 table_info.name.clone(),
355 );
356
357 let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
358 let table_source = Arc::new(DefaultTableSource::new(table_provider));
359
360 let stmt = DmlStatement::new(
361 table_name,
362 table_source,
363 datafusion_expr::WriteOp::Delete,
364 Arc::new(dataframe.into_parts().1),
365 );
366
367 let plan = LogicalPlan::Dml(stmt);
368
369 let output = self
370 .query_engine
371 .execute(plan, Self::dashboard_query_ctx(&table_info))
372 .await
373 .map_err(BoxedError::new)
374 .context(ExecuteQuerySnafu)?;
375
376 info!(
377 "Delete dashboard success, name: {}, table: {}, output: {:?}",
378 name,
379 table_info.full_table_name(),
380 output
381 );
382
383 Ok(())
384 }
385}
386
387#[async_trait]
388impl servers::query_handler::DashboardHandler for Instance {
389 async fn save(
390 &self,
391 name: &str,
392 definition: &str,
393 ctx: QueryContextRef,
394 ) -> servers::error::Result<()> {
395 self.insert_dashboard(name, definition, ctx).await
396 }
397
398 async fn list(&self, ctx: QueryContextRef) -> servers::error::Result<Vec<DashboardDefinition>> {
399 self.list_dashboards(ctx).await
400 }
401
402 async fn delete(&self, name: &str, ctx: QueryContextRef) -> servers::error::Result<()> {
403 self.delete_dashboard(name, ctx).await
404 }
405}