common_meta/rpc/
router.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18    Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19    TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::error::{self, Result};
31use crate::key::RegionDistribution;
32use crate::peer::Peer;
33use crate::DatanodeId;
34
35/// Returns the distribution of regions to datanodes.
36///
37/// The distribution is a map of datanode id to a list of region ids.
38/// The list of region ids is sorted in ascending order.
39pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40    let mut regions_id_map = RegionDistribution::new();
41    for route in region_routes.iter() {
42        if let Some(peer) = route.leader_peer.as_ref() {
43            let region_number = route.region.id.region_number();
44            regions_id_map
45                .entry(peer.id)
46                .or_default()
47                .add_leader_region(region_number);
48        }
49        for peer in route.follower_peers.iter() {
50            let region_number = route.region.id.region_number();
51            regions_id_map
52                .entry(peer.id)
53                .or_default()
54                .add_follower_region(region_number);
55        }
56    }
57    for (_, region_role_set) in regions_id_map.iter_mut() {
58        // Sort the regions in ascending order.
59        region_role_set.sort()
60    }
61    regions_id_map
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
65pub struct TableRoute {
66    pub table: Table,
67    pub region_routes: Vec<RegionRoute>,
68    region_leaders: HashMap<RegionNumber, Option<Peer>>,
69}
70
71/// Returns the leader peers of the table.
72pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
73    region_routes
74        .iter()
75        .flat_map(|x| &x.leader_peer)
76        .cloned()
77        .collect()
78}
79
80/// Returns the followers of the table.
81pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
82    region_routes
83        .iter()
84        .flat_map(|x| &x.follower_peers)
85        .cloned()
86        .collect()
87}
88
89/// Returns the operating leader regions with corresponding [DatanodeId].
90pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
91    region_routes
92        .iter()
93        .filter_map(|route| {
94            route
95                .leader_peer
96                .as_ref()
97                .map(|leader| (route.region.id, leader.id))
98        })
99        .collect::<Vec<_>>()
100}
101
102/// Returns the HashMap<[RegionNumber], &[Peer]>;
103///
104/// If the region doesn't have a leader peer, the [Region] will be omitted.
105pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
106    region_routes
107        .iter()
108        .filter_map(|x| {
109            x.leader_peer
110                .as_ref()
111                .map(|leader| (x.region.id.region_number(), leader))
112        })
113        .collect::<HashMap<_, _>>()
114}
115
116pub fn find_region_leader(
117    region_routes: &[RegionRoute],
118    region_number: RegionNumber,
119) -> Option<Peer> {
120    region_routes
121        .iter()
122        .find(|x| x.region.id.region_number() == region_number)
123        .and_then(|r| r.leader_peer.as_ref())
124        .cloned()
125}
126
127/// Returns the region numbers of the leader regions on the target datanode.
128pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
129    region_routes
130        .iter()
131        .filter_map(|x| {
132            if let Some(peer) = &x.leader_peer {
133                if peer == datanode {
134                    return Some(x.region.id.region_number());
135                }
136            }
137            None
138        })
139        .collect()
140}
141
142/// Returns the region numbers of the follower regions on the target datanode.
143pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144    region_routes
145        .iter()
146        .filter_map(|x| {
147            if x.follower_peers.contains(datanode) {
148                return Some(x.region.id.region_number());
149            }
150            None
151        })
152        .collect()
153}
154
155impl TableRoute {
156    pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
157        let region_leaders = region_routes
158            .iter()
159            .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
160            .collect::<HashMap<_, _>>();
161        Self {
162            table,
163            region_routes,
164            region_leaders,
165        }
166    }
167
168    pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
169        let table = table_route
170            .table
171            .context(error::RouteInfoCorruptedSnafu {
172                err_msg: "'table' is empty in table route",
173            })?
174            .try_into()?;
175
176        let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
177        for region_route in table_route.region_routes.into_iter() {
178            let region = region_route
179                .region
180                .context(error::RouteInfoCorruptedSnafu {
181                    err_msg: "'region' is empty in region route",
182                })?
183                .into();
184
185            let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
186
187            let follower_peers = region_route
188                .follower_peer_indexes
189                .into_iter()
190                .filter_map(|x| peers.get(x as usize).cloned())
191                .collect::<Vec<_>>();
192
193            region_routes.push(RegionRoute {
194                region,
195                leader_peer,
196                follower_peers,
197                leader_state: None,
198                leader_down_since: None,
199            });
200        }
201
202        Ok(Self::new(table, region_routes))
203    }
204}
205
206#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
207pub struct Table {
208    pub id: u64,
209    pub table_name: TableName,
210    #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
211    pub table_schema: Vec<u8>,
212}
213
214impl TryFrom<PbTable> for Table {
215    type Error = error::Error;
216
217    fn try_from(t: PbTable) -> Result<Self> {
218        let table_name = t
219            .table_name
220            .context(error::RouteInfoCorruptedSnafu {
221                err_msg: "table name required",
222            })?
223            .into();
224        Ok(Self {
225            id: t.id,
226            table_name,
227            table_schema: t.table_schema,
228        })
229    }
230}
231
232impl From<Table> for PbTable {
233    fn from(table: Table) -> Self {
234        PbTable {
235            id: table.id,
236            table_name: Some(table.table_name.into()),
237            table_schema: table.table_schema,
238        }
239    }
240}
241
242#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
243pub struct RegionRoute {
244    pub region: Region,
245    #[builder(setter(into, strip_option))]
246    pub leader_peer: Option<Peer>,
247    #[builder(setter(into), default)]
248    pub follower_peers: Vec<Peer>,
249    /// `None` by default.
250    #[builder(setter(into, strip_option), default)]
251    #[serde(
252        default,
253        alias = "leader_status",
254        skip_serializing_if = "Option::is_none"
255    )]
256    pub leader_state: Option<LeaderState>,
257    /// The start time when the leader is in `Downgraded` state.
258    #[serde(default)]
259    #[builder(default = "self.default_leader_down_since()")]
260    pub leader_down_since: Option<i64>,
261}
262
263impl RegionRouteBuilder {
264    fn default_leader_down_since(&self) -> Option<i64> {
265        match self.leader_state {
266            Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
267            _ => None,
268        }
269    }
270}
271
272/// The State of the [`Region`] Leader.
273/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
274#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
275#[strum(serialize_all = "UPPERCASE")]
276pub enum LeaderState {
277    /// The following cases in which the [`Region`] will be downgraded.
278    ///
279    /// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected).
280    /// - The [`Region`] was planned to migrate to another [`Peer`].
281    #[serde(alias = "Downgraded")]
282    Downgrading,
283}
284
285impl RegionRoute {
286    /// Returns true if the Leader [`Region`] is downgraded.
287    ///
288    /// The following cases in which the [`Region`] will be downgraded.
289    ///
290    /// - The [`Region`] is unavailable(e.g., Crashed, Network disconnected).
291    /// - The [`Region`] was planned to migrate to another [`Peer`].
292    ///
293    pub fn is_leader_downgrading(&self) -> bool {
294        matches!(self.leader_state, Some(LeaderState::Downgrading))
295    }
296
297    /// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
298    ///
299    /// We should downgrade a [`Region`] before deactivating it:
300    ///
301    /// - During the [`Region`] Failover Procedure.
302    /// - Migrating a [`Region`].
303    ///
304    /// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`].
305    ///
306    /// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage
307    ///
308    pub fn downgrade_leader(&mut self) {
309        self.leader_down_since = Some(current_time_millis());
310        self.leader_state = Some(LeaderState::Downgrading)
311    }
312
313    /// Returns how long since the leader is in `Downgraded` state.
314    pub fn leader_down_millis(&self) -> Option<i64> {
315        self.leader_down_since
316            .map(|start| current_time_millis() - start)
317    }
318
319    /// Sets the leader state.
320    ///
321    /// Returns true if updated.
322    pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
323        let updated = self.leader_state != state;
324
325        match (state, updated) {
326            (Some(LeaderState::Downgrading), true) => {
327                self.leader_down_since = Some(current_time_millis());
328            }
329            (Some(LeaderState::Downgrading), false) => {
330                // Do nothing if leader is still in `Downgraded` state.
331            }
332            _ => {
333                self.leader_down_since = None;
334            }
335        }
336
337        self.leader_state = state;
338        updated
339    }
340}
341
342pub struct RegionRoutes(pub Vec<RegionRoute>);
343
344impl RegionRoutes {
345    pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
346        convert_to_region_leader_map(&self.0)
347    }
348
349    pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
350        self.region_leader_map().get(&region_number).copied()
351    }
352}
353
354#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
355pub struct Region {
356    pub id: RegionId,
357    pub name: String,
358    pub partition: Option<Partition>,
359    pub attrs: BTreeMap<String, String>,
360}
361
362impl Region {
363    #[cfg(any(test, feature = "testing"))]
364    pub fn new_test(id: RegionId) -> Self {
365        Self {
366            id,
367            ..Default::default()
368        }
369    }
370}
371
372impl From<PbRegion> for Region {
373    fn from(r: PbRegion) -> Self {
374        Self {
375            id: r.id.into(),
376            name: r.name,
377            partition: r.partition.map(Into::into),
378            attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
379        }
380    }
381}
382
383impl From<Region> for PbRegion {
384    fn from(region: Region) -> Self {
385        Self {
386            id: region.id.into(),
387            name: region.name,
388            partition: region.partition.map(Into::into),
389            attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
390        }
391    }
392}
393
394#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
395pub struct Partition {
396    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
397    pub column_list: Vec<Vec<u8>>,
398    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
399    pub value_list: Vec<Vec<u8>>,
400}
401
402fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
403    serializer.serialize_str(
404        String::from_utf8(val.to_vec())
405            .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
406            .as_str(),
407    )
408}
409
410pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
411where
412    D: Deserializer<'de>,
413{
414    let s = String::deserialize(deserializer)?;
415
416    Ok(s.into_bytes())
417}
418
419fn as_utf8_vec<S: Serializer>(
420    val: &[Vec<u8>],
421    serializer: S,
422) -> std::result::Result<S::Ok, S::Error> {
423    let mut seq = serializer.serialize_seq(Some(val.len()))?;
424    for v in val {
425        seq.serialize_element(&String::from_utf8_lossy(v))?;
426    }
427    seq.end()
428}
429
430pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
431where
432    D: Deserializer<'de>,
433{
434    let values = Vec::<String>::deserialize(deserializer)?;
435
436    let values = values
437        .into_iter()
438        .map(|value| value.into_bytes())
439        .collect::<Vec<_>>();
440    Ok(values)
441}
442
443impl From<Partition> for PbPartition {
444    fn from(p: Partition) -> Self {
445        Self {
446            column_list: p.column_list,
447            value_list: p.value_list,
448        }
449    }
450}
451
452impl From<PbPartition> for Partition {
453    fn from(p: PbPartition) -> Self {
454        Self {
455            column_list: p.column_list,
456            value_list: p.value_list,
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use crate::key::RegionRoleSet;
465
466    #[test]
467    fn test_leader_is_downgraded() {
468        let mut region_route = RegionRoute {
469            region: Region {
470                id: 2.into(),
471                name: "r2".to_string(),
472                partition: None,
473                attrs: BTreeMap::new(),
474            },
475            leader_peer: Some(Peer::new(1, "a1")),
476            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
477            leader_state: None,
478            leader_down_since: None,
479        };
480
481        assert!(!region_route.is_leader_downgrading());
482
483        region_route.downgrade_leader();
484
485        assert!(region_route.is_leader_downgrading());
486    }
487
488    #[test]
489    fn test_region_route_decode() {
490        let region_route = RegionRoute {
491            region: Region {
492                id: 2.into(),
493                name: "r2".to_string(),
494                partition: None,
495                attrs: BTreeMap::new(),
496            },
497            leader_peer: Some(Peer::new(1, "a1")),
498            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
499            leader_state: None,
500            leader_down_since: None,
501        };
502
503        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
504
505        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
506
507        assert_eq!(decoded, region_route);
508    }
509
510    #[test]
511    fn test_region_route_compatibility() {
512        let region_route = RegionRoute {
513            region: Region {
514                id: 2.into(),
515                name: "r2".to_string(),
516                partition: None,
517                attrs: BTreeMap::new(),
518            },
519            leader_peer: Some(Peer::new(1, "a1")),
520            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
521            leader_state: Some(LeaderState::Downgrading),
522            leader_down_since: None,
523        };
524        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
525        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
526        assert_eq!(decoded, region_route);
527
528        let region_route = RegionRoute {
529            region: Region {
530                id: 2.into(),
531                name: "r2".to_string(),
532                partition: None,
533                attrs: BTreeMap::new(),
534            },
535            leader_peer: Some(Peer::new(1, "a1")),
536            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
537            leader_state: Some(LeaderState::Downgrading),
538            leader_down_since: None,
539        };
540        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
541        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
542        assert_eq!(decoded, region_route);
543
544        let region_route = RegionRoute {
545            region: Region {
546                id: 2.into(),
547                name: "r2".to_string(),
548                partition: None,
549                attrs: BTreeMap::new(),
550            },
551            leader_peer: Some(Peer::new(1, "a1")),
552            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
553            leader_state: Some(LeaderState::Downgrading),
554            leader_down_since: None,
555        };
556        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
557        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
558        assert_eq!(decoded, region_route);
559
560        let region_route = RegionRoute {
561            region: Region {
562                id: 2.into(),
563                name: "r2".to_string(),
564                partition: None,
565                attrs: BTreeMap::new(),
566            },
567            leader_peer: Some(Peer::new(1, "a1")),
568            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
569            leader_state: Some(LeaderState::Downgrading),
570            leader_down_since: None,
571        };
572        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
573        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
574        assert_eq!(decoded, region_route);
575    }
576
577    #[test]
578    fn test_de_serialize_partition() {
579        let p = Partition {
580            column_list: vec![b"a".to_vec(), b"b".to_vec()],
581            value_list: vec![b"hi".to_vec(), b",".to_vec()],
582        };
583
584        let output = serde_json::to_string(&p).unwrap();
585        let got: Partition = serde_json::from_str(&output).unwrap();
586
587        assert_eq!(got, p);
588    }
589
590    #[test]
591    fn test_region_distribution() {
592        let region_routes = vec![
593            RegionRoute {
594                region: Region {
595                    id: RegionId::new(1, 1),
596                    name: "r1".to_string(),
597                    partition: None,
598                    attrs: BTreeMap::new(),
599                },
600                leader_peer: Some(Peer::new(1, "a1")),
601                follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
602                leader_state: None,
603                leader_down_since: None,
604            },
605            RegionRoute {
606                region: Region {
607                    id: RegionId::new(1, 2),
608                    name: "r2".to_string(),
609                    partition: None,
610                    attrs: BTreeMap::new(),
611                },
612                leader_peer: Some(Peer::new(2, "a2")),
613                follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
614                leader_state: None,
615                leader_down_since: None,
616            },
617        ];
618
619        let distribution = region_distribution(&region_routes);
620        assert_eq!(distribution.len(), 3);
621        assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
622        assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
623        assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
624    }
625}