meta_srv/handler/
response_header_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, ResponseHeader, Role, PROTOCOL_VERSION};
16
17use crate::error::Result;
18use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
19use crate::metasrv::Context;
20
21pub struct ResponseHeaderHandler;
22
23#[async_trait::async_trait]
24impl HeartbeatHandler for ResponseHeaderHandler {
25    fn is_acceptable(&self, _: Role) -> bool {
26        true
27    }
28
29    async fn handle(
30        &self,
31        _req: &HeartbeatRequest,
32        _ctx: &mut Context,
33        acc: &mut HeartbeatAccumulator,
34    ) -> Result<HandleControl> {
35        let res_header = ResponseHeader {
36            protocol_version: PROTOCOL_VERSION,
37            ..Default::default()
38        };
39        acc.header = Some(res_header);
40        Ok(HandleControl::Continue)
41    }
42}
43
44#[cfg(test)]
45mod tests {
46    use std::sync::Arc;
47
48    use api::v1::meta::RequestHeader;
49    use common_meta::cache_invalidator::DummyCacheInvalidator;
50    use common_meta::key::TableMetadataManager;
51    use common_meta::kv_backend::memory::MemoryKvBackend;
52    use common_meta::region_registry::LeaderRegionRegistry;
53    use common_meta::sequence::SequenceBuilder;
54    use common_telemetry::tracing_context::W3cTrace;
55
56    use super::*;
57    use crate::cluster::MetaPeerClientBuilder;
58    use crate::handler::{Context, HeartbeatMailbox, Pushers};
59    use crate::service::store::cached_kv::LeaderCachedKvBackend;
60
61    #[tokio::test]
62    async fn test_handle_heartbeat_resp_header() {
63        let in_memory = Arc::new(MemoryKvBackend::new());
64        let kv_backend = Arc::new(MemoryKvBackend::new());
65        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
66            kv_backend.clone(),
67        ));
68        let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
69        let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
70        let meta_peer_client = MetaPeerClientBuilder::default()
71            .election(None)
72            .in_memory(in_memory.clone())
73            .build()
74            .map(Arc::new)
75            // Safety: all required fields set at initialization
76            .unwrap();
77        let mut ctx = Context {
78            server_addr: "127.0.0.1:0000".to_string(),
79            in_memory,
80            kv_backend: kv_backend.clone(),
81            leader_cached_kv_backend,
82            meta_peer_client,
83            mailbox,
84            election: None,
85            is_infancy: false,
86            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
87            cache_invalidator: Arc::new(DummyCacheInvalidator),
88            leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
89        };
90
91        let req = HeartbeatRequest {
92            header: Some(RequestHeader::new(2, Role::Datanode, W3cTrace::new())),
93            ..Default::default()
94        };
95        let mut acc = HeartbeatAccumulator::default();
96
97        let response_handler = ResponseHeaderHandler {};
98        response_handler
99            .handle(&req, &mut ctx, &mut acc)
100            .await
101            .unwrap();
102    }
103}