meta_client/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use client::RegionFollowerClientRef;
19use common_base::Plugins;
20use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
21use common_meta::distributed_time_constants::{
22    HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS, HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS,
23    HEARTBEAT_TIMEOUT,
24};
25use common_telemetry::{debug, info};
26use serde::{Deserialize, Serialize};
27
28use crate::client::{LeaderProviderRef, MetaClientBuilder};
29
30pub mod client;
31pub mod error;
32
33// Options for meta client in datanode instance.
34#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
35#[serde(default)]
36pub struct MetaClientOptions {
37    pub metasrv_addrs: Vec<String>,
38    #[serde(with = "humantime_serde")]
39    pub timeout: Duration,
40    #[serde(with = "humantime_serde")]
41    pub ddl_timeout: Duration,
42    #[serde(with = "humantime_serde")]
43    pub connect_timeout: Duration,
44    pub tcp_nodelay: bool,
45    pub metadata_cache_max_capacity: u64,
46    #[serde(with = "humantime_serde")]
47    pub metadata_cache_ttl: Duration,
48    #[serde(with = "humantime_serde")]
49    pub metadata_cache_tti: Duration,
50}
51
52impl Default for MetaClientOptions {
53    fn default() -> Self {
54        Self {
55            metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
56            timeout: Duration::from_millis(3_000u64),
57            ddl_timeout: Duration::from_millis(10_000u64),
58            connect_timeout: Duration::from_millis(1_000u64),
59            tcp_nodelay: true,
60            metadata_cache_max_capacity: 100_000u64,
61            metadata_cache_ttl: Duration::from_secs(600u64),
62            metadata_cache_tti: Duration::from_secs(300u64),
63        }
64    }
65}
66
67#[derive(Debug)]
68pub enum MetaClientType {
69    Datanode { member_id: u64 },
70    Flownode { member_id: u64 },
71    Frontend,
72}
73
74pub type MetaClientRef = Arc<client::MetaClient>;
75
76pub async fn create_meta_client(
77    client_type: MetaClientType,
78    meta_client_options: &MetaClientOptions,
79    plugins: Option<&Plugins>,
80    leader_provider: Option<LeaderProviderRef>,
81) -> error::Result<MetaClientRef> {
82    info!(
83        "Creating {:?} instance with Metasrv addrs {:?}",
84        client_type, meta_client_options.metasrv_addrs
85    );
86
87    let mut builder = match client_type {
88        MetaClientType::Datanode { member_id } => {
89            MetaClientBuilder::datanode_default_options(member_id)
90        }
91        MetaClientType::Flownode { member_id } => {
92            MetaClientBuilder::flownode_default_options(member_id)
93        }
94        MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(),
95    };
96
97    let base_config = ChannelConfig::new()
98        .timeout(Some(meta_client_options.timeout))
99        .connect_timeout(meta_client_options.connect_timeout)
100        .tcp_nodelay(meta_client_options.tcp_nodelay);
101    let heartbeat_config = base_config
102        .clone()
103        .timeout(Some(HEARTBEAT_TIMEOUT))
104        .http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
105        .http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
106
107    if let MetaClientType::Frontend = client_type {
108        // Unset the timeout in the DDL channel manager,
109        // delegating timeout control to each individual request rather than the channel manager itself.
110        let ddl_config = base_config.clone().timeout(None);
111        builder = builder
112            .ddl_timeout(meta_client_options.ddl_timeout)
113            .ddl_channel_manager(ChannelManager::with_config(ddl_config, None));
114        if let Some(plugins) = plugins {
115            let region_follower = plugins.get::<RegionFollowerClientRef>();
116            if let Some(region_follower) = region_follower {
117                debug!("Region follower client found in plugins");
118                builder = builder.with_region_follower(region_follower);
119            }
120        }
121    }
122
123    builder = builder
124        .channel_manager(ChannelManager::with_config(base_config, None))
125        .heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config, None));
126
127    let mut meta_client = builder.build();
128
129    if let Some(leader_provider) = leader_provider {
130        meta_client
131            .start_with(leader_provider, &meta_client_options.metasrv_addrs)
132            .await?;
133    } else {
134        meta_client
135            .start(&meta_client_options.metasrv_addrs)
136            .await?;
137    }
138
139    meta_client.ask_leader().await?;
140
141    Ok(Arc::new(meta_client))
142}
143
144#[cfg(test)]
145mod mocks;