1use std::sync::Arc;
16
17use clap::Parser;
18use common_error::ext::BoxedError;
19use common_meta::kv_backend::chroot::ChrootKvBackend;
20use common_meta::kv_backend::etcd::EtcdStore;
21use common_meta::kv_backend::KvBackendRef;
22use meta_srv::bootstrap::create_etcd_client_with_tls;
23use meta_srv::metasrv::BackendImpl;
24use servers::tls::{TlsMode, TlsOption};
25
26use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
27
28#[derive(Debug, Default, Parser)]
29pub(crate) struct StoreConfig {
30 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
41 store_addrs: Vec<String>,
42
43 #[clap(long, default_value = "128")]
45 max_txn_ops: usize,
46
47 #[clap(long, value_enum, default_value = "etcd-store")]
49 backend: BackendImpl,
50
51 #[clap(long, default_value = "")]
53 store_key_prefix: String,
54
55 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
57 #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
58 meta_table_name: String,
59
60 #[cfg(feature = "pg_kvbackend")]
62 #[clap(long)]
63 meta_schema_name: Option<String>,
64 #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
66 backend_tls_mode: TlsMode,
67
68 #[clap(long = "backend-tls-cert-path", default_value = "")]
70 backend_tls_cert_path: String,
71
72 #[clap(long = "backend-tls-key-path", default_value = "")]
74 backend_tls_key_path: String,
75
76 #[clap(long = "backend-tls-ca-cert-path", default_value = "")]
78 backend_tls_ca_cert_path: String,
79
80 #[clap(long = "backend-tls-watch")]
82 backend_tls_watch: bool,
83}
84
85impl StoreConfig {
86 pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
88 let max_txn_ops = self.max_txn_ops;
89 let store_addrs = &self.store_addrs;
90 if store_addrs.is_empty() {
91 EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
92 } else {
93 let kvbackend = match self.backend {
94 BackendImpl::EtcdStore => {
95 let tls_config = if self.backend_tls_mode != TlsMode::Disable {
96 Some(TlsOption {
97 mode: self.backend_tls_mode.clone(),
98 cert_path: self.backend_tls_cert_path.clone(),
99 key_path: self.backend_tls_key_path.clone(),
100 ca_cert_path: self.backend_tls_ca_cert_path.clone(),
101 watch: self.backend_tls_watch,
102 })
103 } else {
104 None
105 };
106 let etcd_client = create_etcd_client_with_tls(store_addrs, tls_config.as_ref())
107 .await
108 .map_err(BoxedError::new)?;
109 Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
110 }
111 #[cfg(feature = "pg_kvbackend")]
112 BackendImpl::PostgresStore => {
113 let table_name = &self.meta_table_name;
114 let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, None)
115 .await
116 .map_err(BoxedError::new)?;
117 let schema_name = self.meta_schema_name.as_deref();
118 Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
119 pool,
120 schema_name,
121 table_name,
122 max_txn_ops,
123 )
124 .await
125 .map_err(BoxedError::new)?)
126 }
127 #[cfg(feature = "mysql_kvbackend")]
128 BackendImpl::MysqlStore => {
129 let table_name = &self.meta_table_name;
130 let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
131 .await
132 .map_err(BoxedError::new)?;
133 Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
134 pool,
135 table_name,
136 max_txn_ops,
137 )
138 .await
139 .map_err(BoxedError::new)?)
140 }
141 BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu
142 .fail()
143 .map_err(BoxedError::new),
144 };
145 if self.store_key_prefix.is_empty() {
146 kvbackend
147 } else {
148 let chroot_kvbackend =
149 ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
150 Ok(Arc::new(chroot_kvbackend))
151 }
152 }
153 }
154}