1use std::path::PathBuf;
16use std::sync::Arc;
17
18use api::region::RegionResponse;
19use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
20use api::v1::region::{InsertRequests, RegionRequest};
21pub use common_base::AffectedRows;
22use common_query::request::QueryRequest;
23use common_recordbatch::SendableRecordBatchStream;
24use common_wal::config::kafka::MetasrvKafkaConfig;
25use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
26
27use crate::cache_invalidator::DummyCacheInvalidator;
28use crate::ddl::flow_meta::FlowMetadataAllocator;
29use crate::ddl::table_meta::TableMetadataAllocator;
30use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
31use crate::error::Result;
32use crate::key::TableMetadataManager;
33use crate::key::flow::FlowMetadataManager;
34use crate::kv_backend::KvBackendRef;
35use crate::kv_backend::memory::MemoryKvBackend;
36use crate::node_manager::{
37 Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef,
38};
39use crate::peer::{Peer, PeerResolver};
40use crate::region_keeper::MemoryRegionKeeper;
41use crate::region_registry::LeaderRegionRegistry;
42use crate::sequence::SequenceBuilder;
43use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
44use crate::wal_options_allocator::{WalOptionsAllocator, build_kafka_topic_creator};
45use crate::{DatanodeId, FlownodeId};
46
47#[async_trait::async_trait]
48pub trait MockDatanodeHandler: Sync + Send + Clone {
49 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse>;
50
51 async fn handle_query(
52 &self,
53 peer: &Peer,
54 request: QueryRequest,
55 ) -> Result<SendableRecordBatchStream>;
56}
57
58#[async_trait::async_trait]
59pub trait MockFlownodeHandler: Sync + Send + Clone {
60 async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result<FlowResponse> {
61 unimplemented!()
62 }
63
64 async fn handle_inserts(
65 &self,
66 _peer: &Peer,
67 _requests: InsertRequests,
68 ) -> Result<FlowResponse> {
69 unimplemented!()
70 }
71
72 async fn handle_mark_window_dirty(
73 &self,
74 _peer: &Peer,
75 _req: DirtyWindowRequests,
76 ) -> Result<FlowResponse> {
77 unimplemented!()
78 }
79}
80
81#[derive(Clone)]
83pub struct MockDatanodeManager<T> {
84 handler: T,
85}
86
87impl<T> MockDatanodeManager<T> {
88 pub fn new(handler: T) -> Self {
89 Self { handler }
90 }
91}
92
93#[derive(Clone)]
95pub struct MockFlownodeManager<T> {
96 handler: T,
97}
98
99impl<T> MockFlownodeManager<T> {
100 pub fn new(handler: T) -> Self {
101 Self { handler }
102 }
103}
104
105#[derive(Clone)]
107struct MockNode<T> {
108 peer: Peer,
109 handler: T,
110}
111
112#[async_trait::async_trait]
113impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
114 async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
115 self.handler.handle(&self.peer, request).await
116 }
117
118 async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
119 self.handler.handle_query(&self.peer, request).await
120 }
121}
122
123#[async_trait::async_trait]
124impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
125 async fn datanode(&self, peer: &Peer) -> DatanodeRef {
126 Arc::new(MockNode {
127 peer: peer.clone(),
128 handler: self.handler.clone(),
129 })
130 }
131}
132
133#[async_trait::async_trait]
134impl<T: 'static + Send + Sync> FlownodeManager for MockDatanodeManager<T> {
135 async fn flownode(&self, _peer: &Peer) -> FlownodeRef {
136 unimplemented!()
137 }
138}
139
140#[async_trait::async_trait]
141impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
142 async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
143 self.handler.handle(&self.peer, request).await
144 }
145
146 async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
147 self.handler.handle_inserts(&self.peer, requests).await
148 }
149
150 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> Result<FlowResponse> {
151 self.handler.handle_mark_window_dirty(&self.peer, req).await
152 }
153}
154
155#[async_trait::async_trait]
156impl<T: MockFlownodeHandler + 'static> FlownodeManager for MockFlownodeManager<T> {
157 async fn flownode(&self, peer: &Peer) -> FlownodeRef {
158 Arc::new(MockNode {
159 peer: peer.clone(),
160 handler: self.handler.clone(),
161 })
162 }
163}
164
165#[async_trait::async_trait]
166impl<T: 'static + Send + Sync> DatanodeManager for MockFlownodeManager<T> {
167 async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
168 unimplemented!()
169 }
170}
171
172pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
174 let kv_backend = Arc::new(MemoryKvBackend::new());
175 new_ddl_context_with_kv_backend(node_manager, kv_backend)
176}
177
178pub fn new_ddl_context_with_kv_backend(
180 node_manager: NodeManagerRef,
181 kv_backend: KvBackendRef,
182) -> DdlContext {
183 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
184 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
185 Arc::new(
186 SequenceBuilder::new("test", kv_backend.clone())
187 .initial(1024)
188 .build(),
189 ),
190 Arc::new(WalOptionsAllocator::default()),
191 ));
192 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
193 let flow_metadata_allocator =
194 Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
195 SequenceBuilder::new("flow-test", kv_backend)
196 .initial(1024)
197 .build(),
198 )));
199 DdlContext {
200 node_manager,
201 cache_invalidator: Arc::new(DummyCacheInvalidator),
202 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
203 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
204 table_metadata_allocator,
205 table_metadata_manager,
206 flow_metadata_allocator,
207 flow_metadata_manager,
208 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
209 }
210}
211
212pub struct NoopPeerResolver;
213
214#[async_trait::async_trait]
215impl PeerResolver for NoopPeerResolver {
216 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
217 Ok(Some(Peer::empty(id)))
218 }
219
220 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
221 Ok(Some(Peer::empty(id)))
222 }
223}
224
225pub async fn test_kafka_topic_pool(
227 broker_endpoints: Vec<String>,
228 num_topics: usize,
229 auto_create_topics: bool,
230 topic_name_prefix: Option<&str>,
231) -> KafkaTopicPool {
232 let mut config = MetasrvKafkaConfig {
233 connection: KafkaConnectionConfig {
234 broker_endpoints,
235 ..Default::default()
236 },
237 kafka_topic: KafkaTopicConfig {
238 num_topics,
239
240 ..Default::default()
241 },
242 auto_create_topics,
243 ..Default::default()
244 };
245 if let Some(prefix) = topic_name_prefix {
246 config.kafka_topic.topic_name_prefix = prefix.to_string();
247 }
248 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
249 let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
250 .await
251 .unwrap();
252
253 KafkaTopicPool::new(&config, kv_backend, topic_creator)
254}
255
256#[macro_export]
257macro_rules! maybe_skip_postgres_integration_test {
264 () => {
265 if std::env::var("GT_POSTGRES_ENDPOINTS").is_err() {
266 common_telemetry::warn!("The endpoints is empty, skipping the test");
267 return;
268 }
269 };
270}
271
272#[macro_export]
273macro_rules! maybe_skip_mysql_integration_test {
280 () => {
281 if std::env::var("GT_MYSQL_ENDPOINTS").is_err() {
282 common_telemetry::warn!("The endpoints is empty, skipping the test");
283 return;
284 }
285 };
286}
287
288#[macro_export]
289macro_rules! maybe_skip_postgres15_integration_test {
296 () => {
297 if std::env::var("GT_POSTGRES15_ENDPOINTS").is_err() {
298 common_telemetry::warn!("The PG15 endpoints is empty, skipping the test");
299 return;
300 }
301 };
302}
303
304#[macro_export]
305macro_rules! maybe_skip_etcd_tls_integration_test {
312 () => {
313 if std::env::var("GT_ETCD_TLS_ENDPOINTS").is_err() {
314 common_telemetry::warn!("The etcd with tls endpoints is empty, skipping the test");
315 return;
316 }
317 };
318}
319
320pub fn etcd_certs_dir() -> PathBuf {
322 let project_path = env!("CARGO_MANIFEST_DIR");
323 let project_path = PathBuf::from(project_path);
324 let base = project_path.ancestors().nth(3).unwrap();
325 base.join("tests-integration")
326 .join("fixtures")
327 .join("etcd-tls-certs")
328}
329
330pub fn test_certs_dir() -> PathBuf {
332 let project_path = env!("CARGO_MANIFEST_DIR");
333 let project_path = PathBuf::from(project_path);
334 let base = project_path.ancestors().nth(3).unwrap();
335 base.join("tests-integration")
336 .join("fixtures")
337 .join("certs")
338}