frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::ddl_request::{Expr as DdlExpr, Expr};
18use api::v1::greptime_request::Request;
19use api::v1::query_request::Query;
20use api::v1::{
21    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
22    RowInsertRequests,
23};
24use async_trait::async_trait;
25use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
26use common_base::AffectedRows;
27use common_query::logical_plan::add_insert_to_logical_plan;
28use common_query::Output;
29use common_telemetry::tracing::{self};
30use query::parser::PromQuery;
31use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
32use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
33use servers::query_handler::sql::SqlQueryHandler;
34use session::context::QueryContextRef;
35use snafu::{ensure, OptionExt, ResultExt};
36use table::table_name::TableName;
37
38use crate::error::{
39    CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
40    IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
41    SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
42};
43use crate::instance::{attach_timer, Instance};
44use crate::metrics::{
45    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
46};
47
48#[async_trait]
49impl GrpcQueryHandler for Instance {
50    type Error = Error;
51
52    async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
53        let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
54        let interceptor = interceptor_ref.as_ref();
55        interceptor.pre_execute(&request, ctx.clone())?;
56
57        self.plugins
58            .get::<PermissionCheckerRef>()
59            .as_ref()
60            .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
61            .context(PermissionSnafu)?;
62
63        let _guard = if let Some(limiter) = &self.limiter {
64            let result = limiter.limit_request(&request);
65            if result.is_none() {
66                return InFlightWriteBytesExceededSnafu.fail();
67            }
68            result
69        } else {
70            None
71        };
72
73        let output = match request {
74            Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
75            Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
76            Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
77            Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
78            Request::Query(query_request) => {
79                let query = query_request.query.context(IncompleteGrpcRequestSnafu {
80                    err_msg: "Missing field 'QueryRequest.query'",
81                })?;
82                match query {
83                    Query::Sql(sql) => {
84                        let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
85                        let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
86                        ensure!(
87                            result.len() == 1,
88                            NotSupportedSnafu {
89                                feat: "execute multiple statements in SQL query string through GRPC interface"
90                            }
91                        );
92                        let output = result.remove(0)?;
93                        attach_timer(output, timer)
94                    }
95                    Query::LogicalPlan(plan) => {
96                        // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
97                        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
98
99                        // use dummy catalog to provide table
100                        let plan_decoder = self
101                            .query_engine()
102                            .engine_context(ctx.clone())
103                            .new_plan_decoder()
104                            .context(PlanStatementSnafu)?;
105
106                        let dummy_catalog_list =
107                            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
108                                self.catalog_manager().clone(),
109                            ));
110
111                        let logical_plan = plan_decoder
112                            .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
113                            .await
114                            .context(SubstraitDecodeLogicalPlanSnafu)?;
115                        let output =
116                            SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
117
118                        attach_timer(output, timer)
119                    }
120                    Query::InsertIntoPlan(insert) => {
121                        self.handle_insert_plan(insert, ctx.clone()).await?
122                    }
123                    Query::PromRangeQuery(promql) => {
124                        let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
125                        let prom_query = PromQuery {
126                            query: promql.query,
127                            start: promql.start,
128                            end: promql.end,
129                            step: promql.step,
130                            lookback: promql.lookback,
131                        };
132                        let mut result =
133                            SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
134                        ensure!(
135                            result.len() == 1,
136                            NotSupportedSnafu {
137                                feat: "execute multiple statements in PromQL query string through GRPC interface"
138                            }
139                        );
140                        let output = result.remove(0)?;
141                        attach_timer(output, timer)
142                    }
143                }
144            }
145            Request::Ddl(request) => {
146                let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
147                    err_msg: "'expr' is absent in DDL request",
148                })?;
149
150                fill_catalog_and_schema_from_context(&mut expr, &ctx);
151
152                match expr {
153                    DdlExpr::CreateTable(mut expr) => {
154                        let _ = self
155                            .statement_executor
156                            .create_table_inner(&mut expr, None, ctx.clone())
157                            .await?;
158                        Output::new_with_affected_rows(0)
159                    }
160                    DdlExpr::AlterDatabase(expr) => {
161                        let _ = self
162                            .statement_executor
163                            .alter_database_inner(expr, ctx.clone())
164                            .await?;
165                        Output::new_with_affected_rows(0)
166                    }
167                    DdlExpr::AlterTable(expr) => {
168                        self.statement_executor
169                            .alter_table_inner(expr, ctx.clone())
170                            .await?
171                    }
172                    DdlExpr::CreateDatabase(expr) => {
173                        self.statement_executor
174                            .create_database(
175                                &expr.schema_name,
176                                expr.create_if_not_exists,
177                                expr.options,
178                                ctx.clone(),
179                            )
180                            .await?
181                    }
182                    DdlExpr::DropTable(expr) => {
183                        let table_name =
184                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
185                        self.statement_executor
186                            .drop_table(table_name, expr.drop_if_exists, ctx.clone())
187                            .await?
188                    }
189                    DdlExpr::TruncateTable(expr) => {
190                        let table_name =
191                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
192                        self.statement_executor
193                            .truncate_table(table_name, ctx.clone())
194                            .await?
195                    }
196                    DdlExpr::CreateFlow(expr) => {
197                        self.statement_executor
198                            .create_flow_inner(expr, ctx.clone())
199                            .await?
200                    }
201                    DdlExpr::DropFlow(DropFlowExpr {
202                        catalog_name,
203                        flow_name,
204                        drop_if_exists,
205                        ..
206                    }) => {
207                        self.statement_executor
208                            .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
209                            .await?
210                    }
211                    DdlExpr::CreateView(expr) => {
212                        let _ = self
213                            .statement_executor
214                            .create_view_by_expr(expr, ctx.clone())
215                            .await?;
216
217                        Output::new_with_affected_rows(0)
218                    }
219                    DdlExpr::DropView(_) => {
220                        todo!("implemented in the following PR")
221                    }
222                }
223            }
224        };
225
226        let output = interceptor.post_execute(output, ctx)?;
227        Ok(output)
228    }
229
230    async fn put_record_batch(
231        &self,
232        table: &TableName,
233        record_batch: RawRecordBatch,
234    ) -> Result<AffectedRows> {
235        let _table = self
236            .catalog_manager()
237            .table(
238                &table.catalog_name,
239                &table.schema_name,
240                &table.table_name,
241                None,
242            )
243            .await
244            .context(CatalogSnafu)?
245            .with_context(|| TableNotFoundSnafu {
246                table_name: table.to_string(),
247            })?;
248
249        // TODO(LFC): Implement it.
250        common_telemetry::debug!(
251            "calling put_record_batch with table: {:?} and record_batch size: {}",
252            table,
253            record_batch.len()
254        );
255        Ok(record_batch.len())
256    }
257}
258
259fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
260    let catalog = ctx.current_catalog();
261    let schema = ctx.current_schema();
262
263    macro_rules! check_and_fill {
264        ($expr:ident) => {
265            if $expr.catalog_name.is_empty() {
266                $expr.catalog_name = catalog.to_string();
267            }
268            if $expr.schema_name.is_empty() {
269                $expr.schema_name = schema.to_string();
270            }
271        };
272    }
273
274    match ddl_expr {
275        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
276        Expr::CreateTable(expr) => {
277            check_and_fill!(expr);
278        }
279        Expr::AlterTable(expr) => {
280            check_and_fill!(expr);
281        }
282        Expr::DropTable(expr) => {
283            check_and_fill!(expr);
284        }
285        Expr::TruncateTable(expr) => {
286            check_and_fill!(expr);
287        }
288        Expr::CreateFlow(expr) => {
289            if expr.catalog_name.is_empty() {
290                expr.catalog_name = catalog.to_string();
291            }
292        }
293        Expr::DropFlow(expr) => {
294            if expr.catalog_name.is_empty() {
295                expr.catalog_name = catalog.to_string();
296            }
297        }
298        Expr::CreateView(expr) => {
299            check_and_fill!(expr);
300        }
301        Expr::DropView(expr) => {
302            check_and_fill!(expr);
303        }
304    }
305}
306
307impl Instance {
308    async fn handle_insert_plan(
309        &self,
310        insert: InsertIntoPlan,
311        ctx: QueryContextRef,
312    ) -> Result<Output> {
313        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
314        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
315            err_msg: "'table_name' is absent in InsertIntoPlan",
316        })?;
317
318        // use dummy catalog to provide table
319        let plan_decoder = self
320            .query_engine()
321            .engine_context(ctx.clone())
322            .new_plan_decoder()
323            .context(PlanStatementSnafu)?;
324
325        let dummy_catalog_list =
326            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
327                self.catalog_manager().clone(),
328            ));
329
330        // no optimize yet since we still need to add stuff
331        let logical_plan = plan_decoder
332            .decode(
333                bytes::Bytes::from(insert.logical_plan),
334                dummy_catalog_list,
335                false,
336            )
337            .await
338            .context(SubstraitDecodeLogicalPlanSnafu)?;
339
340        let table = self
341            .catalog_manager()
342            .table(
343                &table_name.catalog_name,
344                &table_name.schema_name,
345                &table_name.table_name,
346                None,
347            )
348            .await
349            .context(CatalogSnafu)?
350            .with_context(|| TableNotFoundSnafu {
351                table_name: [
352                    table_name.catalog_name.clone(),
353                    table_name.schema_name.clone(),
354                    table_name.table_name.clone(),
355                ]
356                .join("."),
357            })?;
358
359        let table_info = table.table_info();
360
361        let df_schema = Arc::new(
362            table_info
363                .meta
364                .schema
365                .arrow_schema()
366                .clone()
367                .try_into()
368                .context(DataFusionSnafu)?,
369        );
370
371        let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)
372            .context(SubstraitDecodeLogicalPlanSnafu)?;
373
374        let engine_ctx = self.query_engine().engine_context(ctx.clone());
375        let state = engine_ctx.state();
376        // Analyze the plan
377        let analyzed_plan = state
378            .analyzer()
379            .execute_and_check(insert_into, state.config_options(), |_, _| {})
380            .context(common_query::error::GeneralDataFusionSnafu)
381            .context(SubstraitDecodeLogicalPlanSnafu)?;
382
383        // Optimize the plan
384        let optimized_plan = state
385            .optimize(&analyzed_plan)
386            .context(common_query::error::GeneralDataFusionSnafu)
387            .context(SubstraitDecodeLogicalPlanSnafu)?;
388
389        let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
390
391        Ok(attach_timer(output, timer))
392    }
393    #[tracing::instrument(skip_all)]
394    pub async fn handle_inserts(
395        &self,
396        requests: InsertRequests,
397        ctx: QueryContextRef,
398    ) -> Result<Output> {
399        self.inserter
400            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
401            .await
402            .context(TableOperationSnafu)
403    }
404
405    #[tracing::instrument(skip_all)]
406    pub async fn handle_row_inserts(
407        &self,
408        requests: RowInsertRequests,
409        ctx: QueryContextRef,
410    ) -> Result<Output> {
411        self.inserter
412            .handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
413            .await
414            .context(TableOperationSnafu)
415    }
416
417    #[tracing::instrument(skip_all)]
418    pub async fn handle_influx_row_inserts(
419        &self,
420        requests: RowInsertRequests,
421        ctx: QueryContextRef,
422    ) -> Result<Output> {
423        self.inserter
424            .handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
425            .await
426            .context(TableOperationSnafu)
427    }
428
429    #[tracing::instrument(skip_all)]
430    pub async fn handle_metric_row_inserts(
431        &self,
432        requests: RowInsertRequests,
433        ctx: QueryContextRef,
434        physical_table: String,
435    ) -> Result<Output> {
436        self.inserter
437            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
438            .await
439            .context(TableOperationSnafu)
440    }
441
442    #[tracing::instrument(skip_all)]
443    pub async fn handle_deletes(
444        &self,
445        requests: DeleteRequests,
446        ctx: QueryContextRef,
447    ) -> Result<Output> {
448        self.deleter
449            .handle_column_deletes(requests, ctx)
450            .await
451            .context(TableOperationSnafu)
452    }
453
454    #[tracing::instrument(skip_all)]
455    pub async fn handle_row_deletes(
456        &self,
457        requests: RowDeleteRequests,
458        ctx: QueryContextRef,
459    ) -> Result<Output> {
460        self.deleter
461            .handle_row_deletes(requests, ctx)
462            .await
463            .context(TableOperationSnafu)
464    }
465}