common_meta/
test_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use api::region::RegionResponse;
19use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
20use api::v1::region::{InsertRequests, RegionRequest};
21pub use common_base::AffectedRows;
22use common_query::request::QueryRequest;
23use common_recordbatch::SendableRecordBatchStream;
24use common_wal::config::kafka::MetasrvKafkaConfig;
25use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
26
27use crate::cache_invalidator::DummyCacheInvalidator;
28use crate::ddl::flow_meta::FlowMetadataAllocator;
29use crate::ddl::table_meta::TableMetadataAllocator;
30use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
31use crate::error::Result;
32use crate::key::TableMetadataManager;
33use crate::key::flow::FlowMetadataManager;
34use crate::kv_backend::KvBackendRef;
35use crate::kv_backend::memory::MemoryKvBackend;
36use crate::node_manager::{
37    Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef,
38};
39use crate::peer::{Peer, PeerResolver};
40use crate::region_keeper::MemoryRegionKeeper;
41use crate::region_registry::LeaderRegionRegistry;
42use crate::sequence::SequenceBuilder;
43use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
44use crate::wal_options_allocator::{WalOptionsAllocator, build_kafka_topic_creator};
45use crate::{DatanodeId, FlownodeId};
46
47#[async_trait::async_trait]
48pub trait MockDatanodeHandler: Sync + Send + Clone {
49    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse>;
50
51    async fn handle_query(
52        &self,
53        peer: &Peer,
54        request: QueryRequest,
55    ) -> Result<SendableRecordBatchStream>;
56}
57
58#[async_trait::async_trait]
59pub trait MockFlownodeHandler: Sync + Send + Clone {
60    async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result<FlowResponse> {
61        unimplemented!()
62    }
63
64    async fn handle_inserts(
65        &self,
66        _peer: &Peer,
67        _requests: InsertRequests,
68    ) -> Result<FlowResponse> {
69        unimplemented!()
70    }
71
72    async fn handle_mark_window_dirty(
73        &self,
74        _peer: &Peer,
75        _req: DirtyWindowRequests,
76    ) -> Result<FlowResponse> {
77        unimplemented!()
78    }
79}
80
81/// A mock struct implements [NodeManager] only implement the `datanode` method.
82#[derive(Clone)]
83pub struct MockDatanodeManager<T> {
84    handler: T,
85}
86
87impl<T> MockDatanodeManager<T> {
88    pub fn new(handler: T) -> Self {
89        Self { handler }
90    }
91}
92
93/// A mock struct implements [NodeManager] only implement the `flownode` method.
94#[derive(Clone)]
95pub struct MockFlownodeManager<T> {
96    handler: T,
97}
98
99impl<T> MockFlownodeManager<T> {
100    pub fn new(handler: T) -> Self {
101        Self { handler }
102    }
103}
104
105/// A mock struct implements [Datanode].
106#[derive(Clone)]
107struct MockNode<T> {
108    peer: Peer,
109    handler: T,
110}
111
112#[async_trait::async_trait]
113impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
114    async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
115        self.handler.handle(&self.peer, request).await
116    }
117
118    async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
119        self.handler.handle_query(&self.peer, request).await
120    }
121}
122
123#[async_trait::async_trait]
124impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
125    async fn datanode(&self, peer: &Peer) -> DatanodeRef {
126        Arc::new(MockNode {
127            peer: peer.clone(),
128            handler: self.handler.clone(),
129        })
130    }
131}
132
133#[async_trait::async_trait]
134impl<T: 'static + Send + Sync> FlownodeManager for MockDatanodeManager<T> {
135    async fn flownode(&self, _peer: &Peer) -> FlownodeRef {
136        unimplemented!()
137    }
138}
139
140#[async_trait::async_trait]
141impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
142    async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
143        self.handler.handle(&self.peer, request).await
144    }
145
146    async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
147        self.handler.handle_inserts(&self.peer, requests).await
148    }
149
150    async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> Result<FlowResponse> {
151        self.handler.handle_mark_window_dirty(&self.peer, req).await
152    }
153}
154
155#[async_trait::async_trait]
156impl<T: MockFlownodeHandler + 'static> FlownodeManager for MockFlownodeManager<T> {
157    async fn flownode(&self, peer: &Peer) -> FlownodeRef {
158        Arc::new(MockNode {
159            peer: peer.clone(),
160            handler: self.handler.clone(),
161        })
162    }
163}
164
165#[async_trait::async_trait]
166impl<T: 'static + Send + Sync> DatanodeManager for MockFlownodeManager<T> {
167    async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
168        unimplemented!()
169    }
170}
171
172/// Returns a test purpose [DdlContext].
173pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
174    let kv_backend = Arc::new(MemoryKvBackend::new());
175    new_ddl_context_with_kv_backend(node_manager, kv_backend)
176}
177
178/// Returns a test purpose [DdlContext] with a specified [KvBackendRef].
179pub fn new_ddl_context_with_kv_backend(
180    node_manager: NodeManagerRef,
181    kv_backend: KvBackendRef,
182) -> DdlContext {
183    let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
184    let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
185        Arc::new(
186            SequenceBuilder::new("test", kv_backend.clone())
187                .initial(1024)
188                .build(),
189        ),
190        Arc::new(WalOptionsAllocator::default()),
191    ));
192    let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
193    let flow_metadata_allocator =
194        Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
195            SequenceBuilder::new("flow-test", kv_backend)
196                .initial(1024)
197                .build(),
198        )));
199    DdlContext {
200        node_manager,
201        cache_invalidator: Arc::new(DummyCacheInvalidator),
202        memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
203        leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
204        table_metadata_allocator,
205        table_metadata_manager,
206        flow_metadata_allocator,
207        flow_metadata_manager,
208        region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
209    }
210}
211
212pub struct NoopPeerResolver;
213
214#[async_trait::async_trait]
215impl PeerResolver for NoopPeerResolver {
216    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
217        Ok(Some(Peer::empty(id)))
218    }
219
220    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
221        Ok(Some(Peer::empty(id)))
222    }
223}
224
225/// Create a kafka topic pool for testing.
226pub async fn test_kafka_topic_pool(
227    broker_endpoints: Vec<String>,
228    num_topics: usize,
229    auto_create_topics: bool,
230    topic_name_prefix: Option<&str>,
231) -> KafkaTopicPool {
232    let mut config = MetasrvKafkaConfig {
233        connection: KafkaConnectionConfig {
234            broker_endpoints,
235            ..Default::default()
236        },
237        kafka_topic: KafkaTopicConfig {
238            num_topics,
239
240            ..Default::default()
241        },
242        auto_create_topics,
243        ..Default::default()
244    };
245    if let Some(prefix) = topic_name_prefix {
246        config.kafka_topic.topic_name_prefix = prefix.to_string();
247    }
248    let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
249    let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
250        .await
251        .unwrap();
252
253    KafkaTopicPool::new(&config, kv_backend, topic_creator)
254}
255
256#[macro_export]
257/// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set.
258///
259/// The format of the environment variable is:
260/// ```text
261/// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093
262/// ```
263macro_rules! maybe_skip_postgres_integration_test {
264    () => {
265        if std::env::var("GT_POSTGRES_ENDPOINTS").is_err() {
266            common_telemetry::warn!("The endpoints is empty, skipping the test");
267            return;
268        }
269    };
270}
271
272#[macro_export]
273/// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set.
274///
275/// The format of the environment variable is:
276/// ```text
277/// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093
278/// ```
279macro_rules! maybe_skip_mysql_integration_test {
280    () => {
281        if std::env::var("GT_MYSQL_ENDPOINTS").is_err() {
282            common_telemetry::warn!("The endpoints is empty, skipping the test");
283            return;
284        }
285    };
286}
287
288#[macro_export]
289/// Skip the test if the environment variable `GT_POSTGRES15_ENDPOINTS` is not set.
290///
291/// The format of the environment variable is:
292/// ```text
293/// GT_POSTGRES15_ENDPOINTS=postgres://user:password@127.0.0.1:5433/postgres
294/// ```
295macro_rules! maybe_skip_postgres15_integration_test {
296    () => {
297        if std::env::var("GT_POSTGRES15_ENDPOINTS").is_err() {
298            common_telemetry::warn!("The PG15 endpoints is empty, skipping the test");
299            return;
300        }
301    };
302}
303
304#[macro_export]
305/// Skip the test if the environment variable `GT_ETCD_TLS_ENDPOINTS` is not set.
306///
307/// The format of the environment variable is:
308/// ```text
309/// GT_ETCD_TLS_ENDPOINTS=localhost:9092,localhost:9093
310/// ```
311macro_rules! maybe_skip_etcd_tls_integration_test {
312    () => {
313        if std::env::var("GT_ETCD_TLS_ENDPOINTS").is_err() {
314            common_telemetry::warn!("The etcd with tls endpoints is empty, skipping the test");
315            return;
316        }
317    };
318}
319
320/// Returns the directory of the etcd TLS certs.
321pub fn etcd_certs_dir() -> PathBuf {
322    let project_path = env!("CARGO_MANIFEST_DIR");
323    let project_path = PathBuf::from(project_path);
324    let base = project_path.ancestors().nth(3).unwrap();
325    base.join("tests-integration")
326        .join("fixtures")
327        .join("etcd-tls-certs")
328}
329
330/// Returns the directory of the test certs.
331pub fn test_certs_dir() -> PathBuf {
332    let project_path = env!("CARGO_MANIFEST_DIR");
333    let project_path = PathBuf::from(project_path);
334    let base = project_path.ancestors().nth(3).unwrap();
335    base.join("tests-integration")
336        .join("fixtures")
337        .join("certs")
338}