meta_srv/
mocks.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::heartbeat_server::HeartbeatServer;
20use api::v1::meta::procedure_service_server::ProcedureServiceServer;
21use api::v1::meta::store_server::StoreServer;
22use client::client_manager::NodeClients;
23use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
24use common_meta::key::TableMetadataManager;
25use common_meta::kv_backend::etcd::EtcdStore;
26use common_meta::kv_backend::memory::MemoryKvBackend;
27use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
28use hyper_util::rt::TokioIo;
29use servers::grpc::GrpcOptions;
30use tonic::codec::CompressionEncoding;
31use tower::service_fn;
32
33use crate::add_compressed_service;
34use crate::metasrv::builder::MetasrvBuilder;
35use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
36
37#[derive(Clone)]
38pub struct MockInfo {
39    pub server_addr: String,
40    pub channel_manager: ChannelManager,
41    pub metasrv: Arc<Metasrv>,
42    pub kv_backend: KvBackendRef,
43    pub in_memory: Option<ResettableKvBackendRef>,
44}
45
46pub async fn mock_with_memstore() -> MockInfo {
47    let kv_backend = Arc::new(MemoryKvBackend::new());
48    let in_memory = Arc::new(MemoryKvBackend::new());
49    mock(
50        MetasrvOptions {
51            grpc: GrpcOptions {
52                server_addr: "127.0.0.1:3002".to_string(),
53                ..Default::default()
54            },
55            ..Default::default()
56        },
57        kv_backend,
58        None,
59        None,
60        Some(in_memory),
61    )
62    .await
63}
64
65pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
66    let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
67    mock(
68        MetasrvOptions {
69            grpc: GrpcOptions {
70                server_addr: "127.0.0.1:3002".to_string(),
71                ..Default::default()
72            },
73            ..Default::default()
74        },
75        kv_backend,
76        None,
77        None,
78        None,
79    )
80    .await
81}
82
83pub async fn mock(
84    opts: MetasrvOptions,
85    kv_backend: KvBackendRef,
86    selector: Option<SelectorRef>,
87    datanode_clients: Option<Arc<NodeClients>>,
88    in_memory: Option<ResettableKvBackendRef>,
89) -> MockInfo {
90    let server_addr = opts.grpc.server_addr.clone();
91    let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
92
93    table_metadata_manager.init().await.unwrap();
94
95    let builder = MetasrvBuilder::new()
96        .options(opts)
97        .kv_backend(kv_backend.clone());
98
99    let builder = match selector {
100        Some(s) => builder.selector(s),
101        None => builder,
102    };
103
104    let builder = match datanode_clients {
105        Some(clients) => builder.node_manager(clients),
106        None => builder,
107    };
108
109    let builder = match &in_memory {
110        Some(in_memory) => builder.in_memory(in_memory.clone()),
111        None => builder,
112    };
113
114    let metasrv = builder.build().await.unwrap();
115    metasrv.try_start().await.unwrap();
116
117    let (client, server) = tokio::io::duplex(1024);
118    let metasrv = Arc::new(metasrv);
119    let service = metasrv.clone();
120
121    let _handle = tokio::spawn(async move {
122        let mut router = tonic::transport::Server::builder();
123        let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
124        let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
125        let router =
126            add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
127        let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
128        router
129            .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
130            .await
131    });
132
133    let config = ChannelConfig::new()
134        .timeout(Duration::from_secs(10))
135        .connect_timeout(Duration::from_secs(10))
136        .tcp_nodelay(true);
137    let channel_manager = ChannelManager::with_config(config);
138
139    // Move client to an option so we can _move_ the inner value
140    // on the first attempt to connect. All other attempts will fail.
141    let mut client = Some(client);
142    let res = channel_manager.reset_with_connector(
143        &server_addr,
144        service_fn(move |_| {
145            let client = client.take();
146
147            async move {
148                if let Some(client) = client {
149                    Ok(TokioIo::new(client))
150                } else {
151                    Err(std::io::Error::other("Client already taken"))
152                }
153            }
154        }),
155    );
156    let _ = res.unwrap();
157
158    MockInfo {
159        server_addr,
160        channel_manager,
161        metasrv,
162        kv_backend,
163        in_memory,
164    }
165}