frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::helper::from_pb_time_ranges;
18use api::v1::ddl_request::{Expr as DdlExpr, Expr};
19use api::v1::greptime_request::Request;
20use api::v1::query_request::Query;
21use api::v1::{
22    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
23    RowInsertRequests,
24};
25use async_trait::async_trait;
26use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
27use common_base::AffectedRows;
28use common_error::ext::BoxedError;
29use common_grpc::flight::FlightDecoder;
30use common_grpc::FlightData;
31use common_query::logical_plan::add_insert_to_logical_plan;
32use common_query::Output;
33use common_telemetry::tracing::{self};
34use datafusion::datasource::DefaultTableSource;
35use query::parser::PromQuery;
36use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
37use servers::query_handler::grpc::GrpcQueryHandler;
38use servers::query_handler::sql::SqlQueryHandler;
39use session::context::QueryContextRef;
40use snafu::{ensure, OptionExt, ResultExt};
41use table::table::adapter::DfTableProviderAdapter;
42use table::table_name::TableName;
43use table::TableRef;
44
45use crate::error::{
46    CatalogSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
47    PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu,
48    TableNotFoundSnafu, TableOperationSnafu,
49};
50use crate::instance::{attach_timer, Instance};
51use crate::metrics::{
52    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
53};
54
55#[async_trait]
56impl GrpcQueryHandler for Instance {
57    type Error = Error;
58
59    async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
60        let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
61        let interceptor = interceptor_ref.as_ref();
62        interceptor.pre_execute(&request, ctx.clone())?;
63
64        self.plugins
65            .get::<PermissionCheckerRef>()
66            .as_ref()
67            .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
68            .context(PermissionSnafu)?;
69
70        let _guard = if let Some(limiter) = &self.limiter {
71            Some(limiter.limit_request(&request).await?)
72        } else {
73            None
74        };
75
76        let output = match request {
77            Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
78            Request::RowInserts(requests) => {
79                self.handle_row_inserts(requests, ctx.clone(), false, false)
80                    .await?
81            }
82            Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
83            Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
84            Request::Query(query_request) => {
85                let query = query_request.query.context(IncompleteGrpcRequestSnafu {
86                    err_msg: "Missing field 'QueryRequest.query'",
87                })?;
88                match query {
89                    Query::Sql(sql) => {
90                        let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
91                        let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
92                        ensure!(
93                            result.len() == 1,
94                            NotSupportedSnafu {
95                                feat: "execute multiple statements in SQL query string through GRPC interface"
96                            }
97                        );
98                        let output = result.remove(0)?;
99                        attach_timer(output, timer)
100                    }
101                    Query::LogicalPlan(plan) => {
102                        // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
103                        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
104
105                        // use dummy catalog to provide table
106                        let plan_decoder = self
107                            .query_engine()
108                            .engine_context(ctx.clone())
109                            .new_plan_decoder()
110                            .context(PlanStatementSnafu)?;
111
112                        let dummy_catalog_list =
113                            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
114                                self.catalog_manager().clone(),
115                            ));
116
117                        let logical_plan = plan_decoder
118                            .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
119                            .await
120                            .context(SubstraitDecodeLogicalPlanSnafu)?;
121                        let output =
122                            SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
123                                .await?;
124
125                        attach_timer(output, timer)
126                    }
127                    Query::InsertIntoPlan(insert) => {
128                        self.handle_insert_plan(insert, ctx.clone()).await?
129                    }
130                    Query::PromRangeQuery(promql) => {
131                        let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
132                        let prom_query = PromQuery {
133                            query: promql.query,
134                            start: promql.start,
135                            end: promql.end,
136                            step: promql.step,
137                            lookback: promql.lookback,
138                        };
139                        let mut result =
140                            SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
141                        ensure!(
142                            result.len() == 1,
143                            NotSupportedSnafu {
144                                feat: "execute multiple statements in PromQL query string through GRPC interface"
145                            }
146                        );
147                        let output = result.remove(0)?;
148                        attach_timer(output, timer)
149                    }
150                }
151            }
152            Request::Ddl(request) => {
153                let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
154                    err_msg: "'expr' is absent in DDL request",
155                })?;
156
157                fill_catalog_and_schema_from_context(&mut expr, &ctx);
158
159                match expr {
160                    DdlExpr::CreateTable(mut expr) => {
161                        let _ = self
162                            .statement_executor
163                            .create_table_inner(&mut expr, None, ctx.clone())
164                            .await?;
165                        Output::new_with_affected_rows(0)
166                    }
167                    DdlExpr::AlterDatabase(expr) => {
168                        let _ = self
169                            .statement_executor
170                            .alter_database_inner(expr, ctx.clone())
171                            .await?;
172                        Output::new_with_affected_rows(0)
173                    }
174                    DdlExpr::AlterTable(expr) => {
175                        self.statement_executor
176                            .alter_table_inner(expr, ctx.clone())
177                            .await?
178                    }
179                    DdlExpr::CreateDatabase(expr) => {
180                        self.statement_executor
181                            .create_database(
182                                &expr.schema_name,
183                                expr.create_if_not_exists,
184                                expr.options,
185                                ctx.clone(),
186                            )
187                            .await?
188                    }
189                    DdlExpr::DropTable(expr) => {
190                        let table_name =
191                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
192                        self.statement_executor
193                            .drop_table(table_name, expr.drop_if_exists, ctx.clone())
194                            .await?
195                    }
196                    DdlExpr::TruncateTable(expr) => {
197                        let table_name =
198                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
199                        let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
200                            .map_err(BoxedError::new)
201                            .context(ExternalSnafu)?;
202                        self.statement_executor
203                            .truncate_table(table_name, time_ranges, ctx.clone())
204                            .await?
205                    }
206                    DdlExpr::CreateFlow(expr) => {
207                        self.statement_executor
208                            .create_flow_inner(expr, ctx.clone())
209                            .await?
210                    }
211                    DdlExpr::DropFlow(DropFlowExpr {
212                        catalog_name,
213                        flow_name,
214                        drop_if_exists,
215                        ..
216                    }) => {
217                        self.statement_executor
218                            .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
219                            .await?
220                    }
221                    DdlExpr::CreateView(expr) => {
222                        let _ = self
223                            .statement_executor
224                            .create_view_by_expr(expr, ctx.clone())
225                            .await?;
226
227                        Output::new_with_affected_rows(0)
228                    }
229                    DdlExpr::DropView(_) => {
230                        todo!("implemented in the following PR")
231                    }
232                }
233            }
234        };
235
236        let output = interceptor.post_execute(output, ctx)?;
237        Ok(output)
238    }
239
240    async fn put_record_batch(
241        &self,
242        table_name: &TableName,
243        table_ref: &mut Option<TableRef>,
244        decoder: &mut FlightDecoder,
245        data: FlightData,
246        ctx: QueryContextRef,
247    ) -> Result<AffectedRows> {
248        let table = if let Some(table) = table_ref {
249            table.clone()
250        } else {
251            let table = self
252                .catalog_manager()
253                .table(
254                    &table_name.catalog_name,
255                    &table_name.schema_name,
256                    &table_name.table_name,
257                    None,
258                )
259                .await
260                .context(CatalogSnafu)?
261                .with_context(|| TableNotFoundSnafu {
262                    table_name: table_name.to_string(),
263                })?;
264            *table_ref = Some(table.clone());
265            table
266        };
267
268        let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
269        let interceptor = interceptor_ref.as_ref();
270        interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
271
272        self.plugins
273            .get::<PermissionCheckerRef>()
274            .as_ref()
275            .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
276            .context(PermissionSnafu)?;
277
278        // do we check limit for bulk insert?
279
280        self.inserter
281            .handle_bulk_insert(table, decoder, data)
282            .await
283            .context(TableOperationSnafu)
284    }
285}
286
287fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
288    let catalog = ctx.current_catalog();
289    let schema = ctx.current_schema();
290
291    macro_rules! check_and_fill {
292        ($expr:ident) => {
293            if $expr.catalog_name.is_empty() {
294                $expr.catalog_name = catalog.to_string();
295            }
296            if $expr.schema_name.is_empty() {
297                $expr.schema_name = schema.to_string();
298            }
299        };
300    }
301
302    match ddl_expr {
303        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
304        Expr::CreateTable(expr) => {
305            check_and_fill!(expr);
306        }
307        Expr::AlterTable(expr) => {
308            check_and_fill!(expr);
309        }
310        Expr::DropTable(expr) => {
311            check_and_fill!(expr);
312        }
313        Expr::TruncateTable(expr) => {
314            check_and_fill!(expr);
315        }
316        Expr::CreateFlow(expr) => {
317            if expr.catalog_name.is_empty() {
318                expr.catalog_name = catalog.to_string();
319            }
320        }
321        Expr::DropFlow(expr) => {
322            if expr.catalog_name.is_empty() {
323                expr.catalog_name = catalog.to_string();
324            }
325        }
326        Expr::CreateView(expr) => {
327            check_and_fill!(expr);
328        }
329        Expr::DropView(expr) => {
330            check_and_fill!(expr);
331        }
332    }
333}
334
335impl Instance {
336    async fn handle_insert_plan(
337        &self,
338        insert: InsertIntoPlan,
339        ctx: QueryContextRef,
340    ) -> Result<Output> {
341        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
342        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
343            err_msg: "'table_name' is absent in InsertIntoPlan",
344        })?;
345
346        // use dummy catalog to provide table
347        let plan_decoder = self
348            .query_engine()
349            .engine_context(ctx.clone())
350            .new_plan_decoder()
351            .context(PlanStatementSnafu)?;
352
353        let dummy_catalog_list =
354            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
355                self.catalog_manager().clone(),
356            ));
357
358        // no optimize yet since we still need to add stuff
359        let logical_plan = plan_decoder
360            .decode(
361                bytes::Bytes::from(insert.logical_plan),
362                dummy_catalog_list,
363                false,
364            )
365            .await
366            .context(SubstraitDecodeLogicalPlanSnafu)?;
367
368        let table = self
369            .catalog_manager()
370            .table(
371                &table_name.catalog_name,
372                &table_name.schema_name,
373                &table_name.table_name,
374                None,
375            )
376            .await
377            .context(CatalogSnafu)?
378            .with_context(|| TableNotFoundSnafu {
379                table_name: [
380                    table_name.catalog_name.clone(),
381                    table_name.schema_name.clone(),
382                    table_name.table_name.clone(),
383                ]
384                .join("."),
385            })?;
386        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
387        let table_source = Arc::new(DefaultTableSource::new(table_provider));
388
389        let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
390            .context(SubstraitDecodeLogicalPlanSnafu)?;
391
392        let engine_ctx = self.query_engine().engine_context(ctx.clone());
393        let state = engine_ctx.state();
394        // Analyze the plan
395        let analyzed_plan = state
396            .analyzer()
397            .execute_and_check(insert_into, state.config_options(), |_, _| {})
398            .context(common_query::error::GeneralDataFusionSnafu)
399            .context(SubstraitDecodeLogicalPlanSnafu)?;
400
401        // Optimize the plan
402        let optimized_plan = state
403            .optimize(&analyzed_plan)
404            .context(common_query::error::GeneralDataFusionSnafu)
405            .context(SubstraitDecodeLogicalPlanSnafu)?;
406
407        let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
408
409        Ok(attach_timer(output, timer))
410    }
411    #[tracing::instrument(skip_all)]
412    pub async fn handle_inserts(
413        &self,
414        requests: InsertRequests,
415        ctx: QueryContextRef,
416    ) -> Result<Output> {
417        self.inserter
418            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
419            .await
420            .context(TableOperationSnafu)
421    }
422
423    #[tracing::instrument(skip_all)]
424    pub async fn handle_row_inserts(
425        &self,
426        requests: RowInsertRequests,
427        ctx: QueryContextRef,
428        accommodate_existing_schema: bool,
429        is_single_value: bool,
430    ) -> Result<Output> {
431        self.inserter
432            .handle_row_inserts(
433                requests,
434                ctx,
435                self.statement_executor.as_ref(),
436                accommodate_existing_schema,
437                is_single_value,
438            )
439            .await
440            .context(TableOperationSnafu)
441    }
442
443    #[tracing::instrument(skip_all)]
444    pub async fn handle_influx_row_inserts(
445        &self,
446        requests: RowInsertRequests,
447        ctx: QueryContextRef,
448    ) -> Result<Output> {
449        self.inserter
450            .handle_last_non_null_inserts(
451                requests,
452                ctx,
453                self.statement_executor.as_ref(),
454                true,
455                // Influx protocol may writes multiple fields (values).
456                false,
457            )
458            .await
459            .context(TableOperationSnafu)
460    }
461
462    #[tracing::instrument(skip_all)]
463    pub async fn handle_metric_row_inserts(
464        &self,
465        requests: RowInsertRequests,
466        ctx: QueryContextRef,
467        physical_table: String,
468    ) -> Result<Output> {
469        self.inserter
470            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
471            .await
472            .context(TableOperationSnafu)
473    }
474
475    #[tracing::instrument(skip_all)]
476    pub async fn handle_deletes(
477        &self,
478        requests: DeleteRequests,
479        ctx: QueryContextRef,
480    ) -> Result<Output> {
481        self.deleter
482            .handle_column_deletes(requests, ctx)
483            .await
484            .context(TableOperationSnafu)
485    }
486
487    #[tracing::instrument(skip_all)]
488    pub async fn handle_row_deletes(
489        &self,
490        requests: RowDeleteRequests,
491        ctx: QueryContextRef,
492    ) -> Result<Output> {
493        self.deleter
494            .handle_row_deletes(requests, ctx)
495            .await
496            .context(TableOperationSnafu)
497    }
498}