1use std::sync::Arc;
16
17use api::region::RegionResponse;
18use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
19use api::v1::region::{InsertRequests, RegionRequest};
20pub use common_base::AffectedRows;
21use common_query::request::QueryRequest;
22use common_recordbatch::SendableRecordBatchStream;
23use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
24use common_wal::config::kafka::MetasrvKafkaConfig;
25
26use crate::cache_invalidator::DummyCacheInvalidator;
27use crate::ddl::flow_meta::FlowMetadataAllocator;
28use crate::ddl::table_meta::TableMetadataAllocator;
29use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
30use crate::error::Result;
31use crate::key::flow::FlowMetadataManager;
32use crate::key::TableMetadataManager;
33use crate::kv_backend::memory::MemoryKvBackend;
34use crate::kv_backend::KvBackendRef;
35use crate::node_manager::{
36 Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
37};
38use crate::peer::{Peer, PeerLookupService};
39use crate::region_keeper::MemoryRegionKeeper;
40use crate::region_registry::LeaderRegionRegistry;
41use crate::sequence::SequenceBuilder;
42use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
43use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
44use crate::{DatanodeId, FlownodeId};
45
46#[async_trait::async_trait]
47pub trait MockDatanodeHandler: Sync + Send + Clone {
48 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse>;
49
50 async fn handle_query(
51 &self,
52 peer: &Peer,
53 request: QueryRequest,
54 ) -> Result<SendableRecordBatchStream>;
55}
56
57#[async_trait::async_trait]
58pub trait MockFlownodeHandler: Sync + Send + Clone {
59 async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result<FlowResponse> {
60 unimplemented!()
61 }
62
63 async fn handle_inserts(
64 &self,
65 _peer: &Peer,
66 _requests: InsertRequests,
67 ) -> Result<FlowResponse> {
68 unimplemented!()
69 }
70
71 async fn handle_mark_window_dirty(
72 &self,
73 _peer: &Peer,
74 _req: DirtyWindowRequest,
75 ) -> Result<FlowResponse> {
76 unimplemented!()
77 }
78}
79
80#[derive(Clone)]
82pub struct MockDatanodeManager<T> {
83 handler: T,
84}
85
86impl<T> MockDatanodeManager<T> {
87 pub fn new(handler: T) -> Self {
88 Self { handler }
89 }
90}
91
92#[derive(Clone)]
94pub struct MockFlownodeManager<T> {
95 handler: T,
96}
97
98impl<T> MockFlownodeManager<T> {
99 pub fn new(handler: T) -> Self {
100 Self { handler }
101 }
102}
103
104#[derive(Clone)]
106struct MockNode<T> {
107 peer: Peer,
108 handler: T,
109}
110
111#[async_trait::async_trait]
112impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
113 async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
114 self.handler.handle(&self.peer, request).await
115 }
116
117 async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
118 self.handler.handle_query(&self.peer, request).await
119 }
120}
121
122#[async_trait::async_trait]
123impl<T: MockDatanodeHandler + 'static> NodeManager for MockDatanodeManager<T> {
124 async fn datanode(&self, peer: &Peer) -> DatanodeRef {
125 Arc::new(MockNode {
126 peer: peer.clone(),
127 handler: self.handler.clone(),
128 })
129 }
130
131 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
132 unimplemented!()
133 }
134}
135
136#[async_trait::async_trait]
137impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
138 async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
139 self.handler.handle(&self.peer, request).await
140 }
141
142 async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
143 self.handler.handle_inserts(&self.peer, requests).await
144 }
145
146 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
147 self.handler.handle_mark_window_dirty(&self.peer, req).await
148 }
149}
150
151#[async_trait::async_trait]
152impl<T: MockFlownodeHandler + 'static> NodeManager for MockFlownodeManager<T> {
153 async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
154 unimplemented!()
155 }
156
157 async fn flownode(&self, peer: &Peer) -> FlownodeRef {
158 Arc::new(MockNode {
159 peer: peer.clone(),
160 handler: self.handler.clone(),
161 })
162 }
163}
164
165pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
167 let kv_backend = Arc::new(MemoryKvBackend::new());
168 new_ddl_context_with_kv_backend(node_manager, kv_backend)
169}
170
171pub fn new_ddl_context_with_kv_backend(
173 node_manager: NodeManagerRef,
174 kv_backend: KvBackendRef,
175) -> DdlContext {
176 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
177 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
178 Arc::new(
179 SequenceBuilder::new("test", kv_backend.clone())
180 .initial(1024)
181 .build(),
182 ),
183 Arc::new(WalOptionsAllocator::default()),
184 ));
185 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
186 let flow_metadata_allocator =
187 Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
188 SequenceBuilder::new("flow-test", kv_backend)
189 .initial(1024)
190 .build(),
191 )));
192 DdlContext {
193 node_manager,
194 cache_invalidator: Arc::new(DummyCacheInvalidator),
195 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
196 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
197 table_metadata_allocator,
198 table_metadata_manager,
199 flow_metadata_allocator,
200 flow_metadata_manager,
201 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
202 }
203}
204
205pub struct NoopPeerLookupService;
206
207#[async_trait::async_trait]
208impl PeerLookupService for NoopPeerLookupService {
209 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
210 Ok(Some(Peer::empty(id)))
211 }
212
213 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
214 Ok(Some(Peer::empty(id)))
215 }
216}
217
218pub async fn test_kafka_topic_pool(
220 broker_endpoints: Vec<String>,
221 num_topics: usize,
222 auto_create_topics: bool,
223 topic_name_prefix: Option<&str>,
224) -> KafkaTopicPool {
225 let mut config = MetasrvKafkaConfig {
226 connection: KafkaConnectionConfig {
227 broker_endpoints,
228 ..Default::default()
229 },
230 kafka_topic: KafkaTopicConfig {
231 num_topics,
232
233 ..Default::default()
234 },
235 auto_create_topics,
236 ..Default::default()
237 };
238 if let Some(prefix) = topic_name_prefix {
239 config.kafka_topic.topic_name_prefix = prefix.to_string();
240 }
241 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
242 let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
243 .await
244 .unwrap();
245
246 KafkaTopicPool::new(&config, kv_backend, topic_creator)
247}
248
249#[macro_export]
250macro_rules! maybe_skip_postgres_integration_test {
257 () => {
258 if std::env::var("GT_POSTGRES_ENDPOINTS").is_err() {
259 common_telemetry::warn!("The endpoints is empty, skipping the test");
260 return;
261 }
262 };
263}
264
265#[macro_export]
266macro_rules! maybe_skip_mysql_integration_test {
273 () => {
274 if std::env::var("GT_MYSQL_ENDPOINTS").is_err() {
275 common_telemetry::warn!("The endpoints is empty, skipping the test");
276 return;
277 }
278 };
279}