1use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25 RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_error::ext::BoxedError;
31use common_grpc::flight::do_put::DoPutResponse;
32use common_query::Output;
33use common_query::logical_plan::add_insert_to_logical_plan;
34use common_telemetry::tracing::{self};
35use datafusion::datasource::DefaultTableSource;
36use futures::Stream;
37use futures::stream::StreamExt;
38use query::parser::PromQuery;
39use servers::error as server_error;
40use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
41use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
42use servers::query_handler::grpc::GrpcQueryHandler;
43use session::context::QueryContextRef;
44use snafu::{OptionExt, ResultExt, ensure};
45use table::TableRef;
46use table::table::adapter::DfTableProviderAdapter;
47use table::table_name::TableName;
48
49use crate::error::{
50 CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
51 NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
52 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
53};
54use crate::instance::{Instance, attach_timer};
55use crate::metrics::{
56 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
57};
58
59#[async_trait]
60impl GrpcQueryHandler for Instance {
61 async fn do_query(
62 &self,
63 request: Request,
64 ctx: QueryContextRef,
65 ) -> server_error::Result<Output> {
66 let result: Result<Output> = async {
67 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
68 let interceptor = interceptor_ref.as_ref();
69 interceptor.pre_execute(&request, ctx.clone())?;
70
71 self.plugins
72 .get::<PermissionCheckerRef>()
73 .as_ref()
74 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
75 .context(PermissionSnafu)?;
76
77 let output = match request {
78 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
79 Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
80 Some(physical_table) => {
81 self.handle_metric_row_inserts(
82 requests,
83 ctx.clone(),
84 physical_table.to_string(),
85 )
86 .await?
87 }
88 None => {
89 self.handle_row_inserts(requests, ctx.clone(), false, false)
90 .await?
91 }
92 },
93 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
94 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
95 Request::Query(query_request) => {
96 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
97 err_msg: "Missing field 'QueryRequest.query'",
98 })?;
99 match query {
100 Query::Sql(sql) => {
101 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
102 let mut result = self.do_query_inner(&sql, ctx.clone()).await;
103 ensure!(
104 result.len() == 1,
105 NotSupportedSnafu {
106 feat: "execute multiple statements in SQL query string through GRPC interface"
107 }
108 );
109 let output = result.remove(0)?;
110 attach_timer(output, timer)
111 }
112 Query::LogicalPlan(plan) => {
113 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
115
116 let plan_decoder = self
118 .query_engine()
119 .engine_context(ctx.clone())
120 .new_plan_decoder()
121 .context(PlanStatementSnafu)?;
122
123 let dummy_catalog_list =
124 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
125 self.catalog_manager().clone(),
126 ));
127
128 let logical_plan = plan_decoder
129 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
130 .await
131 .context(SubstraitDecodeLogicalPlanSnafu)?;
132 let output =
133 self.do_exec_plan_inner(None, logical_plan, ctx.clone()).await?;
134
135 attach_timer(output, timer)
136 }
137 Query::InsertIntoPlan(insert) => {
138 self.handle_insert_plan(insert, ctx.clone()).await?
139 }
140 Query::PromRangeQuery(promql) => {
141 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
142 let prom_query = PromQuery {
143 query: promql.query,
144 start: promql.start,
145 end: promql.end,
146 step: promql.step,
147 lookback: promql.lookback,
148 alias: None,
149 };
150 let mut result =
151 self.do_promql_query_inner(&prom_query, ctx.clone()).await;
152 ensure!(
153 result.len() == 1,
154 NotSupportedSnafu {
155 feat: "execute multiple statements in PromQL query string through GRPC interface"
156 }
157 );
158 let output = result.remove(0)?;
159 attach_timer(output, timer)
160 }
161 }
162 }
163 Request::Ddl(request) => {
164 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
165 err_msg: "'expr' is absent in DDL request",
166 })?;
167
168 fill_catalog_and_schema_from_context(&mut expr, &ctx);
169
170 match expr {
171 DdlExpr::CreateTable(mut expr) => {
172 let _ = self
173 .statement_executor
174 .create_table_inner(&mut expr, None, ctx.clone())
175 .await?;
176 Output::new_with_affected_rows(0)
177 }
178 DdlExpr::AlterDatabase(expr) => {
179 let _ = self
180 .statement_executor
181 .alter_database_inner(expr, ctx.clone())
182 .await?;
183 Output::new_with_affected_rows(0)
184 }
185 DdlExpr::AlterTable(expr) => {
186 self.statement_executor
187 .alter_table_inner(expr, ctx.clone())
188 .await?
189 }
190 DdlExpr::CreateDatabase(expr) => {
191 self.statement_executor
192 .create_database(
193 &expr.schema_name,
194 expr.create_if_not_exists,
195 expr.options,
196 ctx.clone(),
197 )
198 .await?
199 }
200 DdlExpr::DropTable(expr) => {
201 let table_name =
202 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
203 self.statement_executor
204 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
205 .await?
206 }
207 DdlExpr::TruncateTable(expr) => {
208 let table_name =
209 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
210 let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
211 .map_err(BoxedError::new)
212 .context(ExternalSnafu)?;
213 self.statement_executor
214 .truncate_table(table_name, time_ranges, ctx.clone())
215 .await?
216 }
217 DdlExpr::CreateFlow(expr) => {
218 self.statement_executor
219 .create_flow_inner(expr, ctx.clone())
220 .await?
221 }
222 DdlExpr::DropFlow(DropFlowExpr {
223 catalog_name,
224 flow_name,
225 drop_if_exists,
226 ..
227 }) => {
228 self.statement_executor
229 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
230 .await?
231 }
232 DdlExpr::CreateView(expr) => {
233 let _ = self
234 .statement_executor
235 .create_view_by_expr(expr, ctx.clone())
236 .await?;
237
238 Output::new_with_affected_rows(0)
239 }
240 DdlExpr::DropView(_) => {
241 todo!("implemented in the following PR")
242 }
243 DdlExpr::CommentOn(expr) => {
244 self.statement_executor
245 .comment_by_expr(expr, ctx.clone())
246 .await?
247 }
248 }
249 }
250 };
251
252 let output = interceptor.post_execute(output, ctx)?;
253 Ok(output)
254 }
255 .await;
256
257 result
258 .map_err(BoxedError::new)
259 .context(server_error::ExecuteGrpcQuerySnafu)
260 }
261
262 fn handle_put_record_batch_stream(
263 &self,
264 stream: servers::grpc::flight::PutRecordBatchRequestStream,
265 ctx: QueryContextRef,
266 ) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
267 Box::pin(
268 self.handle_put_record_batch_stream_inner(stream, ctx)
269 .map(|result| {
270 result
271 .map_err(BoxedError::new)
272 .context(server_error::ExecuteGrpcRequestSnafu)
273 }),
274 )
275 }
276}
277
278fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
279 let catalog = ctx.current_catalog();
280 let schema = ctx.current_schema();
281
282 macro_rules! check_and_fill {
283 ($expr:ident) => {
284 if $expr.catalog_name.is_empty() {
285 $expr.catalog_name = catalog.to_string();
286 }
287 if $expr.schema_name.is_empty() {
288 $expr.schema_name = schema.to_string();
289 }
290 };
291 }
292
293 match ddl_expr {
294 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
295 Expr::CreateTable(expr) => {
296 check_and_fill!(expr);
297 }
298 Expr::AlterTable(expr) => {
299 check_and_fill!(expr);
300 }
301 Expr::DropTable(expr) => {
302 check_and_fill!(expr);
303 }
304 Expr::TruncateTable(expr) => {
305 check_and_fill!(expr);
306 }
307 Expr::CreateFlow(expr) => {
308 if expr.catalog_name.is_empty() {
309 expr.catalog_name = catalog.to_string();
310 }
311 }
312 Expr::DropFlow(expr) => {
313 if expr.catalog_name.is_empty() {
314 expr.catalog_name = catalog.to_string();
315 }
316 }
317 Expr::CreateView(expr) => {
318 check_and_fill!(expr);
319 }
320 Expr::DropView(expr) => {
321 check_and_fill!(expr);
322 }
323 Expr::CommentOn(expr) => {
324 check_and_fill!(expr);
325 }
326 }
327}
328
329impl Instance {
330 fn handle_put_record_batch_stream_inner(
331 &self,
332 mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
333 ctx: QueryContextRef,
334 ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
335 let catalog_manager = self.catalog_manager().clone();
337 let plugins = self.plugins.clone();
338 let inserter = self.inserter.clone();
339 let ctx = ctx.clone();
340 let mut table_ref: Option<TableRef> = None;
341 let mut table_checked = false;
342
343 Box::pin(try_stream! {
344 while let Some(request_result) = stream.next().await {
346 let request = request_result.map_err(|e| {
347 let error_msg = format!("Stream error: {:?}", e);
348 IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
349 })?;
350
351 if !table_checked {
353 let table_name = &request.table_name;
354
355 plugins
356 .get::<PermissionCheckerRef>()
357 .as_ref()
358 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
359 .context(PermissionSnafu)?;
360
361 table_ref = Some(
363 catalog_manager
364 .table(
365 &table_name.catalog_name,
366 &table_name.schema_name,
367 &table_name.table_name,
368 None,
369 )
370 .await
371 .context(CatalogSnafu)?
372 .with_context(|| TableNotFoundSnafu {
373 table_name: table_name.to_string(),
374 })?,
375 );
376
377 let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
379 let interceptor = interceptor_ref.as_ref();
380 interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
381
382 table_checked = true;
383 }
384
385 let request_id = request.request_id;
386 let start = Instant::now();
387 let rows = inserter
388 .handle_bulk_insert(
389 table_ref.clone().unwrap(),
390 request.flight_data,
391 request.record_batch,
392 request.schema_bytes,
393 )
394 .await
395 .context(TableOperationSnafu)?;
396 let elapsed_secs = start.elapsed().as_secs_f64();
397 yield DoPutResponse::new(request_id, rows, elapsed_secs);
398 }
399 })
400 }
401
402 async fn handle_insert_plan(
403 &self,
404 insert: InsertIntoPlan,
405 ctx: QueryContextRef,
406 ) -> Result<Output> {
407 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
408 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
409 err_msg: "'table_name' is absent in InsertIntoPlan",
410 })?;
411
412 let plan_decoder = self
414 .query_engine()
415 .engine_context(ctx.clone())
416 .new_plan_decoder()
417 .context(PlanStatementSnafu)?;
418
419 let dummy_catalog_list =
420 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
421 self.catalog_manager().clone(),
422 ));
423
424 let logical_plan = plan_decoder
426 .decode(
427 bytes::Bytes::from(insert.logical_plan),
428 dummy_catalog_list,
429 false,
430 )
431 .await
432 .context(SubstraitDecodeLogicalPlanSnafu)?;
433
434 let table = self
435 .catalog_manager()
436 .table(
437 &table_name.catalog_name,
438 &table_name.schema_name,
439 &table_name.table_name,
440 None,
441 )
442 .await
443 .context(CatalogSnafu)?
444 .with_context(|| TableNotFoundSnafu {
445 table_name: [
446 table_name.catalog_name.clone(),
447 table_name.schema_name.clone(),
448 table_name.table_name.clone(),
449 ]
450 .join("."),
451 })?;
452 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
453 let table_source = Arc::new(DefaultTableSource::new(table_provider));
454
455 let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
456 .context(SubstraitDecodeLogicalPlanSnafu)?;
457
458 let engine_ctx = self.query_engine().engine_context(ctx.clone());
459 let state = engine_ctx.state();
460 let analyzed_plan = state
462 .analyzer()
463 .execute_and_check(insert_into, state.config_options(), |_, _| {})
464 .context(DataFusionSnafu)?;
465
466 let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
468
469 let output = self
470 .do_exec_plan_inner(None, optimized_plan, ctx.clone())
471 .await?;
472
473 Ok(attach_timer(output, timer))
474 }
475 #[tracing::instrument(skip_all)]
476 pub async fn handle_inserts(
477 &self,
478 requests: InsertRequests,
479 ctx: QueryContextRef,
480 ) -> Result<Output> {
481 self.inserter
482 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
483 .await
484 .context(TableOperationSnafu)
485 }
486
487 #[tracing::instrument(skip_all)]
488 pub async fn handle_row_inserts(
489 &self,
490 requests: RowInsertRequests,
491 ctx: QueryContextRef,
492 accommodate_existing_schema: bool,
493 is_single_value: bool,
494 ) -> Result<Output> {
495 self.inserter
496 .handle_row_inserts(
497 requests,
498 ctx,
499 self.statement_executor.as_ref(),
500 accommodate_existing_schema,
501 is_single_value,
502 )
503 .await
504 .context(TableOperationSnafu)
505 }
506
507 #[tracing::instrument(skip_all)]
508 pub async fn handle_influx_row_inserts(
509 &self,
510 requests: RowInsertRequests,
511 ctx: QueryContextRef,
512 ) -> Result<Output> {
513 self.inserter
514 .handle_last_non_null_inserts(
515 requests,
516 ctx,
517 self.statement_executor.as_ref(),
518 true,
519 false,
521 )
522 .await
523 .context(TableOperationSnafu)
524 }
525
526 #[tracing::instrument(skip_all)]
527 pub async fn handle_metric_row_inserts(
528 &self,
529 requests: RowInsertRequests,
530 ctx: QueryContextRef,
531 physical_table: String,
532 ) -> Result<Output> {
533 self.inserter
534 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
535 .await
536 .context(TableOperationSnafu)
537 }
538
539 #[tracing::instrument(skip_all)]
540 pub async fn handle_deletes(
541 &self,
542 requests: DeleteRequests,
543 ctx: QueryContextRef,
544 ) -> Result<Output> {
545 self.deleter
546 .handle_column_deletes(requests, ctx)
547 .await
548 .context(TableOperationSnafu)
549 }
550
551 #[tracing::instrument(skip_all)]
552 pub async fn handle_row_deletes(
553 &self,
554 requests: RowDeleteRequests,
555 ctx: QueryContextRef,
556 ) -> Result<Output> {
557 self.deleter
558 .handle_row_deletes(requests, ctx)
559 .await
560 .context(TableOperationSnafu)
561 }
562}