meta_srv/handler/
collect_leader_region_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{HeartbeatRequest, Role};
16use common_meta::region_registry::{LeaderRegion, LeaderRegionManifestInfo};
17use store_api::region_engine::RegionRole;
18
19use crate::error::Result;
20use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
21use crate::metasrv::Context;
22
23pub struct CollectLeaderRegionHandler;
24
25#[async_trait::async_trait]
26impl HeartbeatHandler for CollectLeaderRegionHandler {
27    fn is_acceptable(&self, role: Role) -> bool {
28        role == Role::Datanode
29    }
30
31    async fn handle(
32        &self,
33        _req: &HeartbeatRequest,
34        ctx: &mut Context,
35        acc: &mut HeartbeatAccumulator,
36    ) -> Result<HandleControl> {
37        let Some(current_stat) = acc.stat.as_ref() else {
38            return Ok(HandleControl::Continue);
39        };
40
41        let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
42        for stat in current_stat.region_stats.iter() {
43            if stat.role != RegionRole::Leader {
44                continue;
45            }
46
47            let manifest = LeaderRegionManifestInfo::from_region_stat(stat);
48            let value = LeaderRegion {
49                datanode_id: current_stat.id,
50                manifest,
51            };
52            key_values.push((stat.id, value));
53        }
54        ctx.leader_region_registry.batch_put(key_values);
55
56        Ok(HandleControl::Continue)
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use std::sync::Arc;
63
64    use common_meta::cache_invalidator::DummyCacheInvalidator;
65    use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
66    use common_meta::key::TableMetadataManager;
67    use common_meta::kv_backend::memory::MemoryKvBackend;
68    use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
69    use common_meta::sequence::SequenceBuilder;
70    use store_api::region_engine::RegionRole;
71    use store_api::storage::RegionId;
72
73    use super::*;
74    use crate::cluster::MetaPeerClientBuilder;
75    use crate::handler::{HeartbeatMailbox, Pushers};
76    use crate::service::store::cached_kv::LeaderCachedKvBackend;
77
78    fn mock_ctx() -> Context {
79        let in_memory = Arc::new(MemoryKvBackend::new());
80        let kv_backend = Arc::new(MemoryKvBackend::new());
81        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
82            kv_backend.clone(),
83        ));
84        let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
85        let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
86        let meta_peer_client = MetaPeerClientBuilder::default()
87            .election(None)
88            .in_memory(in_memory.clone())
89            .build()
90            .map(Arc::new)
91            // Safety: all required fields set at initialization
92            .unwrap();
93        Context {
94            server_addr: "127.0.0.1:0000".to_string(),
95            in_memory,
96            kv_backend: kv_backend.clone(),
97            leader_cached_kv_backend,
98            meta_peer_client,
99            mailbox,
100            election: None,
101            is_infancy: false,
102            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
103            cache_invalidator: Arc::new(DummyCacheInvalidator),
104            leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
105        }
106    }
107
108    fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
109        RegionStat {
110            id,
111            region_manifest: RegionManifestInfo::Mito {
112                manifest_version,
113                flushed_entry_id: 0,
114            },
115            rcus: 0,
116            wcus: 0,
117            approximate_bytes: 0,
118            engine: "mito".to_string(),
119            role,
120            num_rows: 0,
121            memtable_size: 0,
122            manifest_size: 0,
123            sst_size: 0,
124            index_size: 0,
125            data_topic_latest_entry_id: 0,
126            metadata_topic_latest_entry_id: 0,
127        }
128    }
129
130    #[tokio::test]
131    async fn test_handle_collect_leader_region() {
132        let mut ctx = mock_ctx();
133
134        let mut acc = HeartbeatAccumulator {
135            stat: Some(Stat {
136                id: 1,
137                region_stats: vec![
138                    new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader),
139                    new_region_stat(RegionId::new(1, 2), 2, RegionRole::Follower),
140                ],
141                addr: "127.0.0.1:0000".to_string(),
142                region_num: 2,
143                ..Default::default()
144            }),
145            ..Default::default()
146        };
147
148        let handler = CollectLeaderRegionHandler;
149        let control = handler
150            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
151            .await
152            .unwrap();
153
154        assert_eq!(control, HandleControl::Continue);
155        let regions = ctx
156            .leader_region_registry
157            .batch_get(vec![RegionId::new(1, 1), RegionId::new(1, 2)].into_iter());
158        assert_eq!(regions.len(), 1);
159        assert_eq!(
160            regions.get(&RegionId::new(1, 1)),
161            Some(&LeaderRegion {
162                datanode_id: 1,
163                manifest: LeaderRegionManifestInfo::Mito {
164                    manifest_version: 1,
165                    flushed_entry_id: 0,
166                    topic_latest_entry_id: 0,
167                },
168            })
169        );
170
171        // New heartbeat with new manifest version
172        acc.stat = Some(Stat {
173            id: 1,
174            region_stats: vec![new_region_stat(RegionId::new(1, 1), 2, RegionRole::Leader)],
175            timestamp_millis: 0,
176            addr: "127.0.0.1:0000".to_string(),
177            region_num: 1,
178            node_epoch: 0,
179            ..Default::default()
180        });
181        let control = handler
182            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
183            .await
184            .unwrap();
185
186        assert_eq!(control, HandleControl::Continue);
187        let regions = ctx
188            .leader_region_registry
189            .batch_get(vec![RegionId::new(1, 1)].into_iter());
190        assert_eq!(regions.len(), 1);
191        assert_eq!(
192            regions.get(&RegionId::new(1, 1)),
193            Some(&LeaderRegion {
194                datanode_id: 1,
195                manifest: LeaderRegionManifestInfo::Mito {
196                    manifest_version: 2,
197                    flushed_entry_id: 0,
198                    topic_latest_entry_id: 0,
199                },
200            })
201        );
202
203        // New heartbeat with old manifest version
204        acc.stat = Some(Stat {
205            id: 1,
206            region_stats: vec![new_region_stat(RegionId::new(1, 1), 1, RegionRole::Leader)],
207            timestamp_millis: 0,
208            addr: "127.0.0.1:0000".to_string(),
209            region_num: 1,
210            node_epoch: 0,
211            ..Default::default()
212        });
213        let control = handler
214            .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
215            .await
216            .unwrap();
217
218        assert_eq!(control, HandleControl::Continue);
219        let regions = ctx
220            .leader_region_registry
221            .batch_get(vec![RegionId::new(1, 1)].into_iter());
222        assert_eq!(regions.len(), 1);
223        assert_eq!(
224            regions.get(&RegionId::new(1, 1)),
225            // The manifest version is not updated
226            Some(&LeaderRegion {
227                datanode_id: 1,
228                manifest: LeaderRegionManifestInfo::Mito {
229                    manifest_version: 2,
230                    flushed_entry_id: 0,
231                    topic_latest_entry_id: 0,
232                },
233            })
234        );
235    }
236}