use std::sync::Arc;
use std::time::Duration;
use client::RegionFollowerClientRef;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use crate::client::MetaClientBuilder;
pub mod client;
pub mod error;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct MetaClientOptions {
pub metasrv_addrs: Vec<String>,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(with = "humantime_serde")]
pub heartbeat_timeout: Duration,
#[serde(with = "humantime_serde")]
pub ddl_timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
pub metadata_cache_max_capacity: u64,
#[serde(with = "humantime_serde")]
pub metadata_cache_ttl: Duration,
#[serde(with = "humantime_serde")]
pub metadata_cache_tti: Duration,
}
impl Default for MetaClientOptions {
fn default() -> Self {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_millis(3_000u64),
heartbeat_timeout: Duration::from_millis(500u64),
ddl_timeout: Duration::from_millis(10_000u64),
connect_timeout: Duration::from_millis(1_000u64),
tcp_nodelay: true,
metadata_cache_max_capacity: 100_000u64,
metadata_cache_ttl: Duration::from_secs(600u64),
metadata_cache_tti: Duration::from_secs(300u64),
}
}
}
#[derive(Debug)]
pub enum MetaClientType {
Datanode { member_id: u64 },
Flownode { member_id: u64 },
Frontend,
}
pub type MetaClientRef = Arc<client::MetaClient>;
pub async fn create_meta_client(
client_type: MetaClientType,
meta_client_options: &MetaClientOptions,
plugins: Option<&Plugins>,
) -> error::Result<MetaClientRef> {
info!(
"Creating {:?} instance with Metasrv addrs {:?}",
client_type, meta_client_options.metasrv_addrs
);
let mut builder = match client_type {
MetaClientType::Datanode { member_id } => {
MetaClientBuilder::datanode_default_options(member_id)
}
MetaClientType::Flownode { member_id } => {
MetaClientBuilder::flownode_default_options(member_id)
}
MetaClientType::Frontend => MetaClientBuilder::frontend_default_options(),
};
let base_config = ChannelConfig::new()
.timeout(meta_client_options.timeout)
.connect_timeout(meta_client_options.connect_timeout)
.tcp_nodelay(meta_client_options.tcp_nodelay);
let heartbeat_config = base_config.clone();
if let MetaClientType::Frontend = client_type {
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config));
if let Some(plugins) = plugins {
let region_follower = plugins.get::<RegionFollowerClientRef>();
if let Some(region_follower) = region_follower {
debug!("Region follower client found in plugins");
builder = builder.with_region_follower(region_follower);
}
}
}
builder = builder
.channel_manager(ChannelManager::with_config(base_config))
.heartbeat_channel_manager(ChannelManager::with_config(heartbeat_config));
let mut meta_client = builder.build();
meta_client
.start(&meta_client_options.metasrv_addrs)
.await?;
meta_client.ask_leader().await?;
Ok(Arc::new(meta_client))
}
#[cfg(test)]
mod mocks;