common_meta/rpc/
router.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18    Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19    TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::error::{self, Result};
31use crate::key::RegionDistribution;
32use crate::peer::Peer;
33use crate::DatanodeId;
34
35/// Returns the distribution of regions to datanodes.
36///
37/// The distribution is a map of datanode id to a list of region ids.
38/// The list of region ids is sorted in ascending order.
39pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40    let mut regions_id_map = RegionDistribution::new();
41    for route in region_routes.iter() {
42        if let Some(peer) = route.leader_peer.as_ref() {
43            let region_id = route.region.id.region_number();
44            regions_id_map.entry(peer.id).or_default().push(region_id);
45        }
46        for peer in route.follower_peers.iter() {
47            let region_id = route.region.id.region_number();
48            regions_id_map.entry(peer.id).or_default().push(region_id);
49        }
50    }
51    for (_, regions) in regions_id_map.iter_mut() {
52        // id asc
53        regions.sort()
54    }
55    regions_id_map
56}
57
58#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
59pub struct TableRoute {
60    pub table: Table,
61    pub region_routes: Vec<RegionRoute>,
62    region_leaders: HashMap<RegionNumber, Option<Peer>>,
63}
64
65/// Returns the leader peers of the table.
66pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
67    region_routes
68        .iter()
69        .flat_map(|x| &x.leader_peer)
70        .cloned()
71        .collect()
72}
73
74/// Returns the followers of the table.
75pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
76    region_routes
77        .iter()
78        .flat_map(|x| &x.follower_peers)
79        .cloned()
80        .collect()
81}
82
83/// Returns the operating leader regions with corresponding [DatanodeId].
84pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
85    region_routes
86        .iter()
87        .filter_map(|route| {
88            route
89                .leader_peer
90                .as_ref()
91                .map(|leader| (route.region.id, leader.id))
92        })
93        .collect::<Vec<_>>()
94}
95
96/// Returns the HashMap<[RegionNumber], &[Peer]>;
97///
98/// If the region doesn't have a leader peer, the [Region] will be omitted.
99pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
100    region_routes
101        .iter()
102        .filter_map(|x| {
103            x.leader_peer
104                .as_ref()
105                .map(|leader| (x.region.id.region_number(), leader))
106        })
107        .collect::<HashMap<_, _>>()
108}
109
110pub fn find_region_leader(
111    region_routes: &[RegionRoute],
112    region_number: RegionNumber,
113) -> Option<Peer> {
114    region_routes
115        .iter()
116        .find(|x| x.region.id.region_number() == region_number)
117        .and_then(|r| r.leader_peer.as_ref())
118        .cloned()
119}
120
121/// Returns the region numbers of the leader regions on the target datanode.
122pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
123    region_routes
124        .iter()
125        .filter_map(|x| {
126            if let Some(peer) = &x.leader_peer {
127                if peer == datanode {
128                    return Some(x.region.id.region_number());
129                }
130            }
131            None
132        })
133        .collect()
134}
135
136/// Returns the region numbers of the follower regions on the target datanode.
137pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
138    region_routes
139        .iter()
140        .filter_map(|x| {
141            if x.follower_peers.contains(datanode) {
142                return Some(x.region.id.region_number());
143            }
144            None
145        })
146        .collect()
147}
148
149impl TableRoute {
150    pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
151        let region_leaders = region_routes
152            .iter()
153            .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
154            .collect::<HashMap<_, _>>();
155        Self {
156            table,
157            region_routes,
158            region_leaders,
159        }
160    }
161
162    pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
163        let table = table_route
164            .table
165            .context(error::RouteInfoCorruptedSnafu {
166                err_msg: "'table' is empty in table route",
167            })?
168            .try_into()?;
169
170        let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
171        for region_route in table_route.region_routes.into_iter() {
172            let region = region_route
173                .region
174                .context(error::RouteInfoCorruptedSnafu {
175                    err_msg: "'region' is empty in region route",
176                })?
177                .into();
178
179            let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
180
181            let follower_peers = region_route
182                .follower_peer_indexes
183                .into_iter()
184                .filter_map(|x| peers.get(x as usize).cloned())
185                .collect::<Vec<_>>();
186
187            region_routes.push(RegionRoute {
188                region,
189                leader_peer,
190                follower_peers,
191                leader_state: None,
192                leader_down_since: None,
193            });
194        }
195
196        Ok(Self::new(table, region_routes))
197    }
198}
199
200#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
201pub struct Table {
202    pub id: u64,
203    pub table_name: TableName,
204    #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
205    pub table_schema: Vec<u8>,
206}
207
208impl TryFrom<PbTable> for Table {
209    type Error = error::Error;
210
211    fn try_from(t: PbTable) -> Result<Self> {
212        let table_name = t
213            .table_name
214            .context(error::RouteInfoCorruptedSnafu {
215                err_msg: "table name required",
216            })?
217            .into();
218        Ok(Self {
219            id: t.id,
220            table_name,
221            table_schema: t.table_schema,
222        })
223    }
224}
225
226impl From<Table> for PbTable {
227    fn from(table: Table) -> Self {
228        PbTable {
229            id: table.id,
230            table_name: Some(table.table_name.into()),
231            table_schema: table.table_schema,
232        }
233    }
234}
235
236#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
237pub struct RegionRoute {
238    pub region: Region,
239    #[builder(setter(into, strip_option))]
240    pub leader_peer: Option<Peer>,
241    #[builder(setter(into), default)]
242    pub follower_peers: Vec<Peer>,
243    /// `None` by default.
244    #[builder(setter(into, strip_option), default)]
245    #[serde(
246        default,
247        alias = "leader_status",
248        skip_serializing_if = "Option::is_none"
249    )]
250    pub leader_state: Option<LeaderState>,
251    /// The start time when the leader is in `Downgraded` state.
252    #[serde(default)]
253    #[builder(default = "self.default_leader_down_since()")]
254    pub leader_down_since: Option<i64>,
255}
256
257impl RegionRouteBuilder {
258    fn default_leader_down_since(&self) -> Option<i64> {
259        match self.leader_state {
260            Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
261            _ => None,
262        }
263    }
264}
265
266/// The State of the [`Region`] Leader.
267/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
268#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
269#[strum(serialize_all = "UPPERCASE")]
270pub enum LeaderState {
271    /// The following cases in which the [`Region`] will be downgraded.
272    ///
273    /// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected).
274    /// - The [`Region`] was planned to migrate to another [`Peer`].
275    #[serde(alias = "Downgraded")]
276    Downgrading,
277}
278
279impl RegionRoute {
280    /// Returns true if the Leader [`Region`] is downgraded.
281    ///
282    /// The following cases in which the [`Region`] will be downgraded.
283    ///
284    /// - The [`Region`] is unavailable(e.g., Crashed, Network disconnected).
285    /// - The [`Region`] was planned to migrate to another [`Peer`].
286    ///
287    pub fn is_leader_downgrading(&self) -> bool {
288        matches!(self.leader_state, Some(LeaderState::Downgrading))
289    }
290
291    /// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
292    ///
293    /// We should downgrade a [`Region`] before deactivating it:
294    ///
295    /// - During the [`Region`] Failover Procedure.
296    /// - Migrating a [`Region`].
297    ///
298    /// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`].
299    ///
300    /// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage
301    ///
302    pub fn downgrade_leader(&mut self) {
303        self.leader_down_since = Some(current_time_millis());
304        self.leader_state = Some(LeaderState::Downgrading)
305    }
306
307    /// Returns how long since the leader is in `Downgraded` state.
308    pub fn leader_down_millis(&self) -> Option<i64> {
309        self.leader_down_since
310            .map(|start| current_time_millis() - start)
311    }
312
313    /// Sets the leader state.
314    ///
315    /// Returns true if updated.
316    pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
317        let updated = self.leader_state != state;
318
319        match (state, updated) {
320            (Some(LeaderState::Downgrading), true) => {
321                self.leader_down_since = Some(current_time_millis());
322            }
323            (Some(LeaderState::Downgrading), false) => {
324                // Do nothing if leader is still in `Downgraded` state.
325            }
326            _ => {
327                self.leader_down_since = None;
328            }
329        }
330
331        self.leader_state = state;
332        updated
333    }
334}
335
336pub struct RegionRoutes(pub Vec<RegionRoute>);
337
338impl RegionRoutes {
339    pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
340        convert_to_region_leader_map(&self.0)
341    }
342
343    pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
344        self.region_leader_map().get(&region_number).copied()
345    }
346}
347
348#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
349pub struct Region {
350    pub id: RegionId,
351    pub name: String,
352    pub partition: Option<Partition>,
353    pub attrs: BTreeMap<String, String>,
354}
355
356impl Region {
357    #[cfg(any(test, feature = "testing"))]
358    pub fn new_test(id: RegionId) -> Self {
359        Self {
360            id,
361            ..Default::default()
362        }
363    }
364}
365
366impl From<PbRegion> for Region {
367    fn from(r: PbRegion) -> Self {
368        Self {
369            id: r.id.into(),
370            name: r.name,
371            partition: r.partition.map(Into::into),
372            attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
373        }
374    }
375}
376
377impl From<Region> for PbRegion {
378    fn from(region: Region) -> Self {
379        Self {
380            id: region.id.into(),
381            name: region.name,
382            partition: region.partition.map(Into::into),
383            attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
384        }
385    }
386}
387
388#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
389pub struct Partition {
390    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
391    pub column_list: Vec<Vec<u8>>,
392    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
393    pub value_list: Vec<Vec<u8>>,
394}
395
396fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
397    serializer.serialize_str(
398        String::from_utf8(val.to_vec())
399            .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
400            .as_str(),
401    )
402}
403
404pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
405where
406    D: Deserializer<'de>,
407{
408    let s = String::deserialize(deserializer)?;
409
410    Ok(s.into_bytes())
411}
412
413fn as_utf8_vec<S: Serializer>(
414    val: &[Vec<u8>],
415    serializer: S,
416) -> std::result::Result<S::Ok, S::Error> {
417    let mut seq = serializer.serialize_seq(Some(val.len()))?;
418    for v in val {
419        seq.serialize_element(&String::from_utf8_lossy(v))?;
420    }
421    seq.end()
422}
423
424pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
425where
426    D: Deserializer<'de>,
427{
428    let values = Vec::<String>::deserialize(deserializer)?;
429
430    let values = values
431        .into_iter()
432        .map(|value| value.into_bytes())
433        .collect::<Vec<_>>();
434    Ok(values)
435}
436
437impl From<Partition> for PbPartition {
438    fn from(p: Partition) -> Self {
439        Self {
440            column_list: p.column_list,
441            value_list: p.value_list,
442        }
443    }
444}
445
446impl From<PbPartition> for Partition {
447    fn from(p: PbPartition) -> Self {
448        Self {
449            column_list: p.column_list,
450            value_list: p.value_list,
451        }
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    #[test]
460    fn test_leader_is_downgraded() {
461        let mut region_route = RegionRoute {
462            region: Region {
463                id: 2.into(),
464                name: "r2".to_string(),
465                partition: None,
466                attrs: BTreeMap::new(),
467            },
468            leader_peer: Some(Peer::new(1, "a1")),
469            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
470            leader_state: None,
471            leader_down_since: None,
472        };
473
474        assert!(!region_route.is_leader_downgrading());
475
476        region_route.downgrade_leader();
477
478        assert!(region_route.is_leader_downgrading());
479    }
480
481    #[test]
482    fn test_region_route_decode() {
483        let region_route = RegionRoute {
484            region: Region {
485                id: 2.into(),
486                name: "r2".to_string(),
487                partition: None,
488                attrs: BTreeMap::new(),
489            },
490            leader_peer: Some(Peer::new(1, "a1")),
491            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
492            leader_state: None,
493            leader_down_since: None,
494        };
495
496        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
497
498        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
499
500        assert_eq!(decoded, region_route);
501    }
502
503    #[test]
504    fn test_region_route_compatibility() {
505        let region_route = RegionRoute {
506            region: Region {
507                id: 2.into(),
508                name: "r2".to_string(),
509                partition: None,
510                attrs: BTreeMap::new(),
511            },
512            leader_peer: Some(Peer::new(1, "a1")),
513            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
514            leader_state: Some(LeaderState::Downgrading),
515            leader_down_since: None,
516        };
517        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
518        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
519        assert_eq!(decoded, region_route);
520
521        let region_route = RegionRoute {
522            region: Region {
523                id: 2.into(),
524                name: "r2".to_string(),
525                partition: None,
526                attrs: BTreeMap::new(),
527            },
528            leader_peer: Some(Peer::new(1, "a1")),
529            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
530            leader_state: Some(LeaderState::Downgrading),
531            leader_down_since: None,
532        };
533        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
534        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
535        assert_eq!(decoded, region_route);
536
537        let region_route = RegionRoute {
538            region: Region {
539                id: 2.into(),
540                name: "r2".to_string(),
541                partition: None,
542                attrs: BTreeMap::new(),
543            },
544            leader_peer: Some(Peer::new(1, "a1")),
545            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
546            leader_state: Some(LeaderState::Downgrading),
547            leader_down_since: None,
548        };
549        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
550        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
551        assert_eq!(decoded, region_route);
552
553        let region_route = RegionRoute {
554            region: Region {
555                id: 2.into(),
556                name: "r2".to_string(),
557                partition: None,
558                attrs: BTreeMap::new(),
559            },
560            leader_peer: Some(Peer::new(1, "a1")),
561            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
562            leader_state: Some(LeaderState::Downgrading),
563            leader_down_since: None,
564        };
565        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
566        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
567        assert_eq!(decoded, region_route);
568    }
569
570    #[test]
571    fn test_de_serialize_partition() {
572        let p = Partition {
573            column_list: vec![b"a".to_vec(), b"b".to_vec()],
574            value_list: vec![b"hi".to_vec(), b",".to_vec()],
575        };
576
577        let output = serde_json::to_string(&p).unwrap();
578        let got: Partition = serde_json::from_str(&output).unwrap();
579
580        assert_eq!(got, p);
581    }
582
583    #[test]
584    fn test_region_distribution() {
585        let region_routes = vec![
586            RegionRoute {
587                region: Region {
588                    id: RegionId::new(1, 1),
589                    name: "r1".to_string(),
590                    partition: None,
591                    attrs: BTreeMap::new(),
592                },
593                leader_peer: Some(Peer::new(1, "a1")),
594                follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
595                leader_state: None,
596                leader_down_since: None,
597            },
598            RegionRoute {
599                region: Region {
600                    id: RegionId::new(1, 2),
601                    name: "r2".to_string(),
602                    partition: None,
603                    attrs: BTreeMap::new(),
604                },
605                leader_peer: Some(Peer::new(2, "a2")),
606                follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
607                leader_state: None,
608                leader_down_since: None,
609            },
610        ];
611
612        let distribution = region_distribution(&region_routes);
613        assert_eq!(distribution.len(), 3);
614        assert_eq!(distribution[&1], vec![1, 2]);
615        assert_eq!(distribution[&2], vec![1, 2]);
616        assert_eq!(distribution[&3], vec![1, 2]);
617    }
618}