meta_srv/utils/
postgres.rs1use common_error::ext::BoxedError;
16use common_meta::kv_backend::rds::postgres::{
17 TlsMode as PgTlsMode, TlsOption as PgTlsOption, create_postgres_tls_connector,
18};
19use deadpool_postgres::{Config, Runtime};
20use servers::tls::TlsOption;
21use snafu::{OptionExt, ResultExt};
22use tokio_postgres::NoTls;
23
24use crate::error::{self, Result};
25
26fn convert_tls_option(tls_option: &TlsOption) -> PgTlsOption {
28 let mode = match tls_option.mode {
29 servers::tls::TlsMode::Disable => PgTlsMode::Disable,
30 servers::tls::TlsMode::Prefer => PgTlsMode::Prefer,
31 servers::tls::TlsMode::Require => PgTlsMode::Require,
32 servers::tls::TlsMode::VerifyCa => PgTlsMode::VerifyCa,
33 servers::tls::TlsMode::VerifyFull => PgTlsMode::VerifyFull,
34 };
35
36 PgTlsOption {
37 mode,
38 cert_path: tls_option.cert_path.clone(),
39 key_path: tls_option.key_path.clone(),
40 ca_cert_path: tls_option.ca_cert_path.clone(),
41 watch: tls_option.watch,
42 }
43}
44
45pub async fn create_postgres_pool(
49 store_addrs: &[String],
50 cfg: Option<Config>,
51 tls_config: Option<TlsOption>,
52) -> Result<deadpool_postgres::Pool> {
53 let mut cfg = cfg.unwrap_or_default();
54 let postgres_url = store_addrs.first().context(error::InvalidArgumentsSnafu {
55 err_msg: "empty store addrs",
56 })?;
57 cfg.url = Some(postgres_url.to_string());
58
59 let pool = if let Some(tls_config) = tls_config {
60 let pg_tls_config = convert_tls_option(&tls_config);
61 let tls_connector =
62 create_postgres_tls_connector(&pg_tls_config).map_err(|e| error::Error::Other {
63 source: BoxedError::new(e),
64 location: snafu::Location::new(file!(), line!(), 0),
65 })?;
66 cfg.create_pool(Some(Runtime::Tokio1), tls_connector)
67 .context(error::CreatePostgresPoolSnafu)?
68 } else {
69 cfg.create_pool(Some(Runtime::Tokio1), NoTls)
70 .context(error::CreatePostgresPoolSnafu)?
71 };
72
73 Ok(pool)
74}