meta_srv/handler/
response_header_handler.rs1use api::v1::meta::{HeartbeatRequest, ResponseHeader, Role, PROTOCOL_VERSION};
16
17use crate::error::Result;
18use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
19use crate::metasrv::Context;
20
21pub struct ResponseHeaderHandler;
22
23#[async_trait::async_trait]
24impl HeartbeatHandler for ResponseHeaderHandler {
25 fn is_acceptable(&self, _: Role) -> bool {
26 true
27 }
28
29 async fn handle(
30 &self,
31 _req: &HeartbeatRequest,
32 _ctx: &mut Context,
33 acc: &mut HeartbeatAccumulator,
34 ) -> Result<HandleControl> {
35 let res_header = ResponseHeader {
36 protocol_version: PROTOCOL_VERSION,
37 ..Default::default()
38 };
39 acc.header = Some(res_header);
40 Ok(HandleControl::Continue)
41 }
42}
43
44#[cfg(test)]
45mod tests {
46 use std::sync::Arc;
47
48 use api::v1::meta::RequestHeader;
49 use common_meta::cache_invalidator::DummyCacheInvalidator;
50 use common_meta::key::TableMetadataManager;
51 use common_meta::kv_backend::memory::MemoryKvBackend;
52 use common_meta::region_registry::LeaderRegionRegistry;
53 use common_meta::sequence::SequenceBuilder;
54 use common_telemetry::tracing_context::W3cTrace;
55
56 use super::*;
57 use crate::cluster::MetaPeerClientBuilder;
58 use crate::handler::{Context, HeartbeatMailbox, Pushers};
59 use crate::service::store::cached_kv::LeaderCachedKvBackend;
60
61 #[tokio::test]
62 async fn test_handle_heartbeat_resp_header() {
63 let in_memory = Arc::new(MemoryKvBackend::new());
64 let kv_backend = Arc::new(MemoryKvBackend::new());
65 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
66 kv_backend.clone(),
67 ));
68 let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
69 let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
70 let meta_peer_client = MetaPeerClientBuilder::default()
71 .election(None)
72 .in_memory(in_memory.clone())
73 .build()
74 .map(Arc::new)
75 .unwrap();
77 let mut ctx = Context {
78 server_addr: "127.0.0.1:0000".to_string(),
79 in_memory,
80 kv_backend: kv_backend.clone(),
81 leader_cached_kv_backend,
82 meta_peer_client,
83 mailbox,
84 election: None,
85 is_infancy: false,
86 table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
87 cache_invalidator: Arc::new(DummyCacheInvalidator),
88 leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
89 };
90
91 let req = HeartbeatRequest {
92 header: Some(RequestHeader::new(2, Role::Datanode, W3cTrace::new())),
93 ..Default::default()
94 };
95 let mut acc = HeartbeatAccumulator::default();
96
97 let response_handler = ResponseHeaderHandler {};
98 response_handler
99 .handle(&req, &mut ctx, &mut acc)
100 .await
101 .unwrap();
102 }
103}