frontend/instance/
grpc.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::helper::from_pb_time_ranges;
20use api::v1::ddl_request::{Expr as DdlExpr, Expr};
21use api::v1::greptime_request::Request;
22use api::v1::query_request::Query;
23use api::v1::{
24    DeleteRequests, DropFlowExpr, InsertIntoPlan, InsertRequests, RowDeleteRequests,
25    RowInsertRequests,
26};
27use async_stream::try_stream;
28use async_trait::async_trait;
29use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
30use common_base::AffectedRows;
31use common_error::ext::BoxedError;
32use common_grpc::flight::do_put::DoPutResponse;
33use common_query::Output;
34use common_query::logical_plan::add_insert_to_logical_plan;
35use common_telemetry::tracing::{self};
36use datafusion::datasource::DefaultTableSource;
37use futures::Stream;
38use futures::stream::StreamExt;
39use query::parser::PromQuery;
40use servers::error as server_error;
41use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
42use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
43use servers::query_handler::grpc::GrpcQueryHandler;
44use session::context::QueryContextRef;
45use snafu::{OptionExt, ResultExt, ensure};
46use table::TableRef;
47use table::table::adapter::DfTableProviderAdapter;
48use table::table_name::TableName;
49
50use crate::error::{
51    CatalogSnafu, DataFusionSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu,
52    NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
53    SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
54};
55use crate::instance::{Instance, attach_timer};
56use crate::metrics::{
57    GRPC_HANDLE_PLAN_ELAPSED, GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED,
58};
59
60#[async_trait]
61impl GrpcQueryHandler for Instance {
62    async fn do_query(
63        &self,
64        request: Request,
65        ctx: QueryContextRef,
66    ) -> server_error::Result<Output> {
67        let result: Result<Output> = async {
68            let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
69            let interceptor = interceptor_ref.as_ref();
70            interceptor.pre_execute(&request, ctx.clone())?;
71
72            self.plugins
73                .get::<PermissionCheckerRef>()
74                .as_ref()
75                .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
76                .context(PermissionSnafu)?;
77
78            let output = match request {
79                Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
80                Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
81                    Some(physical_table) => {
82                        self.handle_metric_row_inserts(
83                            requests,
84                            ctx.clone(),
85                            physical_table.to_string(),
86                        )
87                        .await?
88                    }
89                    None => {
90                        self.handle_row_inserts(requests, ctx.clone(), false, false)
91                            .await?
92                    }
93                },
94                Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
95                Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
96                Request::Query(query_request) => {
97                    let query = query_request.query.context(IncompleteGrpcRequestSnafu {
98                        err_msg: "Missing field 'QueryRequest.query'",
99                    })?;
100                    match query {
101                        Query::Sql(sql) => {
102                            let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
103                            let mut result = self.do_query_inner(&sql, ctx.clone()).await;
104                            ensure!(
105                                result.len() == 1,
106                                NotSupportedSnafu {
107                                    feat: "execute multiple statements in SQL query string through GRPC interface"
108                                }
109                            );
110                            let output = result.remove(0)?;
111                            attach_timer(output, timer)
112                        }
113                        Query::LogicalPlan(plan) => {
114                            // this path is useful internally when flownode needs to execute a logical plan through gRPC interface
115                            let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
116
117                            // use dummy catalog to provide table
118                            let plan_decoder = self
119                                .query_engine()
120                                .engine_context(ctx.clone())
121                                .new_plan_decoder()
122                                .context(PlanStatementSnafu)?;
123
124                            let dummy_catalog_list =
125                                Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
126                                    self.catalog_manager().clone(),
127                                ));
128
129                            let logical_plan = plan_decoder
130                                .decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
131                                .await
132                                .context(SubstraitDecodeLogicalPlanSnafu)?;
133                            let output =
134                                self.do_exec_plan_inner(None, logical_plan, ctx.clone()).await?;
135
136                            attach_timer(output, timer)
137                        }
138                        Query::InsertIntoPlan(insert) => {
139                            self.handle_insert_plan(insert, ctx.clone()).await?
140                        }
141                        Query::PromRangeQuery(promql) => {
142                            let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
143                            let prom_query = PromQuery {
144                                query: promql.query,
145                                start: promql.start,
146                                end: promql.end,
147                                step: promql.step,
148                                lookback: promql.lookback,
149                                alias: None,
150                            };
151                            let mut result =
152                                self.do_promql_query_inner(&prom_query, ctx.clone()).await;
153                            ensure!(
154                                result.len() == 1,
155                                NotSupportedSnafu {
156                                    feat: "execute multiple statements in PromQL query string through GRPC interface"
157                                }
158                            );
159                            let output = result.remove(0)?;
160                            attach_timer(output, timer)
161                        }
162                    }
163                }
164                Request::Ddl(request) => {
165                    let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
166                        err_msg: "'expr' is absent in DDL request",
167                    })?;
168
169                    fill_catalog_and_schema_from_context(&mut expr, &ctx);
170
171                    match expr {
172                        DdlExpr::CreateTable(mut expr) => {
173                            let _ = self
174                                .statement_executor
175                                .create_table_inner(&mut expr, None, ctx.clone())
176                                .await?;
177                            Output::new_with_affected_rows(0)
178                        }
179                        DdlExpr::AlterDatabase(expr) => {
180                            let _ = self
181                                .statement_executor
182                                .alter_database_inner(expr, ctx.clone())
183                                .await?;
184                            Output::new_with_affected_rows(0)
185                        }
186                        DdlExpr::AlterTable(expr) => {
187                            self.statement_executor
188                                .alter_table_inner(expr, ctx.clone())
189                                .await?
190                        }
191                        DdlExpr::CreateDatabase(expr) => {
192                            self.statement_executor
193                                .create_database(
194                                    &expr.schema_name,
195                                    expr.create_if_not_exists,
196                                    expr.options,
197                                    ctx.clone(),
198                                )
199                                .await?
200                        }
201                        DdlExpr::DropTable(expr) => {
202                            let table_name =
203                                TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
204                            self.statement_executor
205                                .drop_table(table_name, expr.drop_if_exists, ctx.clone())
206                                .await?
207                        }
208                        DdlExpr::TruncateTable(expr) => {
209                            let table_name =
210                                TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
211                            let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
212                                .map_err(BoxedError::new)
213                                .context(ExternalSnafu)?;
214                            self.statement_executor
215                                .truncate_table(table_name, time_ranges, ctx.clone())
216                                .await?
217                        }
218                        DdlExpr::CreateFlow(expr) => {
219                            self.statement_executor
220                                .create_flow_inner(expr, ctx.clone())
221                                .await?
222                        }
223                        DdlExpr::DropFlow(DropFlowExpr {
224                            catalog_name,
225                            flow_name,
226                            drop_if_exists,
227                            ..
228                        }) => {
229                            self.statement_executor
230                                .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
231                                .await?
232                        }
233                        DdlExpr::CreateView(expr) => {
234                            let _ = self
235                                .statement_executor
236                                .create_view_by_expr(expr, ctx.clone())
237                                .await?;
238
239                            Output::new_with_affected_rows(0)
240                        }
241                        DdlExpr::DropView(_) => {
242                            todo!("implemented in the following PR")
243                        }
244                        DdlExpr::CommentOn(expr) => {
245                            self.statement_executor
246                                .comment_by_expr(expr, ctx.clone())
247                                .await?
248                        }
249                    }
250                }
251            };
252
253            let output = interceptor.post_execute(output, ctx)?;
254            Ok(output)
255        }
256        .await;
257
258        result
259            .map_err(BoxedError::new)
260            .context(server_error::ExecuteGrpcQuerySnafu)
261    }
262
263    async fn put_record_batch(
264        &self,
265        request: servers::grpc::flight::PutRecordBatchRequest,
266        table_ref: &mut Option<TableRef>,
267        ctx: QueryContextRef,
268    ) -> server_error::Result<AffectedRows> {
269        let result: Result<AffectedRows> = async {
270            let table = if let Some(table) = table_ref {
271                table.clone()
272            } else {
273                let table = self
274                    .catalog_manager()
275                    .table(
276                        &request.table_name.catalog_name,
277                        &request.table_name.schema_name,
278                        &request.table_name.table_name,
279                        None,
280                    )
281                    .await
282                    .context(CatalogSnafu)?
283                    .with_context(|| TableNotFoundSnafu {
284                        table_name: request.table_name.to_string(),
285                    })?;
286                *table_ref = Some(table.clone());
287                table
288            };
289
290            let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
291            let interceptor = interceptor_ref.as_ref();
292            interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
293
294            self.plugins
295                .get::<PermissionCheckerRef>()
296                .as_ref()
297                .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
298                .context(PermissionSnafu)?;
299
300            // do we check limit for bulk insert?
301
302            self.inserter
303                .handle_bulk_insert(
304                    table,
305                    request.flight_data,
306                    request.record_batch,
307                    request.schema_bytes,
308                )
309                .await
310                .context(TableOperationSnafu)
311        }
312        .await;
313
314        result
315            .map_err(BoxedError::new)
316            .context(server_error::ExecuteGrpcRequestSnafu)
317    }
318
319    fn handle_put_record_batch_stream(
320        &self,
321        stream: servers::grpc::flight::PutRecordBatchRequestStream,
322        ctx: QueryContextRef,
323    ) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
324        Box::pin(
325            self.handle_put_record_batch_stream_inner(stream, ctx)
326                .map(|result| {
327                    result
328                        .map_err(BoxedError::new)
329                        .context(server_error::ExecuteGrpcRequestSnafu)
330                }),
331        )
332    }
333}
334
335fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
336    let catalog = ctx.current_catalog();
337    let schema = ctx.current_schema();
338
339    macro_rules! check_and_fill {
340        ($expr:ident) => {
341            if $expr.catalog_name.is_empty() {
342                $expr.catalog_name = catalog.to_string();
343            }
344            if $expr.schema_name.is_empty() {
345                $expr.schema_name = schema.to_string();
346            }
347        };
348    }
349
350    match ddl_expr {
351        Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
352        Expr::CreateTable(expr) => {
353            check_and_fill!(expr);
354        }
355        Expr::AlterTable(expr) => {
356            check_and_fill!(expr);
357        }
358        Expr::DropTable(expr) => {
359            check_and_fill!(expr);
360        }
361        Expr::TruncateTable(expr) => {
362            check_and_fill!(expr);
363        }
364        Expr::CreateFlow(expr) => {
365            if expr.catalog_name.is_empty() {
366                expr.catalog_name = catalog.to_string();
367            }
368        }
369        Expr::DropFlow(expr) => {
370            if expr.catalog_name.is_empty() {
371                expr.catalog_name = catalog.to_string();
372            }
373        }
374        Expr::CreateView(expr) => {
375            check_and_fill!(expr);
376        }
377        Expr::DropView(expr) => {
378            check_and_fill!(expr);
379        }
380        Expr::CommentOn(expr) => {
381            check_and_fill!(expr);
382        }
383    }
384}
385
386impl Instance {
387    fn handle_put_record_batch_stream_inner(
388        &self,
389        mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
390        ctx: QueryContextRef,
391    ) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
392        // Clone all necessary data to make it 'static
393        let catalog_manager = self.catalog_manager().clone();
394        let plugins = self.plugins.clone();
395        let inserter = self.inserter.clone();
396        let ctx = ctx.clone();
397        let mut table_ref: Option<TableRef> = None;
398        let mut table_checked = false;
399
400        Box::pin(try_stream! {
401            // Process each request in the stream
402            while let Some(request_result) = stream.next().await {
403                let request = request_result.map_err(|e| {
404                    let error_msg = format!("Stream error: {:?}", e);
405                    IncompleteGrpcRequestSnafu { err_msg: error_msg }.build()
406                })?;
407
408                // Resolve table and check permissions on first RecordBatch (after schema is received)
409                if !table_checked {
410                    let table_name = &request.table_name;
411
412                    plugins
413                        .get::<PermissionCheckerRef>()
414                        .as_ref()
415                        .check_permission(ctx.current_user(), PermissionReq::BulkInsert)
416                        .context(PermissionSnafu)?;
417
418                    // Resolve table reference
419                    table_ref = Some(
420                        catalog_manager
421                            .table(
422                                &table_name.catalog_name,
423                                &table_name.schema_name,
424                                &table_name.table_name,
425                                None,
426                            )
427                            .await
428                            .context(CatalogSnafu)?
429                            .with_context(|| TableNotFoundSnafu {
430                                table_name: table_name.to_string(),
431                            })?,
432                    );
433
434                    // Check permissions for the table
435                    let interceptor_ref = plugins.get::<GrpcQueryInterceptorRef<Error>>();
436                    let interceptor = interceptor_ref.as_ref();
437                    interceptor.pre_bulk_insert(table_ref.clone().unwrap(), ctx.clone())?;
438
439                    table_checked = true;
440                }
441
442                let request_id = request.request_id;
443                let start = Instant::now();
444                let rows = inserter
445                    .handle_bulk_insert(
446                        table_ref.clone().unwrap(),
447                        request.flight_data,
448                        request.record_batch,
449                        request.schema_bytes,
450                    )
451                    .await
452                    .context(TableOperationSnafu)?;
453                let elapsed_secs = start.elapsed().as_secs_f64();
454                yield DoPutResponse::new(request_id, rows, elapsed_secs);
455            }
456        })
457    }
458
459    async fn handle_insert_plan(
460        &self,
461        insert: InsertIntoPlan,
462        ctx: QueryContextRef,
463    ) -> Result<Output> {
464        let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
465        let table_name = insert.table_name.context(IncompleteGrpcRequestSnafu {
466            err_msg: "'table_name' is absent in InsertIntoPlan",
467        })?;
468
469        // use dummy catalog to provide table
470        let plan_decoder = self
471            .query_engine()
472            .engine_context(ctx.clone())
473            .new_plan_decoder()
474            .context(PlanStatementSnafu)?;
475
476        let dummy_catalog_list =
477            Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
478                self.catalog_manager().clone(),
479            ));
480
481        // no optimize yet since we still need to add stuff
482        let logical_plan = plan_decoder
483            .decode(
484                bytes::Bytes::from(insert.logical_plan),
485                dummy_catalog_list,
486                false,
487            )
488            .await
489            .context(SubstraitDecodeLogicalPlanSnafu)?;
490
491        let table = self
492            .catalog_manager()
493            .table(
494                &table_name.catalog_name,
495                &table_name.schema_name,
496                &table_name.table_name,
497                None,
498            )
499            .await
500            .context(CatalogSnafu)?
501            .with_context(|| TableNotFoundSnafu {
502                table_name: [
503                    table_name.catalog_name.clone(),
504                    table_name.schema_name.clone(),
505                    table_name.table_name.clone(),
506                ]
507                .join("."),
508            })?;
509        let table_provider = Arc::new(DfTableProviderAdapter::new(table));
510        let table_source = Arc::new(DefaultTableSource::new(table_provider));
511
512        let insert_into = add_insert_to_logical_plan(table_name, table_source, logical_plan)
513            .context(SubstraitDecodeLogicalPlanSnafu)?;
514
515        let engine_ctx = self.query_engine().engine_context(ctx.clone());
516        let state = engine_ctx.state();
517        // Analyze the plan
518        let analyzed_plan = state
519            .analyzer()
520            .execute_and_check(insert_into, state.config_options(), |_, _| {})
521            .context(DataFusionSnafu)?;
522
523        // Optimize the plan
524        let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
525
526        let output = self
527            .do_exec_plan_inner(None, optimized_plan, ctx.clone())
528            .await?;
529
530        Ok(attach_timer(output, timer))
531    }
532    #[tracing::instrument(skip_all)]
533    pub async fn handle_inserts(
534        &self,
535        requests: InsertRequests,
536        ctx: QueryContextRef,
537    ) -> Result<Output> {
538        self.inserter
539            .handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
540            .await
541            .context(TableOperationSnafu)
542    }
543
544    #[tracing::instrument(skip_all)]
545    pub async fn handle_row_inserts(
546        &self,
547        requests: RowInsertRequests,
548        ctx: QueryContextRef,
549        accommodate_existing_schema: bool,
550        is_single_value: bool,
551    ) -> Result<Output> {
552        self.inserter
553            .handle_row_inserts(
554                requests,
555                ctx,
556                self.statement_executor.as_ref(),
557                accommodate_existing_schema,
558                is_single_value,
559            )
560            .await
561            .context(TableOperationSnafu)
562    }
563
564    #[tracing::instrument(skip_all)]
565    pub async fn handle_influx_row_inserts(
566        &self,
567        requests: RowInsertRequests,
568        ctx: QueryContextRef,
569    ) -> Result<Output> {
570        self.inserter
571            .handle_last_non_null_inserts(
572                requests,
573                ctx,
574                self.statement_executor.as_ref(),
575                true,
576                // Influx protocol may writes multiple fields (values).
577                false,
578            )
579            .await
580            .context(TableOperationSnafu)
581    }
582
583    #[tracing::instrument(skip_all)]
584    pub async fn handle_metric_row_inserts(
585        &self,
586        requests: RowInsertRequests,
587        ctx: QueryContextRef,
588        physical_table: String,
589    ) -> Result<Output> {
590        self.inserter
591            .handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
592            .await
593            .context(TableOperationSnafu)
594    }
595
596    #[tracing::instrument(skip_all)]
597    pub async fn handle_deletes(
598        &self,
599        requests: DeleteRequests,
600        ctx: QueryContextRef,
601    ) -> Result<Output> {
602        self.deleter
603            .handle_column_deletes(requests, ctx)
604            .await
605            .context(TableOperationSnafu)
606    }
607
608    #[tracing::instrument(skip_all)]
609    pub async fn handle_row_deletes(
610        &self,
611        requests: RowDeleteRequests,
612        ctx: QueryContextRef,
613    ) -> Result<Output> {
614        self.deleter
615            .handle_row_deletes(requests, ctx)
616            .await
617            .context(TableOperationSnafu)
618    }
619}