1use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25 RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_base::AffectedRows;
31use common_error::ext::BoxedError;
32use common_grpc::flight::do_put::DoPutResponse;
33use common_query::Output;
34use common_query::logical_plan::add_insert_to_logical_plan;
35use common_telemetry::tracing::{self};
36use datafusion::datasource::DefaultTableSource;
37use futures::Stream;
38use futures::stream::StreamExt;
39use query::parser::PromQuery;
40use servers::error as server_error;
41use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
42use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
43use servers::query_handler::grpc::GrpcQueryHandler;
44use session::context::QueryContextRef;
45use snafu::{OptionExt, ResultExt, ensure};
46use table::TableRef;
47use table::table::adapter::DfTableProviderAdapter;
48use table::table_name::TableName;
49
50use crate::error::{
51 CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
52 NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
53 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
54};
55use crate::instance::{Instance, attach_timer};
56use crate::metrics::{
57 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
58};
59
60#[async_trait]
61impl GrpcQueryHandler for Instance {
62 async fn do_query(
63 &self,
64 request: Request,
65 ctx: QueryContextRef,
66 ) -> server_error::Result<Output> {
67 let result: Result<Output> = async {
68 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
69 let interceptor = interceptor_ref.as_ref();
70 interceptor.pre_execute(&request, ctx.clone())?;
71
72 self.plugins
73 .get::<PermissionCheckerRef>()
74 .as_ref()
75 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
76 .context(PermissionSnafu)?;
77
78 let output = match request {
79 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
80 Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
81 Some(physical_table) => {
82 self.handle_metric_row_inserts(
83 requests,
84 ctx.clone(),
85 physical_table.to_string(),
86 )
87 .await?
88 }
89 None => {
90 self.handle_row_inserts(requests, ctx.clone(), false, false)
91 .await?
92 }
93 },
94 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
95 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
96 Request::Query(query_request) => {
97 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
98 err_msg: "Missing field 'QueryRequest.query'",
99 })?;
100 match query {
101 Query::Sql(sql) => {
102 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
103 let mut result = self.do_query_inner(&sql, ctx.clone()).await;
104 ensure!(
105 result.len() == 1,
106 NotSupportedSnafu {
107 feat: "execute multiple statements in SQL query string through GRPC interface"
108 }
109 );
110 let output = result.remove(0)?;
111 attach_timer(output, timer)
112 }
113 Query::LogicalPlan(plan) => {
114 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
116
117 let plan_decoder = self
119 .query_engine()
120 .engine_context(ctx.clone())
121 .new_plan_decoder()
122 .context(PlanStatementSnafu)?;
123
124 let dummy_catalog_list =
125 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
126 self.catalog_manager().clone(),
127 ));
128
129 let logical_plan = plan_decoder
130 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
131 .await
132 .context(SubstraitDecodeLogicalPlanSnafu)?;
133 let output =
134 self.do_exec_plan_inner(None, logical_plan, ctx.clone()).await?;
135
136 attach_timer(output, timer)
137 }
138 Query::InsertIntoPlan(insert) => {
139 self.handle_insert_plan(insert, ctx.clone()).await?
140 }
141 Query::PromRangeQuery(promql) => {
142 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
143 let prom_query = PromQuery {
144 query: promql.query,
145 start: promql.start,
146 end: promql.end,
147 step: promql.step,
148 lookback: promql.lookback,
149 alias: None,
150 };
151 let mut result =
152 self.do_promql_query_inner(&prom_query, ctx.clone()).await;
153 ensure!(
154 result.len() == 1,
155 NotSupportedSnafu {
156 feat: "execute multiple statements in PromQL query string through GRPC interface"
157 }
158 );
159 let output = result.remove(0)?;
160 attach_timer(output, timer)
161 }
162 }
163 }
164 Request::Ddl(request) => {
165 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
166 err_msg: "'expr' is absent in DDL request",
167 })?;
168
169 fill_catalog_and_schema_from_context(&mut expr, &ctx);
170
171 match expr {
172 DdlExpr::CreateTable(mut expr) => {
173 let _ = self
174 .statement_executor
175 .create_table_inner(&mut expr, None, ctx.clone())
176 .await?;
177 Output::new_with_affected_rows(0)
178 }
179 DdlExpr::AlterDatabase(expr) => {
180 let _ = self
181 .statement_executor
182 .alter_database_inner(expr, ctx.clone())
183 .await?;
184 Output::new_with_affected_rows(0)
185 }
186 DdlExpr::AlterTable(expr) => {
187 self.statement_executor
188 .alter_table_inner(expr, ctx.clone())
189 .await?
190 }
191 DdlExpr::CreateDatabase(expr) => {
192 self.statement_executor
193 .create_database(
194 &expr.schema_name,
195 expr.create_if_not_exists,
196 expr.options,
197 ctx.clone(),
198 )
199 .await?
200 }
201 DdlExpr::DropTable(expr) => {
202 let table_name =
203 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
204 self.statement_executor
205 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
206 .await?
207 }
208 DdlExpr::TruncateTable(expr) => {
209 let table_name =
210 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
211 let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
212 .map_err(BoxedError::new)
213 .context(ExternalSnafu)?;
214 self.statement_executor
215 .truncate_table(table_name, time_ranges, ctx.clone())
216 .await?
217 }
218 DdlExpr::CreateFlow(expr) => {
219 self.statement_executor
220 .create_flow_inner(expr, ctx.clone())
221 .await?
222 }
223 DdlExpr::DropFlow(DropFlowExpr {
224 catalog_name,
225 flow_name,
226 drop_if_exists,
227 ..
228 }) => {
229 self.statement_executor
230 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
231 .await?
232 }
233 DdlExpr::CreateView(expr) => {
234 let _ = self
235 .statement_executor
236 .create_view_by_expr(expr, ctx.clone())
237 .await?;
238
239 Output::new_with_affected_rows(0)
240 }
241 DdlExpr::DropView(_) => {
242 todo!("implemented in the following PR")
243 }
244 DdlExpr::CommentOn(expr) => {
245 self.statement_executor
246 .comment_by_expr(expr, ctx.clone())
247 .await?
248 }
249 }
250 }
251 };
252
253 let output = interceptor.post_execute(output, ctx)?;
254 Ok(output)
255 }
256 .await;
257
258 result
259 .map_err(BoxedError::new)
260 .context(server_error::ExecuteGrpcQuerySnafu)
261 }
262
263 async fn put_record_batch(
264 &self,
265 request: servers::grpc::flight::PutRecordBatchRequest,
266 table_ref: &mut Option<TableRef>,
267 ctx: QueryContextRef,
268 ) -> server_error::Result<AffectedRows> {
269 let result: Result<AffectedRows> = async {
270 let table = if let Some(table) = table_ref {
271 table.clone()
272 } else {
273 let table = self
274 .catalog_manager()
275 .table(
276 &request.table_name.catalog_name,
277 &request.table_name.schema_name,
278 &request.table_name.table_name,
279 None,
280 )
281 .await
282 .context(CatalogSnafu)?
283 .with_context(|| TableNotFoundSnafu {
284 table_name: request.table_name.to_string(),
285 })?;
286 *table_ref = Some(table.clone());
287 table
288 };
289
290 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
291 let interceptor = interceptor_ref.as_ref();
292 interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
293
294 self.plugins
295 .get::<PermissionCheckerRef>()
296 .as_ref()
297 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
298 .context(PermissionSnafu)?;
299
300 self.inserter
303 .handle_bulk_insert(
304 table,
305 request.flight_data,
306 request.record_batch,
307 request.schema_bytes,
308 )
309 .await
310 .context(TableOperationSnafu)
311 }
312 .await;
313
314 result
315 .map_err(BoxedError::new)
316 .context(server_error::ExecuteGrpcRequestSnafu)
317 }
318
319 fn handle_put_record_batch_stream(
320 &self,
321 stream: servers::grpc::flight::PutRecordBatchRequestStream,
322 ctx: QueryContextRef,
323 ) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
324 Box::pin(
325 self.handle_put_record_batch_stream_inner(stream, ctx)
326 .map(|result| {
327 result
328 .map_err(BoxedError::new)
329 .context(server_error::ExecuteGrpcRequestSnafu)
330 }),
331 )
332 }
333}
334
335fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
336 let catalog = ctx.current_catalog();
337 let schema = ctx.current_schema();
338
339 macro_rules! check_and_fill {
340 ($expr:ident) => {
341 if $expr.catalog_name.is_empty() {
342 $expr.catalog_name = catalog.to_string();
343 }
344 if $expr.schema_name.is_empty() {
345 $expr.schema_name = schema.to_string();
346 }
347 };
348 }
349
350 match ddl_expr {
351 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
352 Expr::CreateTable(expr) => {
353 check_and_fill!(expr);
354 }
355 Expr::AlterTable(expr) => {
356 check_and_fill!(expr);
357 }
358 Expr::DropTable(expr) => {
359 check_and_fill!(expr);
360 }
361 Expr::TruncateTable(expr) => {
362 check_and_fill!(expr);
363 }
364 Expr::CreateFlow(expr) => {
365 if expr.catalog_name.is_empty() {
366 expr.catalog_name = catalog.to_string();
367 }
368 }
369 Expr::DropFlow(expr) => {
370 if expr.catalog_name.is_empty() {
371 expr.catalog_name = catalog.to_string();
372 }
373 }
374 Expr::CreateView(expr) => {
375 check_and_fill!(expr);
376 }
377 Expr::DropView(expr) => {
378 check_and_fill!(expr);
379 }
380 Expr::CommentOn(expr) => {
381 check_and_fill!(expr);
382 }
383 }
384}
385
386impl Instance {
387 fn handle_put_record_batch_stream_inner(
388 &self,
389 mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
390 ctx: QueryContextRef,
391 ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
392 let catalog_manager = self.catalog_manager().clone();
394 let plugins = self.plugins.clone();
395 let inserter = self.inserter.clone();
396 let ctx = ctx.clone();
397 let mut table_ref: Option<TableRef> = None;
398 let mut table_checked = false;
399
400 Box::pin(try_stream! {
401 while let Some(request_result) = stream.next().await {
403 let request = request_result.map_err(|e| {
404 let error_msg = format!("Stream error: {:?}", e);
405 IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
406 })?;
407
408 if !table_checked {
410 let table_name = &request.table_name;
411
412 plugins
413 .get::<PermissionCheckerRef>()
414 .as_ref()
415 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
416 .context(PermissionSnafu)?;
417
418 table_ref = Some(
420 catalog_manager
421 .table(
422 &table_name.catalog_name,
423 &table_name.schema_name,
424 &table_name.table_name,
425 None,
426 )
427 .await
428 .context(CatalogSnafu)?
429 .with_context(|| TableNotFoundSnafu {
430 table_name: table_name.to_string(),
431 })?,
432 );
433
434 let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
436 let interceptor = interceptor_ref.as_ref();
437 interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
438
439 table_checked = true;
440 }
441
442 let request_id = request.request_id;
443 let start = Instant::now();
444 let rows = inserter
445 .handle_bulk_insert(
446 table_ref.clone().unwrap(),
447 request.flight_data,
448 request.record_batch,
449 request.schema_bytes,
450 )
451 .await
452 .context(TableOperationSnafu)?;
453 let elapsed_secs = start.elapsed().as_secs_f64();
454 yield DoPutResponse::new(request_id, rows, elapsed_secs);
455 }
456 })
457 }
458
459 async fn handle_insert_plan(
460 &self,
461 insert: InsertIntoPlan,
462 ctx: QueryContextRef,
463 ) -> Result<Output> {
464 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
465 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
466 err_msg: "'table_name' is absent in InsertIntoPlan",
467 })?;
468
469 let plan_decoder = self
471 .query_engine()
472 .engine_context(ctx.clone())
473 .new_plan_decoder()
474 .context(PlanStatementSnafu)?;
475
476 let dummy_catalog_list =
477 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
478 self.catalog_manager().clone(),
479 ));
480
481 let logical_plan = plan_decoder
483 .decode(
484 bytes::Bytes::from(insert.logical_plan),
485 dummy_catalog_list,
486 false,
487 )
488 .await
489 .context(SubstraitDecodeLogicalPlanSnafu)?;
490
491 let table = self
492 .catalog_manager()
493 .table(
494 &table_name.catalog_name,
495 &table_name.schema_name,
496 &table_name.table_name,
497 None,
498 )
499 .await
500 .context(CatalogSnafu)?
501 .with_context(|| TableNotFoundSnafu {
502 table_name: [
503 table_name.catalog_name.clone(),
504 table_name.schema_name.clone(),
505 table_name.table_name.clone(),
506 ]
507 .join("."),
508 })?;
509 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
510 let table_source = Arc::new(DefaultTableSource::new(table_provider));
511
512 let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
513 .context(SubstraitDecodeLogicalPlanSnafu)?;
514
515 let engine_ctx = self.query_engine().engine_context(ctx.clone());
516 let state = engine_ctx.state();
517 let analyzed_plan = state
519 .analyzer()
520 .execute_and_check(insert_into, state.config_options(), |_, _| {})
521 .context(DataFusionSnafu)?;
522
523 let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
525
526 let output = self
527 .do_exec_plan_inner(None, optimized_plan, ctx.clone())
528 .await?;
529
530 Ok(attach_timer(output, timer))
531 }
532 #[tracing::instrument(skip_all)]
533 pub async fn handle_inserts(
534 &self,
535 requests: InsertRequests,
536 ctx: QueryContextRef,
537 ) -> Result<Output> {
538 self.inserter
539 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
540 .await
541 .context(TableOperationSnafu)
542 }
543
544 #[tracing::instrument(skip_all)]
545 pub async fn handle_row_inserts(
546 &self,
547 requests: RowInsertRequests,
548 ctx: QueryContextRef,
549 accommodate_existing_schema: bool,
550 is_single_value: bool,
551 ) -> Result<Output> {
552 self.inserter
553 .handle_row_inserts(
554 requests,
555 ctx,
556 self.statement_executor.as_ref(),
557 accommodate_existing_schema,
558 is_single_value,
559 )
560 .await
561 .context(TableOperationSnafu)
562 }
563
564 #[tracing::instrument(skip_all)]
565 pub async fn handle_influx_row_inserts(
566 &self,
567 requests: RowInsertRequests,
568 ctx: QueryContextRef,
569 ) -> Result<Output> {
570 self.inserter
571 .handle_last_non_null_inserts(
572 requests,
573 ctx,
574 self.statement_executor.as_ref(),
575 true,
576 false,
578 )
579 .await
580 .context(TableOperationSnafu)
581 }
582
583 #[tracing::instrument(skip_all)]
584 pub async fn handle_metric_row_inserts(
585 &self,
586 requests: RowInsertRequests,
587 ctx: QueryContextRef,
588 physical_table: String,
589 ) -> Result<Output> {
590 self.inserter
591 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
592 .await
593 .context(TableOperationSnafu)
594 }
595
596 #[tracing::instrument(skip_all)]
597 pub async fn handle_deletes(
598 &self,
599 requests: DeleteRequests,
600 ctx: QueryContextRef,
601 ) -> Result<Output> {
602 self.deleter
603 .handle_column_deletes(requests, ctx)
604 .await
605 .context(TableOperationSnafu)
606 }
607
608 #[tracing::instrument(skip_all)]
609 pub async fn handle_row_deletes(
610 &self,
611 requests: RowDeleteRequests,
612 ctx: QueryContextRef,
613 ) -> Result<Output> {
614 self.deleter
615 .handle_row_deletes(requests, ctx)
616 .await
617 .context(TableOperationSnafu)
618 }
619}