1use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25 RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_error::ext::BoxedError;
31use common_grpc::flight::do_put::DoPutResponse;
32use common_query::Output;
33use common_query::logical_plan::add_insert_to_logical_plan;
34use common_telemetry::tracing::{self};
35use datafusion::datasource::DefaultTableSource;
36use futures::Stream;
37use futures::stream::StreamExt;
38use query::parser::PromQuery;
39use servers::error as server_error;
40use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
41use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
42use servers::query_handler::grpc::GrpcQueryHandler;
43use session::context::QueryContextRef;
44use snafu::{OptionExt, ResultExt, ensure};
45use table::TableRef;
46use table::table::adapter::DfTableProviderAdapter;
47use table::table_name::TableName;
48
49use crate::error::{
50 CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
51 NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
52 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
53};
54use crate::instance::{Instance, attach_timer};
55use crate::metrics::{
56 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
57};
58
59#[async_trait]
60impl GrpcQueryHandler for Instance {
61 async fn do_query(
62 &self,
63 request: Request,
64 ctx: QueryContextRef,
65 ) -> server_error::Result<Output> {
66 let result: Result<Output> = async {
67 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
68 let interceptor = interceptor_ref.as_ref();
69 interceptor.pre_execute(&request, ctx.clone())?;
70
71 self.plugins
72 .get::<PermissionCheckerRef>()
73 .as_ref()
74 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
75 .context(PermissionSnafu)?;
76
77 let output = match request {
78 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
79 Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
80 Some(physical_table) => {
81 self.handle_metric_row_inserts(
82 requests,
83 ctx.clone(),
84 physical_table.to_string(),
85 )
86 .await?
87 }
88 None => {
89 self.handle_row_inserts(requests, ctx.clone(), false, false)
90 .await?
91 }
92 },
93 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
94 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
95 Request::Query(query_request) => {
96 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
97 err_msg: "Missing field 'QueryRequest.query'",
98 })?;
99 match query {
100 Query::Sql(sql) => {
101 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
102 let mut result = self.do_query_inner(&sql, ctx.clone()).await;
103 ensure!(
104 result.len() == 1,
105 NotSupportedSnafu {
106 feat: "execute multiple statements in SQL query string through GRPC interface"
107 }
108 );
109 let output = result.remove(0)?;
110 attach_timer(output, timer)
111 }
112 Query::LogicalPlan(plan) => {
113 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
115
116 let plan_decoder = self
118 .query_engine()
119 .engine_context(ctx.clone())
120 .new_plan_decoder()
121 .context(PlanStatementSnafu)?;
122
123 let dummy_catalog_list =
124 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
125 self.catalog_manager().clone(),
126 ));
127
128 let logical_plan = plan_decoder
129 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
130 .await
131 .context(SubstraitDecodeLogicalPlanSnafu)?;
132 let query = logical_plan.display_indent().to_string();
133 let output =
134 self.do_exec_plan_inner(logical_plan, query, ctx.clone()).await?;
135
136 attach_timer(output, timer)
137 }
138 Query::InsertIntoPlan(insert) => {
139 self.handle_insert_plan(insert, ctx.clone()).await?
140 }
141 Query::PromRangeQuery(promql) => {
142 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
143 let prom_query = PromQuery {
144 query: promql.query,
145 start: promql.start,
146 end: promql.end,
147 step: promql.step,
148 lookback: promql.lookback,
149 alias: None,
150 };
151 let mut result =
152 self.do_promql_query_inner(&prom_query, ctx.clone()).await;
153 ensure!(
154 result.len() == 1,
155 NotSupportedSnafu {
156 feat: "execute multiple statements in PromQL query string through GRPC interface"
157 }
158 );
159 let output = result.remove(0)?;
160 attach_timer(output, timer)
161 }
162 }
163 }
164 Request::Ddl(request) => {
165 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
166 err_msg: "'expr' is absent in DDL request",
167 })?;
168
169 fill_catalog_and_schema_from_context(&mut expr, &ctx);
170
171 match expr {
172 DdlExpr::CreateTable(mut expr) => {
173 let _ = self
174 .statement_executor
175 .create_table_inner(&mut expr, None, ctx.clone())
176 .await?;
177 Output::new_with_affected_rows(0)
178 }
179 DdlExpr::AlterDatabase(expr) => {
180 let _ = self
181 .statement_executor
182 .alter_database_inner(expr, ctx.clone())
183 .await?;
184 Output::new_with_affected_rows(0)
185 }
186 DdlExpr::AlterTable(expr) => {
187 self.statement_executor
188 .alter_table_inner(expr, ctx.clone())
189 .await?
190 }
191 DdlExpr::CreateDatabase(expr) => {
192 self.statement_executor
193 .create_database(
194 &expr.schema_name,
195 expr.create_if_not_exists,
196 expr.options,
197 ctx.clone(),
198 )
199 .await?
200 }
201 DdlExpr::DropTable(expr) => {
202 let table_name =
203 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
204 self.statement_executor
205 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
206 .await?
207 }
208 DdlExpr::TruncateTable(expr) => {
209 let table_name =
210 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
211 let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
212 .map_err(BoxedError::new)
213 .context(ExternalSnafu)?;
214 self.statement_executor
215 .truncate_table(table_name, time_ranges, ctx.clone())
216 .await?
217 }
218 DdlExpr::CreateFlow(expr) => {
219 self.statement_executor
220 .create_flow_inner(expr, ctx.clone())
221 .await?
222 }
223 DdlExpr::DropFlow(DropFlowExpr {
224 catalog_name,
225 flow_name,
226 drop_if_exists,
227 ..
228 }) => {
229 self.statement_executor
230 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
231 .await?
232 }
233 DdlExpr::CreateView(expr) => {
234 let _ = self
235 .statement_executor
236 .create_view_by_expr(expr, ctx.clone())
237 .await?;
238
239 Output::new_with_affected_rows(0)
240 }
241 DdlExpr::DropView(_) => {
242 todo!("implemented in the following PR")
243 }
244 DdlExpr::CommentOn(expr) => {
245 self.statement_executor
246 .comment_by_expr(expr, ctx.clone())
247 .await?
248 }
249 }
250 }
251 };
252
253 let output = interceptor.post_execute(output, ctx)?;
254 Ok(output)
255 }
256 .await;
257
258 result
259 .map_err(BoxedError::new)
260 .context(server_error::ExecuteGrpcQuerySnafu)
261 }
262
263 fn handle_put_record_batch_stream(
264 &self,
265 stream: servers::grpc::flight::PutRecordBatchRequestStream,
266 ctx: QueryContextRef,
267 ) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
268 Box::pin(
269 self.handle_put_record_batch_stream_inner(stream, ctx)
270 .map(|result| {
271 result
272 .map_err(BoxedError::new)
273 .context(server_error::ExecuteGrpcRequestSnafu)
274 }),
275 )
276 }
277}
278
279fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
280 let catalog = ctx.current_catalog();
281 let schema = ctx.current_schema();
282
283 macro_rules! check_and_fill {
284 ($expr:ident) => {
285 if $expr.catalog_name.is_empty() {
286 $expr.catalog_name = catalog.to_string();
287 }
288 if $expr.schema_name.is_empty() {
289 $expr.schema_name = schema.to_string();
290 }
291 };
292 }
293
294 match ddl_expr {
295 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
296 Expr::CreateTable(expr) => {
297 check_and_fill!(expr);
298 }
299 Expr::AlterTable(expr) => {
300 check_and_fill!(expr);
301 }
302 Expr::DropTable(expr) => {
303 check_and_fill!(expr);
304 }
305 Expr::TruncateTable(expr) => {
306 check_and_fill!(expr);
307 }
308 Expr::CreateFlow(expr) => {
309 if expr.catalog_name.is_empty() {
310 expr.catalog_name = catalog.to_string();
311 }
312 }
313 Expr::DropFlow(expr) => {
314 if expr.catalog_name.is_empty() {
315 expr.catalog_name = catalog.to_string();
316 }
317 }
318 Expr::CreateView(expr) => {
319 check_and_fill!(expr);
320 }
321 Expr::DropView(expr) => {
322 check_and_fill!(expr);
323 }
324 Expr::CommentOn(expr) => {
325 check_and_fill!(expr);
326 }
327 }
328}
329
330impl Instance {
331 fn handle_put_record_batch_stream_inner(
332 &self,
333 mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
334 ctx: QueryContextRef,
335 ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
336 let catalog_manager = self.catalog_manager().clone();
338 let plugins = self.plugins.clone();
339 let inserter = self.inserter.clone();
340 let ctx = ctx.clone();
341 let mut table_ref: Option<TableRef> = None;
342 let mut table_checked = false;
343
344 Box::pin(try_stream! {
345 while let Some(request_result) = stream.next().await {
347 let request = request_result.map_err(|e| {
348 let error_msg = format!("Stream error: {:?}", e);
349 IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
350 })?;
351
352 if !table_checked {
354 let table_name = &request.table_name;
355
356 plugins
357 .get::<PermissionCheckerRef>()
358 .as_ref()
359 .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
360 .context(PermissionSnafu)?;
361
362 table_ref = Some(
364 catalog_manager
365 .table(
366 &table_name.catalog_name,
367 &table_name.schema_name,
368 &table_name.table_name,
369 None,
370 )
371 .await
372 .context(CatalogSnafu)?
373 .with_context(|| TableNotFoundSnafu {
374 table_name: table_name.to_string(),
375 })?,
376 );
377
378 let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
380 let interceptor = interceptor_ref.as_ref();
381 interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
382
383 table_checked = true;
384 }
385
386 let request_id = request.request_id;
387 let start = Instant::now();
388 let rows = inserter
389 .handle_bulk_insert(
390 table_ref.clone().unwrap(),
391 request.flight_data,
392 request.record_batch,
393 request.schema_bytes,
394 )
395 .await
396 .context(TableOperationSnafu)?;
397 let elapsed_secs = start.elapsed().as_secs_f64();
398 yield DoPutResponse::new(request_id, rows, elapsed_secs);
399 }
400 })
401 }
402
403 async fn handle_insert_plan(
404 &self,
405 insert: InsertIntoPlan,
406 ctx: QueryContextRef,
407 ) -> Result<Output> {
408 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
409 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
410 err_msg: "'table_name' is absent in InsertIntoPlan",
411 })?;
412
413 let plan_decoder = self
415 .query_engine()
416 .engine_context(ctx.clone())
417 .new_plan_decoder()
418 .context(PlanStatementSnafu)?;
419
420 let dummy_catalog_list =
421 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
422 self.catalog_manager().clone(),
423 ));
424
425 let logical_plan = plan_decoder
427 .decode(
428 bytes::Bytes::from(insert.logical_plan),
429 dummy_catalog_list,
430 false,
431 )
432 .await
433 .context(SubstraitDecodeLogicalPlanSnafu)?;
434
435 let table = self
436 .catalog_manager()
437 .table(
438 &table_name.catalog_name,
439 &table_name.schema_name,
440 &table_name.table_name,
441 None,
442 )
443 .await
444 .context(CatalogSnafu)?
445 .with_context(|| TableNotFoundSnafu {
446 table_name: [
447 table_name.catalog_name.clone(),
448 table_name.schema_name.clone(),
449 table_name.table_name.clone(),
450 ]
451 .join("."),
452 })?;
453 let table_provider = Arc::new(DfTableProviderAdapter::new(table));
454 let table_source = Arc::new(DefaultTableSource::new(table_provider));
455
456 let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
457 .context(SubstraitDecodeLogicalPlanSnafu)?;
458
459 let engine_ctx = self.query_engine().engine_context(ctx.clone());
460 let state = engine_ctx.state();
461 let analyzed_plan = state
463 .analyzer()
464 .execute_and_check(insert_into, state.config_options(), |_, _| {})
465 .context(DataFusionSnafu)?;
466
467 let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
469
470 let query = optimized_plan.display_indent().to_string();
471 let output = self
472 .do_exec_plan_inner(optimized_plan, query, ctx.clone())
473 .await?;
474
475 Ok(attach_timer(output, timer))
476 }
477 #[tracing::instrument(skip_all)]
478 pub async fn handle_inserts(
479 &self,
480 requests: InsertRequests,
481 ctx: QueryContextRef,
482 ) -> Result<Output> {
483 self.inserter
484 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
485 .await
486 .context(TableOperationSnafu)
487 }
488
489 #[tracing::instrument(skip_all)]
490 pub async fn handle_row_inserts(
491 &self,
492 requests: RowInsertRequests,
493 ctx: QueryContextRef,
494 accommodate_existing_schema: bool,
495 is_single_value: bool,
496 ) -> Result<Output> {
497 self.inserter
498 .handle_row_inserts(
499 requests,
500 ctx,
501 self.statement_executor.as_ref(),
502 accommodate_existing_schema,
503 is_single_value,
504 )
505 .await
506 .context(TableOperationSnafu)
507 }
508
509 #[tracing::instrument(skip_all)]
510 pub async fn handle_influx_row_inserts(
511 &self,
512 requests: RowInsertRequests,
513 ctx: QueryContextRef,
514 ) -> Result<Output> {
515 self.inserter
516 .handle_last_non_null_inserts(
517 requests,
518 ctx,
519 self.statement_executor.as_ref(),
520 true,
521 false,
523 )
524 .await
525 .context(TableOperationSnafu)
526 }
527
528 #[tracing::instrument(skip_all)]
529 pub async fn handle_metric_row_inserts(
530 &self,
531 requests: RowInsertRequests,
532 ctx: QueryContextRef,
533 physical_table: String,
534 ) -> Result<Output> {
535 self.inserter
536 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
537 .await
538 .context(TableOperationSnafu)
539 }
540
541 #[tracing::instrument(skip_all)]
542 pub async fn handle_deletes(
543 &self,
544 requests: DeleteRequests,
545 ctx: QueryContextRef,
546 ) -> Result<Output> {
547 self.deleter
548 .handle_column_deletes(requests, ctx)
549 .await
550 .context(TableOperationSnafu)
551 }
552
553 #[tracing::instrument(skip_all)]
554 pub async fn handle_row_deletes(
555 &self,
556 requests: RowDeleteRequests,
557 ctx: QueryContextRef,
558 ) -> Result<Output> {
559 self.deleter
560 .handle_row_deletes(requests, ctx)
561 .await
562 .context(TableOperationSnafu)
563 }
564}