meta_srv/utils/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::kv_backend::rds::postgres::{
17    TlsMode as PgTlsMode, TlsOption as PgTlsOption, create_postgres_tls_connector,
18};
19use deadpool_postgres::{Config, Runtime};
20use servers::tls::TlsOption;
21use snafu::{OptionExt, ResultExt};
22use tokio_postgres::NoTls;
23
24use crate::error::{self, Result};
25
26/// Converts [`TlsOption`] to [`PgTlsOption`] to avoid circular dependencies
27fn convert_tls_option(tls_option: &TlsOption) -> PgTlsOption {
28    let mode = match tls_option.mode {
29        servers::tls::TlsMode::Disable => PgTlsMode::Disable,
30        servers::tls::TlsMode::Prefer => PgTlsMode::Prefer,
31        servers::tls::TlsMode::Require => PgTlsMode::Require,
32        servers::tls::TlsMode::VerifyCa => PgTlsMode::VerifyCa,
33        servers::tls::TlsMode::VerifyFull => PgTlsMode::VerifyFull,
34    };
35
36    PgTlsOption {
37        mode,
38        cert_path: tls_option.cert_path.clone(),
39        key_path: tls_option.key_path.clone(),
40        ca_cert_path: tls_option.ca_cert_path.clone(),
41        watch: tls_option.watch,
42    }
43}
44
45/// Creates a pool for the Postgres backend with config and optional TLS.
46///
47/// It only use first store addr to create a pool, and use the given config to create a pool.
48pub async fn create_postgres_pool(
49    store_addrs: &[String],
50    cfg: Option<Config>,
51    tls_config: Option<TlsOption>,
52) -> Result<deadpool_postgres::Pool> {
53    let mut cfg = cfg.unwrap_or_default();
54    let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
55        err_msg: "empty store addrs",
56    })?;
57    cfg.url = Some(postgres_url.to_string());
58
59    let pool = if let Some(tls_config) = tls_config {
60        let pg_tls_config = convert_tls_option(&tls_config);
61        let tls_connector =
62            create_postgres_tls_connector(&pg_tls_config).map_err(|e| error::Error::Other {
63                source: BoxedError::new(e),
64                location: snafu::Location::new(file!(), line!(), 0),
65            })?;
66        cfg.create_pool(Some(Runtime::Tokio1), tls_connector)
67            .context(error::CreatePostgresPoolSnafu)?
68    } else {
69        cfg.create_pool(Some(Runtime::Tokio1), NoTls)
70            .context(error::CreatePostgresPoolSnafu)?
71    };
72
73    Ok(pool)
74}