1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::config_server::ConfigServer;
20use api::v1::meta::heartbeat_server::HeartbeatServer;
21use api::v1::meta::procedure_service_server::ProcedureServiceServer;
22use api::v1::meta::store_server::StoreServer;
23use client::client_manager::NodeClients;
24use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
25use common_meta::key::TableMetadataManager;
26use common_meta::kv_backend::etcd::EtcdStore;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
29use hyper_util::rt::TokioIo;
30use servers::grpc::GrpcOptions;
31use tonic::codec::CompressionEncoding;
32use tower::service_fn;
33
34use crate::add_compressed_service;
35use crate::metasrv::builder::MetasrvBuilder;
36use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
37
38#[derive(Clone)]
39pub struct MockInfo {
40 pub server_addr: String,
41 pub channel_manager: ChannelManager,
42 pub metasrv: Arc<Metasrv>,
43 pub kv_backend: KvBackendRef,
44 pub in_memory: Option<ResettableKvBackendRef>,
45}
46
47pub async fn mock_with_memstore() -> MockInfo {
48 let kv_backend = Arc::new(MemoryKvBackend::new());
49 let in_memory = Arc::new(MemoryKvBackend::new());
50 mock(
51 MetasrvOptions {
52 grpc: GrpcOptions {
53 server_addr: "127.0.0.1:3002".to_string(),
54 ..Default::default()
55 },
56 ..Default::default()
57 },
58 kv_backend,
59 None,
60 None,
61 Some(in_memory),
62 )
63 .await
64}
65
66pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
67 let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
68 mock(
69 MetasrvOptions {
70 grpc: GrpcOptions {
71 server_addr: "127.0.0.1:3002".to_string(),
72 ..Default::default()
73 },
74 ..Default::default()
75 },
76 kv_backend,
77 None,
78 None,
79 None,
80 )
81 .await
82}
83
84pub async fn mock(
85 opts: MetasrvOptions,
86 kv_backend: KvBackendRef,
87 selector: Option<SelectorRef>,
88 datanode_clients: Option<Arc<NodeClients>>,
89 in_memory: Option<ResettableKvBackendRef>,
90) -> MockInfo {
91 let server_addr = opts.grpc.server_addr.clone();
92 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
93
94 table_metadata_manager.init().await.unwrap();
95
96 let builder = MetasrvBuilder::new()
97 .options(opts)
98 .kv_backend(kv_backend.clone());
99
100 let builder = match selector {
101 Some(s) => builder.selector(s),
102 None => builder,
103 };
104
105 let builder = match datanode_clients {
106 Some(clients) => builder.node_manager(clients),
107 None => builder,
108 };
109
110 let builder = match &in_memory {
111 Some(in_memory) => builder.in_memory(in_memory.clone()),
112 None => builder,
113 };
114
115 let metasrv = builder.build().await.unwrap();
116 metasrv.try_start().await.unwrap();
117
118 let (client, server) = tokio::io::duplex(1024);
119 let metasrv = Arc::new(metasrv);
120 let service = metasrv.clone();
121
122 let _handle = tokio::spawn(async move {
123 let mut router = tonic::transport::Server::builder();
124 let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
125 let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
126 let router =
127 add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
128 let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
129 let router = add_compressed_service!(router, ConfigServer::from_arc(service.clone()));
130 router
131 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
132 .await
133 });
134
135 let config = ChannelConfig::new()
136 .timeout(Some(Duration::from_secs(60)))
138 .connect_timeout(Duration::from_secs(10))
139 .tcp_nodelay(true);
140 let channel_manager = ChannelManager::with_config(config, None);
141
142 let mut client = Some(client);
145 let res = channel_manager.reset_with_connector(
146 &server_addr,
147 service_fn(move |_| {
148 let client = client.take();
149
150 async move {
151 if let Some(client) = client {
152 Ok(TokioIo::new(client))
153 } else {
154 Err(std::io::Error::other("Client already taken"))
155 }
156 }
157 }),
158 );
159 let _ = res.unwrap();
160
161 MockInfo {
162 server_addr,
163 channel_manager,
164 metasrv,
165 kv_backend,
166 in_memory,
167 }
168}