meta_srv/
mocks.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use api::v1::meta::cluster_server::ClusterServer;
19use api::v1::meta::config_server::ConfigServer;
20use api::v1::meta::heartbeat_server::HeartbeatServer;
21use api::v1::meta::procedure_service_server::ProcedureServiceServer;
22use api::v1::meta::store_server::StoreServer;
23use client::client_manager::NodeClients;
24use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
25use common_meta::key::TableMetadataManager;
26use common_meta::kv_backend::etcd::EtcdStore;
27use common_meta::kv_backend::memory::MemoryKvBackend;
28use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
29use hyper_util::rt::TokioIo;
30use servers::grpc::GrpcOptions;
31use tonic::codec::CompressionEncoding;
32use tower::service_fn;
33
34use crate::add_compressed_service;
35use crate::metasrv::builder::MetasrvBuilder;
36use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
37
38#[derive(Clone)]
39pub struct MockInfo {
40    pub server_addr: String,
41    pub channel_manager: ChannelManager,
42    pub metasrv: Arc<Metasrv>,
43    pub kv_backend: KvBackendRef,
44    pub in_memory: Option<ResettableKvBackendRef>,
45}
46
47pub async fn mock_with_memstore() -> MockInfo {
48    let kv_backend = Arc::new(MemoryKvBackend::new());
49    let in_memory = Arc::new(MemoryKvBackend::new());
50    mock(
51        MetasrvOptions {
52            grpc: GrpcOptions {
53                server_addr: "127.0.0.1:3002".to_string(),
54                ..Default::default()
55            },
56            ..Default::default()
57        },
58        kv_backend,
59        None,
60        None,
61        Some(in_memory),
62    )
63    .await
64}
65
66pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
67    let kv_backend = EtcdStore::with_endpoints([addr], 128).await.unwrap();
68    mock(
69        MetasrvOptions {
70            grpc: GrpcOptions {
71                server_addr: "127.0.0.1:3002".to_string(),
72                ..Default::default()
73            },
74            ..Default::default()
75        },
76        kv_backend,
77        None,
78        None,
79        None,
80    )
81    .await
82}
83
84pub async fn mock(
85    opts: MetasrvOptions,
86    kv_backend: KvBackendRef,
87    selector: Option<SelectorRef>,
88    datanode_clients: Option<Arc<NodeClients>>,
89    in_memory: Option<ResettableKvBackendRef>,
90) -> MockInfo {
91    let server_addr = opts.grpc.server_addr.clone();
92    let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
93
94    table_metadata_manager.init().await.unwrap();
95
96    let builder = MetasrvBuilder::new()
97        .options(opts)
98        .kv_backend(kv_backend.clone());
99
100    let builder = match selector {
101        Some(s) => builder.selector(s),
102        None => builder,
103    };
104
105    let builder = match datanode_clients {
106        Some(clients) => builder.node_manager(clients),
107        None => builder,
108    };
109
110    let builder = match &in_memory {
111        Some(in_memory) => builder.in_memory(in_memory.clone()),
112        None => builder,
113    };
114
115    let metasrv = builder.build().await.unwrap();
116    metasrv.try_start().await.unwrap();
117
118    let (client, server) = tokio::io::duplex(1024);
119    let metasrv = Arc::new(metasrv);
120    let service = metasrv.clone();
121
122    let _handle = tokio::spawn(async move {
123        let mut router = tonic::transport::Server::builder();
124        let router = add_compressed_service!(router, HeartbeatServer::from_arc(service.clone()));
125        let router = add_compressed_service!(router, StoreServer::from_arc(service.clone()));
126        let router =
127            add_compressed_service!(router, ProcedureServiceServer::from_arc(service.clone()));
128        let router = add_compressed_service!(router, ClusterServer::from_arc(service.clone()));
129        let router = add_compressed_service!(router, ConfigServer::from_arc(service.clone()));
130        router
131            .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
132            .await
133    });
134
135    let config = ChannelConfig::new()
136        // Use an long timeout to prevent test failures due to slow operations (e.g., when testing with S3).
137        .timeout(Some(Duration::from_secs(60)))
138        .connect_timeout(Duration::from_secs(10))
139        .tcp_nodelay(true);
140    let channel_manager = ChannelManager::with_config(config, None);
141
142    // Move client to an option so we can _move_ the inner value
143    // on the first attempt to connect. All other attempts will fail.
144    let mut client = Some(client);
145    let res = channel_manager.reset_with_connector(
146        &server_addr,
147        service_fn(move |_| {
148            let client = client.take();
149
150            async move {
151                if let Some(client) = client {
152                    Ok(TokioIo::new(client))
153                } else {
154                    Err(std::io::Error::other("Client already taken"))
155                }
156            }
157        }),
158    );
159    let _ = res.unwrap();
160
161    MockInfo {
162        server_addr,
163        channel_manager,
164        metasrv,
165        kv_backend,
166        in_memory,
167    }
168}