common_meta/
test_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::region::RegionResponse;
18use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
19use api::v1::region::{InsertRequests, RegionRequest};
20pub use common_base::AffectedRows;
21use common_query::request::QueryRequest;
22use common_recordbatch::SendableRecordBatchStream;
23use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
24use common_wal::config::kafka::MetasrvKafkaConfig;
25
26use crate::cache_invalidator::DummyCacheInvalidator;
27use crate::ddl::flow_meta::FlowMetadataAllocator;
28use crate::ddl::table_meta::TableMetadataAllocator;
29use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
30use crate::error::Result;
31use crate::key::flow::FlowMetadataManager;
32use crate::key::TableMetadataManager;
33use crate::kv_backend::memory::MemoryKvBackend;
34use crate::kv_backend::KvBackendRef;
35use crate::node_manager::{
36    Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef,
37};
38use crate::peer::{Peer, PeerLookupService};
39use crate::region_keeper::MemoryRegionKeeper;
40use crate::region_registry::LeaderRegionRegistry;
41use crate::sequence::SequenceBuilder;
42use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
43use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
44use crate::{DatanodeId, FlownodeId};
45
46#[async_trait::async_trait]
47pub trait MockDatanodeHandler: Sync + Send + Clone {
48    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse>;
49
50    async fn handle_query(
51        &self,
52        peer: &Peer,
53        request: QueryRequest,
54    ) -> Result<SendableRecordBatchStream>;
55}
56
57#[async_trait::async_trait]
58pub trait MockFlownodeHandler: Sync + Send + Clone {
59    async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result<FlowResponse> {
60        unimplemented!()
61    }
62
63    async fn handle_inserts(
64        &self,
65        _peer: &Peer,
66        _requests: InsertRequests,
67    ) -> Result<FlowResponse> {
68        unimplemented!()
69    }
70
71    async fn handle_mark_window_dirty(
72        &self,
73        _peer: &Peer,
74        _req: DirtyWindowRequest,
75    ) -> Result<FlowResponse> {
76        unimplemented!()
77    }
78}
79
80/// A mock struct implements [NodeManager] only implement the `datanode` method.
81#[derive(Clone)]
82pub struct MockDatanodeManager<T> {
83    handler: T,
84}
85
86impl<T> MockDatanodeManager<T> {
87    pub fn new(handler: T) -> Self {
88        Self { handler }
89    }
90}
91
92/// A mock struct implements [NodeManager] only implement the `flownode` method.
93#[derive(Clone)]
94pub struct MockFlownodeManager<T> {
95    handler: T,
96}
97
98impl<T> MockFlownodeManager<T> {
99    pub fn new(handler: T) -> Self {
100        Self { handler }
101    }
102}
103
104/// A mock struct implements [Datanode].
105#[derive(Clone)]
106struct MockNode<T> {
107    peer: Peer,
108    handler: T,
109}
110
111#[async_trait::async_trait]
112impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
113    async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
114        self.handler.handle(&self.peer, request).await
115    }
116
117    async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
118        self.handler.handle_query(&self.peer, request).await
119    }
120}
121
122#[async_trait::async_trait]
123impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
124    async fn datanode(&self, peer: &Peer) -> DatanodeRef {
125        Arc::new(MockNode {
126            peer: peer.clone(),
127            handler: self.handler.clone(),
128        })
129    }
130}
131
132#[async_trait::async_trait]
133impl<T: 'static + Send + Sync> FlownodeManager for MockDatanodeManager<T> {
134    async fn flownode(&self, _peer: &Peer) -> FlownodeRef {
135        unimplemented!()
136    }
137}
138
139#[async_trait::async_trait]
140impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
141    async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
142        self.handler.handle(&self.peer, request).await
143    }
144
145    async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
146        self.handler.handle_inserts(&self.peer, requests).await
147    }
148
149    async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
150        self.handler.handle_mark_window_dirty(&self.peer, req).await
151    }
152}
153
154#[async_trait::async_trait]
155impl<T: MockFlownodeHandler + 'static> FlownodeManager for MockFlownodeManager<T> {
156    async fn flownode(&self, peer: &Peer) -> FlownodeRef {
157        Arc::new(MockNode {
158            peer: peer.clone(),
159            handler: self.handler.clone(),
160        })
161    }
162}
163
164#[async_trait::async_trait]
165impl<T: 'static + Send + Sync> DatanodeManager for MockFlownodeManager<T> {
166    async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
167        unimplemented!()
168    }
169}
170
171/// Returns a test purpose [DdlContext].
172pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
173    let kv_backend = Arc::new(MemoryKvBackend::new());
174    new_ddl_context_with_kv_backend(node_manager, kv_backend)
175}
176
177/// Returns a test purpose [DdlContext] with a specified [KvBackendRef].
178pub fn new_ddl_context_with_kv_backend(
179    node_manager: NodeManagerRef,
180    kv_backend: KvBackendRef,
181) -> DdlContext {
182    let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
183    let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
184        Arc::new(
185            SequenceBuilder::new("test", kv_backend.clone())
186                .initial(1024)
187                .build(),
188        ),
189        Arc::new(WalOptionsAllocator::default()),
190    ));
191    let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
192    let flow_metadata_allocator =
193        Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
194            SequenceBuilder::new("flow-test", kv_backend)
195                .initial(1024)
196                .build(),
197        )));
198    DdlContext {
199        node_manager,
200        cache_invalidator: Arc::new(DummyCacheInvalidator),
201        memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
202        leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
203        table_metadata_allocator,
204        table_metadata_manager,
205        flow_metadata_allocator,
206        flow_metadata_manager,
207        region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
208    }
209}
210
211pub struct NoopPeerLookupService;
212
213#[async_trait::async_trait]
214impl PeerLookupService for NoopPeerLookupService {
215    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
216        Ok(Some(Peer::empty(id)))
217    }
218
219    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
220        Ok(Some(Peer::empty(id)))
221    }
222
223    async fn active_frontends(&self) -> Result<Vec<Peer>> {
224        Ok(vec![])
225    }
226}
227
228/// Create a kafka topic pool for testing.
229pub async fn test_kafka_topic_pool(
230    broker_endpoints: Vec<String>,
231    num_topics: usize,
232    auto_create_topics: bool,
233    topic_name_prefix: Option<&str>,
234) -> KafkaTopicPool {
235    let mut config = MetasrvKafkaConfig {
236        connection: KafkaConnectionConfig {
237            broker_endpoints,
238            ..Default::default()
239        },
240        kafka_topic: KafkaTopicConfig {
241            num_topics,
242
243            ..Default::default()
244        },
245        auto_create_topics,
246        ..Default::default()
247    };
248    if let Some(prefix) = topic_name_prefix {
249        config.kafka_topic.topic_name_prefix = prefix.to_string();
250    }
251    let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
252    let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
253        .await
254        .unwrap();
255
256    KafkaTopicPool::new(&config, kv_backend, topic_creator)
257}
258
259#[macro_export]
260/// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set.
261///
262/// The format of the environment variable is:
263/// ```text
264/// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093
265/// ```
266macro_rules! maybe_skip_postgres_integration_test {
267    () => {
268        if std::env::var("GT_POSTGRES_ENDPOINTS").is_err() {
269            common_telemetry::warn!("The endpoints is empty, skipping the test");
270            return;
271        }
272    };
273}
274
275#[macro_export]
276/// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set.
277///
278/// The format of the environment variable is:
279/// ```text
280/// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093
281/// ```
282macro_rules! maybe_skip_mysql_integration_test {
283    () => {
284        if std::env::var("GT_MYSQL_ENDPOINTS").is_err() {
285            common_telemetry::warn!("The endpoints is empty, skipping the test");
286            return;
287        }
288    };
289}
290
291#[macro_export]
292/// Skip the test if the environment variable `GT_POSTGRES15_ENDPOINTS` is not set.
293///
294/// The format of the environment variable is:
295/// ```text
296/// GT_POSTGRES15_ENDPOINTS=postgres://user:password@127.0.0.1:5433/postgres
297/// ```
298macro_rules! maybe_skip_postgres15_integration_test {
299    () => {
300        if std::env::var("GT_POSTGRES15_ENDPOINTS").is_err() {
301            common_telemetry::warn!("The PG15 endpoints is empty, skipping the test");
302            return;
303        }
304    };
305}