1use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use client::client_manager::NodeClients;
23use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::etcd::EtcdStore;
26use common_meta::kv_backend::memory::MemoryKvBackend;
27use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
28use hyper_util::rt::TokioIo;
29use tonic::codec::CompressionEncoding;
30use tower::service_fn;
31
32use crate::add_compressed_service;
33use crate::metasrv::builder::MetasrvBuilder;
34use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
35
36#[derive(Clone)]
37pub struct MockInfo {
38 pub server_addr: String,
39 pub channel_manager: ChannelManager,
40 pub metasrv: Arc<Metasrv>,
41 pub kv_backend: KvBackendRef,
42 pub in_memory: Option<ResettableKvBackendRef>,
43}
44
45pub async fn mock_with_memstore() -> MockInfo {
46 let kv_backend = Arc::new(MemoryKvBackend::new());
47 let in_memory = Arc::new(MemoryKvBackend::new());
48 mock(
49 MetasrvOptions {
50 server_addr: "127.0.0.1:3002".to_string(),
51 ..Default::default()
52 },
53 kv_backend,
54 None,
55 None,
56 Some(in_memory),
57 )
58 .await
59}
60
61pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
62 let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
63 mock(
64 MetasrvOptions {
65 server_addr: "127.0.0.1:3002".to_string(),
66 ..Default::default()
67 },
68 kv_backend,
69 None,
70 None,
71 None,
72 )
73 .await
74}
75
76pub async fn mock(
77 opts: MetasrvOptions,
78 kv_backend: KvBackendRef,
79 selector: Option<SelectorRef>,
80 datanode_clients: Option<Arc<NodeClients>>,
81 in_memory: Option<ResettableKvBackendRef>,
82) -> MockInfo {
83 let server_addr = opts.server_addr.clone();
84 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
85
86 table_metadata_manager.init().await.unwrap();
87
88 let builder = MetasrvBuilder::new()
89 .options(opts)
90 .kv_backend(kv_backend.clone());
91
92 let builder = match selector {
93 Some(s) => builder.selector(s),
94 None => builder,
95 };
96
97 let builder = match datanode_clients {
98 Some(clients) => builder.node_manager(clients),
99 None => builder,
100 };
101
102 let builder = match &in_memory {
103 Some(in_memory) => builder.in_memory(in_memory.clone()),
104 None => builder,
105 };
106
107 let metasrv = builder.build().await.unwrap();
108 metasrv.try_start().await.unwrap();
109
110 let (client, server) = tokio::io::duplex(1024);
111 let metasrv = Arc::new(metasrv);
112 let service = metasrv.clone();
113
114 let _handle = tokio::spawn(async move {
115 let mut router = tonic::transport::Server::builder();
116 let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
117 let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
118 let router =
119 add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
120 let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
121 router
122 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
123 .await
124 });
125
126 let config = ChannelConfig::new()
127 .timeout(Duration::from_secs(10))
128 .connect_timeout(Duration::from_secs(10))
129 .tcp_nodelay(true);
130 let channel_manager = ChannelManager::with_config(config);
131
132 let mut client = Some(client);
135 let res = channel_manager.reset_with_connector(
136 &server_addr,
137 service_fn(move |_| {
138 let client = client.take();
139
140 async move {
141 if let Some(client) = client {
142 Ok(TokioIo::new(client))
143 } else {
144 Err(std::io::Error::other("Client already taken"))
145 }
146 }
147 }),
148 );
149 let _ = res.unwrap();
150
151 MockInfo {
152 server_addr,
153 channel_manager,
154 metasrv,
155 kv_backend,
156 in_memory,
157 }
158}