meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "mock"))]
16pub mod mock {
17    use std::io::Error;
18    use std::sync::Arc;
19
20    use api::v1::region::region_server::RegionServer;
21    use api::v1::region::{region_request, RegionResponse};
22    use api::v1::{ResponseHeader, Status as PbStatus};
23    use async_trait::async_trait;
24    use client::Client;
25    use common_grpc::channel_manager::ChannelManager;
26    use common_meta::peer::Peer;
27    use common_runtime::runtime::BuilderBuild;
28    use common_runtime::{Builder as RuntimeBuilder, Runtime};
29    use hyper_util::rt::TokioIo;
30    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
31    use tokio::sync::mpsc;
32    use tonic::codec::CompressionEncoding;
33    use tonic::transport::Server;
34    use tower::service_fn;
35
36    /// An mock implementation of region server that simply echoes the request.
37    #[derive(Clone)]
38    pub struct EchoRegionServer {
39        runtime: Runtime,
40        received_requests: mpsc::Sender<region_request::Body>,
41    }
42
43    impl EchoRegionServer {
44        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
45            let (tx, rx) = mpsc::channel(10);
46            (
47                Self {
48                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
49                    received_requests: tx,
50                },
51                rx,
52            )
53        }
54
55        pub fn new_client(&self, datanode: &Peer) -> Client {
56            let (client, server) = tokio::io::duplex(1024);
57
58            let handler =
59                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
60
61            tokio::spawn(async move {
62                Server::builder()
63                    .add_service(
64                        RegionServer::new(handler)
65                            .accept_compressed(CompressionEncoding::Gzip)
66                            .accept_compressed(CompressionEncoding::Zstd)
67                            .send_compressed(CompressionEncoding::Gzip)
68                            .send_compressed(CompressionEncoding::Zstd),
69                    )
70                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
71                    .await
72            });
73
74            let channel_manager = ChannelManager::new();
75            let mut client = Some(client);
76            channel_manager
77                .reset_with_connector(
78                    datanode.addr.clone(),
79                    service_fn(move |_| {
80                        let client = client.take().unwrap();
81                        async move { Ok::<_, Error>(TokioIo::new(client)) }
82                    }),
83                )
84                .unwrap();
85            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
86        }
87    }
88
89    #[async_trait]
90    impl RegionServerHandler for EchoRegionServer {
91        async fn handle(
92            &self,
93            request: region_request::Body,
94        ) -> servers::error::Result<RegionResponse> {
95            self.received_requests.send(request).await.unwrap();
96
97            Ok(RegionResponse {
98                header: Some(ResponseHeader {
99                    status: Some(PbStatus {
100                        status_code: 0,
101                        err_msg: String::default(),
102                    }),
103                }),
104                affected_rows: 0,
105                extensions: Default::default(),
106            })
107        }
108    }
109}
110
111#[cfg(test)]
112pub mod test_data {
113    use std::sync::Arc;
114
115    use chrono::DateTime;
116    use common_catalog::consts::MITO2_ENGINE;
117    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
118    use common_meta::ddl::table_meta::TableMetadataAllocator;
119    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
120    use common_meta::key::flow::FlowMetadataManager;
121    use common_meta::key::TableMetadataManager;
122    use common_meta::kv_backend::memory::MemoryKvBackend;
123    use common_meta::node_manager::NodeManagerRef;
124    use common_meta::peer::Peer;
125    use common_meta::region_keeper::MemoryRegionKeeper;
126    use common_meta::region_registry::LeaderRegionRegistry;
127    use common_meta::rpc::router::RegionRoute;
128    use common_meta::sequence::SequenceBuilder;
129    use common_meta::wal_options_allocator::WalOptionsAllocator;
130    use datatypes::prelude::ConcreteDataType;
131    use datatypes::schema::{ColumnSchema, RawSchema};
132    use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
133    use table::requests::TableOptions;
134
135    use crate::cache_invalidator::MetasrvCacheInvalidator;
136    use crate::handler::{HeartbeatMailbox, Pushers};
137    use crate::metasrv::MetasrvInfo;
138    use crate::test_util::new_region_route;
139
140    pub fn new_region_routes() -> Vec<RegionRoute> {
141        let peers = vec![
142            Peer::new(1, "127.0.0.1:4001"),
143            Peer::new(2, "127.0.0.1:4002"),
144            Peer::new(3, "127.0.0.1:4003"),
145        ];
146        vec![
147            new_region_route(1, &peers, 3),
148            new_region_route(2, &peers, 2),
149            new_region_route(3, &peers, 1),
150        ]
151    }
152
153    pub fn new_table_info() -> RawTableInfo {
154        RawTableInfo {
155            ident: TableIdent {
156                table_id: 42,
157                version: 1,
158            },
159            name: "my_table".to_string(),
160            desc: Some("blabla".to_string()),
161            catalog_name: "my_catalog".to_string(),
162            schema_name: "my_schema".to_string(),
163            meta: RawTableMeta {
164                schema: RawSchema {
165                    column_schemas: vec![
166                        ColumnSchema::new(
167                            "ts".to_string(),
168                            ConcreteDataType::timestamp_millisecond_datatype(),
169                            false,
170                        ),
171                        ColumnSchema::new(
172                            "my_tag1".to_string(),
173                            ConcreteDataType::string_datatype(),
174                            true,
175                        ),
176                        ColumnSchema::new(
177                            "my_tag2".to_string(),
178                            ConcreteDataType::string_datatype(),
179                            true,
180                        ),
181                        ColumnSchema::new(
182                            "my_field_column".to_string(),
183                            ConcreteDataType::int32_datatype(),
184                            true,
185                        ),
186                    ],
187                    timestamp_index: Some(0),
188                    version: 0,
189                },
190                primary_key_indices: vec![1, 2],
191                value_indices: vec![2],
192                engine: MITO2_ENGINE.to_string(),
193                next_column_id: 3,
194                region_numbers: vec![1, 2, 3],
195                options: TableOptions::default(),
196                created_on: DateTime::default(),
197                partition_key_indices: vec![],
198            },
199            table_type: TableType::Base,
200        }
201    }
202
203    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
204        let kv_backend = Arc::new(MemoryKvBackend::new());
205
206        let mailbox_sequence =
207            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
208        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
209        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
210        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
211            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
212            Arc::new(WalOptionsAllocator::default()),
213        ));
214        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
215        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
216            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
217        ));
218        DdlContext {
219            node_manager,
220            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
221                mailbox,
222                MetasrvInfo {
223                    server_addr: "127.0.0.1:4321".to_string(),
224                },
225            )),
226            table_metadata_manager,
227            table_metadata_allocator,
228            flow_metadata_manager,
229            flow_metadata_allocator,
230            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
231            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
232            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
233        }
234    }
235}