1use std::sync::Arc;
16
17use api::region::RegionResponse;
18use api::v1::flow::{FlowRequest, FlowResponse};
19use api::v1::region::{InsertRequests, RegionRequest};
20pub use common_base::AffectedRows;
21use common_query::request::QueryRequest;
22use common_recordbatch::SendableRecordBatchStream;
23use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
24use common_wal::config::kafka::MetasrvKafkaConfig;
25
26use crate::cache_invalidator::DummyCacheInvalidator;
27use crate::ddl::flow_meta::FlowMetadataAllocator;
28use crate::ddl::table_meta::TableMetadataAllocator;
29use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
30use crate::error::Result;
31use crate::key::flow::FlowMetadataManager;
32use crate::key::TableMetadataManager;
33use crate::kv_backend::memory::MemoryKvBackend;
34use crate::kv_backend::KvBackendRef;
35use crate::node_manager::{
36 Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
37};
38use crate::peer::{Peer, PeerLookupService};
39use crate::region_keeper::MemoryRegionKeeper;
40use crate::region_registry::LeaderRegionRegistry;
41use crate::sequence::SequenceBuilder;
42use crate::wal_options_allocator::topic_pool::KafkaTopicPool;
43use crate::wal_options_allocator::{build_kafka_topic_creator, WalOptionsAllocator};
44use crate::{DatanodeId, FlownodeId};
45
46#[async_trait::async_trait]
47pub trait MockDatanodeHandler: Sync + Send + Clone {
48 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse>;
49
50 async fn handle_query(
51 &self,
52 peer: &Peer,
53 request: QueryRequest,
54 ) -> Result<SendableRecordBatchStream>;
55}
56
57#[async_trait::async_trait]
58pub trait MockFlownodeHandler: Sync + Send + Clone {
59 async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result<FlowResponse> {
60 unimplemented!()
61 }
62
63 async fn handle_inserts(
64 &self,
65 _peer: &Peer,
66 _requests: InsertRequests,
67 ) -> Result<FlowResponse> {
68 unimplemented!()
69 }
70}
71
72#[derive(Clone)]
74pub struct MockDatanodeManager<T> {
75 handler: T,
76}
77
78impl<T> MockDatanodeManager<T> {
79 pub fn new(handler: T) -> Self {
80 Self { handler }
81 }
82}
83
84#[derive(Clone)]
86pub struct MockFlownodeManager<T> {
87 handler: T,
88}
89
90impl<T> MockFlownodeManager<T> {
91 pub fn new(handler: T) -> Self {
92 Self { handler }
93 }
94}
95
96#[derive(Clone)]
98struct MockNode<T> {
99 peer: Peer,
100 handler: T,
101}
102
103#[async_trait::async_trait]
104impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
105 async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
106 self.handler.handle(&self.peer, request).await
107 }
108
109 async fn handle_query(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
110 self.handler.handle_query(&self.peer, request).await
111 }
112}
113
114#[async_trait::async_trait]
115impl<T: MockDatanodeHandler + 'static> NodeManager for MockDatanodeManager<T> {
116 async fn datanode(&self, peer: &Peer) -> DatanodeRef {
117 Arc::new(MockNode {
118 peer: peer.clone(),
119 handler: self.handler.clone(),
120 })
121 }
122
123 async fn flownode(&self, _node: &Peer) -> FlownodeRef {
124 unimplemented!()
125 }
126}
127
128#[async_trait::async_trait]
129impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
130 async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
131 self.handler.handle(&self.peer, request).await
132 }
133
134 async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
135 self.handler.handle_inserts(&self.peer, requests).await
136 }
137}
138
139#[async_trait::async_trait]
140impl<T: MockFlownodeHandler + 'static> NodeManager for MockFlownodeManager<T> {
141 async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
142 unimplemented!()
143 }
144
145 async fn flownode(&self, peer: &Peer) -> FlownodeRef {
146 Arc::new(MockNode {
147 peer: peer.clone(),
148 handler: self.handler.clone(),
149 })
150 }
151}
152
153pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
155 let kv_backend = Arc::new(MemoryKvBackend::new());
156 new_ddl_context_with_kv_backend(node_manager, kv_backend)
157}
158
159pub fn new_ddl_context_with_kv_backend(
161 node_manager: NodeManagerRef,
162 kv_backend: KvBackendRef,
163) -> DdlContext {
164 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
165 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
166 Arc::new(
167 SequenceBuilder::new("test", kv_backend.clone())
168 .initial(1024)
169 .build(),
170 ),
171 Arc::new(WalOptionsAllocator::default()),
172 ));
173 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
174 let flow_metadata_allocator =
175 Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(Arc::new(
176 SequenceBuilder::new("flow-test", kv_backend)
177 .initial(1024)
178 .build(),
179 )));
180 DdlContext {
181 node_manager,
182 cache_invalidator: Arc::new(DummyCacheInvalidator),
183 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
184 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
185 table_metadata_allocator,
186 table_metadata_manager,
187 flow_metadata_allocator,
188 flow_metadata_manager,
189 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
190 }
191}
192
193pub struct NoopPeerLookupService;
194
195#[async_trait::async_trait]
196impl PeerLookupService for NoopPeerLookupService {
197 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
198 Ok(Some(Peer::empty(id)))
199 }
200
201 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
202 Ok(Some(Peer::empty(id)))
203 }
204}
205
206pub async fn test_kafka_topic_pool(
208 broker_endpoints: Vec<String>,
209 num_topics: usize,
210 auto_create_topics: bool,
211 topic_name_prefix: Option<&str>,
212) -> KafkaTopicPool {
213 let mut config = MetasrvKafkaConfig {
214 connection: KafkaConnectionConfig {
215 broker_endpoints,
216 ..Default::default()
217 },
218 kafka_topic: KafkaTopicConfig {
219 num_topics,
220
221 ..Default::default()
222 },
223 auto_create_topics,
224 ..Default::default()
225 };
226 if let Some(prefix) = topic_name_prefix {
227 config.kafka_topic.topic_name_prefix = prefix.to_string();
228 }
229 let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
230 let topic_creator = build_kafka_topic_creator(&config.connection, &config.kafka_topic)
231 .await
232 .unwrap();
233
234 KafkaTopicPool::new(&config, kv_backend, topic_creator)
235}