Skip to main content

frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25    RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_error::ext::BoxedError;
31use common_grpc::flight::do_put::DoPutResponse;
32use common_query::Output;
33use common_query::logical_plan::add_insert_to_logical_plan;
34use common_telemetry::tracing::{self};
35use datafusion::datasource::DefaultTableSource;
36use futures::Stream;
37use futures::stream::StreamExt;
38use query::parser::PromQuery;
39use servers::error as server_error;
40use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
41use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
42use servers::query_handler::grpc::GrpcQueryHandler;
43use session::context::QueryContextRef;
44use snafu::{OptionExt, ResultExt, ensure};
45use table::TableRef;
46use table::table::adapter::DfTableProviderAdapter;
47use table::table_name::TableName;
48
49use crate::error::{
50    CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
51    NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
52    SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
53};
54use crate::instance::{Instance, attach_timer};
55use crate::metrics::{
56    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
57};
58
59#[async_trait]
60impl GrpcQueryHandler for Instance {
61    async fn do_query(
62        &self,
63        request: Request,
64        ctx: QueryContextRef,
65    ) -> server_error::Result<Output> {
66        let result: Result<Output> = async {
67            let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
68            let interceptor = interceptor_ref.as_ref();
69            interceptor.pre_execute(&request, ctx.clone())?;
70
71            self.plugins
72                .get::<PermissionCheckerRef>()
73                .as_ref()
74                .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
75                .context(PermissionSnafu)?;
76
77            let output = match request {
78                Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
79                Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
80                    Some(physical_table) => {
81                        self.handle_metric_row_inserts(
82                            requests,
83                            ctx.clone(),
84                            physical_table.to_string(),
85                        )
86                        .await?
87                    }
88                    None => {
89                        self.handle_row_inserts(requests, ctx.clone(), false, false)
90                            .await?
91                    }
92                },
93                Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
94                Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
95                Request::Query(query_request) => {
96                    let query = query_request.query.context(IncompleteGrpcRequestSnafu {
97                        err_msg: "Missing field 'QueryRequest.query'",
98                    })?;
99                    match query {
100                        Query::Sql(sql) => {
101                            let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
102                            let mut result = self.do_query_inner(&sql, ctx.clone()).await;
103                            ensure!(
104                                result.len() == 1,
105                                NotSupportedSnafu {
106                                    feat: "execute multiple statements in SQL query string through GRPC interface"
107                                }
108                            );
109                            let output = result.remove(0)?;
110                            attach_timer(output, timer)
111                        }
112                        Query::LogicalPlan(plan) => {
113                            // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
114                            let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
115
116                            // use dummy catalog to provide table
117                            let plan_decoder = self
118                                .query_engine()
119                                .engine_context(ctx.clone())
120                                .new_plan_decoder()
121                                .context(PlanStatementSnafu)?;
122
123                            let dummy_catalog_list =
124                                Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
125                                    self.catalog_manager().clone(),
126                                ));
127
128                            let logical_plan = plan_decoder
129                                .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
130                                .await
131                                .context(SubstraitDecodeLogicalPlanSnafu)?;
132                            let query = logical_plan.display_indent().to_string();
133                            let output =
134                                self.do_exec_plan_inner(logical_plan, query, ctx.clone()).await?;
135
136                            attach_timer(output, timer)
137                        }
138                        Query::InsertIntoPlan(insert) => {
139                            self.handle_insert_plan(insert, ctx.clone()).await?
140                        }
141                        Query::PromRangeQuery(promql) => {
142                            let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
143                            let prom_query = PromQuery {
144                                query: promql.query,
145                                start: promql.start,
146                                end: promql.end,
147                                step: promql.step,
148                                lookback: promql.lookback,
149                                alias: None,
150                            };
151                            let mut result =
152                                self.do_promql_query_inner(&prom_query, ctx.clone()).await;
153                            ensure!(
154                                result.len() == 1,
155                                NotSupportedSnafu {
156                                    feat: "execute multiple statements in PromQL query string through GRPC interface"
157                                }
158                            );
159                            let output = result.remove(0)?;
160                            attach_timer(output, timer)
161                        }
162                    }
163                }
164                Request::Ddl(request) => {
165                    let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
166                        err_msg: "'expr' is absent in DDL request",
167                    })?;
168
169                    fill_catalog_and_schema_from_context(&mut expr, &ctx);
170
171                    match expr {
172                        DdlExpr::CreateTable(mut expr) => {
173                            let _ = self
174                                .statement_executor
175                                .create_table_inner(&mut expr, None, ctx.clone())
176                                .await?;
177                            Output::new_with_affected_rows(0)
178                        }
179                        DdlExpr::AlterDatabase(expr) => {
180                            let _ = self
181                                .statement_executor
182                                .alter_database_inner(expr, ctx.clone())
183                                .await?;
184                            Output::new_with_affected_rows(0)
185                        }
186                        DdlExpr::AlterTable(expr) => {
187                            self.statement_executor
188                                .alter_table_inner(expr, ctx.clone())
189                                .await?
190                        }
191                        DdlExpr::CreateDatabase(expr) => {
192                            self.statement_executor
193                                .create_database(
194                                    &expr.schema_name,
195                                    expr.create_if_not_exists,
196                                    expr.options,
197                                    ctx.clone(),
198                                )
199                                .await?
200                        }
201                        DdlExpr::DropTable(expr) => {
202                            let table_name =
203                                TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
204                            self.statement_executor
205                                .drop_table(table_name, expr.drop_if_exists, ctx.clone())
206                                .await?
207                        }
208                        DdlExpr::TruncateTable(expr) => {
209                            let table_name =
210                                TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
211                            let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
212                                .map_err(BoxedError::new)
213                                .context(ExternalSnafu)?;
214                            self.statement_executor
215                                .truncate_table(table_name, time_ranges, ctx.clone())
216                                .await?
217                        }
218                        DdlExpr::CreateFlow(expr) => {
219                            self.statement_executor
220                                .create_flow_inner(expr, ctx.clone())
221                                .await?
222                        }
223                        DdlExpr::DropFlow(DropFlowExpr {
224                            catalog_name,
225                            flow_name,
226                            drop_if_exists,
227                            ..
228                        }) => {
229                            self.statement_executor
230                                .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
231                                .await?
232                        }
233                        DdlExpr::CreateView(expr) => {
234                            let _ = self
235                                .statement_executor
236                                .create_view_by_expr(expr, ctx.clone())
237                                .await?;
238
239                            Output::new_with_affected_rows(0)
240                        }
241                        DdlExpr::DropView(_) => {
242                            todo!("implemented in the following PR")
243                        }
244                        DdlExpr::CommentOn(expr) => {
245                            self.statement_executor
246                                .comment_by_expr(expr, ctx.clone())
247                                .await?
248                        }
249                    }
250                }
251            };
252
253            let output = interceptor.post_execute(output, ctx)?;
254            Ok(output)
255        }
256        .await;
257
258        result
259            .map_err(BoxedError::new)
260            .context(server_error::ExecuteGrpcQuerySnafu)
261    }
262
263    fn handle_put_record_batch_stream(
264        &self,
265        stream: servers::grpc::flight::PutRecordBatchRequestStream,
266        ctx: QueryContextRef,
267    ) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
268        Box::pin(
269            self.handle_put_record_batch_stream_inner(stream, ctx)
270                .map(|result| {
271                    result
272                        .map_err(BoxedError::new)
273                        .context(server_error::ExecuteGrpcRequestSnafu)
274                }),
275        )
276    }
277}
278
279fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
280    let catalog = ctx.current_catalog();
281    let schema = ctx.current_schema();
282
283    macro_rules! check_and_fill {
284        ($expr:ident) => {
285            if $expr.catalog_name.is_empty() {
286                $expr.catalog_name = catalog.to_string();
287            }
288            if $expr.schema_name.is_empty() {
289                $expr.schema_name = schema.to_string();
290            }
291        };
292    }
293
294    match ddl_expr {
295        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
296        Expr::CreateTable(expr) => {
297            check_and_fill!(expr);
298        }
299        Expr::AlterTable(expr) => {
300            check_and_fill!(expr);
301        }
302        Expr::DropTable(expr) => {
303            check_and_fill!(expr);
304        }
305        Expr::TruncateTable(expr) => {
306            check_and_fill!(expr);
307        }
308        Expr::CreateFlow(expr) => {
309            if expr.catalog_name.is_empty() {
310                expr.catalog_name = catalog.to_string();
311            }
312        }
313        Expr::DropFlow(expr) => {
314            if expr.catalog_name.is_empty() {
315                expr.catalog_name = catalog.to_string();
316            }
317        }
318        Expr::CreateView(expr) => {
319            check_and_fill!(expr);
320        }
321        Expr::DropView(expr) => {
322            check_and_fill!(expr);
323        }
324        Expr::CommentOn(expr) => {
325            check_and_fill!(expr);
326        }
327    }
328}
329
330impl Instance {
331    fn handle_put_record_batch_stream_inner(
332        &self,
333        mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
334        ctx: QueryContextRef,
335    ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
336        // Clone all necessary data to make it 'static
337        let catalog_manager = self.catalog_manager().clone();
338        let plugins = self.plugins.clone();
339        let inserter = self.inserter.clone();
340        let ctx = ctx.clone();
341        let mut table_ref: Option<TableRef> = None;
342        let mut table_checked = false;
343
344        Box::pin(try_stream! {
345            // Process each request in the stream
346            while let Some(request_result) = stream.next().await {
347                let request = request_result.map_err(|e| {
348                    let error_msg = format!("Stream error: {:?}", e);
349                    IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
350                })?;
351
352                // Resolve table and check permissions on first RecordBatch (after schema is received)
353                if !table_checked {
354                    let table_name = &request.table_name;
355
356                    plugins
357                        .get::<PermissionCheckerRef>()
358                        .as_ref()
359                        .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
360                        .context(PermissionSnafu)?;
361
362                    // Resolve table reference
363                    table_ref = Some(
364                        catalog_manager
365                            .table(
366                                &table_name.catalog_name,
367                                &table_name.schema_name,
368                                &table_name.table_name,
369                                None,
370                            )
371                            .await
372                            .context(CatalogSnafu)?
373                            .with_context(|| TableNotFoundSnafu {
374                                table_name: table_name.to_string(),
375                            })?,
376                    );
377
378                    // Check permissions for the table
379                    let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
380                    let interceptor = interceptor_ref.as_ref();
381                    interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
382
383                    table_checked = true;
384                }
385
386                let request_id = request.request_id;
387                let start = Instant::now();
388                let rows = inserter
389                    .handle_bulk_insert(
390                        table_ref.clone().unwrap(),
391                        request.flight_data,
392                        request.record_batch,
393                        request.schema_bytes,
394                    )
395                    .await
396                    .context(TableOperationSnafu)?;
397                let elapsed_secs = start.elapsed().as_secs_f64();
398                yield DoPutResponse::new(request_id, rows, elapsed_secs);
399            }
400        })
401    }
402
403    async fn handle_insert_plan(
404        &self,
405        insert: InsertIntoPlan,
406        ctx: QueryContextRef,
407    ) -> Result<Output> {
408        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
409        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
410            err_msg: "'table_name' is absent in InsertIntoPlan",
411        })?;
412
413        // use dummy catalog to provide table
414        let plan_decoder = self
415            .query_engine()
416            .engine_context(ctx.clone())
417            .new_plan_decoder()
418            .context(PlanStatementSnafu)?;
419
420        let dummy_catalog_list =
421            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
422                self.catalog_manager().clone(),
423            ));
424
425        // no optimize yet since we still need to add stuff
426        let logical_plan = plan_decoder
427            .decode(
428                bytes::Bytes::from(insert.logical_plan),
429                dummy_catalog_list,
430                false,
431            )
432            .await
433            .context(SubstraitDecodeLogicalPlanSnafu)?;
434
435        let table = self
436            .catalog_manager()
437            .table(
438                &table_name.catalog_name,
439                &table_name.schema_name,
440                &table_name.table_name,
441                None,
442            )
443            .await
444            .context(CatalogSnafu)?
445            .with_context(|| TableNotFoundSnafu {
446                table_name: [
447                    table_name.catalog_name.clone(),
448                    table_name.schema_name.clone(),
449                    table_name.table_name.clone(),
450                ]
451                .join("."),
452            })?;
453        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
454        let table_source = Arc::new(DefaultTableSource::new(table_provider));
455
456        let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
457            .context(SubstraitDecodeLogicalPlanSnafu)?;
458
459        let engine_ctx = self.query_engine().engine_context(ctx.clone());
460        let state = engine_ctx.state();
461        // Analyze the plan
462        let analyzed_plan = state
463            .analyzer()
464            .execute_and_check(insert_into, state.config_options(), |_, _| {})
465            .context(DataFusionSnafu)?;
466
467        // Optimize the plan
468        let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
469
470        let query = optimized_plan.display_indent().to_string();
471        let output = self
472            .do_exec_plan_inner(optimized_plan, query, ctx.clone())
473            .await?;
474
475        Ok(attach_timer(output, timer))
476    }
477    #[tracing::instrument(skip_all)]
478    pub async fn handle_inserts(
479        &self,
480        requests: InsertRequests,
481        ctx: QueryContextRef,
482    ) -> Result<Output> {
483        self.inserter
484            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
485            .await
486            .context(TableOperationSnafu)
487    }
488
489    #[tracing::instrument(skip_all)]
490    pub async fn handle_row_inserts(
491        &self,
492        requests: RowInsertRequests,
493        ctx: QueryContextRef,
494        accommodate_existing_schema: bool,
495        is_single_value: bool,
496    ) -> Result<Output> {
497        self.inserter
498            .handle_row_inserts(
499                requests,
500                ctx,
501                self.statement_executor.as_ref(),
502                accommodate_existing_schema,
503                is_single_value,
504            )
505            .await
506            .context(TableOperationSnafu)
507    }
508
509    #[tracing::instrument(skip_all)]
510    pub async fn handle_influx_row_inserts(
511        &self,
512        requests: RowInsertRequests,
513        ctx: QueryContextRef,
514    ) -> Result<Output> {
515        self.inserter
516            .handle_last_non_null_inserts(
517                requests,
518                ctx,
519                self.statement_executor.as_ref(),
520                true,
521                // Influx protocol may writes multiple fields (values).
522                false,
523            )
524            .await
525            .context(TableOperationSnafu)
526    }
527
528    #[tracing::instrument(skip_all)]
529    pub async fn handle_metric_row_inserts(
530        &self,
531        requests: RowInsertRequests,
532        ctx: QueryContextRef,
533        physical_table: String,
534    ) -> Result<Output> {
535        self.inserter
536            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
537            .await
538            .context(TableOperationSnafu)
539    }
540
541    #[tracing::instrument(skip_all)]
542    pub async fn handle_deletes(
543        &self,
544        requests: DeleteRequests,
545        ctx: QueryContextRef,
546    ) -> Result<Output> {
547        self.deleter
548            .handle_column_deletes(requests, ctx)
549            .await
550            .context(TableOperationSnafu)
551    }
552
553    #[tracing::instrument(skip_all)]
554    pub async fn handle_row_deletes(
555        &self,
556        requests: RowDeleteRequests,
557        ctx: QueryContextRef,
558    ) -> Result<Output> {
559        self.deleter
560            .handle_row_deletes(requests, ctx)
561            .await
562            .context(TableOperationSnafu)
563    }
564}