Skip to main content

common_meta/rpc/
router.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18    Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19    TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::region_engine::RegionRole;
27use store_api::storage::{RegionId, RegionNumber};
28use strum::AsRefStr;
29use table::table_name::TableName;
30
31use crate::DatanodeId;
32use crate::error::{self, Result};
33use crate::key::RegionDistribution;
34use crate::peer::Peer;
35
36/// Returns the distribution of regions to datanodes.
37///
38/// The distribution is a map of datanode id to a list of region ids.
39/// The list of region ids is sorted in ascending order.
40pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
41    let mut regions_id_map = RegionDistribution::new();
42    for route in region_routes.iter() {
43        if let Some(peer) = route.leader_peer.as_ref() {
44            let region_number = route.region.id.region_number();
45            regions_id_map
46                .entry(peer.id)
47                .or_default()
48                .add_leader_region(region_number);
49        }
50        for peer in route.follower_peers.iter() {
51            let region_number = route.region.id.region_number();
52            regions_id_map
53                .entry(peer.id)
54                .or_default()
55                .add_follower_region(region_number);
56        }
57    }
58    for (_, region_role_set) in regions_id_map.iter_mut() {
59        // Sort the regions in ascending order.
60        region_role_set.sort()
61    }
62    regions_id_map
63}
64
65#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
66pub struct TableRoute {
67    pub table: Table,
68    pub region_routes: Vec<RegionRoute>,
69    region_leaders: HashMap<RegionNumber, Option<Peer>>,
70}
71
72/// Returns the leader peers of the table.
73pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
74    region_routes
75        .iter()
76        .flat_map(|x| &x.leader_peer)
77        .cloned()
78        .collect()
79}
80
81/// Returns the followers of the table.
82pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
83    region_routes
84        .iter()
85        .flat_map(|x| &x.follower_peers)
86        .cloned()
87        .collect()
88}
89
90/// Returns the operating leader regions with corresponding [DatanodeId].
91pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
92    region_routes
93        .iter()
94        .filter_map(|route| {
95            route
96                .leader_peer
97                .as_ref()
98                .map(|leader| (route.region.id, leader.id))
99        })
100        .collect::<Vec<_>>()
101}
102
103/// Returns the operating leader regions with corresponding [DatanodeId] and [RegionRole].
104pub fn operating_leader_region_roles(
105    region_routes: &[RegionRoute],
106) -> Vec<(RegionId, DatanodeId, RegionRole)> {
107    region_routes
108        .iter()
109        .filter_map(|route| {
110            let role = route.leader_region_role()?;
111            let leader = route.leader_peer.as_ref()?;
112            Some((route.region.id, leader.id, role))
113        })
114        .collect()
115}
116
117/// Returns the HashMap<[RegionNumber], &[Peer]>;
118///
119/// If the region doesn't have a leader peer, the [Region] will be omitted.
120pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
121    region_routes
122        .iter()
123        .filter_map(|x| {
124            x.leader_peer
125                .as_ref()
126                .map(|leader| (x.region.id.region_number(), leader))
127        })
128        .collect::<HashMap<_, _>>()
129}
130
131pub fn find_region_leader(
132    region_routes: &[RegionRoute],
133    region_number: RegionNumber,
134) -> Option<Peer> {
135    region_routes
136        .iter()
137        .find(|x| x.region.id.region_number() == region_number)
138        .and_then(|r| r.leader_peer.as_ref())
139        .cloned()
140}
141
142/// Returns the region numbers of the leader regions on the target datanode.
143pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144    region_routes
145        .iter()
146        .filter_map(|x| {
147            if let Some(peer) = &x.leader_peer
148                && peer == datanode
149            {
150                return Some(x.region.id.region_number());
151            }
152            None
153        })
154        .collect()
155}
156
157/// Returns the region numbers of the follower regions on the target datanode.
158pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
159    region_routes
160        .iter()
161        .filter_map(|x| {
162            if x.follower_peers.contains(datanode) {
163                return Some(x.region.id.region_number());
164            }
165            None
166        })
167        .collect()
168}
169
170impl TableRoute {
171    pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
172        let region_leaders = region_routes
173            .iter()
174            .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
175            .collect::<HashMap<_, _>>();
176        Self {
177            table,
178            region_routes,
179            region_leaders,
180        }
181    }
182
183    pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
184        let table = table_route
185            .table
186            .context(error::RouteInfoCorruptedSnafu {
187                err_msg: "'table' is empty in table route",
188            })?
189            .try_into()?;
190
191        let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
192        for region_route in table_route.region_routes.into_iter() {
193            let region = region_route
194                .region
195                .context(error::RouteInfoCorruptedSnafu {
196                    err_msg: "'region' is empty in region route",
197                })?
198                .into();
199
200            let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
201
202            let follower_peers = region_route
203                .follower_peer_indexes
204                .into_iter()
205                .filter_map(|x| peers.get(x as usize).cloned())
206                .collect::<Vec<_>>();
207
208            region_routes.push(RegionRoute {
209                region,
210                leader_peer,
211                follower_peers,
212                leader_state: None,
213                leader_down_since: None,
214                write_route_policy: None,
215            });
216        }
217
218        Ok(Self::new(table, region_routes))
219    }
220}
221
222#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
223pub struct Table {
224    pub id: u64,
225    pub table_name: TableName,
226    #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
227    pub table_schema: Vec<u8>,
228}
229
230impl TryFrom<PbTable> for Table {
231    type Error = error::Error;
232
233    fn try_from(t: PbTable) -> Result<Self> {
234        let table_name = t
235            .table_name
236            .context(error::RouteInfoCorruptedSnafu {
237                err_msg: "table name required",
238            })?
239            .into();
240        Ok(Self {
241            id: t.id,
242            table_name,
243            table_schema: t.table_schema,
244        })
245    }
246}
247
248impl From<Table> for PbTable {
249    fn from(table: Table) -> Self {
250        PbTable {
251            id: table.id,
252            table_name: Some(table.table_name.into()),
253            table_schema: table.table_schema,
254        }
255    }
256}
257
258#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
259pub struct RegionRoute {
260    pub region: Region,
261    #[builder(setter(into, strip_option))]
262    pub leader_peer: Option<Peer>,
263    #[builder(setter(into), default)]
264    pub follower_peers: Vec<Peer>,
265    /// `None` by default.
266    #[builder(setter(into, strip_option), default)]
267    #[serde(
268        default,
269        alias = "leader_status",
270        skip_serializing_if = "Option::is_none"
271    )]
272    pub leader_state: Option<LeaderState>,
273    /// The start time when the leader is in `Downgraded` state.
274    #[serde(default)]
275    #[builder(default = "self.default_leader_down_since()")]
276    pub leader_down_since: Option<i64>,
277    /// Special write routing behavior for this region.
278    #[builder(setter(into, strip_option), default)]
279    #[serde(default, skip_serializing_if = "Option::is_none")]
280    pub write_route_policy: Option<WriteRoutePolicy>,
281}
282
283impl RegionRouteBuilder {
284    fn default_leader_down_since(&self) -> Option<i64> {
285        match self.leader_state {
286            Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
287            _ => None,
288        }
289    }
290}
291
292/// The State of the [`Region`] Leader.
293/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
294#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
295#[strum(serialize_all = "UPPERCASE")]
296pub enum LeaderState {
297    /// The following cases in which the [`Region`] will be downgraded.
298    ///
299    /// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected).
300    /// - The [`Region`] was planned to migrate to another [`Peer`].
301    #[serde(alias = "Downgraded")]
302    Downgrading,
303    /// The [`Region`] is in staging mode.
304    ///
305    /// Disables checkpoint and compaction while maintaining write capability.
306    /// But data ingested during this period are not visible to the user (hence staging).
307    Staging,
308}
309
310/// The write route policy for the region.
311#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
312pub enum WriteRoutePolicy {
313    // The default policy.
314    Normal,
315    /// Ignores all writes for this region.
316    ///
317    /// This policy is typically used during region merge operations, such as repartitioning.
318    /// For example, when merging Region A and Region B into just Region B,
319    /// writes to Region A are ignored, while Region B accepts all writes originating from both regions.
320    IgnoreAllWrites,
321}
322
323impl RegionRoute {
324    /// Returns true if the region should ignore all writes.
325    pub fn is_ignore_all_writes(&self) -> bool {
326        matches!(
327            self.write_route_policy,
328            Some(WriteRoutePolicy::IgnoreAllWrites)
329        )
330    }
331
332    /// Marks this region as ignore-all for writes.
333    pub fn set_ignore_all_writes(&mut self) {
334        self.write_route_policy = Some(WriteRoutePolicy::IgnoreAllWrites);
335    }
336
337    /// Clears ignore-all write policy and falls back to normal routing behavior.
338    pub fn clear_ignore_all_writes(&mut self) {
339        if self.write_route_policy == Some(WriteRoutePolicy::IgnoreAllWrites) {
340            self.write_route_policy = None;
341        }
342    }
343
344    /// Returns true if the Leader [`Region`] is downgraded.
345    ///
346    /// The following cases in which the [`Region`] will be downgraded.
347    ///
348    /// - The [`Region`] is unavailable(e.g., Crashed, Network disconnected).
349    /// - The [`Region`] was planned to migrate to another [`Peer`].
350    ///
351    pub fn is_leader_downgrading(&self) -> bool {
352        matches!(self.leader_state, Some(LeaderState::Downgrading))
353    }
354
355    /// Returns true if the Leader [`Region`] is in staging mode.
356    pub fn is_leader_staging(&self) -> bool {
357        matches!(self.leader_state, Some(LeaderState::Staging))
358    }
359
360    /// Returns the role of the leader region.
361    pub fn leader_region_role(&self) -> Option<RegionRole> {
362        self.leader_peer.as_ref().map(|_| {
363            if self.is_leader_staging() {
364                RegionRole::StagingLeader
365            } else if self.is_leader_downgrading() {
366                RegionRole::DowngradingLeader
367            } else {
368                RegionRole::Leader
369            }
370        })
371    }
372
373    /// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
374    ///
375    /// We should downgrade a [`Region`] before deactivating it:
376    ///
377    /// - During the [`Region`] Failover Procedure.
378    /// - Migrating a [`Region`].
379    ///
380    /// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`].
381    ///
382    /// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage
383    ///
384    pub fn downgrade_leader(&mut self) {
385        self.leader_down_since = Some(current_time_millis());
386        self.leader_state = Some(LeaderState::Downgrading)
387    }
388
389    /// Sets the Leader [`Region`] to staging mode.
390    pub fn set_leader_staging(&mut self) {
391        self.leader_state = Some(LeaderState::Staging);
392        // Reset leader_down_since as it's specific to downgrading
393        self.leader_down_since = None;
394    }
395
396    /// Clears the leader staging state, returning to normal leader mode.
397    pub fn clear_leader_staging(&mut self) {
398        if self.leader_state == Some(LeaderState::Staging) {
399            self.leader_state = None;
400            self.leader_down_since = None;
401        }
402    }
403
404    /// Returns how long since the leader is in `Downgraded` state.
405    pub fn leader_down_millis(&self) -> Option<i64> {
406        self.leader_down_since
407            .map(|start| current_time_millis() - start)
408    }
409
410    /// Sets the leader state.
411    ///
412    /// Returns true if updated.
413    pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
414        let updated = self.leader_state != state;
415
416        match (state, updated) {
417            (Some(LeaderState::Downgrading), true) => {
418                self.leader_down_since = Some(current_time_millis());
419            }
420            (Some(LeaderState::Downgrading), false) => {
421                // Do nothing if leader is still in `Downgraded` state.
422            }
423            _ => {
424                self.leader_down_since = None;
425            }
426        }
427
428        self.leader_state = state;
429        updated
430    }
431}
432
433pub struct RegionRoutes(pub Vec<RegionRoute>);
434
435impl RegionRoutes {
436    pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
437        convert_to_region_leader_map(&self.0)
438    }
439
440    pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
441        self.region_leader_map().get(&region_number).copied()
442    }
443}
444
445#[derive(Debug, Clone, Default, PartialEq, Serialize)]
446pub struct Region {
447    pub id: RegionId,
448    pub name: String,
449    pub attrs: BTreeMap<String, String>,
450    /// The normalized partition expression of the region.
451    pub partition_expr: String,
452}
453
454#[derive(Debug, Deserialize)]
455struct RegionDe {
456    id: RegionId,
457    name: String,
458    #[serde(default)]
459    attrs: BTreeMap<String, String>,
460    #[serde(default)]
461    partition: Option<LegacyPartition>,
462    #[serde(default)]
463    partition_expr: String,
464}
465
466impl<'de> Deserialize<'de> for Region {
467    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
468    where
469        D: Deserializer<'de>,
470    {
471        let de = RegionDe::deserialize(deserializer)?;
472        // Compatibility path for legacy serialized routes: prefer the normalized
473        // `partition_expr` field and only fall back to legacy `partition.value_list`.
474        let partition_expr = if de.partition_expr.is_empty() {
475            if let Some(LegacyPartition { value_list, .. }) = &de.partition {
476                value_list
477                    .first()
478                    .map(|expr| String::from_utf8_lossy(expr).to_string())
479                    .unwrap_or_default()
480            } else {
481                String::new()
482            }
483        } else {
484            de.partition_expr
485        };
486
487        Ok(Self {
488            id: de.id,
489            name: de.name,
490            attrs: de.attrs,
491            partition_expr,
492        })
493    }
494}
495
496impl Region {
497    #[cfg(any(test, feature = "testing"))]
498    pub fn new_test(id: RegionId) -> Self {
499        Self {
500            id,
501            ..Default::default()
502        }
503    }
504
505    /// Gets the partition expression of the region in compatible mode.
506    pub fn partition_expr(&self) -> String {
507        self.partition_expr.clone()
508    }
509}
510
511/// Gets the partition expression of the `PbRegion` in compatible mode.
512#[allow(deprecated)]
513pub fn pb_region_partition_expr(r: &PbRegion) -> String {
514    if let Some(partition) = &r.partition {
515        if !partition.expression.is_empty() {
516            partition.expression.clone()
517        } else if !partition.value_list.is_empty() {
518            String::from_utf8_lossy(&partition.value_list[0]).to_string()
519        } else {
520            "".to_string()
521        }
522    } else {
523        "".to_string()
524    }
525}
526
527impl From<PbRegion> for Region {
528    fn from(r: PbRegion) -> Self {
529        let partition_expr = pb_region_partition_expr(&r);
530        Self {
531            id: r.id.into(),
532            name: r.name,
533            partition_expr,
534            attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
535        }
536    }
537}
538
539impl From<Region> for PbRegion {
540    fn from(region: Region) -> Self {
541        let partition_expr = region.partition_expr();
542        Self {
543            id: region.id.into(),
544            name: region.name,
545            partition: Some(PbPartition {
546                expression: partition_expr,
547                ..Default::default()
548            }),
549            attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
550        }
551    }
552}
553
554#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
555pub struct LegacyPartition {
556    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
557    pub column_list: Vec<Vec<u8>>,
558    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
559    pub value_list: Vec<Vec<u8>>,
560}
561
562fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
563    serializer.serialize_str(
564        String::from_utf8(val.to_vec())
565            .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
566            .as_str(),
567    )
568}
569
570pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
571where
572    D: Deserializer<'de>,
573{
574    let s = String::deserialize(deserializer)?;
575
576    Ok(s.into_bytes())
577}
578
579fn as_utf8_vec<S: Serializer>(
580    val: &[Vec<u8>],
581    serializer: S,
582) -> std::result::Result<S::Ok, S::Error> {
583    let mut seq = serializer.serialize_seq(Some(val.len()))?;
584    for v in val {
585        seq.serialize_element(&String::from_utf8_lossy(v))?;
586    }
587    seq.end()
588}
589
590pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
591where
592    D: Deserializer<'de>,
593{
594    let values = Vec::<String>::deserialize(deserializer)?;
595
596    let values = values
597        .into_iter()
598        .map(|value| value.into_bytes())
599        .collect::<Vec<_>>();
600    Ok(values)
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use crate::key::RegionRoleSet;
607
608    fn new_test_region_route(region_id: RegionId) -> RegionRoute {
609        RegionRoute {
610            region: Region::new_test(region_id),
611            leader_peer: Some(Peer::new(1, "a1")),
612            follower_peers: vec![Peer::new(2, "a2")],
613            leader_state: None,
614            leader_down_since: None,
615            write_route_policy: None,
616        }
617    }
618
619    #[test]
620    fn test_leader_is_downgraded() {
621        let mut region_route = RegionRoute {
622            region: Region {
623                id: 2.into(),
624                name: "r2".to_string(),
625                attrs: BTreeMap::new(),
626                partition_expr: "".to_string(),
627            },
628            leader_peer: Some(Peer::new(1, "a1")),
629            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
630            leader_state: None,
631            leader_down_since: None,
632            write_route_policy: None,
633        };
634
635        assert!(!region_route.is_leader_downgrading());
636
637        region_route.downgrade_leader();
638
639        assert!(region_route.is_leader_downgrading());
640    }
641
642    #[test]
643    fn test_region_route_decode() {
644        let region_route = RegionRoute {
645            region: Region {
646                id: 2.into(),
647                name: "r2".to_string(),
648                attrs: BTreeMap::new(),
649                partition_expr: "".to_string(),
650            },
651            leader_peer: Some(Peer::new(1, "a1")),
652            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
653            leader_state: None,
654            leader_down_since: None,
655            write_route_policy: None,
656        };
657
658        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
659
660        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
661
662        assert_eq!(decoded, region_route);
663    }
664
665    #[test]
666    fn test_region_route_compatibility() {
667        let region_route = RegionRoute {
668            region: Region {
669                id: 2.into(),
670                name: "r2".to_string(),
671                attrs: BTreeMap::new(),
672                partition_expr: "".to_string(),
673            },
674            leader_peer: Some(Peer::new(1, "a1")),
675            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
676            leader_state: Some(LeaderState::Downgrading),
677            leader_down_since: None,
678            write_route_policy: None,
679        };
680        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
681        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
682        assert_eq!(decoded, region_route);
683
684        let region_route = RegionRoute {
685            region: Region {
686                id: 2.into(),
687                name: "r2".to_string(),
688                attrs: BTreeMap::new(),
689                partition_expr: "".to_string(),
690            },
691            leader_peer: Some(Peer::new(1, "a1")),
692            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
693            leader_state: Some(LeaderState::Downgrading),
694            leader_down_since: None,
695            write_route_policy: None,
696        };
697        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
698        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
699        assert_eq!(decoded, region_route);
700
701        let region_route = RegionRoute {
702            region: Region {
703                id: 2.into(),
704                name: "r2".to_string(),
705                attrs: BTreeMap::new(),
706                partition_expr: "".to_string(),
707            },
708            leader_peer: Some(Peer::new(1, "a1")),
709            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
710            leader_state: Some(LeaderState::Downgrading),
711            leader_down_since: None,
712            write_route_policy: None,
713        };
714        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
715        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
716        assert_eq!(decoded, region_route);
717
718        let region_route = RegionRoute {
719            region: Region {
720                id: 2.into(),
721                name: "r2".to_string(),
722                attrs: BTreeMap::new(),
723                partition_expr: "".to_string(),
724            },
725            leader_peer: Some(Peer::new(1, "a1")),
726            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
727            leader_state: Some(LeaderState::Downgrading),
728            leader_down_since: None,
729            write_route_policy: None,
730        };
731        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
732        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
733        assert_eq!(decoded, region_route);
734    }
735
736    #[test]
737    fn test_region_route_write_route_policy_decode_compatibility() {
738        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"}],"write_route_policy":"IgnoreAllWrites"}"#;
739        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
740
741        assert!(decoded.is_ignore_all_writes());
742    }
743
744    #[test]
745    fn test_region_route_write_route_policy_default_not_serialized() {
746        let region_route = RegionRoute {
747            region: Region {
748                id: 2.into(),
749                name: "r2".to_string(),
750                attrs: BTreeMap::new(),
751                partition_expr: "".to_string(),
752            },
753            leader_peer: Some(Peer::new(1, "a1")),
754            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
755            leader_state: None,
756            leader_down_since: None,
757            write_route_policy: None,
758        };
759
760        let encoded = serde_json::to_string(&region_route).unwrap();
761        assert!(!encoded.contains("write_route_policy"));
762    }
763
764    #[test]
765    fn test_region_route_write_route_policy_helpers() {
766        let mut region_route = RegionRoute {
767            region: Region::new_test(2.into()),
768            leader_peer: Some(Peer::new(1, "a1")),
769            follower_peers: vec![],
770            leader_state: None,
771            leader_down_since: None,
772            write_route_policy: None,
773        };
774
775        assert!(!region_route.is_ignore_all_writes());
776        region_route.set_ignore_all_writes();
777        assert!(region_route.is_ignore_all_writes());
778        region_route.clear_ignore_all_writes();
779        assert!(!region_route.is_ignore_all_writes());
780    }
781
782    #[test]
783    fn test_leader_region_role_without_leader_peer_returns_none() {
784        let region_route = RegionRoute {
785            leader_peer: None,
786            ..new_test_region_route(RegionId::new(1, 1))
787        };
788
789        assert_eq!(region_route.leader_region_role(), None);
790    }
791
792    #[test]
793    fn test_leader_region_role_variants() {
794        let normal = new_test_region_route(RegionId::new(1, 1));
795        let mut downgrading = new_test_region_route(RegionId::new(1, 2));
796        downgrading.leader_state = Some(LeaderState::Downgrading);
797        let mut staging = new_test_region_route(RegionId::new(1, 3));
798        staging.leader_state = Some(LeaderState::Staging);
799
800        assert_eq!(normal.leader_region_role(), Some(RegionRole::Leader));
801        assert_eq!(
802            downgrading.leader_region_role(),
803            Some(RegionRole::DowngradingLeader)
804        );
805        assert_eq!(
806            staging.leader_region_role(),
807            Some(RegionRole::StagingLeader)
808        );
809    }
810
811    #[test]
812    fn test_operating_leader_region_roles_returns_expected_roles() {
813        let no_leader_region = RegionRoute {
814            leader_peer: None,
815            ..new_test_region_route(RegionId::new(1, 4))
816        };
817        let mut downgrading = new_test_region_route(RegionId::new(1, 2));
818        downgrading.leader_peer = Some(Peer::new(2, "a2"));
819        downgrading.leader_state = Some(LeaderState::Downgrading);
820        let mut staging = new_test_region_route(RegionId::new(1, 3));
821        staging.leader_peer = Some(Peer::new(3, "a3"));
822        staging.leader_state = Some(LeaderState::Staging);
823
824        let roles = operating_leader_region_roles(&[
825            new_test_region_route(RegionId::new(1, 1)),
826            downgrading,
827            staging,
828            no_leader_region,
829        ]);
830
831        assert_eq!(
832            roles,
833            vec![
834                (RegionId::new(1, 1), 1, RegionRole::Leader),
835                (RegionId::new(1, 2), 2, RegionRole::DowngradingLeader),
836                (RegionId::new(1, 3), 3, RegionRole::StagingLeader),
837            ]
838        );
839    }
840
841    #[test]
842    fn test_region_distribution() {
843        let region_routes = vec![
844            RegionRoute {
845                region: Region {
846                    id: RegionId::new(1, 1),
847                    name: "r1".to_string(),
848                    attrs: BTreeMap::new(),
849                    partition_expr: "".to_string(),
850                },
851                leader_peer: Some(Peer::new(1, "a1")),
852                follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
853                leader_state: None,
854                leader_down_since: None,
855                write_route_policy: None,
856            },
857            RegionRoute {
858                region: Region {
859                    id: RegionId::new(1, 2),
860                    name: "r2".to_string(),
861                    attrs: BTreeMap::new(),
862                    partition_expr: "".to_string(),
863                },
864                leader_peer: Some(Peer::new(2, "a2")),
865                follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
866                leader_state: None,
867                leader_down_since: None,
868                write_route_policy: None,
869            },
870        ];
871
872        let distribution = region_distribution(&region_routes);
873        assert_eq!(distribution.len(), 3);
874        assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
875        assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
876        assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
877    }
878
879    #[test]
880    fn test_de_serialize_partition() {
881        let p = LegacyPartition {
882            column_list: vec![b"a".to_vec(), b"b".to_vec()],
883            value_list: vec![b"hi".to_vec(), b",".to_vec()],
884        };
885
886        let output = serde_json::to_string(&p).unwrap();
887        let got: LegacyPartition = serde_json::from_str(&output).unwrap();
888
889        assert_eq!(got, p);
890    }
891
892    #[test]
893    #[allow(deprecated)]
894    fn test_region_partition_expr() {
895        let r = PbRegion {
896            id: 1,
897            name: "r1".to_string(),
898            partition: None,
899            attrs: Default::default(),
900        };
901        assert_eq!(pb_region_partition_expr(&r), "");
902
903        let r2: Region = r.into();
904        assert_eq!(r2.partition_expr(), "");
905
906        let r3: PbRegion = r2.into();
907        assert_eq!(r3.partition.as_ref().unwrap().expression, "");
908
909        let r = PbRegion {
910            id: 1,
911            name: "r1".to_string(),
912            partition: Some(PbPartition {
913                column_list: vec![b"a".to_vec()],
914                value_list: vec![b"{}".to_vec()],
915                expression: Default::default(),
916            }),
917            attrs: Default::default(),
918        };
919        assert_eq!(pb_region_partition_expr(&r), "{}");
920
921        let r2: Region = r.into();
922        assert_eq!(r2.partition_expr(), "{}");
923
924        let r3: PbRegion = r2.into();
925        assert_eq!(r3.partition.as_ref().unwrap().expression, "{}");
926
927        let r = PbRegion {
928            id: 1,
929            name: "r1".to_string(),
930            partition: Some(PbPartition {
931                column_list: vec![b"a".to_vec()],
932                value_list: vec![b"{}".to_vec()],
933                expression: "a>b".to_string(),
934            }),
935            attrs: Default::default(),
936        };
937        assert_eq!(pb_region_partition_expr(&r), "a>b");
938
939        let r2: Region = r.into();
940        assert_eq!(r2.partition_expr(), "a>b");
941
942        let r3: PbRegion = r2.into();
943        assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b");
944    }
945}