servers/grpc/
context_auth.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::auth_header::AuthScheme;
18use api::v1::{AuthHeader, RequestHeader};
19use auth::{Identity, Password, UserInfoRef, UserProviderRef};
20use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
21use common_catalog::parse_catalog_and_schema_from_db_string;
22use common_error::ext::ErrorExt;
23use session::context::{Channel, QueryContextBuilder, QueryContextRef};
24use snafu::{OptionExt, ResultExt};
25use tonic::Status;
26use tonic::metadata::MetadataMap;
27
28use crate::error::Error::UnsupportedAuthScheme;
29use crate::error::{AuthSnafu, InvalidParameterSnafu, NotFoundAuthHeaderSnafu, Result};
30use crate::grpc::TonicResult;
31use crate::http::AUTHORIZATION_HEADER;
32use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
33use crate::metrics::METRIC_AUTH_FAILURE;
34
35/// Create a query context from the grpc metadata.
36pub fn create_query_context_from_grpc_metadata(
37    headers: &MetadataMap,
38) -> TonicResult<QueryContextRef> {
39    let (catalog, schema) = if let Some(db) = extract_header(headers, &[GREPTIME_DB_HEADER_NAME])? {
40        parse_catalog_and_schema_from_db_string(db)
41    } else {
42        (
43            DEFAULT_CATALOG_NAME.to_string(),
44            DEFAULT_SCHEMA_NAME.to_string(),
45        )
46    };
47
48    Ok(Arc::new(
49        QueryContextBuilder::default()
50            .current_catalog(catalog)
51            .current_schema(schema)
52            .channel(Channel::Grpc)
53            .build(),
54    ))
55}
56
57/// Helper function to extract a header from the metadata map.
58/// Can be multiple keys, and the first one found will be returned.
59pub fn extract_header<'a>(headers: &'a MetadataMap, keys: &[&str]) -> TonicResult<Option<&'a str>> {
60    let mut value = None;
61    for key in keys {
62        if let Some(v) = headers.get(*key) {
63            value = Some(v);
64            break;
65        }
66    }
67
68    let Some(v) = value else {
69        return Ok(None);
70    };
71    let Ok(v) = std::str::from_utf8(v.as_bytes()) else {
72        return Err(InvalidParameterSnafu {
73            reason: "expect valid UTF-8 value",
74        }
75        .build()
76        .into());
77    };
78    Ok(Some(v))
79}
80
81/// Helper function to extract the header from the metadata and authenticate the user.
82pub async fn check_auth(
83    user_provider: Option<UserProviderRef>,
84    headers: &MetadataMap,
85    query_ctx: QueryContextRef,
86) -> TonicResult<bool> {
87    if user_provider.is_none() {
88        return Ok(true);
89    }
90
91    let auth_schema = extract_header(
92        headers,
93        &[AUTHORIZATION_HEADER, http::header::AUTHORIZATION.as_str()],
94    )?
95    .map(|x| {
96        if x.len() > 5 && x[0..5].eq_ignore_ascii_case("Basic") {
97            x.try_into()
98        } else {
99            // compatible with old version
100            format!("Basic {}", x).as_str().try_into()
101        }
102    })
103    .transpose()?
104    .map(|x: crate::http::authorize::AuthScheme| x.into());
105
106    let auth_schema = auth_schema.context(NotFoundAuthHeaderSnafu)?;
107    let header = RequestHeader {
108        authorization: Some(AuthHeader {
109            auth_scheme: Some(auth_schema),
110        }),
111        catalog: query_ctx.current_catalog().to_string(),
112        schema: query_ctx.current_schema(),
113        ..Default::default()
114    };
115
116    match auth(user_provider, Some(&header), &query_ctx).await {
117        Ok(user_info) => {
118            query_ctx.set_current_user(user_info);
119            Ok(true)
120        }
121        Err(_) => Err(Status::unauthenticated("auth failed")),
122    }
123}
124
125/// Authenticate the user based on the header and query context.
126pub async fn auth(
127    user_provider: Option<UserProviderRef>,
128    header: Option<&RequestHeader>,
129    query_ctx: &QueryContextRef,
130) -> Result<UserInfoRef> {
131    let Some(user_provider) = user_provider else {
132        return Ok(auth::userinfo_by_name(None));
133    };
134
135    let auth_scheme = header
136        .and_then(|header| {
137            header
138                .authorization
139                .as_ref()
140                .and_then(|x| x.auth_scheme.clone())
141        })
142        .context(NotFoundAuthHeaderSnafu)?;
143
144    match auth_scheme {
145        AuthScheme::Basic(api::v1::Basic { username, password }) => user_provider
146            .auth(
147                Identity::UserId(&username, None),
148                Password::PlainText(password.into()),
149                query_ctx.current_catalog(),
150                &query_ctx.current_schema(),
151            )
152            .await
153            .context(AuthSnafu),
154        AuthScheme::Token(_) => Err(UnsupportedAuthScheme {
155            name: "Token AuthScheme".to_string(),
156        }),
157    }
158    .inspect_err(|e| {
159        METRIC_AUTH_FAILURE
160            .with_label_values(&[e.status_code().as_ref()])
161            .inc();
162    })
163}