cli/common/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use clap::{Parser, ValueEnum};
18use common_error::ext::BoxedError;
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::kv_backend::chroot::ChrootKvBackend;
21use common_meta::kv_backend::etcd::EtcdStore;
22use meta_srv::metasrv::BackendClientOptions;
23use meta_srv::utils::etcd::create_etcd_client_with_tls;
24use serde::{Deserialize, Serialize};
25use servers::tls::{TlsMode, TlsOption};
26use snafu::OptionExt;
27
28use crate::error::{EmptyStoreAddrsSnafu, InvalidArgumentsSnafu};
29
30// The datastores that implements metadata kvbackend.
31#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
32#[serde(rename_all = "snake_case")]
33#[allow(clippy::enum_variant_names)]
34pub enum BackendImpl {
35    // Etcd as metadata storage.
36    #[default]
37    EtcdStore,
38    // In memory metadata storage - mostly used for testing.
39    MemoryStore,
40    #[cfg(feature = "pg_kvbackend")]
41    // Postgres as metadata storage.
42    PostgresStore,
43    #[cfg(feature = "mysql_kvbackend")]
44    // MySql as metadata storage.
45    MysqlStore,
46    // RaftEngine as metadata storage.
47    RaftEngineStore,
48}
49
50#[derive(Debug, Default, Parser)]
51pub struct StoreConfig {
52    /// The endpoint of store. one of etcd, postgres or mysql.
53    ///
54    /// For postgres store, the format is:
55    /// "password=password dbname=postgres user=postgres host=localhost port=5432"
56    ///
57    /// For etcd store, the format is:
58    /// "127.0.0.1:2379"
59    ///
60    /// For mysql store, the format is:
61    /// "mysql://user:password@ip:port/dbname"
62    #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
63    pub store_addrs: Vec<String>,
64
65    /// The maximum number of operations in a transaction. Only used when using [etcd-store].
66    #[clap(long, default_value = "128")]
67    pub max_txn_ops: usize,
68
69    /// The metadata store backend.
70    #[clap(long, value_enum, default_value = "etcd-store")]
71    pub backend: BackendImpl,
72
73    /// The key prefix of the metadata store.
74    #[clap(long, default_value = "")]
75    pub store_key_prefix: String,
76
77    /// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
78    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
79    #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
80    pub meta_table_name: String,
81
82    /// Optional PostgreSQL schema for metadata table (defaults to current search_path if unset).
83    #[cfg(feature = "pg_kvbackend")]
84    #[clap(long)]
85    pub meta_schema_name: Option<String>,
86
87    /// Automatically create PostgreSQL schema if it doesn't exist (default: true).
88    #[cfg(feature = "pg_kvbackend")]
89    #[clap(long, default_value_t = true)]
90    pub auto_create_schema: bool,
91
92    /// TLS mode for backend store connections (etcd, PostgreSQL, MySQL)
93    #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
94    pub backend_tls_mode: TlsMode,
95
96    /// Path to TLS certificate file for backend store connections
97    #[clap(long = "backend-tls-cert-path", default_value = "")]
98    pub backend_tls_cert_path: String,
99
100    /// Path to TLS private key file for backend store connections
101    #[clap(long = "backend-tls-key-path", default_value = "")]
102    pub backend_tls_key_path: String,
103
104    /// Path to TLS CA certificate file for backend store connections
105    #[clap(long = "backend-tls-ca-cert-path", default_value = "")]
106    pub backend_tls_ca_cert_path: String,
107
108    /// Enable watching TLS certificate files for changes
109    #[clap(long = "backend-tls-watch")]
110    pub backend_tls_watch: bool,
111}
112
113impl StoreConfig {
114    pub fn tls_config(&self) -> Option<TlsOption> {
115        if self.backend_tls_mode != TlsMode::Disable {
116            Some(TlsOption {
117                mode: self.backend_tls_mode,
118                cert_path: self.backend_tls_cert_path.clone(),
119                key_path: self.backend_tls_key_path.clone(),
120                ca_cert_path: self.backend_tls_ca_cert_path.clone(),
121                watch: self.backend_tls_watch,
122            })
123        } else {
124            None
125        }
126    }
127
128    /// Builds a [`KvBackendRef`] from the store configuration.
129    pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
130        let max_txn_ops = self.max_txn_ops;
131        let store_addrs = &self.store_addrs;
132        if store_addrs.is_empty() {
133            EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
134        } else {
135            common_telemetry::info!(
136                "Building kvbackend with store addrs: {:?}, backend: {:?}",
137                store_addrs,
138                self.backend
139            );
140            let kvbackend = match self.backend {
141                BackendImpl::EtcdStore => {
142                    let tls_config = self.tls_config();
143                    let etcd_client = create_etcd_client_with_tls(
144                        store_addrs,
145                        &BackendClientOptions::default(),
146                        tls_config.as_ref(),
147                    )
148                    .await
149                    .map_err(BoxedError::new)?;
150                    Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
151                }
152                #[cfg(feature = "pg_kvbackend")]
153                BackendImpl::PostgresStore => {
154                    let table_name = &self.meta_table_name;
155                    let tls_config = self.tls_config();
156                    let pool = meta_srv::utils::postgres::create_postgres_pool(
157                        store_addrs,
158                        None,
159                        tls_config,
160                    )
161                    .await
162                    .map_err(BoxedError::new)?;
163                    let schema_name = self.meta_schema_name.as_deref();
164                    Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
165                        pool,
166                        schema_name,
167                        table_name,
168                        max_txn_ops,
169                        self.auto_create_schema,
170                    )
171                    .await
172                    .map_err(BoxedError::new)?)
173                }
174                #[cfg(feature = "mysql_kvbackend")]
175                BackendImpl::MysqlStore => {
176                    let table_name = &self.meta_table_name;
177                    let tls_config = self.tls_config();
178                    let pool =
179                        meta_srv::utils::mysql::create_mysql_pool(store_addrs, tls_config.as_ref())
180                            .await
181                            .map_err(BoxedError::new)?;
182                    Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
183                        pool,
184                        table_name,
185                        max_txn_ops,
186                    )
187                    .await
188                    .map_err(BoxedError::new)?)
189                }
190                #[cfg(not(test))]
191                BackendImpl::MemoryStore => {
192                    use crate::error::UnsupportedMemoryBackendSnafu;
193
194                    UnsupportedMemoryBackendSnafu
195                        .fail()
196                        .map_err(BoxedError::new)
197                }
198                #[cfg(test)]
199                BackendImpl::MemoryStore => {
200                    use common_meta::kv_backend::memory::MemoryKvBackend;
201
202                    Ok(Arc::new(MemoryKvBackend::default()) as _)
203                }
204                BackendImpl::RaftEngineStore => {
205                    let url = store_addrs
206                        .first()
207                        .context(InvalidArgumentsSnafu {
208                            msg: "empty store addresses".to_string(),
209                        })
210                        .map_err(BoxedError::new)?;
211                    let kvbackend =
212                        standalone::build_metadata_kv_from_url(url).map_err(BoxedError::new)?;
213
214                    Ok(kvbackend)
215                }
216            };
217            if self.store_key_prefix.is_empty() {
218                kvbackend
219            } else {
220                let chroot_kvbackend =
221                    ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
222                Ok(Arc::new(chroot_kvbackend))
223            }
224        }
225    }
226}