1#[cfg(any(test, feature = "mock"))]
16pub mod mock {
17 use std::io::Error;
18 use std::sync::Arc;
19
20 use api::v1::region::region_server::RegionServer;
21 use api::v1::region::{region_request, RegionResponse};
22 use api::v1::{ResponseHeader, Status as PbStatus};
23 use async_trait::async_trait;
24 use client::Client;
25 use common_grpc::channel_manager::ChannelManager;
26 use common_meta::peer::Peer;
27 use common_runtime::runtime::BuilderBuild;
28 use common_runtime::{Builder as RuntimeBuilder, Runtime};
29 use hyper_util::rt::TokioIo;
30 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
31 use tokio::sync::mpsc;
32 use tonic::codec::CompressionEncoding;
33 use tonic::transport::Server;
34 use tower::service_fn;
35
36 #[derive(Clone)]
38 pub struct EchoRegionServer {
39 runtime: Runtime,
40 received_requests: mpsc::Sender<region_request::Body>,
41 }
42
43 impl EchoRegionServer {
44 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
45 let (tx, rx) = mpsc::channel(10);
46 (
47 Self {
48 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
49 received_requests: tx,
50 },
51 rx,
52 )
53 }
54
55 pub fn new_client(&self, datanode: &Peer) -> Client {
56 let (client, server) = tokio::io::duplex(1024);
57
58 let handler =
59 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
60
61 tokio::spawn(async move {
62 Server::builder()
63 .add_service(
64 RegionServer::new(handler)
65 .accept_compressed(CompressionEncoding::Gzip)
66 .accept_compressed(CompressionEncoding::Zstd)
67 .send_compressed(CompressionEncoding::Gzip)
68 .send_compressed(CompressionEncoding::Zstd),
69 )
70 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
71 .await
72 });
73
74 let channel_manager = ChannelManager::new();
75 let mut client = Some(client);
76 channel_manager
77 .reset_with_connector(
78 datanode.addr.clone(),
79 service_fn(move |_| {
80 let client = client.take().unwrap();
81 async move { Ok::<_, Error>(TokioIo::new(client)) }
82 }),
83 )
84 .unwrap();
85 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
86 }
87 }
88
89 #[async_trait]
90 impl RegionServerHandler for EchoRegionServer {
91 async fn handle(
92 &self,
93 request: region_request::Body,
94 ) -> servers::error::Result<RegionResponse> {
95 self.received_requests.send(request).await.unwrap();
96
97 Ok(RegionResponse {
98 header: Some(ResponseHeader {
99 status: Some(PbStatus {
100 status_code: 0,
101 err_msg: String::default(),
102 }),
103 }),
104 affected_rows: 0,
105 extensions: Default::default(),
106 })
107 }
108 }
109}
110
111#[cfg(test)]
112pub mod test_data {
113 use std::sync::Arc;
114
115 use chrono::DateTime;
116 use common_catalog::consts::MITO2_ENGINE;
117 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
118 use common_meta::ddl::table_meta::TableMetadataAllocator;
119 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
120 use common_meta::key::flow::FlowMetadataManager;
121 use common_meta::key::TableMetadataManager;
122 use common_meta::kv_backend::memory::MemoryKvBackend;
123 use common_meta::node_manager::NodeManagerRef;
124 use common_meta::peer::Peer;
125 use common_meta::region_keeper::MemoryRegionKeeper;
126 use common_meta::region_registry::LeaderRegionRegistry;
127 use common_meta::rpc::router::RegionRoute;
128 use common_meta::sequence::SequenceBuilder;
129 use common_meta::wal_options_allocator::WalOptionsAllocator;
130 use datatypes::prelude::ConcreteDataType;
131 use datatypes::schema::{ColumnSchema, RawSchema};
132 use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
133 use table::requests::TableOptions;
134
135 use crate::cache_invalidator::MetasrvCacheInvalidator;
136 use crate::handler::{HeartbeatMailbox, Pushers};
137 use crate::metasrv::MetasrvInfo;
138 use crate::test_util::new_region_route;
139
140 pub fn new_region_routes() -> Vec<RegionRoute> {
141 let peers = vec![
142 Peer::new(1, "127.0.0.1:4001"),
143 Peer::new(2, "127.0.0.1:4002"),
144 Peer::new(3, "127.0.0.1:4003"),
145 ];
146 vec![
147 new_region_route(1, &peers, 3),
148 new_region_route(2, &peers, 2),
149 new_region_route(3, &peers, 1),
150 ]
151 }
152
153 pub fn new_table_info() -> RawTableInfo {
154 RawTableInfo {
155 ident: TableIdent {
156 table_id: 42,
157 version: 1,
158 },
159 name: "my_table".to_string(),
160 desc: Some("blabla".to_string()),
161 catalog_name: "my_catalog".to_string(),
162 schema_name: "my_schema".to_string(),
163 meta: RawTableMeta {
164 schema: RawSchema {
165 column_schemas: vec![
166 ColumnSchema::new(
167 "ts".to_string(),
168 ConcreteDataType::timestamp_millisecond_datatype(),
169 false,
170 ),
171 ColumnSchema::new(
172 "my_tag1".to_string(),
173 ConcreteDataType::string_datatype(),
174 true,
175 ),
176 ColumnSchema::new(
177 "my_tag2".to_string(),
178 ConcreteDataType::string_datatype(),
179 true,
180 ),
181 ColumnSchema::new(
182 "my_field_column".to_string(),
183 ConcreteDataType::int32_datatype(),
184 true,
185 ),
186 ],
187 timestamp_index: Some(0),
188 version: 0,
189 },
190 primary_key_indices: vec![1, 2],
191 value_indices: vec![2],
192 engine: MITO2_ENGINE.to_string(),
193 next_column_id: 3,
194 region_numbers: vec![1, 2, 3],
195 options: TableOptions::default(),
196 created_on: DateTime::default(),
197 partition_key_indices: vec![],
198 },
199 table_type: TableType::Base,
200 }
201 }
202
203 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
204 let kv_backend = Arc::new(MemoryKvBackend::new());
205
206 let mailbox_sequence =
207 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
208 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
209 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
210 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
211 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
212 Arc::new(WalOptionsAllocator::default()),
213 ));
214 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
215 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
216 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
217 ));
218 DdlContext {
219 node_manager,
220 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
221 mailbox,
222 MetasrvInfo {
223 server_addr: "127.0.0.1:4321".to_string(),
224 },
225 )),
226 table_metadata_manager,
227 table_metadata_allocator,
228 flow_metadata_manager,
229 flow_metadata_allocator,
230 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
231 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
232 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
233 }
234 }
235}