1use std::sync::Arc;
16
17use api::helper::from_pb_time_ranges;
18use api::v1::ddl_request::{Expr as DdlExpr, Expr};
19use api::v1::greptime_request::Request;
20use api::v1::query_request::Query;
21use api::v1::{
22 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
23 RowInsertRequests,
24};
25use async_trait::async_trait;
26use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
27use common_base::AffectedRows;
28use common_error::ext::BoxedError;
29use common_grpc::flight::FlightDecoder;
30use common_grpc::FlightData;
31use common_query::logical_plan::add_insert_to_logical_plan;
32use common_query::Output;
33use common_telemetry::tracing::{self};
34use datafusion::datasource::DefaultTableSource;
35use query::parser::PromQuery;
36use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
37use servers::query_handler::grpc::GrpcQueryHandler;
38use servers::query_handler::sql::SqlQueryHandler;
39use session::context::QueryContextRef;
40use snafu::{ensure, OptionExt, ResultExt};
41use table::table::adapter::DfTableProviderAdapter;
42use table::table_name::TableName;
43use table::TableRef;
44
45use crate::error::{
46 CatalogSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
47 PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu,
48 TableNotFoundSnafu, TableOperationSnafu,
49};
50use crate::instance::{attach_timer, Instance};
51use crate::metrics::{
52 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
53};
54
55#[async_trait]
56impl GrpcQueryHandler for Instance {
57 type Error = Error;
58
59 async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
60 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
61 let interceptor = interceptor_ref.as_ref();
62 interceptor.pre_execute(&request, ctx.clone())?;
63
64 self.plugins
65 .get::<PermissionCheckerRef>()
66 .as_ref()
67 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
68 .context(PermissionSnafu)?;
69
70 let _guard = if let Some(limiter) = &self.limiter {
71 Some(limiter.limit_request(&request).await?)
72 } else {
73 None
74 };
75
76 let output = match request {
77 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
78 Request::RowInserts(requests) => {
79 self.handle_row_inserts(requests, ctx.clone(), false, false)
80 .await?
81 }
82 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
83 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
84 Request::Query(query_request) => {
85 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
86 err_msg: "Missing field 'QueryRequest.query'",
87 })?;
88 match query {
89 Query::Sql(sql) => {
90 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
91 let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
92 ensure!(
93 result.len() == 1,
94 NotSupportedSnafu {
95 feat: "execute multiple statements in SQL query string through GRPC interface"
96 }
97 );
98 let output = result.remove(0)?;
99 attach_timer(output, timer)
100 }
101 Query::LogicalPlan(plan) => {
102 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
104
105 let plan_decoder = self
107 .query_engine()
108 .engine_context(ctx.clone())
109 .new_plan_decoder()
110 .context(PlanStatementSnafu)?;
111
112 let dummy_catalog_list =
113 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
114 self.catalog_manager().clone(),
115 ));
116
117 let logical_plan = plan_decoder
118 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
119 .await
120 .context(SubstraitDecodeLogicalPlanSnafu)?;
121 let output =
122 SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
123 .await?;
124
125 attach_timer(output, timer)
126 }
127 Query::InsertIntoPlan(insert) => {
128 self.handle_insert_plan(insert, ctx.clone()).await?
129 }
130 Query::PromRangeQuery(promql) => {
131 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
132 let prom_query = PromQuery {
133 query: promql.query,
134 start: promql.start,
135 end: promql.end,
136 step: promql.step,
137 lookback: promql.lookback,
138 };
139 let mut result =
140 SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
141 ensure!(
142 result.len() == 1,
143 NotSupportedSnafu {
144 feat: "execute multiple statements in PromQL query string through GRPC interface"
145 }
146 );
147 let output = result.remove(0)?;
148 attach_timer(output, timer)
149 }
150 }
151 }
152 Request::Ddl(request) => {
153 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
154 err_msg: "'expr' is absent in DDL request",
155 })?;
156
157 fill_catalog_and_schema_from_context(&mut expr, &ctx);
158
159 match expr {
160 DdlExpr::CreateTable(mut expr) => {
161 let _ = self
162 .statement_executor
163 .create_table_inner(&mut expr, None, ctx.clone())
164 .await?;
165 Output::new_with_affected_rows(0)
166 }
167 DdlExpr::AlterDatabase(expr) => {
168 let _ = self
169 .statement_executor
170 .alter_database_inner(expr, ctx.clone())
171 .await?;
172 Output::new_with_affected_rows(0)
173 }
174 DdlExpr::AlterTable(expr) => {
175 self.statement_executor
176 .alter_table_inner(expr, ctx.clone())
177 .await?
178 }
179 DdlExpr::CreateDatabase(expr) => {
180 self.statement_executor
181 .create_database(
182 &expr.schema_name,
183 expr.create_if_not_exists,
184 expr.options,
185 ctx.clone(),
186 )
187 .await?
188 }
189 DdlExpr::DropTable(expr) => {
190 let table_name =
191 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
192 self.statement_executor
193 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
194 .await?
195 }
196 DdlExpr::TruncateTable(expr) => {
197 let table_name =
198 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
199 let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
200 .map_err(BoxedError::new)
201 .context(ExternalSnafu)?;
202 self.statement_executor
203 .truncate_table(table_name, time_ranges, ctx.clone())
204 .await?
205 }
206 DdlExpr::CreateFlow(expr) => {
207 self.statement_executor
208 .create_flow_inner(expr, ctx.clone())
209 .await?
210 }
211 DdlExpr::DropFlow(DropFlowExpr {
212 catalog_name,
213 flow_name,
214 drop_if_exists,
215 ..
216 }) => {
217 self.statement_executor
218 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
219 .await?
220 }
221 DdlExpr::CreateView(expr) => {
222 let _ = self
223 .statement_executor
224 .create_view_by_expr(expr, ctx.clone())
225 .await?;
226
227 Output::new_with_affected_rows(0)
228 }
229 DdlExpr::DropView(_) => {
230 todo!("implemented in the following PR")
231 }
232 }
233 }
234 };
235
236 let output = interceptor.post_execute(output, ctx)?;
237 Ok(output)
238 }
239
240 async fn put_record_batch(
241 &self,
242 table_name: &TableName,
243 table_ref: &mut Option<TableRef>,
244 decoder: &mut FlightDecoder,
245 data: FlightData,
246 ctx: QueryContextRef,
247 ) -> Result<AffectedRows> {
248 let table = if let Some(table) = table_ref {
249 table.clone()
250 } else {
251 let table = self
252 .catalog_manager()
253 .table(
254 &table_name.catalog_name,
255 &table_name.schema_name,
256 &table_name.table_name,
257 None,
258 )
259 .await
260 .context(CatalogSnafu)?
261 .with_context(|| TableNotFoundSnafu {
262 table_name: table_name.to_string(),
263 })?;
264 *table_ref = Some(table.clone());
265 table
266 };
267
268 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
269 let interceptor = interceptor_ref.as_ref();
270 interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
271
272 self.plugins
273 .get::<PermissionCheckerRef>()
274 .as_ref()
275 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
276 .context(PermissionSnafu)?;
277
278 self.inserter
281 .handle_bulk_insert(table, decoder, data)
282 .await
283 .context(TableOperationSnafu)
284 }
285}
286
287fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
288 let catalog = ctx.current_catalog();
289 let schema = ctx.current_schema();
290
291 macro_rules! check_and_fill {
292 ($expr:ident) => {
293 if $expr.catalog_name.is_empty() {
294 $expr.catalog_name = catalog.to_string();
295 }
296 if $expr.schema_name.is_empty() {
297 $expr.schema_name = schema.to_string();
298 }
299 };
300 }
301
302 match ddl_expr {
303 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
304 Expr::CreateTable(expr) => {
305 check_and_fill!(expr);
306 }
307 Expr::AlterTable(expr) => {
308 check_and_fill!(expr);
309 }
310 Expr::DropTable(expr) => {
311 check_and_fill!(expr);
312 }
313 Expr::TruncateTable(expr) => {
314 check_and_fill!(expr);
315 }
316 Expr::CreateFlow(expr) => {
317 if expr.catalog_name.is_empty() {
318 expr.catalog_name = catalog.to_string();
319 }
320 }
321 Expr::DropFlow(expr) => {
322 if expr.catalog_name.is_empty() {
323 expr.catalog_name = catalog.to_string();
324 }
325 }
326 Expr::CreateView(expr) => {
327 check_and_fill!(expr);
328 }
329 Expr::DropView(expr) => {
330 check_and_fill!(expr);
331 }
332 }
333}
334
335impl Instance {
336 async fn handle_insert_plan(
337 &self,
338 insert: InsertIntoPlan,
339 ctx: QueryContextRef,
340 ) -> Result<Output> {
341 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
342 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
343 err_msg: "'table_name' is absent in InsertIntoPlan",
344 })?;
345
346 let plan_decoder = self
348 .query_engine()
349 .engine_context(ctx.clone())
350 .new_plan_decoder()
351 .context(PlanStatementSnafu)?;
352
353 let dummy_catalog_list =
354 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
355 self.catalog_manager().clone(),
356 ));
357
358 let logical_plan = plan_decoder
360 .decode(
361 bytes::Bytes::from(insert.logical_plan),
362 dummy_catalog_list,
363 false,
364 )
365 .await
366 .context(SubstraitDecodeLogicalPlanSnafu)?;
367
368 let table = self
369 .catalog_manager()
370 .table(
371 &table_name.catalog_name,
372 &table_name.schema_name,
373 &table_name.table_name,
374 None,
375 )
376 .await
377 .context(CatalogSnafu)?
378 .with_context(|| TableNotFoundSnafu {
379 table_name: [
380 table_name.catalog_name.clone(),
381 table_name.schema_name.clone(),
382 table_name.table_name.clone(),
383 ]
384 .join("."),
385 })?;
386 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
387 let table_source = Arc::new(DefaultTableSource::new(table_provider));
388
389 let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
390 .context(SubstraitDecodeLogicalPlanSnafu)?;
391
392 let engine_ctx = self.query_engine().engine_context(ctx.clone());
393 let state = engine_ctx.state();
394 let analyzed_plan = state
396 .analyzer()
397 .execute_and_check(insert_into, state.config_options(), |_, _| {})
398 .context(common_query::error::GeneralDataFusionSnafu)
399 .context(SubstraitDecodeLogicalPlanSnafu)?;
400
401 let optimized_plan = state
403 .optimize(&analyzed_plan)
404 .context(common_query::error::GeneralDataFusionSnafu)
405 .context(SubstraitDecodeLogicalPlanSnafu)?;
406
407 let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
408
409 Ok(attach_timer(output, timer))
410 }
411 #[tracing::instrument(skip_all)]
412 pub async fn handle_inserts(
413 &self,
414 requests: InsertRequests,
415 ctx: QueryContextRef,
416 ) -> Result<Output> {
417 self.inserter
418 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
419 .await
420 .context(TableOperationSnafu)
421 }
422
423 #[tracing::instrument(skip_all)]
424 pub async fn handle_row_inserts(
425 &self,
426 requests: RowInsertRequests,
427 ctx: QueryContextRef,
428 accommodate_existing_schema: bool,
429 is_single_value: bool,
430 ) -> Result<Output> {
431 self.inserter
432 .handle_row_inserts(
433 requests,
434 ctx,
435 self.statement_executor.as_ref(),
436 accommodate_existing_schema,
437 is_single_value,
438 )
439 .await
440 .context(TableOperationSnafu)
441 }
442
443 #[tracing::instrument(skip_all)]
444 pub async fn handle_influx_row_inserts(
445 &self,
446 requests: RowInsertRequests,
447 ctx: QueryContextRef,
448 ) -> Result<Output> {
449 self.inserter
450 .handle_last_non_null_inserts(
451 requests,
452 ctx,
453 self.statement_executor.as_ref(),
454 true,
455 false,
457 )
458 .await
459 .context(TableOperationSnafu)
460 }
461
462 #[tracing::instrument(skip_all)]
463 pub async fn handle_metric_row_inserts(
464 &self,
465 requests: RowInsertRequests,
466 ctx: QueryContextRef,
467 physical_table: String,
468 ) -> Result<Output> {
469 self.inserter
470 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
471 .await
472 .context(TableOperationSnafu)
473 }
474
475 #[tracing::instrument(skip_all)]
476 pub async fn handle_deletes(
477 &self,
478 requests: DeleteRequests,
479 ctx: QueryContextRef,
480 ) -> Result<Output> {
481 self.deleter
482 .handle_column_deletes(requests, ctx)
483 .await
484 .context(TableOperationSnafu)
485 }
486
487 #[tracing::instrument(skip_all)]
488 pub async fn handle_row_deletes(
489 &self,
490 requests: RowDeleteRequests,
491 ctx: QueryContextRef,
492 ) -> Result<Output> {
493 self.deleter
494 .handle_row_deletes(requests, ctx)
495 .await
496 .context(TableOperationSnafu)
497 }
498}