1use std::sync::Arc;
16
17use api::helper::from_pb_time_ranges;
18use api::v1::ddl_request::{Expr as DdlExpr, Expr};
19use api::v1::greptime_request::Request;
20use api::v1::query_request::Query;
21use api::v1::{
22 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
23 RowInsertRequests,
24};
25use async_trait::async_trait;
26use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
27use common_base::AffectedRows;
28use common_error::ext::BoxedError;
29use common_grpc::FlightData;
30use common_grpc::flight::FlightDecoder;
31use common_query::Output;
32use common_query::logical_plan::add_insert_to_logical_plan;
33use common_telemetry::tracing::{self};
34use datafusion::datasource::DefaultTableSource;
35use query::parser::PromQuery;
36use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
37use servers::query_handler::grpc::GrpcQueryHandler;
38use servers::query_handler::sql::SqlQueryHandler;
39use session::context::QueryContextRef;
40use snafu::{OptionExt, ResultExt, ensure};
41use table::TableRef;
42use table::table::adapter::DfTableProviderAdapter;
43use table::table_name::TableName;
44
45use crate::error::{
46 CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
47 NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
48 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
49};
50use crate::instance::{Instance, attach_timer};
51use crate::metrics::{
52 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
53};
54
55#[async_trait]
56impl GrpcQueryHandler for Instance {
57 type Error = Error;
58
59 async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
60 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
61 let interceptor = interceptor_ref.as_ref();
62 interceptor.pre_execute(&request, ctx.clone())?;
63
64 self.plugins
65 .get::<PermissionCheckerRef>()
66 .as_ref()
67 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
68 .context(PermissionSnafu)?;
69
70 let _guard = if let Some(limiter) = &self.limiter {
71 Some(limiter.limit_request(&request).await?)
72 } else {
73 None
74 };
75
76 let output = match request {
77 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
78 Request::RowInserts(requests) => {
79 self.handle_row_inserts(requests, ctx.clone(), false, false)
80 .await?
81 }
82 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
83 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
84 Request::Query(query_request) => {
85 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
86 err_msg: "Missing field 'QueryRequest.query'",
87 })?;
88 match query {
89 Query::Sql(sql) => {
90 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
91 let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
92 ensure!(
93 result.len() == 1,
94 NotSupportedSnafu {
95 feat: "execute multiple statements in SQL query string through GRPC interface"
96 }
97 );
98 let output = result.remove(0)?;
99 attach_timer(output, timer)
100 }
101 Query::LogicalPlan(plan) => {
102 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
104
105 let plan_decoder = self
107 .query_engine()
108 .engine_context(ctx.clone())
109 .new_plan_decoder()
110 .context(PlanStatementSnafu)?;
111
112 let dummy_catalog_list =
113 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
114 self.catalog_manager().clone(),
115 ));
116
117 let logical_plan = plan_decoder
118 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
119 .await
120 .context(SubstraitDecodeLogicalPlanSnafu)?;
121 let output =
122 SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
123 .await?;
124
125 attach_timer(output, timer)
126 }
127 Query::InsertIntoPlan(insert) => {
128 self.handle_insert_plan(insert, ctx.clone()).await?
129 }
130 Query::PromRangeQuery(promql) => {
131 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
132 let prom_query = PromQuery {
133 query: promql.query,
134 start: promql.start,
135 end: promql.end,
136 step: promql.step,
137 lookback: promql.lookback,
138 alias: None,
139 };
140 let mut result =
141 SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
142 ensure!(
143 result.len() == 1,
144 NotSupportedSnafu {
145 feat: "execute multiple statements in PromQL query string through GRPC interface"
146 }
147 );
148 let output = result.remove(0)?;
149 attach_timer(output, timer)
150 }
151 }
152 }
153 Request::Ddl(request) => {
154 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
155 err_msg: "'expr' is absent in DDL request",
156 })?;
157
158 fill_catalog_and_schema_from_context(&mut expr, &ctx);
159
160 match expr {
161 DdlExpr::CreateTable(mut expr) => {
162 let _ = self
163 .statement_executor
164 .create_table_inner(&mut expr, None, ctx.clone())
165 .await?;
166 Output::new_with_affected_rows(0)
167 }
168 DdlExpr::AlterDatabase(expr) => {
169 let _ = self
170 .statement_executor
171 .alter_database_inner(expr, ctx.clone())
172 .await?;
173 Output::new_with_affected_rows(0)
174 }
175 DdlExpr::AlterTable(expr) => {
176 self.statement_executor
177 .alter_table_inner(expr, ctx.clone())
178 .await?
179 }
180 DdlExpr::CreateDatabase(expr) => {
181 self.statement_executor
182 .create_database(
183 &expr.schema_name,
184 expr.create_if_not_exists,
185 expr.options,
186 ctx.clone(),
187 )
188 .await?
189 }
190 DdlExpr::DropTable(expr) => {
191 let table_name =
192 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
193 self.statement_executor
194 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
195 .await?
196 }
197 DdlExpr::TruncateTable(expr) => {
198 let table_name =
199 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
200 let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
201 .map_err(BoxedError::new)
202 .context(ExternalSnafu)?;
203 self.statement_executor
204 .truncate_table(table_name, time_ranges, ctx.clone())
205 .await?
206 }
207 DdlExpr::CreateFlow(expr) => {
208 self.statement_executor
209 .create_flow_inner(expr, ctx.clone())
210 .await?
211 }
212 DdlExpr::DropFlow(DropFlowExpr {
213 catalog_name,
214 flow_name,
215 drop_if_exists,
216 ..
217 }) => {
218 self.statement_executor
219 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
220 .await?
221 }
222 DdlExpr::CreateView(expr) => {
223 let _ = self
224 .statement_executor
225 .create_view_by_expr(expr, ctx.clone())
226 .await?;
227
228 Output::new_with_affected_rows(0)
229 }
230 DdlExpr::DropView(_) => {
231 todo!("implemented in the following PR")
232 }
233 }
234 }
235 };
236
237 let output = interceptor.post_execute(output, ctx)?;
238 Ok(output)
239 }
240
241 async fn put_record_batch(
242 &self,
243 table_name: &TableName,
244 table_ref: &mut Option<TableRef>,
245 decoder: &mut FlightDecoder,
246 data: FlightData,
247 ctx: QueryContextRef,
248 ) -> Result<AffectedRows> {
249 let table = if let Some(table) = table_ref {
250 table.clone()
251 } else {
252 let table = self
253 .catalog_manager()
254 .table(
255 &table_name.catalog_name,
256 &table_name.schema_name,
257 &table_name.table_name,
258 None,
259 )
260 .await
261 .context(CatalogSnafu)?
262 .with_context(|| TableNotFoundSnafu {
263 table_name: table_name.to_string(),
264 })?;
265 *table_ref = Some(table.clone());
266 table
267 };
268
269 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
270 let interceptor = interceptor_ref.as_ref();
271 interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
272
273 self.plugins
274 .get::<PermissionCheckerRef>()
275 .as_ref()
276 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
277 .context(PermissionSnafu)?;
278
279 self.inserter
282 .handle_bulk_insert(table, decoder, data)
283 .await
284 .context(TableOperationSnafu)
285 }
286}
287
288fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
289 let catalog = ctx.current_catalog();
290 let schema = ctx.current_schema();
291
292 macro_rules! check_and_fill {
293 ($expr:ident) => {
294 if $expr.catalog_name.is_empty() {
295 $expr.catalog_name = catalog.to_string();
296 }
297 if $expr.schema_name.is_empty() {
298 $expr.schema_name = schema.to_string();
299 }
300 };
301 }
302
303 match ddl_expr {
304 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
305 Expr::CreateTable(expr) => {
306 check_and_fill!(expr);
307 }
308 Expr::AlterTable(expr) => {
309 check_and_fill!(expr);
310 }
311 Expr::DropTable(expr) => {
312 check_and_fill!(expr);
313 }
314 Expr::TruncateTable(expr) => {
315 check_and_fill!(expr);
316 }
317 Expr::CreateFlow(expr) => {
318 if expr.catalog_name.is_empty() {
319 expr.catalog_name = catalog.to_string();
320 }
321 }
322 Expr::DropFlow(expr) => {
323 if expr.catalog_name.is_empty() {
324 expr.catalog_name = catalog.to_string();
325 }
326 }
327 Expr::CreateView(expr) => {
328 check_and_fill!(expr);
329 }
330 Expr::DropView(expr) => {
331 check_and_fill!(expr);
332 }
333 }
334}
335
336impl Instance {
337 async fn handle_insert_plan(
338 &self,
339 insert: InsertIntoPlan,
340 ctx: QueryContextRef,
341 ) -> Result<Output> {
342 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
343 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
344 err_msg: "'table_name' is absent in InsertIntoPlan",
345 })?;
346
347 let plan_decoder = self
349 .query_engine()
350 .engine_context(ctx.clone())
351 .new_plan_decoder()
352 .context(PlanStatementSnafu)?;
353
354 let dummy_catalog_list =
355 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
356 self.catalog_manager().clone(),
357 ));
358
359 let logical_plan = plan_decoder
361 .decode(
362 bytes::Bytes::from(insert.logical_plan),
363 dummy_catalog_list,
364 false,
365 )
366 .await
367 .context(SubstraitDecodeLogicalPlanSnafu)?;
368
369 let table = self
370 .catalog_manager()
371 .table(
372 &table_name.catalog_name,
373 &table_name.schema_name,
374 &table_name.table_name,
375 None,
376 )
377 .await
378 .context(CatalogSnafu)?
379 .with_context(|| TableNotFoundSnafu {
380 table_name: [
381 table_name.catalog_name.clone(),
382 table_name.schema_name.clone(),
383 table_name.table_name.clone(),
384 ]
385 .join("."),
386 })?;
387 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
388 let table_source = Arc::new(DefaultTableSource::new(table_provider));
389
390 let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
391 .context(SubstraitDecodeLogicalPlanSnafu)?;
392
393 let engine_ctx = self.query_engine().engine_context(ctx.clone());
394 let state = engine_ctx.state();
395 let analyzed_plan = state
397 .analyzer()
398 .execute_and_check(insert_into, state.config_options(), |_, _| {})
399 .context(DataFusionSnafu)?;
400
401 let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
403
404 let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
405
406 Ok(attach_timer(output, timer))
407 }
408 #[tracing::instrument(skip_all)]
409 pub async fn handle_inserts(
410 &self,
411 requests: InsertRequests,
412 ctx: QueryContextRef,
413 ) -> Result<Output> {
414 self.inserter
415 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
416 .await
417 .context(TableOperationSnafu)
418 }
419
420 #[tracing::instrument(skip_all)]
421 pub async fn handle_row_inserts(
422 &self,
423 requests: RowInsertRequests,
424 ctx: QueryContextRef,
425 accommodate_existing_schema: bool,
426 is_single_value: bool,
427 ) -> Result<Output> {
428 self.inserter
429 .handle_row_inserts(
430 requests,
431 ctx,
432 self.statement_executor.as_ref(),
433 accommodate_existing_schema,
434 is_single_value,
435 )
436 .await
437 .context(TableOperationSnafu)
438 }
439
440 #[tracing::instrument(skip_all)]
441 pub async fn handle_influx_row_inserts(
442 &self,
443 requests: RowInsertRequests,
444 ctx: QueryContextRef,
445 ) -> Result<Output> {
446 self.inserter
447 .handle_last_non_null_inserts(
448 requests,
449 ctx,
450 self.statement_executor.as_ref(),
451 true,
452 false,
454 )
455 .await
456 .context(TableOperationSnafu)
457 }
458
459 #[tracing::instrument(skip_all)]
460 pub async fn handle_metric_row_inserts(
461 &self,
462 requests: RowInsertRequests,
463 ctx: QueryContextRef,
464 physical_table: String,
465 ) -> Result<Output> {
466 self.inserter
467 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
468 .await
469 .context(TableOperationSnafu)
470 }
471
472 #[tracing::instrument(skip_all)]
473 pub async fn handle_deletes(
474 &self,
475 requests: DeleteRequests,
476 ctx: QueryContextRef,
477 ) -> Result<Output> {
478 self.deleter
479 .handle_column_deletes(requests, ctx)
480 .await
481 .context(TableOperationSnafu)
482 }
483
484 #[tracing::instrument(skip_all)]
485 pub async fn handle_row_deletes(
486 &self,
487 requests: RowDeleteRequests,
488 ctx: QueryContextRef,
489 ) -> Result<Output> {
490 self.deleter
491 .handle_row_deletes(requests, ctx)
492 .await
493 .context(TableOperationSnafu)
494 }
495}