1use std::sync::Arc;
16use std::time::Duration;
17
18use client::RegionFollowerClientRef;
19use common_base::Plugins;
20use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
21use common_telemetry::{debug, info};
22use serde::{Deserialize, Serialize};
23
24use crate::client::MetaClientBuilder;
25
26pub mod client;
27pub mod error;
28
29#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
31#[serde(default)]
32pub struct MetaClientOptions {
33 pub metasrv_addrs: Vec<String>,
34 #[serde(with = "humantime_serde")]
35 pub timeout: Duration,
36 #[serde(with = "humantime_serde")]
37 pub heartbeat_timeout: Duration,
38 #[serde(with = "humantime_serde")]
39 pub ddl_timeout: Duration,
40 #[serde(with = "humantime_serde")]
41 pub connect_timeout: Duration,
42 pub tcp_nodelay: bool,
43 pub metadata_cache_max_capacity: u64,
44 #[serde(with = "humantime_serde")]
45 pub metadata_cache_ttl: Duration,
46 #[serde(with = "humantime_serde")]
47 pub metadata_cache_tti: Duration,
48}
49
50impl Default for MetaClientOptions {
51 fn default() -> Self {
52 Self {
53 metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
54 timeout: Duration::from_millis(3_000u64),
55 heartbeat_timeout: Duration::from_millis(500u64),
56 ddl_timeout: Duration::from_millis(10_000u64),
57 connect_timeout: Duration::from_millis(1_000u64),
58 tcp_nodelay: true,
59 metadata_cache_max_capacity: 100_000u64,
60 metadata_cache_ttl: Duration::from_secs(600u64),
61 metadata_cache_tti: Duration::from_secs(300u64),
62 }
63 }
64}
65
66#[derive(Debug)]
67pub enum MetaClientType {
68 Datanode { member_id: u64 },
69 Flownode { member_id: u64 },
70 Frontend,
71}
72
73pub type MetaClientRef = Arc<client::MetaClient>;
74
75pub async fn create_meta_client(
76 client_type: MetaClientType,
77 meta_client_options: &MetaClientOptions,
78 plugins: Option<&Plugins>,
79) -> error::Result<MetaClientRef> {
80 info!(
81 "Creating {:?} instance with Metasrv addrs {:?}",
82 client_type, meta_client_options.metasrv_addrs
83 );
84
85 let mut builder = match client_type {
86 MetaClientType::Datanode { member_id } => {
87 MetaClientBuilder::datanode_default_options(member_id)
88 }
89 MetaClientType::Flownode { member_id } => {
90 MetaClientBuilder::flownode_default_options(member_id)
91 }
92 MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(),
93 };
94
95 let base_config = ChannelConfig::new()
96 .timeout(meta_client_options.timeout)
97 .connect_timeout(meta_client_options.connect_timeout)
98 .tcp_nodelay(meta_client_options.tcp_nodelay);
99 let heartbeat_config = base_config.clone();
100
101 if let MetaClientType::Frontend = client_type {
102 let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
103 builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config));
104 if let Some(plugins) = plugins {
105 let region_follower = plugins.get::<RegionFollowerClientRef>();
106 if let Some(region_follower) = region_follower {
107 debug!("Region follower client found in plugins");
108 builder = builder.with_region_follower(region_follower);
109 }
110 }
111 }
112
113 builder = builder
114 .channel_manager(ChannelManager::with_config(base_config))
115 .heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config));
116
117 let mut meta_client = builder.build();
118
119 meta_client
120 .start(&meta_client_options.metasrv_addrs)
121 .await?;
122
123 meta_client.ask_leader().await?;
124
125 Ok(Arc::new(meta_client))
126}
127
128#[cfg(test)]
129mod mocks;