1use std::sync::Arc;
16
17use clap::{Parser, ValueEnum};
18use common_error::ext::BoxedError;
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::kv_backend::chroot::ChrootKvBackend;
21use common_meta::kv_backend::etcd::EtcdStore;
22use meta_srv::metasrv::BackendClientOptions;
23use meta_srv::utils::etcd::create_etcd_client_with_tls;
24use serde::{Deserialize, Serialize};
25use servers::tls::{TlsMode, TlsOption};
26use snafu::OptionExt;
27
28use crate::error::{EmptyStoreAddrsSnafu, InvalidArgumentsSnafu};
29
30#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
32#[serde(rename_all = "snake_case")]
33#[allow(clippy::enum_variant_names)]
34pub enum BackendImpl {
35 #[default]
37 EtcdStore,
38 MemoryStore,
40 #[cfg(feature = "pg_kvbackend")]
41 PostgresStore,
43 #[cfg(feature = "mysql_kvbackend")]
44 MysqlStore,
46 RaftEngineStore,
48}
49
50#[derive(Debug, Default, Parser)]
51pub struct StoreConfig {
52 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
63 pub store_addrs: Vec<String>,
64
65 #[clap(long, default_value = "128")]
67 pub max_txn_ops: usize,
68
69 #[clap(long, value_enum, default_value = "etcd-store")]
71 pub backend: BackendImpl,
72
73 #[clap(long, default_value = "")]
75 pub store_key_prefix: String,
76
77 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
79 #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
80 pub meta_table_name: String,
81
82 #[cfg(feature = "pg_kvbackend")]
84 #[clap(long)]
85 pub meta_schema_name: Option<String>,
86
87 #[cfg(feature = "pg_kvbackend")]
89 #[clap(long, default_value_t = true)]
90 pub auto_create_schema: bool,
91
92 #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
94 pub backend_tls_mode: TlsMode,
95
96 #[clap(long = "backend-tls-cert-path", default_value = "")]
98 pub backend_tls_cert_path: String,
99
100 #[clap(long = "backend-tls-key-path", default_value = "")]
102 pub backend_tls_key_path: String,
103
104 #[clap(long = "backend-tls-ca-cert-path", default_value = "")]
106 pub backend_tls_ca_cert_path: String,
107
108 #[clap(long = "backend-tls-watch")]
110 pub backend_tls_watch: bool,
111}
112
113impl StoreConfig {
114 pub fn tls_config(&self) -> Option<TlsOption> {
115 if self.backend_tls_mode != TlsMode::Disable {
116 Some(TlsOption {
117 mode: self.backend_tls_mode,
118 cert_path: self.backend_tls_cert_path.clone(),
119 key_path: self.backend_tls_key_path.clone(),
120 ca_cert_path: self.backend_tls_ca_cert_path.clone(),
121 watch: self.backend_tls_watch,
122 })
123 } else {
124 None
125 }
126 }
127
128 pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
130 let max_txn_ops = self.max_txn_ops;
131 let store_addrs = &self.store_addrs;
132 if store_addrs.is_empty() {
133 EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
134 } else {
135 common_telemetry::info!(
136 "Building kvbackend with store addrs: {:?}, backend: {:?}",
137 store_addrs,
138 self.backend
139 );
140 let kvbackend = match self.backend {
141 BackendImpl::EtcdStore => {
142 let tls_config = self.tls_config();
143 let etcd_client = create_etcd_client_with_tls(
144 store_addrs,
145 &BackendClientOptions::default(),
146 tls_config.as_ref(),
147 )
148 .await
149 .map_err(BoxedError::new)?;
150 Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
151 }
152 #[cfg(feature = "pg_kvbackend")]
153 BackendImpl::PostgresStore => {
154 let table_name = &self.meta_table_name;
155 let tls_config = self.tls_config();
156 let pool = meta_srv::utils::postgres::create_postgres_pool(
157 store_addrs,
158 None,
159 tls_config,
160 )
161 .await
162 .map_err(BoxedError::new)?;
163 let schema_name = self.meta_schema_name.as_deref();
164 Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
165 pool,
166 schema_name,
167 table_name,
168 max_txn_ops,
169 self.auto_create_schema,
170 )
171 .await
172 .map_err(BoxedError::new)?)
173 }
174 #[cfg(feature = "mysql_kvbackend")]
175 BackendImpl::MysqlStore => {
176 let table_name = &self.meta_table_name;
177 let tls_config = self.tls_config();
178 let pool =
179 meta_srv::utils::mysql::create_mysql_pool(store_addrs, tls_config.as_ref())
180 .await
181 .map_err(BoxedError::new)?;
182 Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
183 pool,
184 table_name,
185 max_txn_ops,
186 )
187 .await
188 .map_err(BoxedError::new)?)
189 }
190 #[cfg(not(test))]
191 BackendImpl::MemoryStore => {
192 use crate::error::UnsupportedMemoryBackendSnafu;
193
194 UnsupportedMemoryBackendSnafu
195 .fail()
196 .map_err(BoxedError::new)
197 }
198 #[cfg(test)]
199 BackendImpl::MemoryStore => {
200 use common_meta::kv_backend::memory::MemoryKvBackend;
201
202 Ok(Arc::new(MemoryKvBackend::default()) as _)
203 }
204 BackendImpl::RaftEngineStore => {
205 let url = store_addrs
206 .first()
207 .context(InvalidArgumentsSnafu {
208 msg: "empty store addresses".to_string(),
209 })
210 .map_err(BoxedError::new)?;
211 let kvbackend =
212 standalone::build_metadata_kv_from_url(url).map_err(BoxedError::new)?;
213
214 Ok(kvbackend)
215 }
216 };
217 if self.store_key_prefix.is_empty() {
218 kvbackend
219 } else {
220 let chroot_kvbackend =
221 ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
222 Ok(Arc::new(chroot_kvbackend))
223 }
224 }
225 }
226}