1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use client::client_manager::NodeClients;
23use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::etcd::EtcdStore;
26use common_meta::kv_backend::memory::MemoryKvBackend;
27use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
28use hyper_util::rt::TokioIo;
29use servers::grpc::GrpcOptions;
30use tonic::codec::CompressionEncoding;
31use tower::service_fn;
32
33use crate::add_compressed_service;
34use crate::metasrv::builder::MetasrvBuilder;
35use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
36
37#[derive(Clone)]
38pub struct MockInfo {
39 pub server_addr: String,
40 pub channel_manager: ChannelManager,
41 pub metasrv: Arc<Metasrv>,
42 pub kv_backend: KvBackendRef,
43 pub in_memory: Option<ResettableKvBackendRef>,
44}
45
46pub async fn mock_with_memstore() -> MockInfo {
47 let kv_backend = Arc::new(MemoryKvBackend::new());
48 let in_memory = Arc::new(MemoryKvBackend::new());
49 mock(
50 MetasrvOptions {
51 grpc: GrpcOptions {
52 server_addr: "127.0.0.1:3002".to_string(),
53 ..Default::default()
54 },
55 ..Default::default()
56 },
57 kv_backend,
58 None,
59 None,
60 Some(in_memory),
61 )
62 .await
63}
64
65pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
66 let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
67 mock(
68 MetasrvOptions {
69 grpc: GrpcOptions {
70 server_addr: "127.0.0.1:3002".to_string(),
71 ..Default::default()
72 },
73 ..Default::default()
74 },
75 kv_backend,
76 None,
77 None,
78 None,
79 )
80 .await
81}
82
83pub async fn mock(
84 opts: MetasrvOptions,
85 kv_backend: KvBackendRef,
86 selector: Option<SelectorRef>,
87 datanode_clients: Option<Arc<NodeClients>>,
88 in_memory: Option<ResettableKvBackendRef>,
89) -> MockInfo {
90 let server_addr = opts.grpc.server_addr.clone();
91 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
92
93 table_metadata_manager.init().await.unwrap();
94
95 let builder = MetasrvBuilder::new()
96 .options(opts)
97 .kv_backend(kv_backend.clone());
98
99 let builder = match selector {
100 Some(s) => builder.selector(s),
101 None => builder,
102 };
103
104 let builder = match datanode_clients {
105 Some(clients) => builder.node_manager(clients),
106 None => builder,
107 };
108
109 let builder = match &in_memory {
110 Some(in_memory) => builder.in_memory(in_memory.clone()),
111 None => builder,
112 };
113
114 let metasrv = builder.build().await.unwrap();
115 metasrv.try_start().await.unwrap();
116
117 let (client, server) = tokio::io::duplex(1024);
118 let metasrv = Arc::new(metasrv);
119 let service = metasrv.clone();
120
121 let _handle = tokio::spawn(async move {
122 let mut router = tonic::transport::Server::builder();
123 let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
124 let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
125 let router =
126 add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
127 let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
128 router
129 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
130 .await
131 });
132
133 let config = ChannelConfig::new()
134 .timeout(Duration::from_secs(10))
135 .connect_timeout(Duration::from_secs(10))
136 .tcp_nodelay(true);
137 let channel_manager = ChannelManager::with_config(config);
138
139 let mut client = Some(client);
142 let res = channel_manager.reset_with_connector(
143 &server_addr,
144 service_fn(move |_| {
145 let client = client.take();
146
147 async move {
148 if let Some(client) = client {
149 Ok(TokioIo::new(client))
150 } else {
151 Err(std::io::Error::other("Client already taken"))
152 }
153 }
154 }),
155 );
156 let _ = res.unwrap();
157
158 MockInfo {
159 server_addr,
160 channel_manager,
161 metasrv,
162 kv_backend,
163 in_memory,
164 }
165}