1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
21 RowInsertRequests, Rows, SemanticType,
22};
23use async_trait::async_trait;
24use common_catalog::consts::{DEFAULT_PRIVATE_SCHEMA_NAME, default_engine};
25use common_error::ext::BoxedError;
26use common_query::OutputData;
27use common_recordbatch::util as record_util;
28use common_telemetry::info;
29use common_time::FOREVER;
30use datafusion::datasource::DefaultTableSource;
31use datafusion::logical_expr::col;
32use datafusion::sql::TableReference;
33use datafusion_expr::{DmlStatement, LogicalPlan, lit};
34use datatypes::arrow::array::{Array, AsArray};
35use servers::error::{
36 CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu,
37 TableNotFoundSnafu,
38};
39use servers::query_handler::DashboardDefinition;
40use session::context::{QueryContextBuilder, QueryContextRef};
41use snafu::{OptionExt, ResultExt};
42use table::TableRef;
43use table::metadata::TableInfo;
44use table::requests::TTL_KEY;
45use table::table::adapter::DfTableProviderAdapter;
46
47use crate::instance::Instance;
48
49pub const DASHBOARD_TABLE_NAME: &str = "dashboard";
50pub const DASHBOARD_TABLE_NAME_COLUMN_NAME: &str = "name";
51pub const DASHBOARD_TABLE_DEFINITION_COLUMN_NAME: &str = "definition";
52pub const DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at";
53
54impl Instance {
55 fn build_dashboard_schema() -> (String, Vec<String>, Vec<ColumnDef>) {
58 (
59 DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
60 vec![DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string()],
61 vec![
62 ColumnDef {
63 name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
64 data_type: ColumnDataType::String as i32,
65 is_nullable: false,
66 default_constraint: vec![],
67 semantic_type: SemanticType::Tag as i32,
68 comment: String::new(),
69 datatype_extension: None,
70 options: None,
71 },
72 ColumnDef {
73 name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
74 data_type: ColumnDataType::String as i32,
75 is_nullable: false,
76 default_constraint: vec![],
77 semantic_type: SemanticType::Field as i32,
78 comment: String::new(),
79 datatype_extension: None,
80 options: None,
81 },
82 ColumnDef {
83 name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
84 data_type: ColumnDataType::TimestampNanosecond as i32,
85 is_nullable: false,
86 default_constraint: vec![],
87 semantic_type: SemanticType::Timestamp as i32,
88 comment: String::new(),
89 datatype_extension: None,
90 options: None,
91 },
92 ],
93 )
94 }
95
96 fn build_dashboard_insert_column_schemas() -> Vec<PbColumnSchema> {
98 vec![
99 PbColumnSchema {
100 column_name: DASHBOARD_TABLE_NAME_COLUMN_NAME.to_string(),
101 datatype: ColumnDataType::String.into(),
102 semantic_type: SemanticType::Tag.into(),
103 ..Default::default()
104 },
105 PbColumnSchema {
106 column_name: DASHBOARD_TABLE_DEFINITION_COLUMN_NAME.to_string(),
107 datatype: ColumnDataType::String.into(),
108 semantic_type: SemanticType::Field.into(),
109 ..Default::default()
110 },
111 PbColumnSchema {
112 column_name: DASHBOARD_TABLE_CREATED_AT_COLUMN_NAME.to_string(),
113 datatype: ColumnDataType::TimestampNanosecond.into(),
114 semantic_type: SemanticType::Timestamp.into(),
115 ..Default::default()
116 },
117 ]
118 }
119
120 fn dashboard_query_ctx(table_info: &TableInfo) -> QueryContextRef {
121 QueryContextBuilder::default()
122 .current_catalog(table_info.catalog_name.clone())
123 .current_schema(table_info.schema_name.clone())
124 .build()
125 .into()
126 }
127
128 async fn create_dashboard_table_if_not_exists(
129 &self,
130 ctx: QueryContextRef,
131 ) -> servers::error::Result<TableRef> {
132 let catalog = ctx.current_catalog();
133
134 if let Some(table) = self
135 .catalog_manager
136 .table(
137 catalog,
138 DEFAULT_PRIVATE_SCHEMA_NAME,
139 DASHBOARD_TABLE_NAME,
140 Some(&ctx),
141 )
142 .await?
143 {
144 return Ok(table);
145 }
146
147 let (time_index, primary_keys, column_defs) = Self::build_dashboard_schema();
148
149 let mut table_options = HashMap::new();
150 table_options.insert(TTL_KEY.to_string(), FOREVER.to_string());
151
152 let mut create_table_expr = api::v1::CreateTableExpr {
153 catalog_name: catalog.to_string(),
154 schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
155 table_name: DASHBOARD_TABLE_NAME.to_string(),
156 desc: "GreptimeDB dashboard table".to_string(),
157 column_defs,
158 time_index,
159 primary_keys,
160 create_if_not_exists: true,
161 table_options,
162 table_id: None,
163 engine: default_engine().to_string(),
164 };
165
166 self.statement_executor
167 .create_table_inner(&mut create_table_expr, None, ctx.clone())
168 .await
169 .map_err(BoxedError::new)
170 .context(ExecuteQuerySnafu)?;
171
172 let table = self
173 .catalog_manager
174 .table(
175 catalog,
176 DEFAULT_PRIVATE_SCHEMA_NAME,
177 DASHBOARD_TABLE_NAME,
178 Some(&ctx),
179 )
180 .await?
181 .context(TableNotFoundSnafu {
182 catalog: catalog.to_string(),
183 schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
184 table: DASHBOARD_TABLE_NAME.to_string(),
185 })?;
186
187 Ok(table)
188 }
189
190 async fn insert_dashboard(
192 &self,
193 name: &str,
194 definition: &str,
195 query_ctx: QueryContextRef,
196 ) -> servers::error::Result<()> {
197 let table = self
198 .create_dashboard_table_if_not_exists(query_ctx.clone())
199 .await?;
200 let table_info = table.table_info();
201
202 let insert = RowInsertRequest {
203 table_name: DASHBOARD_TABLE_NAME.to_string(),
204 rows: Some(Rows {
205 schema: Self::build_dashboard_insert_column_schemas(),
206 rows: vec![Row {
207 values: vec![
208 ValueData::StringValue(name.to_string()).into(),
209 ValueData::StringValue(definition.to_string()).into(),
210 ValueData::TimestampNanosecondValue(0).into(),
211 ],
212 }],
213 }),
214 };
215
216 let requests = RowInsertRequests {
217 inserts: vec![insert],
218 };
219
220 let output = self
221 .inserter
222 .handle_row_inserts(
223 requests,
224 Self::dashboard_query_ctx(&table_info),
225 &self.statement_executor,
226 false,
227 false,
228 )
229 .await
230 .map_err(BoxedError::new)
231 .context(ExecuteQuerySnafu)?;
232
233 info!(
234 "Insert dashboard success, name: {}, table: {}, output: {:?}",
235 name,
236 table_info.full_table_name(),
237 output
238 );
239
240 Ok(())
241 }
242
243 async fn list_dashboards(
245 &self,
246 query_ctx: QueryContextRef,
247 ) -> servers::error::Result<Vec<DashboardDefinition>> {
248 let table = if let Some(table) = self
249 .catalog_manager
250 .table(
251 query_ctx.current_catalog(),
252 DEFAULT_PRIVATE_SCHEMA_NAME,
253 DASHBOARD_TABLE_NAME,
254 Some(&query_ctx),
255 )
256 .await?
257 {
258 table
259 } else {
260 return Ok(vec![]);
261 };
262
263 let table_info = table.table_info();
264
265 let dataframe = self
266 .query_engine
267 .read_table(table.clone())
268 .map_err(BoxedError::new)
269 .context(ExecuteQuerySnafu)?;
270
271 let dataframe = dataframe
272 .select_columns(&[
273 DASHBOARD_TABLE_NAME_COLUMN_NAME,
274 DASHBOARD_TABLE_DEFINITION_COLUMN_NAME,
275 ])
276 .context(DataFusionSnafu)?;
277
278 let plan = dataframe.into_parts().1;
279
280 let output = self
281 .query_engine
282 .execute(plan, Self::dashboard_query_ctx(&table_info))
283 .await
284 .map_err(BoxedError::new)
285 .context(ExecuteQuerySnafu)?;
286
287 let stream = match output.data {
288 OutputData::Stream(stream) => stream,
289 OutputData::RecordBatches(record_batches) => record_batches.as_stream(),
290 _ => unreachable!(),
291 };
292
293 let records = record_util::collect(stream)
294 .await
295 .context(CollectRecordbatchSnafu)?;
296
297 let mut dashboards = Vec::new();
298
299 for r in &records {
300 let name_column = r.column(0);
301 let definition_column = r.column(1);
302
303 let name = name_column
304 .as_string_opt::<i32>()
305 .context(NotSupportedSnafu {
306 feat: "Invalid data type for greptime_private.dashboard.name",
307 })?;
308
309 let definition =
310 definition_column
311 .as_string_opt::<i32>()
312 .context(NotSupportedSnafu {
313 feat: "Invalid data type for greptime_private.dashboard.definition",
314 })?;
315
316 for i in 0..name.len() {
317 dashboards.push(DashboardDefinition {
318 name: name.value(i).to_string(),
319 definition: definition.value(i).to_string(),
320 });
321 }
322 }
323
324 Ok(dashboards)
325 }
326
327 async fn delete_dashboard(
329 &self,
330 name: &str,
331 query_ctx: QueryContextRef,
332 ) -> servers::error::Result<()> {
333 let table = self
334 .create_dashboard_table_if_not_exists(query_ctx.clone())
335 .await?;
336 let table_info = table.table_info();
337
338 let dataframe = self
339 .query_engine
340 .read_table(table.clone())
341 .map_err(BoxedError::new)
342 .context(ExecuteQuerySnafu)?;
343
344 let name_condition = col(DASHBOARD_TABLE_NAME_COLUMN_NAME).eq(lit(name));
345
346 let dataframe = dataframe.filter(name_condition).context(DataFusionSnafu)?;
347
348 let table_name = TableReference::full(
349 table_info.catalog_name.clone(),
350 table_info.schema_name.clone(),
351 table_info.name.clone(),
352 );
353
354 let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
355 let table_source = Arc::new(DefaultTableSource::new(table_provider));
356
357 let stmt = DmlStatement::new(
358 table_name,
359 table_source,
360 datafusion_expr::WriteOp::Delete,
361 Arc::new(dataframe.into_parts().1),
362 );
363
364 let plan = LogicalPlan::Dml(stmt);
365
366 let output = self
367 .query_engine
368 .execute(plan, Self::dashboard_query_ctx(&table_info))
369 .await
370 .map_err(BoxedError::new)
371 .context(ExecuteQuerySnafu)?;
372
373 info!(
374 "Delete dashboard success, name: {}, table: {}, output: {:?}",
375 name,
376 table_info.full_table_name(),
377 output
378 );
379
380 Ok(())
381 }
382}
383
384#[async_trait]
385impl servers::query_handler::DashboardHandler for Instance {
386 async fn save(
387 &self,
388 name: &str,
389 definition: &str,
390 ctx: QueryContextRef,
391 ) -> servers::error::Result<()> {
392 self.insert_dashboard(name, definition, ctx).await
393 }
394
395 async fn list(&self, ctx: QueryContextRef) -> servers::error::Result<Vec<DashboardDefinition>> {
396 self.list_dashboards(ctx).await
397 }
398
399 async fn delete(&self, name: &str, ctx: QueryContextRef) -> servers::error::Result<()> {
400 self.delete_dashboard(name, ctx).await
401 }
402}