meta_client/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use client::RegionFollowerClientRef;
19use common_base::Plugins;
20use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
21use common_telemetry::{debug, info};
22use serde::{Deserialize, Serialize};
23
24use crate::client::MetaClientBuilder;
25
26pub mod client;
27pub mod error;
28
29// Options for meta client in datanode instance.
30#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
31#[serde(default)]
32pub struct MetaClientOptions {
33    pub metasrv_addrs: Vec<String>,
34    #[serde(with = "humantime_serde")]
35    pub timeout: Duration,
36    #[serde(with = "humantime_serde")]
37    pub heartbeat_timeout: Duration,
38    #[serde(with = "humantime_serde")]
39    pub ddl_timeout: Duration,
40    #[serde(with = "humantime_serde")]
41    pub connect_timeout: Duration,
42    pub tcp_nodelay: bool,
43    pub metadata_cache_max_capacity: u64,
44    #[serde(with = "humantime_serde")]
45    pub metadata_cache_ttl: Duration,
46    #[serde(with = "humantime_serde")]
47    pub metadata_cache_tti: Duration,
48}
49
50impl Default for MetaClientOptions {
51    fn default() -> Self {
52        Self {
53            metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
54            timeout: Duration::from_millis(3_000u64),
55            heartbeat_timeout: Duration::from_millis(500u64),
56            ddl_timeout: Duration::from_millis(10_000u64),
57            connect_timeout: Duration::from_millis(1_000u64),
58            tcp_nodelay: true,
59            metadata_cache_max_capacity: 100_000u64,
60            metadata_cache_ttl: Duration::from_secs(600u64),
61            metadata_cache_tti: Duration::from_secs(300u64),
62        }
63    }
64}
65
66#[derive(Debug)]
67pub enum MetaClientType {
68    Datanode { member_id: u64 },
69    Flownode { member_id: u64 },
70    Frontend,
71}
72
73pub type MetaClientRef = Arc<client::MetaClient>;
74
75pub async fn create_meta_client(
76    client_type: MetaClientType,
77    meta_client_options: &MetaClientOptions,
78    plugins: Option<&Plugins>,
79) -> error::Result<MetaClientRef> {
80    info!(
81        "Creating {:?} instance with Metasrv addrs {:?}",
82        client_type, meta_client_options.metasrv_addrs
83    );
84
85    let mut builder = match client_type {
86        MetaClientType::Datanode { member_id } => {
87            MetaClientBuilder::datanode_default_options(member_id)
88        }
89        MetaClientType::Flownode { member_id } => {
90            MetaClientBuilder::flownode_default_options(member_id)
91        }
92        MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(),
93    };
94
95    let base_config = ChannelConfig::new()
96        .timeout(meta_client_options.timeout)
97        .connect_timeout(meta_client_options.connect_timeout)
98        .tcp_nodelay(meta_client_options.tcp_nodelay);
99    let heartbeat_config = base_config.clone();
100
101    if let MetaClientType::Frontend = client_type {
102        let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
103        builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config));
104        if let Some(plugins) = plugins {
105            let region_follower = plugins.get::<RegionFollowerClientRef>();
106            if let Some(region_follower) = region_follower {
107                debug!("Region follower client found in plugins");
108                builder = builder.with_region_follower(region_follower);
109            }
110        }
111    }
112
113    builder = builder
114        .channel_manager(ChannelManager::with_config(base_config))
115        .heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config));
116
117    let mut meta_client = builder.build();
118
119    meta_client
120        .start(&meta_client_options.metasrv_addrs)
121        .await?;
122
123    meta_client.ask_leader().await?;
124
125    Ok(Arc::new(meta_client))
126}
127
128#[cfg(test)]
129mod mocks;