1use std::sync::Arc;
16use std::time::Duration;
17
18use client::RegionFollowerClientRef;
19use common_base::Plugins;
20use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
21use common_meta::distributed_time_constants::{
22 HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS, HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS,
23 HEARTBEAT_TIMEOUT,
24};
25use common_telemetry::{debug, info};
26use serde::{Deserialize, Serialize};
27
28use crate::client::{LeaderProviderRef, MetaClientBuilder};
29
30pub mod client;
31pub mod error;
32
33#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
35#[serde(default)]
36pub struct MetaClientOptions {
37 pub metasrv_addrs: Vec<String>,
38 #[serde(with = "humantime_serde")]
39 pub timeout: Duration,
40 #[serde(with = "humantime_serde")]
41 pub ddl_timeout: Duration,
42 #[serde(with = "humantime_serde")]
43 pub connect_timeout: Duration,
44 pub tcp_nodelay: bool,
45 pub metadata_cache_max_capacity: u64,
46 #[serde(with = "humantime_serde")]
47 pub metadata_cache_ttl: Duration,
48 #[serde(with = "humantime_serde")]
49 pub metadata_cache_tti: Duration,
50}
51
52impl Default for MetaClientOptions {
53 fn default() -> Self {
54 Self {
55 metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
56 timeout: Duration::from_millis(3_000u64),
57 ddl_timeout: Duration::from_millis(10_000u64),
58 connect_timeout: Duration::from_millis(1_000u64),
59 tcp_nodelay: true,
60 metadata_cache_max_capacity: 100_000u64,
61 metadata_cache_ttl: Duration::from_secs(600u64),
62 metadata_cache_tti: Duration::from_secs(300u64),
63 }
64 }
65}
66
67#[derive(Debug)]
68pub enum MetaClientType {
69 Datanode { member_id: u64 },
70 Flownode { member_id: u64 },
71 Frontend,
72}
73
74pub type MetaClientRef = Arc<client::MetaClient>;
75
76pub async fn create_meta_client(
77 client_type: MetaClientType,
78 meta_client_options: &MetaClientOptions,
79 plugins: Option<&Plugins>,
80 leader_provider: Option<LeaderProviderRef>,
81) -> error::Result<MetaClientRef> {
82 info!(
83 "Creating {:?} instance with Metasrv addrs {:?}",
84 client_type, meta_client_options.metasrv_addrs
85 );
86
87 let mut builder = match client_type {
88 MetaClientType::Datanode { member_id } => {
89 MetaClientBuilder::datanode_default_options(member_id)
90 }
91 MetaClientType::Flownode { member_id } => {
92 MetaClientBuilder::flownode_default_options(member_id)
93 }
94 MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(),
95 };
96
97 let base_config = ChannelConfig::new()
98 .timeout(meta_client_options.timeout)
99 .connect_timeout(meta_client_options.connect_timeout)
100 .tcp_nodelay(meta_client_options.tcp_nodelay);
101 let heartbeat_config = base_config
102 .clone()
103 .timeout(HEARTBEAT_TIMEOUT)
104 .http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
105 .http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
106
107 if let MetaClientType::Frontend = client_type {
108 let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
109 builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config, None));
110 if let Some(plugins) = plugins {
111 let region_follower = plugins.get::<RegionFollowerClientRef>();
112 if let Some(region_follower) = region_follower {
113 debug!("Region follower client found in plugins");
114 builder = builder.with_region_follower(region_follower);
115 }
116 }
117 }
118
119 builder = builder
120 .channel_manager(ChannelManager::with_config(base_config, None))
121 .heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config, None));
122
123 let mut meta_client = builder.build();
124
125 if let Some(leader_provider) = leader_provider {
126 meta_client
127 .start_with(leader_provider, &meta_client_options.metasrv_addrs)
128 .await?;
129 } else {
130 meta_client
131 .start(&meta_client_options.metasrv_addrs)
132 .await?;
133 }
134
135 meta_client.ask_leader().await?;
136
137 Ok(Arc::new(meta_client))
138}
139
140#[cfg(test)]
141mod mocks;