frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::ddl_request::{Expr as DdlExpr, Expr};
18use api::v1::greptime_request::Request;
19use api::v1::query_request::Query;
20use api::v1::{
21    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
22    RowInsertRequests,
23};
24use async_trait::async_trait;
25use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
26use common_base::AffectedRows;
27use common_grpc::flight::FlightDecoder;
28use common_grpc::FlightData;
29use common_query::logical_plan::add_insert_to_logical_plan;
30use common_query::Output;
31use common_telemetry::tracing::{self};
32use query::parser::PromQuery;
33use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
34use servers::query_handler::grpc::GrpcQueryHandler;
35use servers::query_handler::sql::SqlQueryHandler;
36use session::context::QueryContextRef;
37use snafu::{ensure, OptionExt, ResultExt};
38use table::table_name::TableName;
39use table::TableRef;
40
41use crate::error::{
42    CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
43    IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
44    SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
45};
46use crate::instance::{attach_timer, Instance};
47use crate::metrics::{
48    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
49};
50
51#[async_trait]
52impl GrpcQueryHandler for Instance {
53    type Error = Error;
54
55    async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
56        let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
57        let interceptor = interceptor_ref.as_ref();
58        interceptor.pre_execute(&request, ctx.clone())?;
59
60        self.plugins
61            .get::<PermissionCheckerRef>()
62            .as_ref()
63            .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
64            .context(PermissionSnafu)?;
65
66        let _guard = if let Some(limiter) = &self.limiter {
67            let result = limiter.limit_request(&request);
68            if result.is_none() {
69                return InFlightWriteBytesExceededSnafu.fail();
70            }
71            result
72        } else {
73            None
74        };
75
76        let output = match request {
77            Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
78            Request::RowInserts(requests) => {
79                self.handle_row_inserts(requests, ctx.clone(), false, false)
80                    .await?
81            }
82            Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
83            Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
84            Request::Query(query_request) => {
85                let query = query_request.query.context(IncompleteGrpcRequestSnafu {
86                    err_msg: "Missing field 'QueryRequest.query'",
87                })?;
88                match query {
89                    Query::Sql(sql) => {
90                        let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
91                        let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
92                        ensure!(
93                            result.len() == 1,
94                            NotSupportedSnafu {
95                                feat: "execute multiple statements in SQL query string through GRPC interface"
96                            }
97                        );
98                        let output = result.remove(0)?;
99                        attach_timer(output, timer)
100                    }
101                    Query::LogicalPlan(plan) => {
102                        // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
103                        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
104
105                        // use dummy catalog to provide table
106                        let plan_decoder = self
107                            .query_engine()
108                            .engine_context(ctx.clone())
109                            .new_plan_decoder()
110                            .context(PlanStatementSnafu)?;
111
112                        let dummy_catalog_list =
113                            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
114                                self.catalog_manager().clone(),
115                            ));
116
117                        let logical_plan = plan_decoder
118                            .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
119                            .await
120                            .context(SubstraitDecodeLogicalPlanSnafu)?;
121                        let output =
122                            SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
123
124                        attach_timer(output, timer)
125                    }
126                    Query::InsertIntoPlan(insert) => {
127                        self.handle_insert_plan(insert, ctx.clone()).await?
128                    }
129                    Query::PromRangeQuery(promql) => {
130                        let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
131                        let prom_query = PromQuery {
132                            query: promql.query,
133                            start: promql.start,
134                            end: promql.end,
135                            step: promql.step,
136                            lookback: promql.lookback,
137                        };
138                        let mut result =
139                            SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
140                        ensure!(
141                            result.len() == 1,
142                            NotSupportedSnafu {
143                                feat: "execute multiple statements in PromQL query string through GRPC interface"
144                            }
145                        );
146                        let output = result.remove(0)?;
147                        attach_timer(output, timer)
148                    }
149                }
150            }
151            Request::Ddl(request) => {
152                let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
153                    err_msg: "'expr' is absent in DDL request",
154                })?;
155
156                fill_catalog_and_schema_from_context(&mut expr, &ctx);
157
158                match expr {
159                    DdlExpr::CreateTable(mut expr) => {
160                        let _ = self
161                            .statement_executor
162                            .create_table_inner(&mut expr, None, ctx.clone())
163                            .await?;
164                        Output::new_with_affected_rows(0)
165                    }
166                    DdlExpr::AlterDatabase(expr) => {
167                        let _ = self
168                            .statement_executor
169                            .alter_database_inner(expr, ctx.clone())
170                            .await?;
171                        Output::new_with_affected_rows(0)
172                    }
173                    DdlExpr::AlterTable(expr) => {
174                        self.statement_executor
175                            .alter_table_inner(expr, ctx.clone())
176                            .await?
177                    }
178                    DdlExpr::CreateDatabase(expr) => {
179                        self.statement_executor
180                            .create_database(
181                                &expr.schema_name,
182                                expr.create_if_not_exists,
183                                expr.options,
184                                ctx.clone(),
185                            )
186                            .await?
187                    }
188                    DdlExpr::DropTable(expr) => {
189                        let table_name =
190                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
191                        self.statement_executor
192                            .drop_table(table_name, expr.drop_if_exists, ctx.clone())
193                            .await?
194                    }
195                    DdlExpr::TruncateTable(expr) => {
196                        let table_name =
197                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
198                        self.statement_executor
199                            .truncate_table(table_name, ctx.clone())
200                            .await?
201                    }
202                    DdlExpr::CreateFlow(expr) => {
203                        self.statement_executor
204                            .create_flow_inner(expr, ctx.clone())
205                            .await?
206                    }
207                    DdlExpr::DropFlow(DropFlowExpr {
208                        catalog_name,
209                        flow_name,
210                        drop_if_exists,
211                        ..
212                    }) => {
213                        self.statement_executor
214                            .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
215                            .await?
216                    }
217                    DdlExpr::CreateView(expr) => {
218                        let _ = self
219                            .statement_executor
220                            .create_view_by_expr(expr, ctx.clone())
221                            .await?;
222
223                        Output::new_with_affected_rows(0)
224                    }
225                    DdlExpr::DropView(_) => {
226                        todo!("implemented in the following PR")
227                    }
228                }
229            }
230        };
231
232        let output = interceptor.post_execute(output, ctx)?;
233        Ok(output)
234    }
235
236    async fn put_record_batch(
237        &self,
238        table_name: &TableName,
239        table_ref: &mut Option<TableRef>,
240        decoder: &mut FlightDecoder,
241        data: FlightData,
242    ) -> Result<AffectedRows> {
243        let table = if let Some(table) = table_ref {
244            table.clone()
245        } else {
246            let table = self
247                .catalog_manager()
248                .table(
249                    &table_name.catalog_name,
250                    &table_name.schema_name,
251                    &table_name.table_name,
252                    None,
253                )
254                .await
255                .context(CatalogSnafu)?
256                .with_context(|| TableNotFoundSnafu {
257                    table_name: table_name.to_string(),
258                })?;
259            *table_ref = Some(table.clone());
260            table
261        };
262
263        self.inserter
264            .handle_bulk_insert(table, decoder, data)
265            .await
266            .context(TableOperationSnafu)
267    }
268}
269
270fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
271    let catalog = ctx.current_catalog();
272    let schema = ctx.current_schema();
273
274    macro_rules! check_and_fill {
275        ($expr:ident) => {
276            if $expr.catalog_name.is_empty() {
277                $expr.catalog_name = catalog.to_string();
278            }
279            if $expr.schema_name.is_empty() {
280                $expr.schema_name = schema.to_string();
281            }
282        };
283    }
284
285    match ddl_expr {
286        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
287        Expr::CreateTable(expr) => {
288            check_and_fill!(expr);
289        }
290        Expr::AlterTable(expr) => {
291            check_and_fill!(expr);
292        }
293        Expr::DropTable(expr) => {
294            check_and_fill!(expr);
295        }
296        Expr::TruncateTable(expr) => {
297            check_and_fill!(expr);
298        }
299        Expr::CreateFlow(expr) => {
300            if expr.catalog_name.is_empty() {
301                expr.catalog_name = catalog.to_string();
302            }
303        }
304        Expr::DropFlow(expr) => {
305            if expr.catalog_name.is_empty() {
306                expr.catalog_name = catalog.to_string();
307            }
308        }
309        Expr::CreateView(expr) => {
310            check_and_fill!(expr);
311        }
312        Expr::DropView(expr) => {
313            check_and_fill!(expr);
314        }
315    }
316}
317
318impl Instance {
319    async fn handle_insert_plan(
320        &self,
321        insert: InsertIntoPlan,
322        ctx: QueryContextRef,
323    ) -> Result<Output> {
324        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
325        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
326            err_msg: "'table_name' is absent in InsertIntoPlan",
327        })?;
328
329        // use dummy catalog to provide table
330        let plan_decoder = self
331            .query_engine()
332            .engine_context(ctx.clone())
333            .new_plan_decoder()
334            .context(PlanStatementSnafu)?;
335
336        let dummy_catalog_list =
337            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
338                self.catalog_manager().clone(),
339            ));
340
341        // no optimize yet since we still need to add stuff
342        let logical_plan = plan_decoder
343            .decode(
344                bytes::Bytes::from(insert.logical_plan),
345                dummy_catalog_list,
346                false,
347            )
348            .await
349            .context(SubstraitDecodeLogicalPlanSnafu)?;
350
351        let table = self
352            .catalog_manager()
353            .table(
354                &table_name.catalog_name,
355                &table_name.schema_name,
356                &table_name.table_name,
357                None,
358            )
359            .await
360            .context(CatalogSnafu)?
361            .with_context(|| TableNotFoundSnafu {
362                table_name: [
363                    table_name.catalog_name.clone(),
364                    table_name.schema_name.clone(),
365                    table_name.table_name.clone(),
366                ]
367                .join("."),
368            })?;
369
370        let table_info = table.table_info();
371
372        let df_schema = Arc::new(
373            table_info
374                .meta
375                .schema
376                .arrow_schema()
377                .clone()
378                .try_into()
379                .context(DataFusionSnafu)?,
380        );
381
382        let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
383            .context(SubstraitDecodeLogicalPlanSnafu)?;
384
385        let engine_ctx = self.query_engine().engine_context(ctx.clone());
386        let state = engine_ctx.state();
387        // Analyze the plan
388        let analyzed_plan = state
389            .analyzer()
390            .execute_and_check(insert_into, state.config_options(), |_, _| {})
391            .context(common_query::error::GeneralDataFusionSnafu)
392            .context(SubstraitDecodeLogicalPlanSnafu)?;
393
394        // Optimize the plan
395        let optimized_plan = state
396            .optimize(&analyzed_plan)
397            .context(common_query::error::GeneralDataFusionSnafu)
398            .context(SubstraitDecodeLogicalPlanSnafu)?;
399
400        let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
401
402        Ok(attach_timer(output, timer))
403    }
404    #[tracing::instrument(skip_all)]
405    pub async fn handle_inserts(
406        &self,
407        requests: InsertRequests,
408        ctx: QueryContextRef,
409    ) -> Result<Output> {
410        self.inserter
411            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
412            .await
413            .context(TableOperationSnafu)
414    }
415
416    #[tracing::instrument(skip_all)]
417    pub async fn handle_row_inserts(
418        &self,
419        requests: RowInsertRequests,
420        ctx: QueryContextRef,
421        accommodate_existing_schema: bool,
422        is_single_value: bool,
423    ) -> Result<Output> {
424        self.inserter
425            .handle_row_inserts(
426                requests,
427                ctx,
428                self.statement_executor.as_ref(),
429                accommodate_existing_schema,
430                is_single_value,
431            )
432            .await
433            .context(TableOperationSnafu)
434    }
435
436    #[tracing::instrument(skip_all)]
437    pub async fn handle_influx_row_inserts(
438        &self,
439        requests: RowInsertRequests,
440        ctx: QueryContextRef,
441    ) -> Result<Output> {
442        self.inserter
443            .handle_last_non_null_inserts(
444                requests,
445                ctx,
446                self.statement_executor.as_ref(),
447                true,
448                // Influx protocol may writes multiple fields (values).
449                false,
450            )
451            .await
452            .context(TableOperationSnafu)
453    }
454
455    #[tracing::instrument(skip_all)]
456    pub async fn handle_metric_row_inserts(
457        &self,
458        requests: RowInsertRequests,
459        ctx: QueryContextRef,
460        physical_table: String,
461    ) -> Result<Output> {
462        self.inserter
463            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
464            .await
465            .context(TableOperationSnafu)
466    }
467
468    #[tracing::instrument(skip_all)]
469    pub async fn handle_deletes(
470        &self,
471        requests: DeleteRequests,
472        ctx: QueryContextRef,
473    ) -> Result<Output> {
474        self.deleter
475            .handle_column_deletes(requests, ctx)
476            .await
477            .context(TableOperationSnafu)
478    }
479
480    #[tracing::instrument(skip_all)]
481    pub async fn handle_row_deletes(
482        &self,
483        requests: RowDeleteRequests,
484        ctx: QueryContextRef,
485    ) -> Result<Output> {
486        self.deleter
487            .handle_row_deletes(requests, ctx)
488            .await
489            .context(TableOperationSnafu)
490    }
491}