1#[cfg(any(test, feature = "mock"))]
16pub mod mock {
17 use std::io::Error;
18 use std::sync::Arc;
19
20 use api::v1::region::region_server::RegionServer;
21 use api::v1::region::{region_request, RegionResponse};
22 use api::v1::{ResponseHeader, Status as PbStatus};
23 use async_trait::async_trait;
24 use client::Client;
25 use common_grpc::channel_manager::ChannelManager;
26 use common_meta::peer::Peer;
27 use common_runtime::runtime::BuilderBuild;
28 use common_runtime::{Builder as RuntimeBuilder, Runtime};
29 use hyper_util::rt::TokioIo;
30 use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
31 use tokio::sync::mpsc;
32 use tonic::codec::CompressionEncoding;
33 use tonic::transport::Server;
34 use tower::service_fn;
35
36 #[derive(Clone)]
38 pub struct EchoRegionServer {
39 runtime: Runtime,
40 received_requests: mpsc::Sender<region_request::Body>,
41 }
42
43 impl EchoRegionServer {
44 pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
45 let (tx, rx) = mpsc::channel(10);
46 (
47 Self {
48 runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
49 received_requests: tx,
50 },
51 rx,
52 )
53 }
54
55 pub fn new_client(&self, datanode: &Peer) -> Client {
56 let (client, server) = tokio::io::duplex(1024);
57
58 let handler =
59 RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
60
61 tokio::spawn(async move {
62 Server::builder()
63 .add_service(
64 RegionServer::new(handler)
65 .accept_compressed(CompressionEncoding::Gzip)
66 .accept_compressed(CompressionEncoding::Zstd)
67 .send_compressed(CompressionEncoding::Gzip)
68 .send_compressed(CompressionEncoding::Zstd),
69 )
70 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
71 .await
72 });
73
74 let channel_manager = ChannelManager::new();
75 let mut client = Some(client);
76 channel_manager
77 .reset_with_connector(
78 datanode.addr.clone(),
79 service_fn(move |_| {
80 let client = client.take().unwrap();
81 async move { Ok::<_, Error>(TokioIo::new(client)) }
82 }),
83 )
84 .unwrap();
85 Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
86 }
87 }
88
89 #[async_trait]
90 impl RegionServerHandler for EchoRegionServer {
91 async fn handle(
92 &self,
93 request: region_request::Body,
94 ) -> servers::error::Result<RegionResponse> {
95 self.received_requests.send(request).await.unwrap();
96
97 Ok(RegionResponse {
98 header: Some(ResponseHeader {
99 status: Some(PbStatus {
100 status_code: 0,
101 err_msg: String::default(),
102 }),
103 }),
104 affected_rows: 0,
105 extensions: Default::default(),
106 metadata: Vec::new(),
107 })
108 }
109 }
110}
111
112#[cfg(test)]
113pub mod test_data {
114 use std::sync::Arc;
115
116 use chrono::DateTime;
117 use common_catalog::consts::MITO2_ENGINE;
118 use common_meta::ddl::flow_meta::FlowMetadataAllocator;
119 use common_meta::ddl::table_meta::TableMetadataAllocator;
120 use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
121 use common_meta::key::flow::FlowMetadataManager;
122 use common_meta::key::TableMetadataManager;
123 use common_meta::kv_backend::memory::MemoryKvBackend;
124 use common_meta::node_manager::NodeManagerRef;
125 use common_meta::peer::Peer;
126 use common_meta::region_keeper::MemoryRegionKeeper;
127 use common_meta::region_registry::LeaderRegionRegistry;
128 use common_meta::rpc::router::RegionRoute;
129 use common_meta::sequence::SequenceBuilder;
130 use common_meta::wal_options_allocator::WalOptionsAllocator;
131 use datatypes::prelude::ConcreteDataType;
132 use datatypes::schema::{ColumnSchema, RawSchema};
133 use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
134 use table::requests::TableOptions;
135
136 use crate::cache_invalidator::MetasrvCacheInvalidator;
137 use crate::handler::{HeartbeatMailbox, Pushers};
138 use crate::metasrv::MetasrvInfo;
139 use crate::test_util::new_region_route;
140
141 pub fn new_region_routes() -> Vec<RegionRoute> {
142 let peers = vec![
143 Peer::new(1, "127.0.0.1:4001"),
144 Peer::new(2, "127.0.0.1:4002"),
145 Peer::new(3, "127.0.0.1:4003"),
146 ];
147 vec![
148 new_region_route(1, &peers, 3),
149 new_region_route(2, &peers, 2),
150 new_region_route(3, &peers, 1),
151 ]
152 }
153
154 pub fn new_table_info() -> RawTableInfo {
155 RawTableInfo {
156 ident: TableIdent {
157 table_id: 42,
158 version: 1,
159 },
160 name: "my_table".to_string(),
161 desc: Some("blabla".to_string()),
162 catalog_name: "my_catalog".to_string(),
163 schema_name: "my_schema".to_string(),
164 meta: RawTableMeta {
165 schema: RawSchema {
166 column_schemas: vec![
167 ColumnSchema::new(
168 "ts".to_string(),
169 ConcreteDataType::timestamp_millisecond_datatype(),
170 false,
171 ),
172 ColumnSchema::new(
173 "my_tag1".to_string(),
174 ConcreteDataType::string_datatype(),
175 true,
176 ),
177 ColumnSchema::new(
178 "my_tag2".to_string(),
179 ConcreteDataType::string_datatype(),
180 true,
181 ),
182 ColumnSchema::new(
183 "my_field_column".to_string(),
184 ConcreteDataType::int32_datatype(),
185 true,
186 ),
187 ],
188 timestamp_index: Some(0),
189 version: 0,
190 },
191 primary_key_indices: vec![1, 2],
192 value_indices: vec![2],
193 engine: MITO2_ENGINE.to_string(),
194 next_column_id: 3,
195 region_numbers: vec![1, 2, 3],
196 options: TableOptions::default(),
197 created_on: DateTime::default(),
198 partition_key_indices: vec![],
199 },
200 table_type: TableType::Base,
201 }
202 }
203
204 pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
205 let kv_backend = Arc::new(MemoryKvBackend::new());
206
207 let mailbox_sequence =
208 SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
209 let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
210 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
211 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
212 Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
213 Arc::new(WalOptionsAllocator::default()),
214 ));
215 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
216 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
217 Arc::new(SequenceBuilder::new("test", kv_backend).build()),
218 ));
219 DdlContext {
220 node_manager,
221 cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
222 mailbox,
223 MetasrvInfo {
224 server_addr: "127.0.0.1:4321".to_string(),
225 },
226 )),
227 table_metadata_manager,
228 table_metadata_allocator,
229 flow_metadata_manager,
230 flow_metadata_allocator,
231 memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
232 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
233 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
234 }
235 }
236}