frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25    RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_error::ext::BoxedError;
31use common_grpc::flight::do_put::DoPutResponse;
32use common_query::Output;
33use common_query::logical_plan::add_insert_to_logical_plan;
34use common_telemetry::tracing::{self};
35use datafusion::datasource::DefaultTableSource;
36use futures::Stream;
37use futures::stream::StreamExt;
38use query::parser::PromQuery;
39use servers::error as server_error;
40use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
41use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
42use servers::query_handler::grpc::GrpcQueryHandler;
43use session::context::QueryContextRef;
44use snafu::{OptionExt, ResultExt, ensure};
45use table::TableRef;
46use table::table::adapter::DfTableProviderAdapter;
47use table::table_name::TableName;
48
49use crate::error::{
50    CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
51    NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
52    SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
53};
54use crate::instance::{Instance, attach_timer};
55use crate::metrics::{
56    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
57};
58
59#[async_trait]
60impl GrpcQueryHandler for Instance {
61    async fn do_query(
62        &self,
63        request: Request,
64        ctx: QueryContextRef,
65    ) -> server_error::Result<Output> {
66        let result: Result<Output> = async {
67            let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
68            let interceptor = interceptor_ref.as_ref();
69            interceptor.pre_execute(&request, ctx.clone())?;
70
71            self.plugins
72                .get::<PermissionCheckerRef>()
73                .as_ref()
74                .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
75                .context(PermissionSnafu)?;
76
77            let output = match request {
78                Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
79                Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
80                    Some(physical_table) => {
81                        self.handle_metric_row_inserts(
82                            requests,
83                            ctx.clone(),
84                            physical_table.to_string(),
85                        )
86                        .await?
87                    }
88                    None => {
89                        self.handle_row_inserts(requests, ctx.clone(), false, false)
90                            .await?
91                    }
92                },
93                Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
94                Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
95                Request::Query(query_request) => {
96                    let query = query_request.query.context(IncompleteGrpcRequestSnafu {
97                        err_msg: "Missing field 'QueryRequest.query'",
98                    })?;
99                    match query {
100                        Query::Sql(sql) => {
101                            let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
102                            let mut result = self.do_query_inner(&sql, ctx.clone()).await;
103                            ensure!(
104                                result.len() == 1,
105                                NotSupportedSnafu {
106                                    feat: "execute multiple statements in SQL query string through GRPC interface"
107                                }
108                            );
109                            let output = result.remove(0)?;
110                            attach_timer(output, timer)
111                        }
112                        Query::LogicalPlan(plan) => {
113                            // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
114                            let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
115
116                            // use dummy catalog to provide table
117                            let plan_decoder = self
118                                .query_engine()
119                                .engine_context(ctx.clone())
120                                .new_plan_decoder()
121                                .context(PlanStatementSnafu)?;
122
123                            let dummy_catalog_list =
124                                Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
125                                    self.catalog_manager().clone(),
126                                ));
127
128                            let logical_plan = plan_decoder
129                                .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
130                                .await
131                                .context(SubstraitDecodeLogicalPlanSnafu)?;
132                            let output =
133                                self.do_exec_plan_inner(None, logical_plan, ctx.clone()).await?;
134
135                            attach_timer(output, timer)
136                        }
137                        Query::InsertIntoPlan(insert) => {
138                            self.handle_insert_plan(insert, ctx.clone()).await?
139                        }
140                        Query::PromRangeQuery(promql) => {
141                            let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
142                            let prom_query = PromQuery {
143                                query: promql.query,
144                                start: promql.start,
145                                end: promql.end,
146                                step: promql.step,
147                                lookback: promql.lookback,
148                                alias: None,
149                            };
150                            let mut result =
151                                self.do_promql_query_inner(&prom_query, ctx.clone()).await;
152                            ensure!(
153                                result.len() == 1,
154                                NotSupportedSnafu {
155                                    feat: "execute multiple statements in PromQL query string through GRPC interface"
156                                }
157                            );
158                            let output = result.remove(0)?;
159                            attach_timer(output, timer)
160                        }
161                    }
162                }
163                Request::Ddl(request) => {
164                    let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
165                        err_msg: "'expr' is absent in DDL request",
166                    })?;
167
168                    fill_catalog_and_schema_from_context(&mut expr, &ctx);
169
170                    match expr {
171                        DdlExpr::CreateTable(mut expr) => {
172                            let _ = self
173                                .statement_executor
174                                .create_table_inner(&mut expr, None, ctx.clone())
175                                .await?;
176                            Output::new_with_affected_rows(0)
177                        }
178                        DdlExpr::AlterDatabase(expr) => {
179                            let _ = self
180                                .statement_executor
181                                .alter_database_inner(expr, ctx.clone())
182                                .await?;
183                            Output::new_with_affected_rows(0)
184                        }
185                        DdlExpr::AlterTable(expr) => {
186                            self.statement_executor
187                                .alter_table_inner(expr, ctx.clone())
188                                .await?
189                        }
190                        DdlExpr::CreateDatabase(expr) => {
191                            self.statement_executor
192                                .create_database(
193                                    &expr.schema_name,
194                                    expr.create_if_not_exists,
195                                    expr.options,
196                                    ctx.clone(),
197                                )
198                                .await?
199                        }
200                        DdlExpr::DropTable(expr) => {
201                            let table_name =
202                                TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
203                            self.statement_executor
204                                .drop_table(table_name, expr.drop_if_exists, ctx.clone())
205                                .await?
206                        }
207                        DdlExpr::TruncateTable(expr) => {
208                            let table_name =
209                                TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
210                            let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
211                                .map_err(BoxedError::new)
212                                .context(ExternalSnafu)?;
213                            self.statement_executor
214                                .truncate_table(table_name, time_ranges, ctx.clone())
215                                .await?
216                        }
217                        DdlExpr::CreateFlow(expr) => {
218                            self.statement_executor
219                                .create_flow_inner(expr, ctx.clone())
220                                .await?
221                        }
222                        DdlExpr::DropFlow(DropFlowExpr {
223                            catalog_name,
224                            flow_name,
225                            drop_if_exists,
226                            ..
227                        }) => {
228                            self.statement_executor
229                                .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
230                                .await?
231                        }
232                        DdlExpr::CreateView(expr) => {
233                            let _ = self
234                                .statement_executor
235                                .create_view_by_expr(expr, ctx.clone())
236                                .await?;
237
238                            Output::new_with_affected_rows(0)
239                        }
240                        DdlExpr::DropView(_) => {
241                            todo!("implemented in the following PR")
242                        }
243                        DdlExpr::CommentOn(expr) => {
244                            self.statement_executor
245                                .comment_by_expr(expr, ctx.clone())
246                                .await?
247                        }
248                    }
249                }
250            };
251
252            let output = interceptor.post_execute(output, ctx)?;
253            Ok(output)
254        }
255        .await;
256
257        result
258            .map_err(BoxedError::new)
259            .context(server_error::ExecuteGrpcQuerySnafu)
260    }
261
262    fn handle_put_record_batch_stream(
263        &self,
264        stream: servers::grpc::flight::PutRecordBatchRequestStream,
265        ctx: QueryContextRef,
266    ) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
267        Box::pin(
268            self.handle_put_record_batch_stream_inner(stream, ctx)
269                .map(|result| {
270                    result
271                        .map_err(BoxedError::new)
272                        .context(server_error::ExecuteGrpcRequestSnafu)
273                }),
274        )
275    }
276}
277
278fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
279    let catalog = ctx.current_catalog();
280    let schema = ctx.current_schema();
281
282    macro_rules! check_and_fill {
283        ($expr:ident) => {
284            if $expr.catalog_name.is_empty() {
285                $expr.catalog_name = catalog.to_string();
286            }
287            if $expr.schema_name.is_empty() {
288                $expr.schema_name = schema.to_string();
289            }
290        };
291    }
292
293    match ddl_expr {
294        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
295        Expr::CreateTable(expr) => {
296            check_and_fill!(expr);
297        }
298        Expr::AlterTable(expr) => {
299            check_and_fill!(expr);
300        }
301        Expr::DropTable(expr) => {
302            check_and_fill!(expr);
303        }
304        Expr::TruncateTable(expr) => {
305            check_and_fill!(expr);
306        }
307        Expr::CreateFlow(expr) => {
308            if expr.catalog_name.is_empty() {
309                expr.catalog_name = catalog.to_string();
310            }
311        }
312        Expr::DropFlow(expr) => {
313            if expr.catalog_name.is_empty() {
314                expr.catalog_name = catalog.to_string();
315            }
316        }
317        Expr::CreateView(expr) => {
318            check_and_fill!(expr);
319        }
320        Expr::DropView(expr) => {
321            check_and_fill!(expr);
322        }
323        Expr::CommentOn(expr) => {
324            check_and_fill!(expr);
325        }
326    }
327}
328
329impl Instance {
330    fn handle_put_record_batch_stream_inner(
331        &self,
332        mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
333        ctx: QueryContextRef,
334    ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
335        // Clone all necessary data to make it 'static
336        let catalog_manager = self.catalog_manager().clone();
337        let plugins = self.plugins.clone();
338        let inserter = self.inserter.clone();
339        let ctx = ctx.clone();
340        let mut table_ref: Option<TableRef> = None;
341        let mut table_checked = false;
342
343        Box::pin(try_stream! {
344            // Process each request in the stream
345            while let Some(request_result) = stream.next().await {
346                let request = request_result.map_err(|e| {
347                    let error_msg = format!("Stream error: {:?}", e);
348                    IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
349                })?;
350
351                // Resolve table and check permissions on first RecordBatch (after schema is received)
352                if !table_checked {
353                    let table_name = &request.table_name;
354
355                    plugins
356                        .get::<PermissionCheckerRef>()
357                        .as_ref()
358                        .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
359                        .context(PermissionSnafu)?;
360
361                    // Resolve table reference
362                    table_ref = Some(
363                        catalog_manager
364                            .table(
365                                &table_name.catalog_name,
366                                &table_name.schema_name,
367                                &table_name.table_name,
368                                None,
369                            )
370                            .await
371                            .context(CatalogSnafu)?
372                            .with_context(|| TableNotFoundSnafu {
373                                table_name: table_name.to_string(),
374                            })?,
375                    );
376
377                    // Check permissions for the table
378                    let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
379                    let interceptor = interceptor_ref.as_ref();
380                    interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
381
382                    table_checked = true;
383                }
384
385                let request_id = request.request_id;
386                let start = Instant::now();
387                let rows = inserter
388                    .handle_bulk_insert(
389                        table_ref.clone().unwrap(),
390                        request.flight_data,
391                        request.record_batch,
392                        request.schema_bytes,
393                    )
394                    .await
395                    .context(TableOperationSnafu)?;
396                let elapsed_secs = start.elapsed().as_secs_f64();
397                yield DoPutResponse::new(request_id, rows, elapsed_secs);
398            }
399        })
400    }
401
402    async fn handle_insert_plan(
403        &self,
404        insert: InsertIntoPlan,
405        ctx: QueryContextRef,
406    ) -> Result<Output> {
407        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
408        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
409            err_msg: "'table_name' is absent in InsertIntoPlan",
410        })?;
411
412        // use dummy catalog to provide table
413        let plan_decoder = self
414            .query_engine()
415            .engine_context(ctx.clone())
416            .new_plan_decoder()
417            .context(PlanStatementSnafu)?;
418
419        let dummy_catalog_list =
420            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
421                self.catalog_manager().clone(),
422            ));
423
424        // no optimize yet since we still need to add stuff
425        let logical_plan = plan_decoder
426            .decode(
427                bytes::Bytes::from(insert.logical_plan),
428                dummy_catalog_list,
429                false,
430            )
431            .await
432            .context(SubstraitDecodeLogicalPlanSnafu)?;
433
434        let table = self
435            .catalog_manager()
436            .table(
437                &table_name.catalog_name,
438                &table_name.schema_name,
439                &table_name.table_name,
440                None,
441            )
442            .await
443            .context(CatalogSnafu)?
444            .with_context(|| TableNotFoundSnafu {
445                table_name: [
446                    table_name.catalog_name.clone(),
447                    table_name.schema_name.clone(),
448                    table_name.table_name.clone(),
449                ]
450                .join("."),
451            })?;
452        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
453        let table_source = Arc::new(DefaultTableSource::new(table_provider));
454
455        let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
456            .context(SubstraitDecodeLogicalPlanSnafu)?;
457
458        let engine_ctx = self.query_engine().engine_context(ctx.clone());
459        let state = engine_ctx.state();
460        // Analyze the plan
461        let analyzed_plan = state
462            .analyzer()
463            .execute_and_check(insert_into, state.config_options(), |_, _| {})
464            .context(DataFusionSnafu)?;
465
466        // Optimize the plan
467        let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
468
469        let output = self
470            .do_exec_plan_inner(None, optimized_plan, ctx.clone())
471            .await?;
472
473        Ok(attach_timer(output, timer))
474    }
475    #[tracing::instrument(skip_all)]
476    pub async fn handle_inserts(
477        &self,
478        requests: InsertRequests,
479        ctx: QueryContextRef,
480    ) -> Result<Output> {
481        self.inserter
482            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
483            .await
484            .context(TableOperationSnafu)
485    }
486
487    #[tracing::instrument(skip_all)]
488    pub async fn handle_row_inserts(
489        &self,
490        requests: RowInsertRequests,
491        ctx: QueryContextRef,
492        accommodate_existing_schema: bool,
493        is_single_value: bool,
494    ) -> Result<Output> {
495        self.inserter
496            .handle_row_inserts(
497                requests,
498                ctx,
499                self.statement_executor.as_ref(),
500                accommodate_existing_schema,
501                is_single_value,
502            )
503            .await
504            .context(TableOperationSnafu)
505    }
506
507    #[tracing::instrument(skip_all)]
508    pub async fn handle_influx_row_inserts(
509        &self,
510        requests: RowInsertRequests,
511        ctx: QueryContextRef,
512    ) -> Result<Output> {
513        self.inserter
514            .handle_last_non_null_inserts(
515                requests,
516                ctx,
517                self.statement_executor.as_ref(),
518                true,
519                // Influx protocol may writes multiple fields (values).
520                false,
521            )
522            .await
523            .context(TableOperationSnafu)
524    }
525
526    #[tracing::instrument(skip_all)]
527    pub async fn handle_metric_row_inserts(
528        &self,
529        requests: RowInsertRequests,
530        ctx: QueryContextRef,
531        physical_table: String,
532    ) -> Result<Output> {
533        self.inserter
534            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
535            .await
536            .context(TableOperationSnafu)
537    }
538
539    #[tracing::instrument(skip_all)]
540    pub async fn handle_deletes(
541        &self,
542        requests: DeleteRequests,
543        ctx: QueryContextRef,
544    ) -> Result<Output> {
545        self.deleter
546            .handle_column_deletes(requests, ctx)
547            .await
548            .context(TableOperationSnafu)
549    }
550
551    #[tracing::instrument(skip_all)]
552    pub async fn handle_row_deletes(
553        &self,
554        requests: RowDeleteRequests,
555        ctx: QueryContextRef,
556    ) -> Result<Output> {
557        self.deleter
558            .handle_row_deletes(requests, ctx)
559            .await
560            .context(TableOperationSnafu)
561    }
562}