1use std::sync::Arc;
16
17use clap::Parser;
18use common_error::ext::BoxedError;
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::kv_backend::chroot::ChrootKvBackend;
21use common_meta::kv_backend::etcd::EtcdStore;
22use meta_srv::metasrv::BackendImpl;
23use meta_srv::utils::etcd::create_etcd_client_with_tls;
24use servers::tls::{TlsMode, TlsOption};
25
26use crate::error::EmptyStoreAddrsSnafu;
27
28#[derive(Debug, Default, Parser)]
29pub struct StoreConfig {
30 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
41 pub store_addrs: Vec<String>,
42
43 #[clap(long, default_value = "128")]
45 pub max_txn_ops: usize,
46
47 #[clap(long, value_enum, default_value = "etcd-store")]
49 pub backend: BackendImpl,
50
51 #[clap(long, default_value = "")]
53 pub store_key_prefix: String,
54
55 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
57 #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
58 pub meta_table_name: String,
59
60 #[cfg(feature = "pg_kvbackend")]
62 #[clap(long)]
63 pub meta_schema_name: Option<String>,
64 #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
66 pub backend_tls_mode: TlsMode,
67
68 #[clap(long = "backend-tls-cert-path", default_value = "")]
70 pub backend_tls_cert_path: String,
71
72 #[clap(long = "backend-tls-key-path", default_value = "")]
74 pub backend_tls_key_path: String,
75
76 #[clap(long = "backend-tls-ca-cert-path", default_value = "")]
78 pub backend_tls_ca_cert_path: String,
79
80 #[clap(long = "backend-tls-watch")]
82 pub backend_tls_watch: bool,
83}
84
85impl StoreConfig {
86 pub fn tls_config(&self) -> Option<TlsOption> {
87 if self.backend_tls_mode != TlsMode::Disable {
88 Some(TlsOption {
89 mode: self.backend_tls_mode.clone(),
90 cert_path: self.backend_tls_cert_path.clone(),
91 key_path: self.backend_tls_key_path.clone(),
92 ca_cert_path: self.backend_tls_ca_cert_path.clone(),
93 watch: self.backend_tls_watch,
94 })
95 } else {
96 None
97 }
98 }
99
100 pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
102 let max_txn_ops = self.max_txn_ops;
103 let store_addrs = &self.store_addrs;
104 if store_addrs.is_empty() {
105 EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
106 } else {
107 common_telemetry::info!(
108 "Building kvbackend with store addrs: {:?}, backend: {:?}",
109 store_addrs,
110 self.backend
111 );
112 let kvbackend = match self.backend {
113 BackendImpl::EtcdStore => {
114 let tls_config = self.tls_config();
115 let etcd_client = create_etcd_client_with_tls(store_addrs, tls_config.as_ref())
116 .await
117 .map_err(BoxedError::new)?;
118 Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
119 }
120 #[cfg(feature = "pg_kvbackend")]
121 BackendImpl::PostgresStore => {
122 let table_name = &self.meta_table_name;
123 let tls_config = self.tls_config();
124 let pool = meta_srv::utils::postgres::create_postgres_pool(
125 store_addrs,
126 None,
127 tls_config,
128 )
129 .await
130 .map_err(BoxedError::new)?;
131 let schema_name = self.meta_schema_name.as_deref();
132 Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
133 pool,
134 schema_name,
135 table_name,
136 max_txn_ops,
137 )
138 .await
139 .map_err(BoxedError::new)?)
140 }
141 #[cfg(feature = "mysql_kvbackend")]
142 BackendImpl::MysqlStore => {
143 let table_name = &self.meta_table_name;
144 let tls_config = self.tls_config();
145 let pool =
146 meta_srv::utils::mysql::create_mysql_pool(store_addrs, tls_config.as_ref())
147 .await
148 .map_err(BoxedError::new)?;
149 Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
150 pool,
151 table_name,
152 max_txn_ops,
153 )
154 .await
155 .map_err(BoxedError::new)?)
156 }
157 #[cfg(not(test))]
158 BackendImpl::MemoryStore => {
159 use crate::error::UnsupportedMemoryBackendSnafu;
160
161 UnsupportedMemoryBackendSnafu
162 .fail()
163 .map_err(BoxedError::new)
164 }
165 #[cfg(test)]
166 BackendImpl::MemoryStore => {
167 use common_meta::kv_backend::memory::MemoryKvBackend;
168
169 Ok(Arc::new(MemoryKvBackend::default()) as _)
170 }
171 };
172 if self.store_key_prefix.is_empty() {
173 kvbackend
174 } else {
175 let chroot_kvbackend =
176 ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
177 Ok(Arc::new(chroot_kvbackend))
178 }
179 }
180 }
181}