cli/metadata/
common.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use clap::Parser;
18use common_error::ext::BoxedError;
19use common_meta::kv_backend::chroot::ChrootKvBackend;
20use common_meta::kv_backend::etcd::EtcdStore;
21use common_meta::kv_backend::KvBackendRef;
22use meta_srv::bootstrap::create_etcd_client;
23use meta_srv::metasrv::BackendImpl;
24
25use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
26
27#[derive(Debug, Default, Parser)]
28pub(crate) struct StoreConfig {
29    /// The endpoint of store. one of etcd, postgres or mysql.
30    ///
31    /// For postgres store, the format is:
32    /// "password=password dbname=postgres user=postgres host=localhost port=5432"
33    ///
34    /// For etcd store, the format is:
35    /// "127.0.0.1:2379"
36    ///
37    /// For mysql store, the format is:
38    /// "mysql://user:password@ip:port/dbname"
39    #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
40    store_addrs: Vec<String>,
41
42    /// The maximum number of operations in a transaction. Only used when using [etcd-store].
43    #[clap(long, default_value = "128")]
44    max_txn_ops: usize,
45
46    /// The metadata store backend.
47    #[clap(long, value_enum, default_value = "etcd-store")]
48    backend: BackendImpl,
49
50    /// The key prefix of the metadata store.
51    #[clap(long, default_value = "")]
52    store_key_prefix: String,
53
54    /// The table name in RDS to store metadata. Only used when using [postgres-store] or [mysql-store].
55    #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
56    #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
57    meta_table_name: String,
58}
59
60impl StoreConfig {
61    /// Builds a [`KvBackendRef`] from the store configuration.
62    pub async fn build(&self) -> Result<KvBackendRef, BoxedError> {
63        let max_txn_ops = self.max_txn_ops;
64        let store_addrs = &self.store_addrs;
65        if store_addrs.is_empty() {
66            EmptyStoreAddrsSnafu.fail().map_err(BoxedError::new)
67        } else {
68            let kvbackend = match self.backend {
69                BackendImpl::EtcdStore => {
70                    let etcd_client = create_etcd_client(store_addrs)
71                        .await
72                        .map_err(BoxedError::new)?;
73                    Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
74                }
75                #[cfg(feature = "pg_kvbackend")]
76                BackendImpl::PostgresStore => {
77                    let table_name = &self.meta_table_name;
78                    let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs)
79                        .await
80                        .map_err(BoxedError::new)?;
81                    Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
82                        pool,
83                        table_name,
84                        max_txn_ops,
85                    )
86                    .await
87                    .map_err(BoxedError::new)?)
88                }
89                #[cfg(feature = "mysql_kvbackend")]
90                BackendImpl::MysqlStore => {
91                    let table_name = &self.meta_table_name;
92                    let pool = meta_srv::bootstrap::create_mysql_pool(store_addrs)
93                        .await
94                        .map_err(BoxedError::new)?;
95                    Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
96                        pool,
97                        table_name,
98                        max_txn_ops,
99                    )
100                    .await
101                    .map_err(BoxedError::new)?)
102                }
103                BackendImpl::MemoryStore => UnsupportedMemoryBackendSnafu
104                    .fail()
105                    .map_err(BoxedError::new),
106            };
107            if self.store_key_prefix.is_empty() {
108                kvbackend
109            } else {
110                let chroot_kvbackend =
111                    ChrootKvBackend::new(self.store_key_prefix.as_bytes().to_vec(), kvbackend?);
112                Ok(Arc::new(chroot_kvbackend))
113            }
114        }
115    }
116}