1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18 Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19 TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::region_engine::RegionRole;
27use store_api::storage::{RegionId, RegionNumber};
28use strum::AsRefStr;
29use table::table_name::TableName;
30
31use crate::DatanodeId;
32use crate::error::{self, Result};
33use crate::key::RegionDistribution;
34use crate::peer::Peer;
35
36pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
41 let mut regions_id_map = RegionDistribution::new();
42 for route in region_routes.iter() {
43 if let Some(peer) = route.leader_peer.as_ref() {
44 let region_number = route.region.id.region_number();
45 regions_id_map
46 .entry(peer.id)
47 .or_default()
48 .add_leader_region(region_number);
49 }
50 for peer in route.follower_peers.iter() {
51 let region_number = route.region.id.region_number();
52 regions_id_map
53 .entry(peer.id)
54 .or_default()
55 .add_follower_region(region_number);
56 }
57 }
58 for (_, region_role_set) in regions_id_map.iter_mut() {
59 region_role_set.sort()
61 }
62 regions_id_map
63}
64
65#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
66pub struct TableRoute {
67 pub table: Table,
68 pub region_routes: Vec<RegionRoute>,
69 region_leaders: HashMap<RegionNumber, Option<Peer>>,
70}
71
72pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
74 region_routes
75 .iter()
76 .flat_map(|x| &x.leader_peer)
77 .cloned()
78 .collect()
79}
80
81pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
83 region_routes
84 .iter()
85 .flat_map(|x| &x.follower_peers)
86 .cloned()
87 .collect()
88}
89
90pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
92 region_routes
93 .iter()
94 .filter_map(|route| {
95 route
96 .leader_peer
97 .as_ref()
98 .map(|leader| (route.region.id, leader.id))
99 })
100 .collect::<Vec<_>>()
101}
102
103pub fn operating_leader_region_roles(
105 region_routes: &[RegionRoute],
106) -> Vec<(RegionId, DatanodeId, RegionRole)> {
107 region_routes
108 .iter()
109 .filter_map(|route| {
110 let role = route.leader_region_role()?;
111 let leader = route.leader_peer.as_ref()?;
112 Some((route.region.id, leader.id, role))
113 })
114 .collect()
115}
116
117pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
121 region_routes
122 .iter()
123 .filter_map(|x| {
124 x.leader_peer
125 .as_ref()
126 .map(|leader| (x.region.id.region_number(), leader))
127 })
128 .collect::<HashMap<_, _>>()
129}
130
131pub fn find_region_leader(
132 region_routes: &[RegionRoute],
133 region_number: RegionNumber,
134) -> Option<Peer> {
135 region_routes
136 .iter()
137 .find(|x| x.region.id.region_number() == region_number)
138 .and_then(|r| r.leader_peer.as_ref())
139 .cloned()
140}
141
142pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144 region_routes
145 .iter()
146 .filter_map(|x| {
147 if let Some(peer) = &x.leader_peer
148 && peer == datanode
149 {
150 return Some(x.region.id.region_number());
151 }
152 None
153 })
154 .collect()
155}
156
157pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
159 region_routes
160 .iter()
161 .filter_map(|x| {
162 if x.follower_peers.contains(datanode) {
163 return Some(x.region.id.region_number());
164 }
165 None
166 })
167 .collect()
168}
169
170impl TableRoute {
171 pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
172 let region_leaders = region_routes
173 .iter()
174 .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
175 .collect::<HashMap<_, _>>();
176 Self {
177 table,
178 region_routes,
179 region_leaders,
180 }
181 }
182
183 pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
184 let table = table_route
185 .table
186 .context(error::RouteInfoCorruptedSnafu {
187 err_msg: "'table' is empty in table route",
188 })?
189 .try_into()?;
190
191 let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
192 for region_route in table_route.region_routes.into_iter() {
193 let region = region_route
194 .region
195 .context(error::RouteInfoCorruptedSnafu {
196 err_msg: "'region' is empty in region route",
197 })?
198 .into();
199
200 let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
201
202 let follower_peers = region_route
203 .follower_peer_indexes
204 .into_iter()
205 .filter_map(|x| peers.get(x as usize).cloned())
206 .collect::<Vec<_>>();
207
208 region_routes.push(RegionRoute {
209 region,
210 leader_peer,
211 follower_peers,
212 leader_state: None,
213 leader_down_since: None,
214 write_route_policy: None,
215 });
216 }
217
218 Ok(Self::new(table, region_routes))
219 }
220}
221
222#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
223pub struct Table {
224 pub id: u64,
225 pub table_name: TableName,
226 #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
227 pub table_schema: Vec<u8>,
228}
229
230impl TryFrom<PbTable> for Table {
231 type Error = error::Error;
232
233 fn try_from(t: PbTable) -> Result<Self> {
234 let table_name = t
235 .table_name
236 .context(error::RouteInfoCorruptedSnafu {
237 err_msg: "table name required",
238 })?
239 .into();
240 Ok(Self {
241 id: t.id,
242 table_name,
243 table_schema: t.table_schema,
244 })
245 }
246}
247
248impl From<Table> for PbTable {
249 fn from(table: Table) -> Self {
250 PbTable {
251 id: table.id,
252 table_name: Some(table.table_name.into()),
253 table_schema: table.table_schema,
254 }
255 }
256}
257
258#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
259pub struct RegionRoute {
260 pub region: Region,
261 #[builder(setter(into, strip_option))]
262 pub leader_peer: Option<Peer>,
263 #[builder(setter(into), default)]
264 pub follower_peers: Vec<Peer>,
265 #[builder(setter(into, strip_option), default)]
267 #[serde(
268 default,
269 alias = "leader_status",
270 skip_serializing_if = "Option::is_none"
271 )]
272 pub leader_state: Option<LeaderState>,
273 #[serde(default)]
275 #[builder(default = "self.default_leader_down_since()")]
276 pub leader_down_since: Option<i64>,
277 #[builder(setter(into, strip_option), default)]
279 #[serde(default, skip_serializing_if = "Option::is_none")]
280 pub write_route_policy: Option<WriteRoutePolicy>,
281}
282
283impl RegionRouteBuilder {
284 fn default_leader_down_since(&self) -> Option<i64> {
285 match self.leader_state {
286 Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
287 _ => None,
288 }
289 }
290}
291
292#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
295#[strum(serialize_all = "UPPERCASE")]
296pub enum LeaderState {
297 #[serde(alias = "Downgraded")]
302 Downgrading,
303 Staging,
308}
309
310#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
312pub enum WriteRoutePolicy {
313 Normal,
315 IgnoreAllWrites,
321}
322
323impl RegionRoute {
324 pub fn is_ignore_all_writes(&self) -> bool {
326 matches!(
327 self.write_route_policy,
328 Some(WriteRoutePolicy::IgnoreAllWrites)
329 )
330 }
331
332 pub fn set_ignore_all_writes(&mut self) {
334 self.write_route_policy = Some(WriteRoutePolicy::IgnoreAllWrites);
335 }
336
337 pub fn clear_ignore_all_writes(&mut self) {
339 if self.write_route_policy == Some(WriteRoutePolicy::IgnoreAllWrites) {
340 self.write_route_policy = None;
341 }
342 }
343
344 pub fn is_leader_downgrading(&self) -> bool {
352 matches!(self.leader_state, Some(LeaderState::Downgrading))
353 }
354
355 pub fn is_leader_staging(&self) -> bool {
357 matches!(self.leader_state, Some(LeaderState::Staging))
358 }
359
360 pub fn leader_region_role(&self) -> Option<RegionRole> {
362 self.leader_peer.as_ref().map(|_| {
363 if self.is_leader_staging() {
364 RegionRole::StagingLeader
365 } else if self.is_leader_downgrading() {
366 RegionRole::DowngradingLeader
367 } else {
368 RegionRole::Leader
369 }
370 })
371 }
372
373 pub fn downgrade_leader(&mut self) {
385 self.leader_down_since = Some(current_time_millis());
386 self.leader_state = Some(LeaderState::Downgrading)
387 }
388
389 pub fn set_leader_staging(&mut self) {
391 self.leader_state = Some(LeaderState::Staging);
392 self.leader_down_since = None;
394 }
395
396 pub fn clear_leader_staging(&mut self) {
398 if self.leader_state == Some(LeaderState::Staging) {
399 self.leader_state = None;
400 self.leader_down_since = None;
401 }
402 }
403
404 pub fn leader_down_millis(&self) -> Option<i64> {
406 self.leader_down_since
407 .map(|start| current_time_millis() - start)
408 }
409
410 pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
414 let updated = self.leader_state != state;
415
416 match (state, updated) {
417 (Some(LeaderState::Downgrading), true) => {
418 self.leader_down_since = Some(current_time_millis());
419 }
420 (Some(LeaderState::Downgrading), false) => {
421 }
423 _ => {
424 self.leader_down_since = None;
425 }
426 }
427
428 self.leader_state = state;
429 updated
430 }
431}
432
433pub struct RegionRoutes(pub Vec<RegionRoute>);
434
435impl RegionRoutes {
436 pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
437 convert_to_region_leader_map(&self.0)
438 }
439
440 pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
441 self.region_leader_map().get(®ion_number).copied()
442 }
443}
444
445#[derive(Debug, Clone, Default, PartialEq, Serialize)]
446pub struct Region {
447 pub id: RegionId,
448 pub name: String,
449 pub attrs: BTreeMap<String, String>,
450 pub partition_expr: String,
452}
453
454#[derive(Debug, Deserialize)]
455struct RegionDe {
456 id: RegionId,
457 name: String,
458 #[serde(default)]
459 attrs: BTreeMap<String, String>,
460 #[serde(default)]
461 partition: Option<LegacyPartition>,
462 #[serde(default)]
463 partition_expr: String,
464}
465
466impl<'de> Deserialize<'de> for Region {
467 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
468 where
469 D: Deserializer<'de>,
470 {
471 let de = RegionDe::deserialize(deserializer)?;
472 let partition_expr = if de.partition_expr.is_empty() {
475 if let Some(LegacyPartition { value_list, .. }) = &de.partition {
476 value_list
477 .first()
478 .map(|expr| String::from_utf8_lossy(expr).to_string())
479 .unwrap_or_default()
480 } else {
481 String::new()
482 }
483 } else {
484 de.partition_expr
485 };
486
487 Ok(Self {
488 id: de.id,
489 name: de.name,
490 attrs: de.attrs,
491 partition_expr,
492 })
493 }
494}
495
496impl Region {
497 #[cfg(any(test, feature = "testing"))]
498 pub fn new_test(id: RegionId) -> Self {
499 Self {
500 id,
501 ..Default::default()
502 }
503 }
504
505 pub fn partition_expr(&self) -> String {
507 self.partition_expr.clone()
508 }
509}
510
511#[allow(deprecated)]
513pub fn pb_region_partition_expr(r: &PbRegion) -> String {
514 if let Some(partition) = &r.partition {
515 if !partition.expression.is_empty() {
516 partition.expression.clone()
517 } else if !partition.value_list.is_empty() {
518 String::from_utf8_lossy(&partition.value_list[0]).to_string()
519 } else {
520 "".to_string()
521 }
522 } else {
523 "".to_string()
524 }
525}
526
527impl From<PbRegion> for Region {
528 fn from(r: PbRegion) -> Self {
529 let partition_expr = pb_region_partition_expr(&r);
530 Self {
531 id: r.id.into(),
532 name: r.name,
533 partition_expr,
534 attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
535 }
536 }
537}
538
539impl From<Region> for PbRegion {
540 fn from(region: Region) -> Self {
541 let partition_expr = region.partition_expr();
542 Self {
543 id: region.id.into(),
544 name: region.name,
545 partition: Some(PbPartition {
546 expression: partition_expr,
547 ..Default::default()
548 }),
549 attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
550 }
551 }
552}
553
554#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
555pub struct LegacyPartition {
556 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
557 pub column_list: Vec<Vec<u8>>,
558 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
559 pub value_list: Vec<Vec<u8>>,
560}
561
562fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
563 serializer.serialize_str(
564 String::from_utf8(val.to_vec())
565 .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
566 .as_str(),
567 )
568}
569
570pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
571where
572 D: Deserializer<'de>,
573{
574 let s = String::deserialize(deserializer)?;
575
576 Ok(s.into_bytes())
577}
578
579fn as_utf8_vec<S: Serializer>(
580 val: &[Vec<u8>],
581 serializer: S,
582) -> std::result::Result<S::Ok, S::Error> {
583 let mut seq = serializer.serialize_seq(Some(val.len()))?;
584 for v in val {
585 seq.serialize_element(&String::from_utf8_lossy(v))?;
586 }
587 seq.end()
588}
589
590pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
591where
592 D: Deserializer<'de>,
593{
594 let values = Vec::<String>::deserialize(deserializer)?;
595
596 let values = values
597 .into_iter()
598 .map(|value| value.into_bytes())
599 .collect::<Vec<_>>();
600 Ok(values)
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use crate::key::RegionRoleSet;
607
608 fn new_test_region_route(region_id: RegionId) -> RegionRoute {
609 RegionRoute {
610 region: Region::new_test(region_id),
611 leader_peer: Some(Peer::new(1, "a1")),
612 follower_peers: vec![Peer::new(2, "a2")],
613 leader_state: None,
614 leader_down_since: None,
615 write_route_policy: None,
616 }
617 }
618
619 #[test]
620 fn test_leader_is_downgraded() {
621 let mut region_route = RegionRoute {
622 region: Region {
623 id: 2.into(),
624 name: "r2".to_string(),
625 attrs: BTreeMap::new(),
626 partition_expr: "".to_string(),
627 },
628 leader_peer: Some(Peer::new(1, "a1")),
629 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
630 leader_state: None,
631 leader_down_since: None,
632 write_route_policy: None,
633 };
634
635 assert!(!region_route.is_leader_downgrading());
636
637 region_route.downgrade_leader();
638
639 assert!(region_route.is_leader_downgrading());
640 }
641
642 #[test]
643 fn test_region_route_decode() {
644 let region_route = RegionRoute {
645 region: Region {
646 id: 2.into(),
647 name: "r2".to_string(),
648 attrs: BTreeMap::new(),
649 partition_expr: "".to_string(),
650 },
651 leader_peer: Some(Peer::new(1, "a1")),
652 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
653 leader_state: None,
654 leader_down_since: None,
655 write_route_policy: None,
656 };
657
658 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
659
660 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
661
662 assert_eq!(decoded, region_route);
663 }
664
665 #[test]
666 fn test_region_route_compatibility() {
667 let region_route = RegionRoute {
668 region: Region {
669 id: 2.into(),
670 name: "r2".to_string(),
671 attrs: BTreeMap::new(),
672 partition_expr: "".to_string(),
673 },
674 leader_peer: Some(Peer::new(1, "a1")),
675 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
676 leader_state: Some(LeaderState::Downgrading),
677 leader_down_since: None,
678 write_route_policy: None,
679 };
680 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
681 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
682 assert_eq!(decoded, region_route);
683
684 let region_route = RegionRoute {
685 region: Region {
686 id: 2.into(),
687 name: "r2".to_string(),
688 attrs: BTreeMap::new(),
689 partition_expr: "".to_string(),
690 },
691 leader_peer: Some(Peer::new(1, "a1")),
692 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
693 leader_state: Some(LeaderState::Downgrading),
694 leader_down_since: None,
695 write_route_policy: None,
696 };
697 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
698 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
699 assert_eq!(decoded, region_route);
700
701 let region_route = RegionRoute {
702 region: Region {
703 id: 2.into(),
704 name: "r2".to_string(),
705 attrs: BTreeMap::new(),
706 partition_expr: "".to_string(),
707 },
708 leader_peer: Some(Peer::new(1, "a1")),
709 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
710 leader_state: Some(LeaderState::Downgrading),
711 leader_down_since: None,
712 write_route_policy: None,
713 };
714 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
715 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
716 assert_eq!(decoded, region_route);
717
718 let region_route = RegionRoute {
719 region: Region {
720 id: 2.into(),
721 name: "r2".to_string(),
722 attrs: BTreeMap::new(),
723 partition_expr: "".to_string(),
724 },
725 leader_peer: Some(Peer::new(1, "a1")),
726 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
727 leader_state: Some(LeaderState::Downgrading),
728 leader_down_since: None,
729 write_route_policy: None,
730 };
731 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
732 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
733 assert_eq!(decoded, region_route);
734 }
735
736 #[test]
737 fn test_region_route_write_route_policy_decode_compatibility() {
738 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"}],"write_route_policy":"IgnoreAllWrites"}"#;
739 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
740
741 assert!(decoded.is_ignore_all_writes());
742 }
743
744 #[test]
745 fn test_region_route_write_route_policy_default_not_serialized() {
746 let region_route = RegionRoute {
747 region: Region {
748 id: 2.into(),
749 name: "r2".to_string(),
750 attrs: BTreeMap::new(),
751 partition_expr: "".to_string(),
752 },
753 leader_peer: Some(Peer::new(1, "a1")),
754 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
755 leader_state: None,
756 leader_down_since: None,
757 write_route_policy: None,
758 };
759
760 let encoded = serde_json::to_string(®ion_route).unwrap();
761 assert!(!encoded.contains("write_route_policy"));
762 }
763
764 #[test]
765 fn test_region_route_write_route_policy_helpers() {
766 let mut region_route = RegionRoute {
767 region: Region::new_test(2.into()),
768 leader_peer: Some(Peer::new(1, "a1")),
769 follower_peers: vec![],
770 leader_state: None,
771 leader_down_since: None,
772 write_route_policy: None,
773 };
774
775 assert!(!region_route.is_ignore_all_writes());
776 region_route.set_ignore_all_writes();
777 assert!(region_route.is_ignore_all_writes());
778 region_route.clear_ignore_all_writes();
779 assert!(!region_route.is_ignore_all_writes());
780 }
781
782 #[test]
783 fn test_leader_region_role_without_leader_peer_returns_none() {
784 let region_route = RegionRoute {
785 leader_peer: None,
786 ..new_test_region_route(RegionId::new(1, 1))
787 };
788
789 assert_eq!(region_route.leader_region_role(), None);
790 }
791
792 #[test]
793 fn test_leader_region_role_variants() {
794 let normal = new_test_region_route(RegionId::new(1, 1));
795 let mut downgrading = new_test_region_route(RegionId::new(1, 2));
796 downgrading.leader_state = Some(LeaderState::Downgrading);
797 let mut staging = new_test_region_route(RegionId::new(1, 3));
798 staging.leader_state = Some(LeaderState::Staging);
799
800 assert_eq!(normal.leader_region_role(), Some(RegionRole::Leader));
801 assert_eq!(
802 downgrading.leader_region_role(),
803 Some(RegionRole::DowngradingLeader)
804 );
805 assert_eq!(
806 staging.leader_region_role(),
807 Some(RegionRole::StagingLeader)
808 );
809 }
810
811 #[test]
812 fn test_operating_leader_region_roles_returns_expected_roles() {
813 let no_leader_region = RegionRoute {
814 leader_peer: None,
815 ..new_test_region_route(RegionId::new(1, 4))
816 };
817 let mut downgrading = new_test_region_route(RegionId::new(1, 2));
818 downgrading.leader_peer = Some(Peer::new(2, "a2"));
819 downgrading.leader_state = Some(LeaderState::Downgrading);
820 let mut staging = new_test_region_route(RegionId::new(1, 3));
821 staging.leader_peer = Some(Peer::new(3, "a3"));
822 staging.leader_state = Some(LeaderState::Staging);
823
824 let roles = operating_leader_region_roles(&[
825 new_test_region_route(RegionId::new(1, 1)),
826 downgrading,
827 staging,
828 no_leader_region,
829 ]);
830
831 assert_eq!(
832 roles,
833 vec![
834 (RegionId::new(1, 1), 1, RegionRole::Leader),
835 (RegionId::new(1, 2), 2, RegionRole::DowngradingLeader),
836 (RegionId::new(1, 3), 3, RegionRole::StagingLeader),
837 ]
838 );
839 }
840
841 #[test]
842 fn test_region_distribution() {
843 let region_routes = vec![
844 RegionRoute {
845 region: Region {
846 id: RegionId::new(1, 1),
847 name: "r1".to_string(),
848 attrs: BTreeMap::new(),
849 partition_expr: "".to_string(),
850 },
851 leader_peer: Some(Peer::new(1, "a1")),
852 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
853 leader_state: None,
854 leader_down_since: None,
855 write_route_policy: None,
856 },
857 RegionRoute {
858 region: Region {
859 id: RegionId::new(1, 2),
860 name: "r2".to_string(),
861 attrs: BTreeMap::new(),
862 partition_expr: "".to_string(),
863 },
864 leader_peer: Some(Peer::new(2, "a2")),
865 follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
866 leader_state: None,
867 leader_down_since: None,
868 write_route_policy: None,
869 },
870 ];
871
872 let distribution = region_distribution(®ion_routes);
873 assert_eq!(distribution.len(), 3);
874 assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
875 assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
876 assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
877 }
878
879 #[test]
880 fn test_de_serialize_partition() {
881 let p = LegacyPartition {
882 column_list: vec![b"a".to_vec(), b"b".to_vec()],
883 value_list: vec![b"hi".to_vec(), b",".to_vec()],
884 };
885
886 let output = serde_json::to_string(&p).unwrap();
887 let got: LegacyPartition = serde_json::from_str(&output).unwrap();
888
889 assert_eq!(got, p);
890 }
891
892 #[test]
893 #[allow(deprecated)]
894 fn test_region_partition_expr() {
895 let r = PbRegion {
896 id: 1,
897 name: "r1".to_string(),
898 partition: None,
899 attrs: Default::default(),
900 };
901 assert_eq!(pb_region_partition_expr(&r), "");
902
903 let r2: Region = r.into();
904 assert_eq!(r2.partition_expr(), "");
905
906 let r3: PbRegion = r2.into();
907 assert_eq!(r3.partition.as_ref().unwrap().expression, "");
908
909 let r = PbRegion {
910 id: 1,
911 name: "r1".to_string(),
912 partition: Some(PbPartition {
913 column_list: vec![b"a".to_vec()],
914 value_list: vec![b"{}".to_vec()],
915 expression: Default::default(),
916 }),
917 attrs: Default::default(),
918 };
919 assert_eq!(pb_region_partition_expr(&r), "{}");
920
921 let r2: Region = r.into();
922 assert_eq!(r2.partition_expr(), "{}");
923
924 let r3: PbRegion = r2.into();
925 assert_eq!(r3.partition.as_ref().unwrap().expression, "{}");
926
927 let r = PbRegion {
928 id: 1,
929 name: "r1".to_string(),
930 partition: Some(PbPartition {
931 column_list: vec![b"a".to_vec()],
932 value_list: vec![b"{}".to_vec()],
933 expression: "a>b".to_string(),
934 }),
935 attrs: Default::default(),
936 };
937 assert_eq!(pb_region_partition_expr(&r), "a>b");
938
939 let r2: Region = r.into();
940 assert_eq!(r2.partition_expr(), "a>b");
941
942 let r3: PbRegion = r2.into();
943 assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b");
944 }
945}