1use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25 RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_base::AffectedRows;
31use common_error::ext::BoxedError;
32use common_grpc::flight::do_put::DoPutResponse;
33use common_query::Output;
34use common_query::logical_plan::add_insert_to_logical_plan;
35use common_telemetry::tracing::{self};
36use datafusion::datasource::DefaultTableSource;
37use futures::Stream;
38use futures::stream::StreamExt;
39use query::parser::PromQuery;
40use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
41use servers::query_handler::grpc::GrpcQueryHandler;
42use servers::query_handler::sql::SqlQueryHandler;
43use session::context::QueryContextRef;
44use snafu::{OptionExt, ResultExt, ensure};
45use table::TableRef;
46use table::table::adapter::DfTableProviderAdapter;
47use table::table_name::TableName;
48
49use crate::error::{
50 CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
51 NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
52 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
53};
54use crate::instance::{Instance, attach_timer};
55use crate::metrics::{
56 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
57};
58
59#[async_trait]
60impl GrpcQueryHandler for Instance {
61 type Error = Error;
62
63 async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
64 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
65 let interceptor = interceptor_ref.as_ref();
66 interceptor.pre_execute(&request, ctx.clone())?;
67
68 self.plugins
69 .get::<PermissionCheckerRef>()
70 .as_ref()
71 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
72 .context(PermissionSnafu)?;
73
74 let output = match request {
75 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
76 Request::RowInserts(requests) => {
77 self.handle_row_inserts(requests, ctx.clone(), false, false)
78 .await?
79 }
80 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
81 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
82 Request::Query(query_request) => {
83 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
84 err_msg: "Missing field 'QueryRequest.query'",
85 })?;
86 match query {
87 Query::Sql(sql) => {
88 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
89 let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
90 ensure!(
91 result.len() == 1,
92 NotSupportedSnafu {
93 feat: "execute multiple statements in SQL query string through GRPC interface"
94 }
95 );
96 let output = result.remove(0)?;
97 attach_timer(output, timer)
98 }
99 Query::LogicalPlan(plan) => {
100 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
102
103 let plan_decoder = self
105 .query_engine()
106 .engine_context(ctx.clone())
107 .new_plan_decoder()
108 .context(PlanStatementSnafu)?;
109
110 let dummy_catalog_list =
111 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
112 self.catalog_manager().clone(),
113 ));
114
115 let logical_plan = plan_decoder
116 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
117 .await
118 .context(SubstraitDecodeLogicalPlanSnafu)?;
119 let output =
120 SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
121 .await?;
122
123 attach_timer(output, timer)
124 }
125 Query::InsertIntoPlan(insert) => {
126 self.handle_insert_plan(insert, ctx.clone()).await?
127 }
128 Query::PromRangeQuery(promql) => {
129 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
130 let prom_query = PromQuery {
131 query: promql.query,
132 start: promql.start,
133 end: promql.end,
134 step: promql.step,
135 lookback: promql.lookback,
136 alias: None,
137 };
138 let mut result =
139 SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
140 ensure!(
141 result.len() == 1,
142 NotSupportedSnafu {
143 feat: "execute multiple statements in PromQL query string through GRPC interface"
144 }
145 );
146 let output = result.remove(0)?;
147 attach_timer(output, timer)
148 }
149 }
150 }
151 Request::Ddl(request) => {
152 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
153 err_msg: "'expr' is absent in DDL request",
154 })?;
155
156 fill_catalog_and_schema_from_context(&mut expr, &ctx);
157
158 match expr {
159 DdlExpr::CreateTable(mut expr) => {
160 let _ = self
161 .statement_executor
162 .create_table_inner(&mut expr, None, ctx.clone())
163 .await?;
164 Output::new_with_affected_rows(0)
165 }
166 DdlExpr::AlterDatabase(expr) => {
167 let _ = self
168 .statement_executor
169 .alter_database_inner(expr, ctx.clone())
170 .await?;
171 Output::new_with_affected_rows(0)
172 }
173 DdlExpr::AlterTable(expr) => {
174 self.statement_executor
175 .alter_table_inner(expr, ctx.clone())
176 .await?
177 }
178 DdlExpr::CreateDatabase(expr) => {
179 self.statement_executor
180 .create_database(
181 &expr.schema_name,
182 expr.create_if_not_exists,
183 expr.options,
184 ctx.clone(),
185 )
186 .await?
187 }
188 DdlExpr::DropTable(expr) => {
189 let table_name =
190 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
191 self.statement_executor
192 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
193 .await?
194 }
195 DdlExpr::TruncateTable(expr) => {
196 let table_name =
197 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
198 let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
199 .map_err(BoxedError::new)
200 .context(ExternalSnafu)?;
201 self.statement_executor
202 .truncate_table(table_name, time_ranges, ctx.clone())
203 .await?
204 }
205 DdlExpr::CreateFlow(expr) => {
206 self.statement_executor
207 .create_flow_inner(expr, ctx.clone())
208 .await?
209 }
210 DdlExpr::DropFlow(DropFlowExpr {
211 catalog_name,
212 flow_name,
213 drop_if_exists,
214 ..
215 }) => {
216 self.statement_executor
217 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
218 .await?
219 }
220 DdlExpr::CreateView(expr) => {
221 let _ = self
222 .statement_executor
223 .create_view_by_expr(expr, ctx.clone())
224 .await?;
225
226 Output::new_with_affected_rows(0)
227 }
228 DdlExpr::DropView(_) => {
229 todo!("implemented in the following PR")
230 }
231 DdlExpr::CommentOn(expr) => {
232 self.statement_executor
233 .comment_by_expr(expr, ctx.clone())
234 .await?
235 }
236 }
237 }
238 };
239
240 let output = interceptor.post_execute(output, ctx)?;
241 Ok(output)
242 }
243
244 async fn put_record_batch(
245 &self,
246 request: servers::grpc::flight::PutRecordBatchRequest,
247 table_ref: &mut Option<TableRef>,
248 ctx: QueryContextRef,
249 ) -> Result<AffectedRows> {
250 let table = if let Some(table) = table_ref {
251 table.clone()
252 } else {
253 let table = self
254 .catalog_manager()
255 .table(
256 &request.table_name.catalog_name,
257 &request.table_name.schema_name,
258 &request.table_name.table_name,
259 None,
260 )
261 .await
262 .context(CatalogSnafu)?
263 .with_context(|| TableNotFoundSnafu {
264 table_name: request.table_name.to_string(),
265 })?;
266 *table_ref = Some(table.clone());
267 table
268 };
269
270 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
271 let interceptor = interceptor_ref.as_ref();
272 interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
273
274 self.plugins
275 .get::<PermissionCheckerRef>()
276 .as_ref()
277 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
278 .context(PermissionSnafu)?;
279
280 self.inserter
283 .handle_bulk_insert(
284 table,
285 request.flight_data,
286 request.record_batch,
287 request.schema_bytes,
288 )
289 .await
290 .context(TableOperationSnafu)
291 }
292
293 fn handle_put_record_batch_stream(
294 &self,
295 mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
296 ctx: QueryContextRef,
297 ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
298 let catalog_manager = self.catalog_manager().clone();
300 let plugins = self.plugins.clone();
301 let inserter = self.inserter.clone();
302 let ctx = ctx.clone();
303 let mut table_ref: Option<TableRef> = None;
304 let mut table_checked = false;
305
306 Box::pin(try_stream! {
307 while let Some(request_result) = stream.next().await {
309 let request = request_result.map_err(|e| {
310 let error_msg = format!("Stream error: {:?}", e);
311 IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
312 })?;
313
314 if !table_checked {
316 let table_name = &request.table_name;
317
318 plugins
319 .get::<PermissionCheckerRef>()
320 .as_ref()
321 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
322 .context(PermissionSnafu)?;
323
324 table_ref = Some(
326 catalog_manager
327 .table(
328 &table_name.catalog_name,
329 &table_name.schema_name,
330 &table_name.table_name,
331 None,
332 )
333 .await
334 .context(CatalogSnafu)?
335 .with_context(|| TableNotFoundSnafu {
336 table_name: table_name.to_string(),
337 })?,
338 );
339
340 let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
342 let interceptor = interceptor_ref.as_ref();
343 interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
344
345 table_checked = true;
346 }
347
348 let request_id = request.request_id;
349 let start = Instant::now();
350 let rows = inserter
351 .handle_bulk_insert(
352 table_ref.clone().unwrap(),
353 request.flight_data,
354 request.record_batch,
355 request.schema_bytes,
356 )
357 .await
358 .context(TableOperationSnafu)?;
359 let elapsed_secs = start.elapsed().as_secs_f64();
360 yield DoPutResponse::new(request_id, rows, elapsed_secs);
361 }
362 })
363 }
364}
365
366fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
367 let catalog = ctx.current_catalog();
368 let schema = ctx.current_schema();
369
370 macro_rules! check_and_fill {
371 ($expr:ident) => {
372 if $expr.catalog_name.is_empty() {
373 $expr.catalog_name = catalog.to_string();
374 }
375 if $expr.schema_name.is_empty() {
376 $expr.schema_name = schema.to_string();
377 }
378 };
379 }
380
381 match ddl_expr {
382 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
383 Expr::CreateTable(expr) => {
384 check_and_fill!(expr);
385 }
386 Expr::AlterTable(expr) => {
387 check_and_fill!(expr);
388 }
389 Expr::DropTable(expr) => {
390 check_and_fill!(expr);
391 }
392 Expr::TruncateTable(expr) => {
393 check_and_fill!(expr);
394 }
395 Expr::CreateFlow(expr) => {
396 if expr.catalog_name.is_empty() {
397 expr.catalog_name = catalog.to_string();
398 }
399 }
400 Expr::DropFlow(expr) => {
401 if expr.catalog_name.is_empty() {
402 expr.catalog_name = catalog.to_string();
403 }
404 }
405 Expr::CreateView(expr) => {
406 check_and_fill!(expr);
407 }
408 Expr::DropView(expr) => {
409 check_and_fill!(expr);
410 }
411 Expr::CommentOn(expr) => {
412 check_and_fill!(expr);
413 }
414 }
415}
416
417impl Instance {
418 async fn handle_insert_plan(
419 &self,
420 insert: InsertIntoPlan,
421 ctx: QueryContextRef,
422 ) -> Result<Output> {
423 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
424 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
425 err_msg: "'table_name' is absent in InsertIntoPlan",
426 })?;
427
428 let plan_decoder = self
430 .query_engine()
431 .engine_context(ctx.clone())
432 .new_plan_decoder()
433 .context(PlanStatementSnafu)?;
434
435 let dummy_catalog_list =
436 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
437 self.catalog_manager().clone(),
438 ));
439
440 let logical_plan = plan_decoder
442 .decode(
443 bytes::Bytes::from(insert.logical_plan),
444 dummy_catalog_list,
445 false,
446 )
447 .await
448 .context(SubstraitDecodeLogicalPlanSnafu)?;
449
450 let table = self
451 .catalog_manager()
452 .table(
453 &table_name.catalog_name,
454 &table_name.schema_name,
455 &table_name.table_name,
456 None,
457 )
458 .await
459 .context(CatalogSnafu)?
460 .with_context(|| TableNotFoundSnafu {
461 table_name: [
462 table_name.catalog_name.clone(),
463 table_name.schema_name.clone(),
464 table_name.table_name.clone(),
465 ]
466 .join("."),
467 })?;
468 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
469 let table_source = Arc::new(DefaultTableSource::new(table_provider));
470
471 let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
472 .context(SubstraitDecodeLogicalPlanSnafu)?;
473
474 let engine_ctx = self.query_engine().engine_context(ctx.clone());
475 let state = engine_ctx.state();
476 let analyzed_plan = state
478 .analyzer()
479 .execute_and_check(insert_into, state.config_options(), |_, _| {})
480 .context(DataFusionSnafu)?;
481
482 let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
484
485 let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
486
487 Ok(attach_timer(output, timer))
488 }
489 #[tracing::instrument(skip_all)]
490 pub async fn handle_inserts(
491 &self,
492 requests: InsertRequests,
493 ctx: QueryContextRef,
494 ) -> Result<Output> {
495 self.inserter
496 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
497 .await
498 .context(TableOperationSnafu)
499 }
500
501 #[tracing::instrument(skip_all)]
502 pub async fn handle_row_inserts(
503 &self,
504 requests: RowInsertRequests,
505 ctx: QueryContextRef,
506 accommodate_existing_schema: bool,
507 is_single_value: bool,
508 ) -> Result<Output> {
509 self.inserter
510 .handle_row_inserts(
511 requests,
512 ctx,
513 self.statement_executor.as_ref(),
514 accommodate_existing_schema,
515 is_single_value,
516 )
517 .await
518 .context(TableOperationSnafu)
519 }
520
521 #[tracing::instrument(skip_all)]
522 pub async fn handle_influx_row_inserts(
523 &self,
524 requests: RowInsertRequests,
525 ctx: QueryContextRef,
526 ) -> Result<Output> {
527 self.inserter
528 .handle_last_non_null_inserts(
529 requests,
530 ctx,
531 self.statement_executor.as_ref(),
532 true,
533 false,
535 )
536 .await
537 .context(TableOperationSnafu)
538 }
539
540 #[tracing::instrument(skip_all)]
541 pub async fn handle_metric_row_inserts(
542 &self,
543 requests: RowInsertRequests,
544 ctx: QueryContextRef,
545 physical_table: String,
546 ) -> Result<Output> {
547 self.inserter
548 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
549 .await
550 .context(TableOperationSnafu)
551 }
552
553 #[tracing::instrument(skip_all)]
554 pub async fn handle_deletes(
555 &self,
556 requests: DeleteRequests,
557 ctx: QueryContextRef,
558 ) -> Result<Output> {
559 self.deleter
560 .handle_column_deletes(requests, ctx)
561 .await
562 .context(TableOperationSnafu)
563 }
564
565 #[tracing::instrument(skip_all)]
566 pub async fn handle_row_deletes(
567 &self,
568 requests: RowDeleteRequests,
569 ctx: QueryContextRef,
570 ) -> Result<Output> {
571 self.deleter
572 .handle_row_deletes(requests, ctx)
573 .await
574 .context(TableOperationSnafu)
575 }
576}