frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25    RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_base::AffectedRows;
31use common_error::ext::BoxedError;
32use common_grpc::flight::do_put::DoPutResponse;
33use common_query::Output;
34use common_query::logical_plan::add_insert_to_logical_plan;
35use common_telemetry::tracing::{self};
36use datafusion::datasource::DefaultTableSource;
37use futures::Stream;
38use futures::stream::StreamExt;
39use query::parser::PromQuery;
40use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
41use servers::query_handler::grpc::GrpcQueryHandler;
42use servers::query_handler::sql::SqlQueryHandler;
43use session::context::QueryContextRef;
44use snafu::{OptionExt, ResultExt, ensure};
45use table::TableRef;
46use table::table::adapter::DfTableProviderAdapter;
47use table::table_name::TableName;
48
49use crate::error::{
50    CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
51    NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
52    SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
53};
54use crate::instance::{Instance, attach_timer};
55use crate::metrics::{
56    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
57};
58
59#[async_trait]
60impl GrpcQueryHandler for Instance {
61    type Error = Error;
62
63    async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
64        let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
65        let interceptor = interceptor_ref.as_ref();
66        interceptor.pre_execute(&request, ctx.clone())?;
67
68        self.plugins
69            .get::<PermissionCheckerRef>()
70            .as_ref()
71            .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
72            .context(PermissionSnafu)?;
73
74        let output = match request {
75            Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
76            Request::RowInserts(requests) => {
77                self.handle_row_inserts(requests, ctx.clone(), false, false)
78                    .await?
79            }
80            Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
81            Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
82            Request::Query(query_request) => {
83                let query = query_request.query.context(IncompleteGrpcRequestSnafu {
84                    err_msg: "Missing field 'QueryRequest.query'",
85                })?;
86                match query {
87                    Query::Sql(sql) => {
88                        let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
89                        let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
90                        ensure!(
91                            result.len() == 1,
92                            NotSupportedSnafu {
93                                feat: "execute multiple statements in SQL query string through GRPC interface"
94                            }
95                        );
96                        let output = result.remove(0)?;
97                        attach_timer(output, timer)
98                    }
99                    Query::LogicalPlan(plan) => {
100                        // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
101                        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
102
103                        // use dummy catalog to provide table
104                        let plan_decoder = self
105                            .query_engine()
106                            .engine_context(ctx.clone())
107                            .new_plan_decoder()
108                            .context(PlanStatementSnafu)?;
109
110                        let dummy_catalog_list =
111                            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
112                                self.catalog_manager().clone(),
113                            ));
114
115                        let logical_plan = plan_decoder
116                            .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
117                            .await
118                            .context(SubstraitDecodeLogicalPlanSnafu)?;
119                        let output =
120                            SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
121                                .await?;
122
123                        attach_timer(output, timer)
124                    }
125                    Query::InsertIntoPlan(insert) => {
126                        self.handle_insert_plan(insert, ctx.clone()).await?
127                    }
128                    Query::PromRangeQuery(promql) => {
129                        let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
130                        let prom_query = PromQuery {
131                            query: promql.query,
132                            start: promql.start,
133                            end: promql.end,
134                            step: promql.step,
135                            lookback: promql.lookback,
136                            alias: None,
137                        };
138                        let mut result =
139                            SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
140                        ensure!(
141                            result.len() == 1,
142                            NotSupportedSnafu {
143                                feat: "execute multiple statements in PromQL query string through GRPC interface"
144                            }
145                        );
146                        let output = result.remove(0)?;
147                        attach_timer(output, timer)
148                    }
149                }
150            }
151            Request::Ddl(request) => {
152                let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
153                    err_msg: "'expr' is absent in DDL request",
154                })?;
155
156                fill_catalog_and_schema_from_context(&mut expr, &ctx);
157
158                match expr {
159                    DdlExpr::CreateTable(mut expr) => {
160                        let _ = self
161                            .statement_executor
162                            .create_table_inner(&mut expr, None, ctx.clone())
163                            .await?;
164                        Output::new_with_affected_rows(0)
165                    }
166                    DdlExpr::AlterDatabase(expr) => {
167                        let _ = self
168                            .statement_executor
169                            .alter_database_inner(expr, ctx.clone())
170                            .await?;
171                        Output::new_with_affected_rows(0)
172                    }
173                    DdlExpr::AlterTable(expr) => {
174                        self.statement_executor
175                            .alter_table_inner(expr, ctx.clone())
176                            .await?
177                    }
178                    DdlExpr::CreateDatabase(expr) => {
179                        self.statement_executor
180                            .create_database(
181                                &expr.schema_name,
182                                expr.create_if_not_exists,
183                                expr.options,
184                                ctx.clone(),
185                            )
186                            .await?
187                    }
188                    DdlExpr::DropTable(expr) => {
189                        let table_name =
190                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
191                        self.statement_executor
192                            .drop_table(table_name, expr.drop_if_exists, ctx.clone())
193                            .await?
194                    }
195                    DdlExpr::TruncateTable(expr) => {
196                        let table_name =
197                            TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
198                        let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
199                            .map_err(BoxedError::new)
200                            .context(ExternalSnafu)?;
201                        self.statement_executor
202                            .truncate_table(table_name, time_ranges, ctx.clone())
203                            .await?
204                    }
205                    DdlExpr::CreateFlow(expr) => {
206                        self.statement_executor
207                            .create_flow_inner(expr, ctx.clone())
208                            .await?
209                    }
210                    DdlExpr::DropFlow(DropFlowExpr {
211                        catalog_name,
212                        flow_name,
213                        drop_if_exists,
214                        ..
215                    }) => {
216                        self.statement_executor
217                            .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
218                            .await?
219                    }
220                    DdlExpr::CreateView(expr) => {
221                        let _ = self
222                            .statement_executor
223                            .create_view_by_expr(expr, ctx.clone())
224                            .await?;
225
226                        Output::new_with_affected_rows(0)
227                    }
228                    DdlExpr::DropView(_) => {
229                        todo!("implemented in the following PR")
230                    }
231                    DdlExpr::CommentOn(expr) => {
232                        self.statement_executor
233                            .comment_by_expr(expr, ctx.clone())
234                            .await?
235                    }
236                }
237            }
238        };
239
240        let output = interceptor.post_execute(output, ctx)?;
241        Ok(output)
242    }
243
244    async fn put_record_batch(
245        &self,
246        request: servers::grpc::flight::PutRecordBatchRequest,
247        table_ref: &mut Option<TableRef>,
248        ctx: QueryContextRef,
249    ) -> Result<AffectedRows> {
250        let table = if let Some(table) = table_ref {
251            table.clone()
252        } else {
253            let table = self
254                .catalog_manager()
255                .table(
256                    &request.table_name.catalog_name,
257                    &request.table_name.schema_name,
258                    &request.table_name.table_name,
259                    None,
260                )
261                .await
262                .context(CatalogSnafu)?
263                .with_context(|| TableNotFoundSnafu {
264                    table_name: request.table_name.to_string(),
265                })?;
266            *table_ref = Some(table.clone());
267            table
268        };
269
270        let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
271        let interceptor = interceptor_ref.as_ref();
272        interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
273
274        self.plugins
275            .get::<PermissionCheckerRef>()
276            .as_ref()
277            .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
278            .context(PermissionSnafu)?;
279
280        // do we check limit for bulk insert?
281
282        self.inserter
283            .handle_bulk_insert(
284                table,
285                request.flight_data,
286                request.record_batch,
287                request.schema_bytes,
288            )
289            .await
290            .context(TableOperationSnafu)
291    }
292
293    fn handle_put_record_batch_stream(
294        &self,
295        mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
296        ctx: QueryContextRef,
297    ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
298        // Clone all necessary data to make it 'static
299        let catalog_manager = self.catalog_manager().clone();
300        let plugins = self.plugins.clone();
301        let inserter = self.inserter.clone();
302        let ctx = ctx.clone();
303        let mut table_ref: Option<TableRef> = None;
304        let mut table_checked = false;
305
306        Box::pin(try_stream! {
307            // Process each request in the stream
308            while let Some(request_result) = stream.next().await {
309                let request = request_result.map_err(|e| {
310                    let error_msg = format!("Stream error: {:?}", e);
311                    IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
312                })?;
313
314                // Resolve table and check permissions on first RecordBatch (after schema is received)
315                if !table_checked {
316                    let table_name = &request.table_name;
317
318                    plugins
319                        .get::<PermissionCheckerRef>()
320                        .as_ref()
321                        .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
322                        .context(PermissionSnafu)?;
323
324                    // Resolve table reference
325                    table_ref = Some(
326                        catalog_manager
327                            .table(
328                                &table_name.catalog_name,
329                                &table_name.schema_name,
330                                &table_name.table_name,
331                                None,
332                            )
333                            .await
334                            .context(CatalogSnafu)?
335                            .with_context(|| TableNotFoundSnafu {
336                                table_name: table_name.to_string(),
337                            })?,
338                    );
339
340                    // Check permissions for the table
341                    let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
342                    let interceptor = interceptor_ref.as_ref();
343                    interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
344
345                    table_checked = true;
346                }
347
348                let request_id = request.request_id;
349                let start = Instant::now();
350                let rows = inserter
351                    .handle_bulk_insert(
352                        table_ref.clone().unwrap(),
353                        request.flight_data,
354                        request.record_batch,
355                        request.schema_bytes,
356                    )
357                    .await
358                    .context(TableOperationSnafu)?;
359                let elapsed_secs = start.elapsed().as_secs_f64();
360                yield DoPutResponse::new(request_id, rows, elapsed_secs);
361            }
362        })
363    }
364}
365
366fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
367    let catalog = ctx.current_catalog();
368    let schema = ctx.current_schema();
369
370    macro_rules! check_and_fill {
371        ($expr:ident) => {
372            if $expr.catalog_name.is_empty() {
373                $expr.catalog_name = catalog.to_string();
374            }
375            if $expr.schema_name.is_empty() {
376                $expr.schema_name = schema.to_string();
377            }
378        };
379    }
380
381    match ddl_expr {
382        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
383        Expr::CreateTable(expr) => {
384            check_and_fill!(expr);
385        }
386        Expr::AlterTable(expr) => {
387            check_and_fill!(expr);
388        }
389        Expr::DropTable(expr) => {
390            check_and_fill!(expr);
391        }
392        Expr::TruncateTable(expr) => {
393            check_and_fill!(expr);
394        }
395        Expr::CreateFlow(expr) => {
396            if expr.catalog_name.is_empty() {
397                expr.catalog_name = catalog.to_string();
398            }
399        }
400        Expr::DropFlow(expr) => {
401            if expr.catalog_name.is_empty() {
402                expr.catalog_name = catalog.to_string();
403            }
404        }
405        Expr::CreateView(expr) => {
406            check_and_fill!(expr);
407        }
408        Expr::DropView(expr) => {
409            check_and_fill!(expr);
410        }
411        Expr::CommentOn(expr) => {
412            check_and_fill!(expr);
413        }
414    }
415}
416
417impl Instance {
418    async fn handle_insert_plan(
419        &self,
420        insert: InsertIntoPlan,
421        ctx: QueryContextRef,
422    ) -> Result<Output> {
423        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
424        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
425            err_msg: "'table_name' is absent in InsertIntoPlan",
426        })?;
427
428        // use dummy catalog to provide table
429        let plan_decoder = self
430            .query_engine()
431            .engine_context(ctx.clone())
432            .new_plan_decoder()
433            .context(PlanStatementSnafu)?;
434
435        let dummy_catalog_list =
436            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
437                self.catalog_manager().clone(),
438            ));
439
440        // no optimize yet since we still need to add stuff
441        let logical_plan = plan_decoder
442            .decode(
443                bytes::Bytes::from(insert.logical_plan),
444                dummy_catalog_list,
445                false,
446            )
447            .await
448            .context(SubstraitDecodeLogicalPlanSnafu)?;
449
450        let table = self
451            .catalog_manager()
452            .table(
453                &table_name.catalog_name,
454                &table_name.schema_name,
455                &table_name.table_name,
456                None,
457            )
458            .await
459            .context(CatalogSnafu)?
460            .with_context(|| TableNotFoundSnafu {
461                table_name: [
462                    table_name.catalog_name.clone(),
463                    table_name.schema_name.clone(),
464                    table_name.table_name.clone(),
465                ]
466                .join("."),
467            })?;
468        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
469        let table_source = Arc::new(DefaultTableSource::new(table_provider));
470
471        let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
472            .context(SubstraitDecodeLogicalPlanSnafu)?;
473
474        let engine_ctx = self.query_engine().engine_context(ctx.clone());
475        let state = engine_ctx.state();
476        // Analyze the plan
477        let analyzed_plan = state
478            .analyzer()
479            .execute_and_check(insert_into, state.config_options(), |_, _| {})
480            .context(DataFusionSnafu)?;
481
482        // Optimize the plan
483        let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
484
485        let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
486
487        Ok(attach_timer(output, timer))
488    }
489    #[tracing::instrument(skip_all)]
490    pub async fn handle_inserts(
491        &self,
492        requests: InsertRequests,
493        ctx: QueryContextRef,
494    ) -> Result<Output> {
495        self.inserter
496            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
497            .await
498            .context(TableOperationSnafu)
499    }
500
501    #[tracing::instrument(skip_all)]
502    pub async fn handle_row_inserts(
503        &self,
504        requests: RowInsertRequests,
505        ctx: QueryContextRef,
506        accommodate_existing_schema: bool,
507        is_single_value: bool,
508    ) -> Result<Output> {
509        self.inserter
510            .handle_row_inserts(
511                requests,
512                ctx,
513                self.statement_executor.as_ref(),
514                accommodate_existing_schema,
515                is_single_value,
516            )
517            .await
518            .context(TableOperationSnafu)
519    }
520
521    #[tracing::instrument(skip_all)]
522    pub async fn handle_influx_row_inserts(
523        &self,
524        requests: RowInsertRequests,
525        ctx: QueryContextRef,
526    ) -> Result<Output> {
527        self.inserter
528            .handle_last_non_null_inserts(
529                requests,
530                ctx,
531                self.statement_executor.as_ref(),
532                true,
533                // Influx protocol may writes multiple fields (values).
534                false,
535            )
536            .await
537            .context(TableOperationSnafu)
538    }
539
540    #[tracing::instrument(skip_all)]
541    pub async fn handle_metric_row_inserts(
542        &self,
543        requests: RowInsertRequests,
544        ctx: QueryContextRef,
545        physical_table: String,
546    ) -> Result<Output> {
547        self.inserter
548            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
549            .await
550            .context(TableOperationSnafu)
551    }
552
553    #[tracing::instrument(skip_all)]
554    pub async fn handle_deletes(
555        &self,
556        requests: DeleteRequests,
557        ctx: QueryContextRef,
558    ) -> Result<Output> {
559        self.deleter
560            .handle_column_deletes(requests, ctx)
561            .await
562            .context(TableOperationSnafu)
563    }
564
565    #[tracing::instrument(skip_all)]
566    pub async fn handle_row_deletes(
567        &self,
568        requests: RowDeleteRequests,
569        ctx: QueryContextRef,
570    ) -> Result<Output> {
571        self.deleter
572            .handle_row_deletes(requests, ctx)
573            .await
574            .context(TableOperationSnafu)
575    }
576}