1use std::sync::Arc;
16
17use api::v1::ddl_request::{Expr as DdlExpr, Expr};
18use api::v1::greptime_request::Request;
19use api::v1::query_request::Query;
20use api::v1::{
21 DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
22 RowInsertRequests,
23};
24use async_trait::async_trait;
25use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
26use common_base::AffectedRows;
27use common_query::logical_plan::add_insert_to_logical_plan;
28use common_query::Output;
29use common_telemetry::tracing::{self};
30use query::parser::PromQuery;
31use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
32use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
33use servers::query_handler::sql::SqlQueryHandler;
34use session::context::QueryContextRef;
35use snafu::{ensure, OptionExt, ResultExt};
36use table::table_name::TableName;
37
38use crate::error::{
39 CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
40 IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
41 SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
42};
43use crate::instance::{attach_timer, Instance};
44use crate::metrics::{
45 GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
46};
47
48#[async_trait]
49impl GrpcQueryHandler for Instance {
50 type Error = Error;
51
52 async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
53 let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
54 let interceptor = interceptor_ref.as_ref();
55 interceptor.pre_execute(&request, ctx.clone())?;
56
57 self.plugins
58 .get::<PermissionCheckerRef>()
59 .as_ref()
60 .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
61 .context(PermissionSnafu)?;
62
63 let _guard = if let Some(limiter) = &self.limiter {
64 let result = limiter.limit_request(&request);
65 if result.is_none() {
66 return InFlightWriteBytesExceededSnafu.fail();
67 }
68 result
69 } else {
70 None
71 };
72
73 let output = match request {
74 Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
75 Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
76 Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
77 Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
78 Request::Query(query_request) => {
79 let query = query_request.query.context(IncompleteGrpcRequestSnafu {
80 err_msg: "Missing field 'QueryRequest.query'",
81 })?;
82 match query {
83 Query::Sql(sql) => {
84 let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
85 let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
86 ensure!(
87 result.len() == 1,
88 NotSupportedSnafu {
89 feat: "execute multiple statements in SQL query string through GRPC interface"
90 }
91 );
92 let output = result.remove(0)?;
93 attach_timer(output, timer)
94 }
95 Query::LogicalPlan(plan) => {
96 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
98
99 let plan_decoder = self
101 .query_engine()
102 .engine_context(ctx.clone())
103 .new_plan_decoder()
104 .context(PlanStatementSnafu)?;
105
106 let dummy_catalog_list =
107 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
108 self.catalog_manager().clone(),
109 ));
110
111 let logical_plan = plan_decoder
112 .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
113 .await
114 .context(SubstraitDecodeLogicalPlanSnafu)?;
115 let output =
116 SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
117
118 attach_timer(output, timer)
119 }
120 Query::InsertIntoPlan(insert) => {
121 self.handle_insert_plan(insert, ctx.clone()).await?
122 }
123 Query::PromRangeQuery(promql) => {
124 let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
125 let prom_query = PromQuery {
126 query: promql.query,
127 start: promql.start,
128 end: promql.end,
129 step: promql.step,
130 lookback: promql.lookback,
131 };
132 let mut result =
133 SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
134 ensure!(
135 result.len() == 1,
136 NotSupportedSnafu {
137 feat: "execute multiple statements in PromQL query string through GRPC interface"
138 }
139 );
140 let output = result.remove(0)?;
141 attach_timer(output, timer)
142 }
143 }
144 }
145 Request::Ddl(request) => {
146 let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
147 err_msg: "'expr' is absent in DDL request",
148 })?;
149
150 fill_catalog_and_schema_from_context(&mut expr, &ctx);
151
152 match expr {
153 DdlExpr::CreateTable(mut expr) => {
154 let _ = self
155 .statement_executor
156 .create_table_inner(&mut expr, None, ctx.clone())
157 .await?;
158 Output::new_with_affected_rows(0)
159 }
160 DdlExpr::AlterDatabase(expr) => {
161 let _ = self
162 .statement_executor
163 .alter_database_inner(expr, ctx.clone())
164 .await?;
165 Output::new_with_affected_rows(0)
166 }
167 DdlExpr::AlterTable(expr) => {
168 self.statement_executor
169 .alter_table_inner(expr, ctx.clone())
170 .await?
171 }
172 DdlExpr::CreateDatabase(expr) => {
173 self.statement_executor
174 .create_database(
175 &expr.schema_name,
176 expr.create_if_not_exists,
177 expr.options,
178 ctx.clone(),
179 )
180 .await?
181 }
182 DdlExpr::DropTable(expr) => {
183 let table_name =
184 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
185 self.statement_executor
186 .drop_table(table_name, expr.drop_if_exists, ctx.clone())
187 .await?
188 }
189 DdlExpr::TruncateTable(expr) => {
190 let table_name =
191 TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
192 self.statement_executor
193 .truncate_table(table_name, ctx.clone())
194 .await?
195 }
196 DdlExpr::CreateFlow(expr) => {
197 self.statement_executor
198 .create_flow_inner(expr, ctx.clone())
199 .await?
200 }
201 DdlExpr::DropFlow(DropFlowExpr {
202 catalog_name,
203 flow_name,
204 drop_if_exists,
205 ..
206 }) => {
207 self.statement_executor
208 .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
209 .await?
210 }
211 DdlExpr::CreateView(expr) => {
212 let _ = self
213 .statement_executor
214 .create_view_by_expr(expr, ctx.clone())
215 .await?;
216
217 Output::new_with_affected_rows(0)
218 }
219 DdlExpr::DropView(_) => {
220 todo!("implemented in the following PR")
221 }
222 }
223 }
224 };
225
226 let output = interceptor.post_execute(output, ctx)?;
227 Ok(output)
228 }
229
230 async fn put_record_batch(
231 &self,
232 table: &TableName,
233 record_batch: RawRecordBatch,
234 ) -> Result<AffectedRows> {
235 let _table = self
236 .catalog_manager()
237 .table(
238 &table.catalog_name,
239 &table.schema_name,
240 &table.table_name,
241 None,
242 )
243 .await
244 .context(CatalogSnafu)?
245 .with_context(|| TableNotFoundSnafu {
246 table_name: table.to_string(),
247 })?;
248
249 common_telemetry::debug!(
251 "calling put_record_batch with table: {:?} and record_batch size: {}",
252 table,
253 record_batch.len()
254 );
255 Ok(record_batch.len())
256 }
257}
258
259fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
260 let catalog = ctx.current_catalog();
261 let schema = ctx.current_schema();
262
263 macro_rules! check_and_fill {
264 ($expr:ident) => {
265 if $expr.catalog_name.is_empty() {
266 $expr.catalog_name = catalog.to_string();
267 }
268 if $expr.schema_name.is_empty() {
269 $expr.schema_name = schema.to_string();
270 }
271 };
272 }
273
274 match ddl_expr {
275 Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { }
276 Expr::CreateTable(expr) => {
277 check_and_fill!(expr);
278 }
279 Expr::AlterTable(expr) => {
280 check_and_fill!(expr);
281 }
282 Expr::DropTable(expr) => {
283 check_and_fill!(expr);
284 }
285 Expr::TruncateTable(expr) => {
286 check_and_fill!(expr);
287 }
288 Expr::CreateFlow(expr) => {
289 if expr.catalog_name.is_empty() {
290 expr.catalog_name = catalog.to_string();
291 }
292 }
293 Expr::DropFlow(expr) => {
294 if expr.catalog_name.is_empty() {
295 expr.catalog_name = catalog.to_string();
296 }
297 }
298 Expr::CreateView(expr) => {
299 check_and_fill!(expr);
300 }
301 Expr::DropView(expr) => {
302 check_and_fill!(expr);
303 }
304 }
305}
306
307impl Instance {
308 async fn handle_insert_plan(
309 &self,
310 insert: InsertIntoPlan,
311 ctx: QueryContextRef,
312 ) -> Result<Output> {
313 let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
314 let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
315 err_msg: "'table_name' is absent in InsertIntoPlan",
316 })?;
317
318 let plan_decoder = self
320 .query_engine()
321 .engine_context(ctx.clone())
322 .new_plan_decoder()
323 .context(PlanStatementSnafu)?;
324
325 let dummy_catalog_list =
326 Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
327 self.catalog_manager().clone(),
328 ));
329
330 let logical_plan = plan_decoder
332 .decode(
333 bytes::Bytes::from(insert.logical_plan),
334 dummy_catalog_list,
335 false,
336 )
337 .await
338 .context(SubstraitDecodeLogicalPlanSnafu)?;
339
340 let table = self
341 .catalog_manager()
342 .table(
343 &table_name.catalog_name,
344 &table_name.schema_name,
345 &table_name.table_name,
346 None,
347 )
348 .await
349 .context(CatalogSnafu)?
350 .with_context(|| TableNotFoundSnafu {
351 table_name: [
352 table_name.catalog_name.clone(),
353 table_name.schema_name.clone(),
354 table_name.table_name.clone(),
355 ]
356 .join("."),
357 })?;
358
359 let table_info = table.table_info();
360
361 let df_schema = Arc::new(
362 table_info
363 .meta
364 .schema
365 .arrow_schema()
366 .clone()
367 .try_into()
368 .context(DataFusionSnafu)?,
369 );
370
371 let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
372 .context(SubstraitDecodeLogicalPlanSnafu)?;
373
374 let engine_ctx = self.query_engine().engine_context(ctx.clone());
375 let state = engine_ctx.state();
376 let analyzed_plan = state
378 .analyzer()
379 .execute_and_check(insert_into, state.config_options(), |_, _| {})
380 .context(common_query::error::GeneralDataFusionSnafu)
381 .context(SubstraitDecodeLogicalPlanSnafu)?;
382
383 let optimized_plan = state
385 .optimize(&analyzed_plan)
386 .context(common_query::error::GeneralDataFusionSnafu)
387 .context(SubstraitDecodeLogicalPlanSnafu)?;
388
389 let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
390
391 Ok(attach_timer(output, timer))
392 }
393 #[tracing::instrument(skip_all)]
394 pub async fn handle_inserts(
395 &self,
396 requests: InsertRequests,
397 ctx: QueryContextRef,
398 ) -> Result<Output> {
399 self.inserter
400 .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
401 .await
402 .context(TableOperationSnafu)
403 }
404
405 #[tracing::instrument(skip_all)]
406 pub async fn handle_row_inserts(
407 &self,
408 requests: RowInsertRequests,
409 ctx: QueryContextRef,
410 ) -> Result<Output> {
411 self.inserter
412 .handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
413 .await
414 .context(TableOperationSnafu)
415 }
416
417 #[tracing::instrument(skip_all)]
418 pub async fn handle_influx_row_inserts(
419 &self,
420 requests: RowInsertRequests,
421 ctx: QueryContextRef,
422 ) -> Result<Output> {
423 self.inserter
424 .handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
425 .await
426 .context(TableOperationSnafu)
427 }
428
429 #[tracing::instrument(skip_all)]
430 pub async fn handle_metric_row_inserts(
431 &self,
432 requests: RowInsertRequests,
433 ctx: QueryContextRef,
434 physical_table: String,
435 ) -> Result<Output> {
436 self.inserter
437 .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
438 .await
439 .context(TableOperationSnafu)
440 }
441
442 #[tracing::instrument(skip_all)]
443 pub async fn handle_deletes(
444 &self,
445 requests: DeleteRequests,
446 ctx: QueryContextRef,
447 ) -> Result<Output> {
448 self.deleter
449 .handle_column_deletes(requests, ctx)
450 .await
451 .context(TableOperationSnafu)
452 }
453
454 #[tracing::instrument(skip_all)]
455 pub async fn handle_row_deletes(
456 &self,
457 requests: RowDeleteRequests,
458 ctx: QueryContextRef,
459 ) -> Result<Output> {
460 self.deleter
461 .handle_row_deletes(requests, ctx)
462 .await
463 .context(TableOperationSnafu)
464 }
465}