servers/grpc/
greptime_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handler for Greptime Database service. It's implemented by frontend.
16
17use std::str::FromStr;
18use std::time::Instant;
19
20use api::helper::request_type;
21use api::v1::{GreptimeRequest, RequestHeader};
22use auth::UserProviderRef;
23use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
24use common_catalog::parse_catalog_and_schema_from_db_string;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_grpc::flight::FlightDecoder;
28use common_grpc::flight::do_put::DoPutResponse;
29use common_query::Output;
30use common_runtime::Runtime;
31use common_runtime::runtime::RuntimeTrait;
32use common_session::ReadPreference;
33use common_telemetry::tracing_context::{FutureExt, TracingContext};
34use common_telemetry::{debug, error, tracing, warn};
35use common_time::timezone::parse_timezone;
36use futures_util::StreamExt;
37use session::context::{Channel, QueryContextBuilder, QueryContextRef};
38use session::hints::READ_PREFERENCE_HINT;
39use snafu::{OptionExt, ResultExt};
40use table::TableRef;
41use tokio::sync::mpsc;
42use tokio::sync::mpsc::error::TrySendError;
43
44use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result, UnknownHintSnafu};
45use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
46use crate::grpc::{FlightCompression, TonicResult, context_auth};
47use crate::metrics;
48use crate::metrics::METRIC_SERVER_GRPC_DB_REQUEST_TIMER;
49use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
50
51#[derive(Clone)]
52pub struct GreptimeRequestHandler {
53    handler: ServerGrpcQueryHandlerRef,
54    pub(crate) user_provider: Option<UserProviderRef>,
55    runtime: Option<Runtime>,
56    pub(crate) flight_compression: FlightCompression,
57}
58
59impl GreptimeRequestHandler {
60    pub fn new(
61        handler: ServerGrpcQueryHandlerRef,
62        user_provider: Option<UserProviderRef>,
63        runtime: Option<Runtime>,
64        flight_compression: FlightCompression,
65    ) -> Self {
66        Self {
67            handler,
68            user_provider,
69            runtime,
70            flight_compression,
71        }
72    }
73
74    #[tracing::instrument(skip_all, fields(protocol = "grpc", request_type = get_request_type(&request)))]
75    pub(crate) async fn handle_request(
76        &self,
77        request: GreptimeRequest,
78        hints: Vec<(String, String)>,
79    ) -> Result<Output> {
80        let query = request.request.context(InvalidQuerySnafu {
81            reason: "Expecting non-empty GreptimeRequest.",
82        })?;
83
84        let header = request.header.as_ref();
85        let query_ctx = create_query_context(Channel::Grpc, header, hints)?;
86        let user_info = context_auth::auth(self.user_provider.clone(), header, &query_ctx).await?;
87        query_ctx.set_current_user(user_info);
88
89        let handler = self.handler.clone();
90        let request_type = request_type(&query).to_string();
91        let db = query_ctx.get_db_string();
92        let timer = RequestTimer::new(db.clone(), request_type);
93        let tracing_context = TracingContext::from_current_span();
94
95        let result_future = async move {
96            handler
97                .do_query(query, query_ctx)
98                .trace(tracing_context.attach(tracing::info_span!(
99                    "GreptimeRequestHandler::handle_request_runtime"
100                )))
101                .await
102                .map_err(|e| {
103                    if e.status_code().should_log_error() {
104                        let root_error = e.root_cause().unwrap_or(&e);
105                        error!(e; "Failed to handle request, error: {}", root_error.to_string());
106                    } else {
107                        // Currently, we still print a debug log.
108                        debug!("Failed to handle request, err: {:?}", e);
109                    }
110                    e
111                })
112        };
113
114        match &self.runtime {
115            Some(runtime) => {
116                // Executes requests in another runtime to
117                // 1. prevent the execution from being cancelled unexpected by Tonic runtime;
118                //   - Refer to our blog for the rational behind it:
119                //     https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
120                //   - Obtaining a `JoinHandle` to get the panic message (if there's any).
121                //     From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
122                // 2. avoid the handler blocks the gRPC runtime incidentally.
123                runtime
124                    .spawn(result_future)
125                    .await
126                    .context(JoinTaskSnafu)
127                    .inspect_err(|e| {
128                        timer.record(e.status_code());
129                    })?
130            }
131            None => result_future.await,
132        }
133    }
134
135    pub(crate) async fn put_record_batches(
136        &self,
137        mut stream: PutRecordBatchRequestStream,
138        result_sender: mpsc::Sender<TonicResult<DoPutResponse>>,
139        query_ctx: QueryContextRef,
140    ) {
141        let handler = self.handler.clone();
142        let runtime = self
143            .runtime
144            .clone()
145            .unwrap_or_else(common_runtime::global_runtime);
146        runtime.spawn(async move {
147            // Cached table ref
148            let mut table_ref: Option<TableRef> = None;
149
150            let mut decoder = FlightDecoder::default();
151            while let Some(request) = stream.next().await {
152                let request = match request {
153                    Ok(request) => request,
154                    Err(e) => {
155                        let _ = result_sender.try_send(Err(e));
156                        break;
157                    }
158                };
159                let PutRecordBatchRequest {
160                    table_name,
161                    request_id,
162                    data,
163                    _guard,
164                } = request;
165
166                let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
167                let result = handler
168                    .put_record_batch(&table_name, &mut table_ref, &mut decoder, data, query_ctx.clone())
169                    .await
170                    .inspect_err(|e| error!(e; "Failed to handle flight record batches"));
171                timer.observe_duration();
172                let result = result
173                    .map(|x| DoPutResponse::new(request_id, x))
174                    .map_err(Into::into);
175                if let Err(e)= result_sender.try_send(result)
176                    && let TrySendError::Closed(_) = e {
177                    warn!(r#""DoPut" client with request_id {} maybe unreachable, abort handling its message"#, request_id);
178                    break;
179                }
180            }
181        });
182    }
183}
184
185pub fn get_request_type(request: &GreptimeRequest) -> &'static str {
186    request
187        .request
188        .as_ref()
189        .map(request_type)
190        .unwrap_or_default()
191}
192
193/// Creates a new `QueryContext` from the provided request header and extensions.
194/// Strongly recommend setting an appropriate channel, as this is very helpful for statistics.
195pub(crate) fn create_query_context(
196    channel: Channel,
197    header: Option<&RequestHeader>,
198    mut extensions: Vec<(String, String)>,
199) -> Result<QueryContextRef> {
200    let (catalog, schema) = header
201        .map(|header| {
202            // We provide dbname field in newer versions of protos/sdks
203            // parse dbname from header in priority
204            if !header.dbname.is_empty() {
205                parse_catalog_and_schema_from_db_string(&header.dbname)
206            } else {
207                (
208                    if !header.catalog.is_empty() {
209                        header.catalog.to_lowercase()
210                    } else {
211                        DEFAULT_CATALOG_NAME.to_string()
212                    },
213                    if !header.schema.is_empty() {
214                        header.schema.to_lowercase()
215                    } else {
216                        DEFAULT_SCHEMA_NAME.to_string()
217                    },
218                )
219            }
220        })
221        .unwrap_or_else(|| {
222            (
223                DEFAULT_CATALOG_NAME.to_string(),
224                DEFAULT_SCHEMA_NAME.to_string(),
225            )
226        });
227    let timezone = parse_timezone(header.map(|h| h.timezone.as_str()));
228    let mut ctx_builder = QueryContextBuilder::default()
229        .current_catalog(catalog)
230        .current_schema(schema)
231        .timezone(timezone)
232        .channel(channel);
233
234    if let Some(x) = extensions
235        .iter()
236        .position(|(k, _)| k == READ_PREFERENCE_HINT)
237    {
238        let (k, v) = extensions.swap_remove(x);
239        let Ok(read_preference) = ReadPreference::from_str(&v) else {
240            return UnknownHintSnafu {
241                hint: format!("{k}={v}"),
242            }
243            .fail();
244        };
245        ctx_builder = ctx_builder.read_preference(read_preference);
246    }
247
248    for (key, value) in extensions {
249        ctx_builder = ctx_builder.set_extension(key, value);
250    }
251    Ok(ctx_builder.build().into())
252}
253
254/// Histogram timer for handling gRPC request.
255///
256/// The timer records the elapsed time with [StatusCode::Success] on drop.
257pub(crate) struct RequestTimer {
258    start: Instant,
259    db: String,
260    request_type: String,
261    status_code: StatusCode,
262}
263
264impl RequestTimer {
265    /// Returns a new timer.
266    pub fn new(db: String, request_type: String) -> RequestTimer {
267        RequestTimer {
268            start: Instant::now(),
269            db,
270            request_type,
271            status_code: StatusCode::Success,
272        }
273    }
274
275    /// Consumes the timer and record the elapsed time with specific `status_code`.
276    pub fn record(mut self, status_code: StatusCode) {
277        self.status_code = status_code;
278    }
279}
280
281impl Drop for RequestTimer {
282    fn drop(&mut self) {
283        METRIC_SERVER_GRPC_DB_REQUEST_TIMER
284            .with_label_values(&[
285                self.db.as_str(),
286                self.request_type.as_str(),
287                self.status_code.as_ref(),
288            ])
289            .observe(self.start.elapsed().as_secs_f64());
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use chrono::FixedOffset;
296    use common_time::Timezone;
297
298    use super::*;
299
300    #[test]
301    fn test_create_query_context() {
302        let header = RequestHeader {
303            catalog: "cat-a-log".to_string(),
304            timezone: "+01:00".to_string(),
305            ..Default::default()
306        };
307        let query_context = create_query_context(
308            Channel::Unknown,
309            Some(&header),
310            vec![
311                ("auto_create_table".to_string(), "true".to_string()),
312                ("read_preference".to_string(), "leader".to_string()),
313            ],
314        )
315        .unwrap();
316        assert_eq!(query_context.current_catalog(), "cat-a-log");
317        assert_eq!(query_context.current_schema(), DEFAULT_SCHEMA_NAME);
318        assert_eq!(
319            query_context.timezone(),
320            Timezone::Offset(FixedOffset::east_opt(3600).unwrap())
321        );
322        assert!(matches!(
323            query_context.read_preference(),
324            ReadPreference::Leader
325        ));
326        assert_eq!(
327            query_context.extensions().into_iter().collect::<Vec<_>>(),
328            vec![("auto_create_table".to_string(), "true".to_string())]
329        );
330    }
331}