meta_srv/procedure/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(any(test, feature = "mock"))]
16pub mod mock {
17    use std::io::Error;
18    use std::sync::Arc;
19
20    use api::v1::region::region_server::RegionServer;
21    use api::v1::region::{region_request, RegionResponse};
22    use api::v1::{ResponseHeader, Status as PbStatus};
23    use async_trait::async_trait;
24    use client::Client;
25    use common_grpc::channel_manager::ChannelManager;
26    use common_meta::peer::Peer;
27    use common_runtime::runtime::BuilderBuild;
28    use common_runtime::{Builder as RuntimeBuilder, Runtime};
29    use hyper_util::rt::TokioIo;
30    use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
31    use tokio::sync::mpsc;
32    use tonic::codec::CompressionEncoding;
33    use tonic::transport::Server;
34    use tower::service_fn;
35
36    /// An mock implementation of region server that simply echoes the request.
37    #[derive(Clone)]
38    pub struct EchoRegionServer {
39        runtime: Runtime,
40        received_requests: mpsc::Sender<region_request::Body>,
41    }
42
43    impl EchoRegionServer {
44        pub fn new() -> (Self, mpsc::Receiver<region_request::Body>) {
45            let (tx, rx) = mpsc::channel(10);
46            (
47                Self {
48                    runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
49                    received_requests: tx,
50                },
51                rx,
52            )
53        }
54
55        pub fn new_client(&self, datanode: &Peer) -> Client {
56            let (client, server) = tokio::io::duplex(1024);
57
58            let handler =
59                RegionServerRequestHandler::new(Arc::new(self.clone()), self.runtime.clone());
60
61            tokio::spawn(async move {
62                Server::builder()
63                    .add_service(
64                        RegionServer::new(handler)
65                            .accept_compressed(CompressionEncoding::Gzip)
66                            .accept_compressed(CompressionEncoding::Zstd)
67                            .send_compressed(CompressionEncoding::Gzip)
68                            .send_compressed(CompressionEncoding::Zstd),
69                    )
70                    .serve_with_incoming(futures::stream::iter(vec![Ok::<_, Error>(server)]))
71                    .await
72            });
73
74            let channel_manager = ChannelManager::new();
75            let mut client = Some(client);
76            channel_manager
77                .reset_with_connector(
78                    datanode.addr.clone(),
79                    service_fn(move |_| {
80                        let client = client.take().unwrap();
81                        async move { Ok::<_, Error>(TokioIo::new(client)) }
82                    }),
83                )
84                .unwrap();
85            Client::with_manager_and_urls(channel_manager, vec![datanode.addr.clone()])
86        }
87    }
88
89    #[async_trait]
90    impl RegionServerHandler for EchoRegionServer {
91        async fn handle(
92            &self,
93            request: region_request::Body,
94        ) -> servers::error::Result<RegionResponse> {
95            self.received_requests.send(request).await.unwrap();
96
97            Ok(RegionResponse {
98                header: Some(ResponseHeader {
99                    status: Some(PbStatus {
100                        status_code: 0,
101                        err_msg: String::default(),
102                    }),
103                }),
104                affected_rows: 0,
105                extensions: Default::default(),
106                metadata: Vec::new(),
107            })
108        }
109    }
110}
111
112#[cfg(test)]
113pub mod test_data {
114    use std::sync::Arc;
115
116    use chrono::DateTime;
117    use common_catalog::consts::MITO2_ENGINE;
118    use common_meta::ddl::flow_meta::FlowMetadataAllocator;
119    use common_meta::ddl::table_meta::TableMetadataAllocator;
120    use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
121    use common_meta::key::flow::FlowMetadataManager;
122    use common_meta::key::TableMetadataManager;
123    use common_meta::kv_backend::memory::MemoryKvBackend;
124    use common_meta::node_manager::NodeManagerRef;
125    use common_meta::peer::Peer;
126    use common_meta::region_keeper::MemoryRegionKeeper;
127    use common_meta::region_registry::LeaderRegionRegistry;
128    use common_meta::rpc::router::RegionRoute;
129    use common_meta::sequence::SequenceBuilder;
130    use common_meta::wal_options_allocator::WalOptionsAllocator;
131    use datatypes::prelude::ConcreteDataType;
132    use datatypes::schema::{ColumnSchema, RawSchema};
133    use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
134    use table::requests::TableOptions;
135
136    use crate::cache_invalidator::MetasrvCacheInvalidator;
137    use crate::handler::{HeartbeatMailbox, Pushers};
138    use crate::metasrv::MetasrvInfo;
139    use crate::test_util::new_region_route;
140
141    pub fn new_region_routes() -> Vec<RegionRoute> {
142        let peers = vec![
143            Peer::new(1, "127.0.0.1:4001"),
144            Peer::new(2, "127.0.0.1:4002"),
145            Peer::new(3, "127.0.0.1:4003"),
146        ];
147        vec![
148            new_region_route(1, &peers, 3),
149            new_region_route(2, &peers, 2),
150            new_region_route(3, &peers, 1),
151        ]
152    }
153
154    pub fn new_table_info() -> RawTableInfo {
155        RawTableInfo {
156            ident: TableIdent {
157                table_id: 42,
158                version: 1,
159            },
160            name: "my_table".to_string(),
161            desc: Some("blabla".to_string()),
162            catalog_name: "my_catalog".to_string(),
163            schema_name: "my_schema".to_string(),
164            meta: RawTableMeta {
165                schema: RawSchema {
166                    column_schemas: vec![
167                        ColumnSchema::new(
168                            "ts".to_string(),
169                            ConcreteDataType::timestamp_millisecond_datatype(),
170                            false,
171                        ),
172                        ColumnSchema::new(
173                            "my_tag1".to_string(),
174                            ConcreteDataType::string_datatype(),
175                            true,
176                        ),
177                        ColumnSchema::new(
178                            "my_tag2".to_string(),
179                            ConcreteDataType::string_datatype(),
180                            true,
181                        ),
182                        ColumnSchema::new(
183                            "my_field_column".to_string(),
184                            ConcreteDataType::int32_datatype(),
185                            true,
186                        ),
187                    ],
188                    timestamp_index: Some(0),
189                    version: 0,
190                },
191                primary_key_indices: vec![1, 2],
192                value_indices: vec![2],
193                engine: MITO2_ENGINE.to_string(),
194                next_column_id: 3,
195                region_numbers: vec![1, 2, 3],
196                options: TableOptions::default(),
197                created_on: DateTime::default(),
198                partition_key_indices: vec![],
199            },
200            table_type: TableType::Base,
201        }
202    }
203
204    pub(crate) fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
205        let kv_backend = Arc::new(MemoryKvBackend::new());
206
207        let mailbox_sequence =
208            SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
209        let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
210        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
211        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
212            Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
213            Arc::new(WalOptionsAllocator::default()),
214        ));
215        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
216        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
217            Arc::new(SequenceBuilder::new("test", kv_backend).build()),
218        ));
219        DdlContext {
220            node_manager,
221            cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
222                mailbox,
223                MetasrvInfo {
224                    server_addr: "127.0.0.1:4321".to_string(),
225                },
226            )),
227            table_metadata_manager,
228            table_metadata_allocator,
229            flow_metadata_manager,
230            flow_metadata_allocator,
231            memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
232            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
233            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
234        }
235    }
236}