1use std::sync::Arc;
16use std::time::Duration;
17
18use client::RegionFollowerClientRef;
19use common_base::Plugins;
20use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
21use common_telemetry::{debug, info};
22use serde::{Deserialize, Serialize};
23
24use crate::client::{LeaderProviderRef, MetaClientBuilder};
25
26pub mod client;
27pub mod error;
28
29#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
31#[serde(default)]
32pub struct MetaClientOptions {
33 pub metasrv_addrs: Vec<String>,
34 #[serde(with = "humantime_serde")]
35 pub timeout: Duration,
36 #[serde(with = "humantime_serde")]
37 pub heartbeat_timeout: Duration,
38 #[serde(with = "humantime_serde")]
39 pub ddl_timeout: Duration,
40 #[serde(with = "humantime_serde")]
41 pub connect_timeout: Duration,
42 pub tcp_nodelay: bool,
43 pub metadata_cache_max_capacity: u64,
44 #[serde(with = "humantime_serde")]
45 pub metadata_cache_ttl: Duration,
46 #[serde(with = "humantime_serde")]
47 pub metadata_cache_tti: Duration,
48}
49
50impl Default for MetaClientOptions {
51 fn default() -> Self {
52 Self {
53 metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
54 timeout: Duration::from_millis(3_000u64),
55 heartbeat_timeout: Duration::from_millis(500u64),
56 ddl_timeout: Duration::from_millis(10_000u64),
57 connect_timeout: Duration::from_millis(1_000u64),
58 tcp_nodelay: true,
59 metadata_cache_max_capacity: 100_000u64,
60 metadata_cache_ttl: Duration::from_secs(600u64),
61 metadata_cache_tti: Duration::from_secs(300u64),
62 }
63 }
64}
65
66#[derive(Debug)]
67pub enum MetaClientType {
68 Datanode { member_id: u64 },
69 Flownode { member_id: u64 },
70 Frontend,
71}
72
73pub type MetaClientRef = Arc<client::MetaClient>;
74
75pub async fn create_meta_client(
76 client_type: MetaClientType,
77 meta_client_options: &MetaClientOptions,
78 plugins: Option<&Plugins>,
79 leader_provider: Option<LeaderProviderRef>,
80) -> error::Result<MetaClientRef> {
81 info!(
82 "Creating {:?} instance with Metasrv addrs {:?}",
83 client_type, meta_client_options.metasrv_addrs
84 );
85
86 let mut builder = match client_type {
87 MetaClientType::Datanode { member_id } => {
88 MetaClientBuilder::datanode_default_options(member_id)
89 }
90 MetaClientType::Flownode { member_id } => {
91 MetaClientBuilder::flownode_default_options(member_id)
92 }
93 MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(),
94 };
95
96 let base_config = ChannelConfig::new()
97 .timeout(meta_client_options.timeout)
98 .connect_timeout(meta_client_options.connect_timeout)
99 .tcp_nodelay(meta_client_options.tcp_nodelay);
100 let heartbeat_config = base_config.clone();
101
102 if let MetaClientType::Frontend = client_type {
103 let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
104 builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config));
105 if let Some(plugins) = plugins {
106 let region_follower = plugins.get::<RegionFollowerClientRef>();
107 if let Some(region_follower) = region_follower {
108 debug!("Region follower client found in plugins");
109 builder = builder.with_region_follower(region_follower);
110 }
111 }
112 }
113
114 builder = builder
115 .channel_manager(ChannelManager::with_config(base_config))
116 .heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config));
117
118 let mut meta_client = builder.build();
119
120 if let Some(leader_provider) = leader_provider {
121 meta_client
122 .start_with(leader_provider, &meta_client_options.metasrv_addrs)
123 .await?;
124 } else {
125 meta_client
126 .start(&meta_client_options.metasrv_addrs)
127 .await?;
128 }
129
130 meta_client.ask_leader().await?;
131
132 Ok(Arc::new(meta_client))
133}
134
135#[cfg(test)]
136mod mocks;