meta_srv/
mocks.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use client::client_manager::NodeClients;
23use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::etcd::EtcdStore;
26use common_meta::kv_backend::memory::MemoryKvBackend;
27use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
28use hyper_util::rt::TokioIo;
29use tonic::codec::CompressionEncoding;
30use tower::service_fn;
31
32use crate::add_compressed_service;
33use crate::metasrv::builder::MetasrvBuilder;
34use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
35
36#[derive(Clone)]
37pub struct MockInfo {
38    pub server_addr: String,
39    pub channel_manager: ChannelManager,
40    pub metasrv: Arc<Metasrv>,
41    pub kv_backend: KvBackendRef,
42    pub in_memory: Option<ResettableKvBackendRef>,
43}
44
45pub async fn mock_with_memstore() -> MockInfo {
46    let kv_backend = Arc::new(MemoryKvBackend::new());
47    let in_memory = Arc::new(MemoryKvBackend::new());
48    mock(
49        MetasrvOptions {
50            server_addr: "127.0.0.1:3002".to_string(),
51            ..Default::default()
52        },
53        kv_backend,
54        None,
55        None,
56        Some(in_memory),
57    )
58    .await
59}
60
61pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
62    let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
63    mock(
64        MetasrvOptions {
65            server_addr: "127.0.0.1:3002".to_string(),
66            ..Default::default()
67        },
68        kv_backend,
69        None,
70        None,
71        None,
72    )
73    .await
74}
75
76pub async fn mock(
77    opts: MetasrvOptions,
78    kv_backend: KvBackendRef,
79    selector: Option<SelectorRef>,
80    datanode_clients: Option<Arc<NodeClients>>,
81    in_memory: Option<ResettableKvBackendRef>,
82) -> MockInfo {
83    let server_addr = opts.server_addr.clone();
84    let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
85
86    table_metadata_manager.init().await.unwrap();
87
88    let builder = MetasrvBuilder::new()
89        .options(opts)
90        .kv_backend(kv_backend.clone());
91
92    let builder = match selector {
93        Some(s) => builder.selector(s),
94        None => builder,
95    };
96
97    let builder = match datanode_clients {
98        Some(clients) => builder.node_manager(clients),
99        None => builder,
100    };
101
102    let builder = match &in_memory {
103        Some(in_memory) => builder.in_memory(in_memory.clone()),
104        None => builder,
105    };
106
107    let metasrv = builder.build().await.unwrap();
108    metasrv.try_start().await.unwrap();
109
110    let (client, server) = tokio::io::duplex(1024);
111    let metasrv = Arc::new(metasrv);
112    let service = metasrv.clone();
113
114    let _handle = tokio::spawn(async move {
115        let mut router = tonic::transport::Server::builder();
116        let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
117        let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
118        let router =
119            add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
120        let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
121        router
122            .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
123            .await
124    });
125
126    let config = ChannelConfig::new()
127        .timeout(Duration::from_secs(10))
128        .connect_timeout(Duration::from_secs(10))
129        .tcp_nodelay(true);
130    let channel_manager = ChannelManager::with_config(config);
131
132    // Move client to an option so we can _move_ the inner value
133    // on the first attempt to connect. All other attempts will fail.
134    let mut client = Some(client);
135    let res = channel_manager.reset_with_connector(
136        &server_addr,
137        service_fn(move |_| {
138            let client = client.take();
139
140            async move {
141                if let Some(client) = client {
142                    Ok(TokioIo::new(client))
143                } else {
144                    Err(std::io::Error::other("Client already taken"))
145                }
146            }
147        }),
148    );
149    let _ = res.unwrap();
150
151    MockInfo {
152        server_addr,
153        channel_manager,
154        metasrv,
155        kv_backend,
156        in_memory,
157    }
158}