1use std::sync::Arc;
16
17use api::v1::ddl_request::{Expr as DdlExpr, Expr};
18use api::v1::greptime_request::Request;
19use api::v1::query_request::Query;
20use api::v1::{
21 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
22 RowInsertRequests,
23};
24use async_trait::async_trait;
25use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
26use common_base::AffectedRows;
27use common_grpc::flight::FlightDecoder;
28use common_grpc::FlightData;
29use common_query::logical_plan::add_insert_to_logical_plan;
30use common_query::Output;
31use common_telemetry::tracing::{self};
32use query::parser::PromQuery;
33use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
34use servers::query_handler::grpc::GrpcQueryHandler;
35use servers::query_handler::sql::SqlQueryHandler;
36use session::context::QueryContextRef;
37use snafu::{ensure, OptionExt, ResultExt};
38use table::table_name::TableName;
39use table::TableRef;
40
41use crate::error::{
42 CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
43 IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
44 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
45};
46use crate::instance::{attach_timer, Instance};
47use crate::metrics::{
48 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
49};
50
51#[async_trait]
52impl GrpcQueryHandler for Instance {
53 type Error = Error;
54
55 async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
56 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
57 let interceptor = interceptor_ref.as_ref();
58 interceptor.pre_execute(&request, ctx.clone())?;
59
60 self.plugins
61 .get::<PermissionCheckerRef>()
62 .as_ref()
63 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
64 .context(PermissionSnafu)?;
65
66 let _guard = if let Some(limiter) = &self.limiter {
67 let result = limiter.limit_request(&request);
68 if result.is_none() {
69 return InFlightWriteBytesExceededSnafu.fail();
70 }
71 result
72 } else {
73 None
74 };
75
76 let output = match request {
77 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
78 Request::RowInserts(requests) => {
79 self.handle_row_inserts(requests, ctx.clone(), false, false)
80 .await?
81 }
82 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
83 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
84 Request::Query(query_request) => {
85 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
86 err_msg: "Missing field 'QueryRequest.query'",
87 })?;
88 match query {
89 Query::Sql(sql) => {
90 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
91 let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
92 ensure!(
93 result.len() == 1,
94 NotSupportedSnafu {
95 feat: "execute multiple statements in SQL query string through GRPC interface"
96 }
97 );
98 let output = result.remove(0)?;
99 attach_timer(output, timer)
100 }
101 Query::LogicalPlan(plan) => {
102 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
104
105 let plan_decoder = self
107 .query_engine()
108 .engine_context(ctx.clone())
109 .new_plan_decoder()
110 .context(PlanStatementSnafu)?;
111
112 let dummy_catalog_list =
113 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
114 self.catalog_manager().clone(),
115 ));
116
117 let logical_plan = plan_decoder
118 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
119 .await
120 .context(SubstraitDecodeLogicalPlanSnafu)?;
121 let output =
122 SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
123
124 attach_timer(output, timer)
125 }
126 Query::InsertIntoPlan(insert) => {
127 self.handle_insert_plan(insert, ctx.clone()).await?
128 }
129 Query::PromRangeQuery(promql) => {
130 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
131 let prom_query = PromQuery {
132 query: promql.query,
133 start: promql.start,
134 end: promql.end,
135 step: promql.step,
136 lookback: promql.lookback,
137 };
138 let mut result =
139 SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
140 ensure!(
141 result.len() == 1,
142 NotSupportedSnafu {
143 feat: "execute multiple statements in PromQL query string through GRPC interface"
144 }
145 );
146 let output = result.remove(0)?;
147 attach_timer(output, timer)
148 }
149 }
150 }
151 Request::Ddl(request) => {
152 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
153 err_msg: "'expr' is absent in DDL request",
154 })?;
155
156 fill_catalog_and_schema_from_context(&mut expr, &ctx);
157
158 match expr {
159 DdlExpr::CreateTable(mut expr) => {
160 let _ = self
161 .statement_executor
162 .create_table_inner(&mut expr, None, ctx.clone())
163 .await?;
164 Output::new_with_affected_rows(0)
165 }
166 DdlExpr::AlterDatabase(expr) => {
167 let _ = self
168 .statement_executor
169 .alter_database_inner(expr, ctx.clone())
170 .await?;
171 Output::new_with_affected_rows(0)
172 }
173 DdlExpr::AlterTable(expr) => {
174 self.statement_executor
175 .alter_table_inner(expr, ctx.clone())
176 .await?
177 }
178 DdlExpr::CreateDatabase(expr) => {
179 self.statement_executor
180 .create_database(
181 &expr.schema_name,
182 expr.create_if_not_exists,
183 expr.options,
184 ctx.clone(),
185 )
186 .await?
187 }
188 DdlExpr::DropTable(expr) => {
189 let table_name =
190 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
191 self.statement_executor
192 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
193 .await?
194 }
195 DdlExpr::TruncateTable(expr) => {
196 let table_name =
197 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
198 self.statement_executor
199 .truncate_table(table_name, ctx.clone())
200 .await?
201 }
202 DdlExpr::CreateFlow(expr) => {
203 self.statement_executor
204 .create_flow_inner(expr, ctx.clone())
205 .await?
206 }
207 DdlExpr::DropFlow(DropFlowExpr {
208 catalog_name,
209 flow_name,
210 drop_if_exists,
211 ..
212 }) => {
213 self.statement_executor
214 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
215 .await?
216 }
217 DdlExpr::CreateView(expr) => {
218 let _ = self
219 .statement_executor
220 .create_view_by_expr(expr, ctx.clone())
221 .await?;
222
223 Output::new_with_affected_rows(0)
224 }
225 DdlExpr::DropView(_) => {
226 todo!("implemented in the following PR")
227 }
228 }
229 }
230 };
231
232 let output = interceptor.post_execute(output, ctx)?;
233 Ok(output)
234 }
235
236 async fn put_record_batch(
237 &self,
238 table_name: &TableName,
239 table_ref: &mut Option<TableRef>,
240 decoder: &mut FlightDecoder,
241 data: FlightData,
242 ) -> Result<AffectedRows> {
243 let table = if let Some(table) = table_ref {
244 table.clone()
245 } else {
246 let table = self
247 .catalog_manager()
248 .table(
249 &table_name.catalog_name,
250 &table_name.schema_name,
251 &table_name.table_name,
252 None,
253 )
254 .await
255 .context(CatalogSnafu)?
256 .with_context(|| TableNotFoundSnafu {
257 table_name: table_name.to_string(),
258 })?;
259 *table_ref = Some(table.clone());
260 table
261 };
262
263 self.inserter
264 .handle_bulk_insert(table, decoder, data)
265 .await
266 .context(TableOperationSnafu)
267 }
268}
269
270fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
271 let catalog = ctx.current_catalog();
272 let schema = ctx.current_schema();
273
274 macro_rules! check_and_fill {
275 ($expr:ident) => {
276 if $expr.catalog_name.is_empty() {
277 $expr.catalog_name = catalog.to_string();
278 }
279 if $expr.schema_name.is_empty() {
280 $expr.schema_name = schema.to_string();
281 }
282 };
283 }
284
285 match ddl_expr {
286 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
287 Expr::CreateTable(expr) => {
288 check_and_fill!(expr);
289 }
290 Expr::AlterTable(expr) => {
291 check_and_fill!(expr);
292 }
293 Expr::DropTable(expr) => {
294 check_and_fill!(expr);
295 }
296 Expr::TruncateTable(expr) => {
297 check_and_fill!(expr);
298 }
299 Expr::CreateFlow(expr) => {
300 if expr.catalog_name.is_empty() {
301 expr.catalog_name = catalog.to_string();
302 }
303 }
304 Expr::DropFlow(expr) => {
305 if expr.catalog_name.is_empty() {
306 expr.catalog_name = catalog.to_string();
307 }
308 }
309 Expr::CreateView(expr) => {
310 check_and_fill!(expr);
311 }
312 Expr::DropView(expr) => {
313 check_and_fill!(expr);
314 }
315 }
316}
317
318impl Instance {
319 async fn handle_insert_plan(
320 &self,
321 insert: InsertIntoPlan,
322 ctx: QueryContextRef,
323 ) -> Result<Output> {
324 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
325 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
326 err_msg: "'table_name' is absent in InsertIntoPlan",
327 })?;
328
329 let plan_decoder = self
331 .query_engine()
332 .engine_context(ctx.clone())
333 .new_plan_decoder()
334 .context(PlanStatementSnafu)?;
335
336 let dummy_catalog_list =
337 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
338 self.catalog_manager().clone(),
339 ));
340
341 let logical_plan = plan_decoder
343 .decode(
344 bytes::Bytes::from(insert.logical_plan),
345 dummy_catalog_list,
346 false,
347 )
348 .await
349 .context(SubstraitDecodeLogicalPlanSnafu)?;
350
351 let table = self
352 .catalog_manager()
353 .table(
354 &table_name.catalog_name,
355 &table_name.schema_name,
356 &table_name.table_name,
357 None,
358 )
359 .await
360 .context(CatalogSnafu)?
361 .with_context(|| TableNotFoundSnafu {
362 table_name: [
363 table_name.catalog_name.clone(),
364 table_name.schema_name.clone(),
365 table_name.table_name.clone(),
366 ]
367 .join("."),
368 })?;
369
370 let table_info = table.table_info();
371
372 let df_schema = Arc::new(
373 table_info
374 .meta
375 .schema
376 .arrow_schema()
377 .clone()
378 .try_into()
379 .context(DataFusionSnafu)?,
380 );
381
382 let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
383 .context(SubstraitDecodeLogicalPlanSnafu)?;
384
385 let engine_ctx = self.query_engine().engine_context(ctx.clone());
386 let state = engine_ctx.state();
387 let analyzed_plan = state
389 .analyzer()
390 .execute_and_check(insert_into, state.config_options(), |_, _| {})
391 .context(common_query::error::GeneralDataFusionSnafu)
392 .context(SubstraitDecodeLogicalPlanSnafu)?;
393
394 let optimized_plan = state
396 .optimize(&analyzed_plan)
397 .context(common_query::error::GeneralDataFusionSnafu)
398 .context(SubstraitDecodeLogicalPlanSnafu)?;
399
400 let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
401
402 Ok(attach_timer(output, timer))
403 }
404 #[tracing::instrument(skip_all)]
405 pub async fn handle_inserts(
406 &self,
407 requests: InsertRequests,
408 ctx: QueryContextRef,
409 ) -> Result<Output> {
410 self.inserter
411 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
412 .await
413 .context(TableOperationSnafu)
414 }
415
416 #[tracing::instrument(skip_all)]
417 pub async fn handle_row_inserts(
418 &self,
419 requests: RowInsertRequests,
420 ctx: QueryContextRef,
421 accommodate_existing_schema: bool,
422 is_single_value: bool,
423 ) -> Result<Output> {
424 self.inserter
425 .handle_row_inserts(
426 requests,
427 ctx,
428 self.statement_executor.as_ref(),
429 accommodate_existing_schema,
430 is_single_value,
431 )
432 .await
433 .context(TableOperationSnafu)
434 }
435
436 #[tracing::instrument(skip_all)]
437 pub async fn handle_influx_row_inserts(
438 &self,
439 requests: RowInsertRequests,
440 ctx: QueryContextRef,
441 ) -> Result<Output> {
442 self.inserter
443 .handle_last_non_null_inserts(
444 requests,
445 ctx,
446 self.statement_executor.as_ref(),
447 true,
448 false,
450 )
451 .await
452 .context(TableOperationSnafu)
453 }
454
455 #[tracing::instrument(skip_all)]
456 pub async fn handle_metric_row_inserts(
457 &self,
458 requests: RowInsertRequests,
459 ctx: QueryContextRef,
460 physical_table: String,
461 ) -> Result<Output> {
462 self.inserter
463 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
464 .await
465 .context(TableOperationSnafu)
466 }
467
468 #[tracing::instrument(skip_all)]
469 pub async fn handle_deletes(
470 &self,
471 requests: DeleteRequests,
472 ctx: QueryContextRef,
473 ) -> Result<Output> {
474 self.deleter
475 .handle_column_deletes(requests, ctx)
476 .await
477 .context(TableOperationSnafu)
478 }
479
480 #[tracing::instrument(skip_all)]
481 pub async fn handle_row_deletes(
482 &self,
483 requests: RowDeleteRequests,
484 ctx: QueryContextRef,
485 ) -> Result<Output> {
486 self.deleter
487 .handle_row_deletes(requests, ctx)
488 .await
489 .context(TableOperationSnafu)
490 }
491}