common_meta/rpc/
router.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18    Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19    TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::DatanodeId;
31use crate::error::{self, Result};
32use crate::key::RegionDistribution;
33use crate::peer::Peer;
34
35/// Returns the distribution of regions to datanodes.
36///
37/// The distribution is a map of datanode id to a list of region ids.
38/// The list of region ids is sorted in ascending order.
39pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40    let mut regions_id_map = RegionDistribution::new();
41    for route in region_routes.iter() {
42        if let Some(peer) = route.leader_peer.as_ref() {
43            let region_number = route.region.id.region_number();
44            regions_id_map
45                .entry(peer.id)
46                .or_default()
47                .add_leader_region(region_number);
48        }
49        for peer in route.follower_peers.iter() {
50            let region_number = route.region.id.region_number();
51            regions_id_map
52                .entry(peer.id)
53                .or_default()
54                .add_follower_region(region_number);
55        }
56    }
57    for (_, region_role_set) in regions_id_map.iter_mut() {
58        // Sort the regions in ascending order.
59        region_role_set.sort()
60    }
61    regions_id_map
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
65pub struct TableRoute {
66    pub table: Table,
67    pub region_routes: Vec<RegionRoute>,
68    region_leaders: HashMap<RegionNumber, Option<Peer>>,
69}
70
71/// Returns the leader peers of the table.
72pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
73    region_routes
74        .iter()
75        .flat_map(|x| &x.leader_peer)
76        .cloned()
77        .collect()
78}
79
80/// Returns the followers of the table.
81pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
82    region_routes
83        .iter()
84        .flat_map(|x| &x.follower_peers)
85        .cloned()
86        .collect()
87}
88
89/// Returns the operating leader regions with corresponding [DatanodeId].
90pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
91    region_routes
92        .iter()
93        .filter_map(|route| {
94            route
95                .leader_peer
96                .as_ref()
97                .map(|leader| (route.region.id, leader.id))
98        })
99        .collect::<Vec<_>>()
100}
101
102/// Returns the HashMap<[RegionNumber], &[Peer]>;
103///
104/// If the region doesn't have a leader peer, the [Region] will be omitted.
105pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
106    region_routes
107        .iter()
108        .filter_map(|x| {
109            x.leader_peer
110                .as_ref()
111                .map(|leader| (x.region.id.region_number(), leader))
112        })
113        .collect::<HashMap<_, _>>()
114}
115
116pub fn find_region_leader(
117    region_routes: &[RegionRoute],
118    region_number: RegionNumber,
119) -> Option<Peer> {
120    region_routes
121        .iter()
122        .find(|x| x.region.id.region_number() == region_number)
123        .and_then(|r| r.leader_peer.as_ref())
124        .cloned()
125}
126
127/// Returns the region numbers of the leader regions on the target datanode.
128pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
129    region_routes
130        .iter()
131        .filter_map(|x| {
132            if let Some(peer) = &x.leader_peer
133                && peer == datanode
134            {
135                return Some(x.region.id.region_number());
136            }
137            None
138        })
139        .collect()
140}
141
142/// Returns the region numbers of the follower regions on the target datanode.
143pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144    region_routes
145        .iter()
146        .filter_map(|x| {
147            if x.follower_peers.contains(datanode) {
148                return Some(x.region.id.region_number());
149            }
150            None
151        })
152        .collect()
153}
154
155impl TableRoute {
156    pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
157        let region_leaders = region_routes
158            .iter()
159            .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
160            .collect::<HashMap<_, _>>();
161        Self {
162            table,
163            region_routes,
164            region_leaders,
165        }
166    }
167
168    pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
169        let table = table_route
170            .table
171            .context(error::RouteInfoCorruptedSnafu {
172                err_msg: "'table' is empty in table route",
173            })?
174            .try_into()?;
175
176        let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
177        for region_route in table_route.region_routes.into_iter() {
178            let region = region_route
179                .region
180                .context(error::RouteInfoCorruptedSnafu {
181                    err_msg: "'region' is empty in region route",
182                })?
183                .into();
184
185            let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
186
187            let follower_peers = region_route
188                .follower_peer_indexes
189                .into_iter()
190                .filter_map(|x| peers.get(x as usize).cloned())
191                .collect::<Vec<_>>();
192
193            region_routes.push(RegionRoute {
194                region,
195                leader_peer,
196                follower_peers,
197                leader_state: None,
198                leader_down_since: None,
199                write_route_policy: None,
200            });
201        }
202
203        Ok(Self::new(table, region_routes))
204    }
205}
206
207#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
208pub struct Table {
209    pub id: u64,
210    pub table_name: TableName,
211    #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
212    pub table_schema: Vec<u8>,
213}
214
215impl TryFrom<PbTable> for Table {
216    type Error = error::Error;
217
218    fn try_from(t: PbTable) -> Result<Self> {
219        let table_name = t
220            .table_name
221            .context(error::RouteInfoCorruptedSnafu {
222                err_msg: "table name required",
223            })?
224            .into();
225        Ok(Self {
226            id: t.id,
227            table_name,
228            table_schema: t.table_schema,
229        })
230    }
231}
232
233impl From<Table> for PbTable {
234    fn from(table: Table) -> Self {
235        PbTable {
236            id: table.id,
237            table_name: Some(table.table_name.into()),
238            table_schema: table.table_schema,
239        }
240    }
241}
242
243#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
244pub struct RegionRoute {
245    pub region: Region,
246    #[builder(setter(into, strip_option))]
247    pub leader_peer: Option<Peer>,
248    #[builder(setter(into), default)]
249    pub follower_peers: Vec<Peer>,
250    /// `None` by default.
251    #[builder(setter(into, strip_option), default)]
252    #[serde(
253        default,
254        alias = "leader_status",
255        skip_serializing_if = "Option::is_none"
256    )]
257    pub leader_state: Option<LeaderState>,
258    /// The start time when the leader is in `Downgraded` state.
259    #[serde(default)]
260    #[builder(default = "self.default_leader_down_since()")]
261    pub leader_down_since: Option<i64>,
262    /// Special write routing behavior for this region.
263    #[builder(setter(into, strip_option), default)]
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub write_route_policy: Option<WriteRoutePolicy>,
266}
267
268impl RegionRouteBuilder {
269    fn default_leader_down_since(&self) -> Option<i64> {
270        match self.leader_state {
271            Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
272            _ => None,
273        }
274    }
275}
276
277/// The State of the [`Region`] Leader.
278/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc.
279#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
280#[strum(serialize_all = "UPPERCASE")]
281pub enum LeaderState {
282    /// The following cases in which the [`Region`] will be downgraded.
283    ///
284    /// - The [`Region`] may be unavailable (e.g., Crashed, Network disconnected).
285    /// - The [`Region`] was planned to migrate to another [`Peer`].
286    #[serde(alias = "Downgraded")]
287    Downgrading,
288    /// The [`Region`] is in staging mode.
289    ///
290    /// Disables checkpoint and compaction while maintaining write capability.
291    /// But data ingested during this period are not visible to the user (hence staging).
292    Staging,
293}
294
295/// The write route policy for the region.
296#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
297pub enum WriteRoutePolicy {
298    // The default policy.
299    Normal,
300    /// Ignores all writes for this region.
301    ///
302    /// This policy is typically used during region merge operations, such as repartitioning.
303    /// For example, when merging Region A and Region B into just Region B,
304    /// writes to Region A are ignored, while Region B accepts all writes originating from both regions.
305    IgnoreAllWrites,
306}
307
308impl RegionRoute {
309    /// Returns true if the region should ignore all writes.
310    pub fn is_ignore_all_writes(&self) -> bool {
311        matches!(
312            self.write_route_policy,
313            Some(WriteRoutePolicy::IgnoreAllWrites)
314        )
315    }
316
317    /// Marks this region as ignore-all for writes.
318    pub fn set_ignore_all_writes(&mut self) {
319        self.write_route_policy = Some(WriteRoutePolicy::IgnoreAllWrites);
320    }
321
322    /// Clears ignore-all write policy and falls back to normal routing behavior.
323    pub fn clear_ignore_all_writes(&mut self) {
324        if self.write_route_policy == Some(WriteRoutePolicy::IgnoreAllWrites) {
325            self.write_route_policy = None;
326        }
327    }
328
329    /// Returns true if the Leader [`Region`] is downgraded.
330    ///
331    /// The following cases in which the [`Region`] will be downgraded.
332    ///
333    /// - The [`Region`] is unavailable(e.g., Crashed, Network disconnected).
334    /// - The [`Region`] was planned to migrate to another [`Peer`].
335    ///
336    pub fn is_leader_downgrading(&self) -> bool {
337        matches!(self.leader_state, Some(LeaderState::Downgrading))
338    }
339
340    /// Returns true if the Leader [`Region`] is in staging mode.
341    pub fn is_leader_staging(&self) -> bool {
342        matches!(self.leader_state, Some(LeaderState::Staging))
343    }
344
345    /// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
346    ///
347    /// We should downgrade a [`Region`] before deactivating it:
348    ///
349    /// - During the [`Region`] Failover Procedure.
350    /// - Migrating a [`Region`].
351    ///
352    /// **Notes:** Meta Server will renewing a special lease(`Downgrading`) for the downgrading [`Region`].
353    ///
354    /// A downgrading region will reject any write requests, and only allow memetable to be flushed to object storage
355    ///
356    pub fn downgrade_leader(&mut self) {
357        self.leader_down_since = Some(current_time_millis());
358        self.leader_state = Some(LeaderState::Downgrading)
359    }
360
361    /// Sets the Leader [`Region`] to staging mode.
362    pub fn set_leader_staging(&mut self) {
363        self.leader_state = Some(LeaderState::Staging);
364        // Reset leader_down_since as it's specific to downgrading
365        self.leader_down_since = None;
366    }
367
368    /// Clears the leader staging state, returning to normal leader mode.
369    pub fn clear_leader_staging(&mut self) {
370        if self.leader_state == Some(LeaderState::Staging) {
371            self.leader_state = None;
372            self.leader_down_since = None;
373        }
374    }
375
376    /// Returns how long since the leader is in `Downgraded` state.
377    pub fn leader_down_millis(&self) -> Option<i64> {
378        self.leader_down_since
379            .map(|start| current_time_millis() - start)
380    }
381
382    /// Sets the leader state.
383    ///
384    /// Returns true if updated.
385    pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
386        let updated = self.leader_state != state;
387
388        match (state, updated) {
389            (Some(LeaderState::Downgrading), true) => {
390                self.leader_down_since = Some(current_time_millis());
391            }
392            (Some(LeaderState::Downgrading), false) => {
393                // Do nothing if leader is still in `Downgraded` state.
394            }
395            _ => {
396                self.leader_down_since = None;
397            }
398        }
399
400        self.leader_state = state;
401        updated
402    }
403}
404
405pub struct RegionRoutes(pub Vec<RegionRoute>);
406
407impl RegionRoutes {
408    pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
409        convert_to_region_leader_map(&self.0)
410    }
411
412    pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
413        self.region_leader_map().get(&region_number).copied()
414    }
415}
416
417#[derive(Debug, Clone, Default, PartialEq, Serialize)]
418pub struct Region {
419    pub id: RegionId,
420    pub name: String,
421    pub attrs: BTreeMap<String, String>,
422    /// The normalized partition expression of the region.
423    pub partition_expr: String,
424}
425
426#[derive(Debug, Deserialize)]
427struct RegionDe {
428    id: RegionId,
429    name: String,
430    #[serde(default)]
431    attrs: BTreeMap<String, String>,
432    #[serde(default)]
433    partition: Option<LegacyPartition>,
434    #[serde(default)]
435    partition_expr: String,
436}
437
438impl<'de> Deserialize<'de> for Region {
439    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
440    where
441        D: Deserializer<'de>,
442    {
443        let de = RegionDe::deserialize(deserializer)?;
444        // Compatibility path for legacy serialized routes: prefer the normalized
445        // `partition_expr` field and only fall back to legacy `partition.value_list`.
446        let partition_expr = if de.partition_expr.is_empty() {
447            if let Some(LegacyPartition { value_list, .. }) = &de.partition {
448                value_list
449                    .first()
450                    .map(|expr| String::from_utf8_lossy(expr).to_string())
451                    .unwrap_or_default()
452            } else {
453                String::new()
454            }
455        } else {
456            de.partition_expr
457        };
458
459        Ok(Self {
460            id: de.id,
461            name: de.name,
462            attrs: de.attrs,
463            partition_expr,
464        })
465    }
466}
467
468impl Region {
469    #[cfg(any(test, feature = "testing"))]
470    pub fn new_test(id: RegionId) -> Self {
471        Self {
472            id,
473            ..Default::default()
474        }
475    }
476
477    /// Gets the partition expression of the region in compatible mode.
478    pub fn partition_expr(&self) -> String {
479        self.partition_expr.clone()
480    }
481}
482
483/// Gets the partition expression of the `PbRegion` in compatible mode.
484#[allow(deprecated)]
485pub fn pb_region_partition_expr(r: &PbRegion) -> String {
486    if let Some(partition) = &r.partition {
487        if !partition.expression.is_empty() {
488            partition.expression.clone()
489        } else if !partition.value_list.is_empty() {
490            String::from_utf8_lossy(&partition.value_list[0]).to_string()
491        } else {
492            "".to_string()
493        }
494    } else {
495        "".to_string()
496    }
497}
498
499impl From<PbRegion> for Region {
500    fn from(r: PbRegion) -> Self {
501        let partition_expr = pb_region_partition_expr(&r);
502        Self {
503            id: r.id.into(),
504            name: r.name,
505            partition_expr,
506            attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
507        }
508    }
509}
510
511impl From<Region> for PbRegion {
512    fn from(region: Region) -> Self {
513        let partition_expr = region.partition_expr();
514        Self {
515            id: region.id.into(),
516            name: region.name,
517            partition: Some(PbPartition {
518                expression: partition_expr,
519                ..Default::default()
520            }),
521            attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
522        }
523    }
524}
525
526#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
527pub struct LegacyPartition {
528    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
529    pub column_list: Vec<Vec<u8>>,
530    #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
531    pub value_list: Vec<Vec<u8>>,
532}
533
534fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
535    serializer.serialize_str(
536        String::from_utf8(val.to_vec())
537            .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
538            .as_str(),
539    )
540}
541
542pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
543where
544    D: Deserializer<'de>,
545{
546    let s = String::deserialize(deserializer)?;
547
548    Ok(s.into_bytes())
549}
550
551fn as_utf8_vec<S: Serializer>(
552    val: &[Vec<u8>],
553    serializer: S,
554) -> std::result::Result<S::Ok, S::Error> {
555    let mut seq = serializer.serialize_seq(Some(val.len()))?;
556    for v in val {
557        seq.serialize_element(&String::from_utf8_lossy(v))?;
558    }
559    seq.end()
560}
561
562pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
563where
564    D: Deserializer<'de>,
565{
566    let values = Vec::<String>::deserialize(deserializer)?;
567
568    let values = values
569        .into_iter()
570        .map(|value| value.into_bytes())
571        .collect::<Vec<_>>();
572    Ok(values)
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use crate::key::RegionRoleSet;
579
580    #[test]
581    fn test_leader_is_downgraded() {
582        let mut region_route = RegionRoute {
583            region: Region {
584                id: 2.into(),
585                name: "r2".to_string(),
586                attrs: BTreeMap::new(),
587                partition_expr: "".to_string(),
588            },
589            leader_peer: Some(Peer::new(1, "a1")),
590            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
591            leader_state: None,
592            leader_down_since: None,
593            write_route_policy: None,
594        };
595
596        assert!(!region_route.is_leader_downgrading());
597
598        region_route.downgrade_leader();
599
600        assert!(region_route.is_leader_downgrading());
601    }
602
603    #[test]
604    fn test_region_route_decode() {
605        let region_route = RegionRoute {
606            region: Region {
607                id: 2.into(),
608                name: "r2".to_string(),
609                attrs: BTreeMap::new(),
610                partition_expr: "".to_string(),
611            },
612            leader_peer: Some(Peer::new(1, "a1")),
613            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
614            leader_state: None,
615            leader_down_since: None,
616            write_route_policy: None,
617        };
618
619        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
620
621        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
622
623        assert_eq!(decoded, region_route);
624    }
625
626    #[test]
627    fn test_region_route_compatibility() {
628        let region_route = RegionRoute {
629            region: Region {
630                id: 2.into(),
631                name: "r2".to_string(),
632                attrs: BTreeMap::new(),
633                partition_expr: "".to_string(),
634            },
635            leader_peer: Some(Peer::new(1, "a1")),
636            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
637            leader_state: Some(LeaderState::Downgrading),
638            leader_down_since: None,
639            write_route_policy: None,
640        };
641        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
642        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
643        assert_eq!(decoded, region_route);
644
645        let region_route = RegionRoute {
646            region: Region {
647                id: 2.into(),
648                name: "r2".to_string(),
649                attrs: BTreeMap::new(),
650                partition_expr: "".to_string(),
651            },
652            leader_peer: Some(Peer::new(1, "a1")),
653            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
654            leader_state: Some(LeaderState::Downgrading),
655            leader_down_since: None,
656            write_route_policy: None,
657        };
658        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
659        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
660        assert_eq!(decoded, region_route);
661
662        let region_route = RegionRoute {
663            region: Region {
664                id: 2.into(),
665                name: "r2".to_string(),
666                attrs: BTreeMap::new(),
667                partition_expr: "".to_string(),
668            },
669            leader_peer: Some(Peer::new(1, "a1")),
670            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
671            leader_state: Some(LeaderState::Downgrading),
672            leader_down_since: None,
673            write_route_policy: None,
674        };
675        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
676        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
677        assert_eq!(decoded, region_route);
678
679        let region_route = RegionRoute {
680            region: Region {
681                id: 2.into(),
682                name: "r2".to_string(),
683                attrs: BTreeMap::new(),
684                partition_expr: "".to_string(),
685            },
686            leader_peer: Some(Peer::new(1, "a1")),
687            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
688            leader_state: Some(LeaderState::Downgrading),
689            leader_down_since: None,
690            write_route_policy: None,
691        };
692        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
693        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
694        assert_eq!(decoded, region_route);
695    }
696
697    #[test]
698    fn test_region_route_write_route_policy_decode_compatibility() {
699        let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"}],"write_route_policy":"IgnoreAllWrites"}"#;
700        let decoded: RegionRoute = serde_json::from_str(input).unwrap();
701
702        assert!(decoded.is_ignore_all_writes());
703    }
704
705    #[test]
706    fn test_region_route_write_route_policy_default_not_serialized() {
707        let region_route = RegionRoute {
708            region: Region {
709                id: 2.into(),
710                name: "r2".to_string(),
711                attrs: BTreeMap::new(),
712                partition_expr: "".to_string(),
713            },
714            leader_peer: Some(Peer::new(1, "a1")),
715            follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
716            leader_state: None,
717            leader_down_since: None,
718            write_route_policy: None,
719        };
720
721        let encoded = serde_json::to_string(&region_route).unwrap();
722        assert!(!encoded.contains("write_route_policy"));
723    }
724
725    #[test]
726    fn test_region_route_write_route_policy_helpers() {
727        let mut region_route = RegionRoute {
728            region: Region::new_test(2.into()),
729            leader_peer: Some(Peer::new(1, "a1")),
730            follower_peers: vec![],
731            leader_state: None,
732            leader_down_since: None,
733            write_route_policy: None,
734        };
735
736        assert!(!region_route.is_ignore_all_writes());
737        region_route.set_ignore_all_writes();
738        assert!(region_route.is_ignore_all_writes());
739        region_route.clear_ignore_all_writes();
740        assert!(!region_route.is_ignore_all_writes());
741    }
742
743    #[test]
744    fn test_region_distribution() {
745        let region_routes = vec![
746            RegionRoute {
747                region: Region {
748                    id: RegionId::new(1, 1),
749                    name: "r1".to_string(),
750                    attrs: BTreeMap::new(),
751                    partition_expr: "".to_string(),
752                },
753                leader_peer: Some(Peer::new(1, "a1")),
754                follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
755                leader_state: None,
756                leader_down_since: None,
757                write_route_policy: None,
758            },
759            RegionRoute {
760                region: Region {
761                    id: RegionId::new(1, 2),
762                    name: "r2".to_string(),
763                    attrs: BTreeMap::new(),
764                    partition_expr: "".to_string(),
765                },
766                leader_peer: Some(Peer::new(2, "a2")),
767                follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
768                leader_state: None,
769                leader_down_since: None,
770                write_route_policy: None,
771            },
772        ];
773
774        let distribution = region_distribution(&region_routes);
775        assert_eq!(distribution.len(), 3);
776        assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
777        assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
778        assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
779    }
780
781    #[test]
782    fn test_de_serialize_partition() {
783        let p = LegacyPartition {
784            column_list: vec![b"a".to_vec(), b"b".to_vec()],
785            value_list: vec![b"hi".to_vec(), b",".to_vec()],
786        };
787
788        let output = serde_json::to_string(&p).unwrap();
789        let got: LegacyPartition = serde_json::from_str(&output).unwrap();
790
791        assert_eq!(got, p);
792    }
793
794    #[test]
795    #[allow(deprecated)]
796    fn test_region_partition_expr() {
797        let r = PbRegion {
798            id: 1,
799            name: "r1".to_string(),
800            partition: None,
801            attrs: Default::default(),
802        };
803        assert_eq!(pb_region_partition_expr(&r), "");
804
805        let r2: Region = r.into();
806        assert_eq!(r2.partition_expr(), "");
807
808        let r3: PbRegion = r2.into();
809        assert_eq!(r3.partition.as_ref().unwrap().expression, "");
810
811        let r = PbRegion {
812            id: 1,
813            name: "r1".to_string(),
814            partition: Some(PbPartition {
815                column_list: vec![b"a".to_vec()],
816                value_list: vec![b"{}".to_vec()],
817                expression: Default::default(),
818            }),
819            attrs: Default::default(),
820        };
821        assert_eq!(pb_region_partition_expr(&r), "{}");
822
823        let r2: Region = r.into();
824        assert_eq!(r2.partition_expr(), "{}");
825
826        let r3: PbRegion = r2.into();
827        assert_eq!(r3.partition.as_ref().unwrap().expression, "{}");
828
829        let r = PbRegion {
830            id: 1,
831            name: "r1".to_string(),
832            partition: Some(PbPartition {
833                column_list: vec![b"a".to_vec()],
834                value_list: vec![b"{}".to_vec()],
835                expression: "a>b".to_string(),
836            }),
837            attrs: Default::default(),
838        };
839        assert_eq!(pb_region_partition_expr(&r), "a>b");
840
841        let r2: Region = r.into();
842        assert_eq!(r2.partition_expr(), "a>b");
843
844        let r3: PbRegion = r2.into();
845        assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b");
846    }
847}