servers/grpc/
context_auth.rs1use std::sync::Arc;
16
17use api::v1::auth_header::AuthScheme;
18use api::v1::{AuthHeader, RequestHeader};
19use auth::{Identity, Password, UserInfoRef, UserProviderRef};
20use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
21use common_catalog::parse_catalog_and_schema_from_db_string;
22use common_error::ext::ErrorExt;
23use session::context::{Channel, QueryContextBuilder, QueryContextRef};
24use snafu::{OptionExt, ResultExt};
25use tonic::Status;
26use tonic::metadata::MetadataMap;
27
28use crate::error::Error::UnsupportedAuthScheme;
29use crate::error::{AuthSnafu, InvalidParameterSnafu, NotFoundAuthHeaderSnafu, Result};
30use crate::grpc::TonicResult;
31use crate::http::AUTHORIZATION_HEADER;
32use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
33use crate::metrics::METRIC_AUTH_FAILURE;
34
35pub fn create_query_context_from_grpc_metadata(
37 headers: &MetadataMap,
38) -> TonicResult<QueryContextRef> {
39 let (catalog, schema) = if let Some(db) = extract_header(headers, &[GREPTIME_DB_HEADER_NAME])? {
40 parse_catalog_and_schema_from_db_string(db)
41 } else {
42 (
43 DEFAULT_CATALOG_NAME.to_string(),
44 DEFAULT_SCHEMA_NAME.to_string(),
45 )
46 };
47
48 Ok(Arc::new(
49 QueryContextBuilder::default()
50 .current_catalog(catalog)
51 .current_schema(schema)
52 .channel(Channel::Grpc)
53 .build(),
54 ))
55}
56
57pub fn extract_header<'a>(headers: &'a MetadataMap, keys: &[&str]) -> TonicResult<Option<&'a str>> {
60 let mut value = None;
61 for key in keys {
62 if let Some(v) = headers.get(*key) {
63 value = Some(v);
64 break;
65 }
66 }
67
68 let Some(v) = value else {
69 return Ok(None);
70 };
71 let Ok(v) = std::str::from_utf8(v.as_bytes()) else {
72 return Err(InvalidParameterSnafu {
73 reason: "expect valid UTF-8 value",
74 }
75 .build()
76 .into());
77 };
78 Ok(Some(v))
79}
80
81pub async fn check_auth(
83 user_provider: Option<UserProviderRef>,
84 headers: &MetadataMap,
85 query_ctx: QueryContextRef,
86) -> TonicResult<bool> {
87 if user_provider.is_none() {
88 return Ok(true);
89 }
90
91 let auth_schema = extract_header(
92 headers,
93 &[AUTHORIZATION_HEADER, http::header::AUTHORIZATION.as_str()],
94 )?
95 .map(|x| {
96 if x.len() > 5 && x[0..5].eq_ignore_ascii_case("Basic") {
97 x.try_into()
98 } else {
99 format!("Basic {}", x).as_str().try_into()
101 }
102 })
103 .transpose()?
104 .map(|x: crate::http::authorize::AuthScheme| x.into());
105
106 let auth_schema = auth_schema.context(NotFoundAuthHeaderSnafu)?;
107 let header = RequestHeader {
108 authorization: Some(AuthHeader {
109 auth_scheme: Some(auth_schema),
110 }),
111 catalog: query_ctx.current_catalog().to_string(),
112 schema: query_ctx.current_schema(),
113 ..Default::default()
114 };
115
116 match auth(user_provider, Some(&header), &query_ctx).await {
117 Ok(user_info) => {
118 query_ctx.set_current_user(user_info);
119 Ok(true)
120 }
121 Err(_) => Err(Status::unauthenticated("auth failed")),
122 }
123}
124
125pub async fn auth(
127 user_provider: Option<UserProviderRef>,
128 header: Option<&RequestHeader>,
129 query_ctx: &QueryContextRef,
130) -> Result<UserInfoRef> {
131 let Some(user_provider) = user_provider else {
132 return Ok(auth::userinfo_by_name(None));
133 };
134
135 let auth_scheme = header
136 .and_then(|header| {
137 header
138 .authorization
139 .as_ref()
140 .and_then(|x| x.auth_scheme.clone())
141 })
142 .context(NotFoundAuthHeaderSnafu)?;
143
144 match auth_scheme {
145 AuthScheme::Basic(api::v1::Basic { username, password }) => user_provider
146 .auth(
147 Identity::UserId(&username, None),
148 Password::PlainText(password.into()),
149 query_ctx.current_catalog(),
150 &query_ctx.current_schema(),
151 )
152 .await
153 .context(AuthSnafu),
154 AuthScheme::Token(_) => Err(UnsupportedAuthScheme {
155 name: "Token AuthScheme".to_string(),
156 }),
157 }
158 .inspect_err(|e| {
159 METRIC_AUTH_FAILURE
160 .with_label_values(&[e.status_code().as_ref()])
161 .inc();
162 })
163}