1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18 Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19 TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::error::{self, Result};
31use crate::key::RegionDistribution;
32use crate::peer::Peer;
33use crate::DatanodeId;
34
35pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40 let mut regions_id_map = RegionDistribution::new();
41 for route in region_routes.iter() {
42 if let Some(peer) = route.leader_peer.as_ref() {
43 let region_number = route.region.id.region_number();
44 regions_id_map
45 .entry(peer.id)
46 .or_default()
47 .add_leader_region(region_number);
48 }
49 for peer in route.follower_peers.iter() {
50 let region_number = route.region.id.region_number();
51 regions_id_map
52 .entry(peer.id)
53 .or_default()
54 .add_follower_region(region_number);
55 }
56 }
57 for (_, region_role_set) in regions_id_map.iter_mut() {
58 region_role_set.sort()
60 }
61 regions_id_map
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
65pub struct TableRoute {
66 pub table: Table,
67 pub region_routes: Vec<RegionRoute>,
68 region_leaders: HashMap<RegionNumber, Option<Peer>>,
69}
70
71pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
73 region_routes
74 .iter()
75 .flat_map(|x| &x.leader_peer)
76 .cloned()
77 .collect()
78}
79
80pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
82 region_routes
83 .iter()
84 .flat_map(|x| &x.follower_peers)
85 .cloned()
86 .collect()
87}
88
89pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
91 region_routes
92 .iter()
93 .filter_map(|route| {
94 route
95 .leader_peer
96 .as_ref()
97 .map(|leader| (route.region.id, leader.id))
98 })
99 .collect::<Vec<_>>()
100}
101
102pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
106 region_routes
107 .iter()
108 .filter_map(|x| {
109 x.leader_peer
110 .as_ref()
111 .map(|leader| (x.region.id.region_number(), leader))
112 })
113 .collect::<HashMap<_, _>>()
114}
115
116pub fn find_region_leader(
117 region_routes: &[RegionRoute],
118 region_number: RegionNumber,
119) -> Option<Peer> {
120 region_routes
121 .iter()
122 .find(|x| x.region.id.region_number() == region_number)
123 .and_then(|r| r.leader_peer.as_ref())
124 .cloned()
125}
126
127pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
129 region_routes
130 .iter()
131 .filter_map(|x| {
132 if let Some(peer) = &x.leader_peer {
133 if peer == datanode {
134 return Some(x.region.id.region_number());
135 }
136 }
137 None
138 })
139 .collect()
140}
141
142pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144 region_routes
145 .iter()
146 .filter_map(|x| {
147 if x.follower_peers.contains(datanode) {
148 return Some(x.region.id.region_number());
149 }
150 None
151 })
152 .collect()
153}
154
155impl TableRoute {
156 pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
157 let region_leaders = region_routes
158 .iter()
159 .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
160 .collect::<HashMap<_, _>>();
161 Self {
162 table,
163 region_routes,
164 region_leaders,
165 }
166 }
167
168 pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
169 let table = table_route
170 .table
171 .context(error::RouteInfoCorruptedSnafu {
172 err_msg: "'table' is empty in table route",
173 })?
174 .try_into()?;
175
176 let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
177 for region_route in table_route.region_routes.into_iter() {
178 let region = region_route
179 .region
180 .context(error::RouteInfoCorruptedSnafu {
181 err_msg: "'region' is empty in region route",
182 })?
183 .into();
184
185 let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
186
187 let follower_peers = region_route
188 .follower_peer_indexes
189 .into_iter()
190 .filter_map(|x| peers.get(x as usize).cloned())
191 .collect::<Vec<_>>();
192
193 region_routes.push(RegionRoute {
194 region,
195 leader_peer,
196 follower_peers,
197 leader_state: None,
198 leader_down_since: None,
199 });
200 }
201
202 Ok(Self::new(table, region_routes))
203 }
204}
205
206#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
207pub struct Table {
208 pub id: u64,
209 pub table_name: TableName,
210 #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
211 pub table_schema: Vec<u8>,
212}
213
214impl TryFrom<PbTable> for Table {
215 type Error = error::Error;
216
217 fn try_from(t: PbTable) -> Result<Self> {
218 let table_name = t
219 .table_name
220 .context(error::RouteInfoCorruptedSnafu {
221 err_msg: "table name required",
222 })?
223 .into();
224 Ok(Self {
225 id: t.id,
226 table_name,
227 table_schema: t.table_schema,
228 })
229 }
230}
231
232impl From<Table> for PbTable {
233 fn from(table: Table) -> Self {
234 PbTable {
235 id: table.id,
236 table_name: Some(table.table_name.into()),
237 table_schema: table.table_schema,
238 }
239 }
240}
241
242#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
243pub struct RegionRoute {
244 pub region: Region,
245 #[builder(setter(into, strip_option))]
246 pub leader_peer: Option<Peer>,
247 #[builder(setter(into), default)]
248 pub follower_peers: Vec<Peer>,
249 #[builder(setter(into, strip_option), default)]
251 #[serde(
252 default,
253 alias = "leader_status",
254 skip_serializing_if = "Option::is_none"
255 )]
256 pub leader_state: Option<LeaderState>,
257 #[serde(default)]
259 #[builder(default = "self.default_leader_down_since()")]
260 pub leader_down_since: Option<i64>,
261}
262
263impl RegionRouteBuilder {
264 fn default_leader_down_since(&self) -> Option<i64> {
265 match self.leader_state {
266 Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
267 _ => None,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
275#[strum(serialize_all = "UPPERCASE")]
276pub enum LeaderState {
277 #[serde(alias = "Downgraded")]
282 Downgrading,
283 Staging,
288}
289
290impl RegionRoute {
291 pub fn is_leader_downgrading(&self) -> bool {
299 matches!(self.leader_state, Some(LeaderState::Downgrading))
300 }
301
302 pub fn is_leader_staging(&self) -> bool {
304 matches!(self.leader_state, Some(LeaderState::Staging))
305 }
306
307 pub fn downgrade_leader(&mut self) {
319 self.leader_down_since = Some(current_time_millis());
320 self.leader_state = Some(LeaderState::Downgrading)
321 }
322
323 pub fn set_leader_staging(&mut self) {
325 self.leader_state = Some(LeaderState::Staging);
326 self.leader_down_since = None;
328 }
329
330 pub fn clear_leader_staging(&mut self) {
332 if self.leader_state == Some(LeaderState::Staging) {
333 self.leader_state = None;
334 self.leader_down_since = None;
335 }
336 }
337
338 pub fn leader_down_millis(&self) -> Option<i64> {
340 self.leader_down_since
341 .map(|start| current_time_millis() - start)
342 }
343
344 pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
348 let updated = self.leader_state != state;
349
350 match (state, updated) {
351 (Some(LeaderState::Downgrading), true) => {
352 self.leader_down_since = Some(current_time_millis());
353 }
354 (Some(LeaderState::Downgrading), false) => {
355 }
357 _ => {
358 self.leader_down_since = None;
359 }
360 }
361
362 self.leader_state = state;
363 updated
364 }
365}
366
367pub struct RegionRoutes(pub Vec<RegionRoute>);
368
369impl RegionRoutes {
370 pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
371 convert_to_region_leader_map(&self.0)
372 }
373
374 pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
375 self.region_leader_map().get(®ion_number).copied()
376 }
377}
378
379#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
380pub struct Region {
381 pub id: RegionId,
382 pub name: String,
383 pub attrs: BTreeMap<String, String>,
384
385 pub partition: Option<LegacyPartition>,
387 #[serde(default)]
389 pub partition_expr: String,
390}
391
392impl Region {
393 #[cfg(any(test, feature = "testing"))]
394 pub fn new_test(id: RegionId) -> Self {
395 Self {
396 id,
397 ..Default::default()
398 }
399 }
400
401 pub fn partition_expr(&self) -> String {
403 if !self.partition_expr.is_empty() {
404 self.partition_expr.clone()
405 } else if let Some(LegacyPartition { value_list, .. }) = &self.partition {
406 if !value_list.is_empty() {
407 String::from_utf8_lossy(&value_list[0]).to_string()
408 } else {
409 "".to_string()
410 }
411 } else {
412 "".to_string()
413 }
414 }
415}
416
417#[allow(deprecated)]
419pub fn pb_region_partition_expr(r: &PbRegion) -> String {
420 if let Some(partition) = &r.partition {
421 if !partition.expression.is_empty() {
422 partition.expression.clone()
423 } else if !partition.value_list.is_empty() {
424 String::from_utf8_lossy(&partition.value_list[0]).to_string()
425 } else {
426 "".to_string()
427 }
428 } else {
429 "".to_string()
430 }
431}
432
433impl From<PbRegion> for Region {
434 fn from(r: PbRegion) -> Self {
435 let partition_expr = pb_region_partition_expr(&r);
436 Self {
437 id: r.id.into(),
438 name: r.name,
439 partition: None,
440 partition_expr,
441 attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
442 }
443 }
444}
445
446impl From<Region> for PbRegion {
447 fn from(region: Region) -> Self {
448 let partition_expr = region.partition_expr();
449 Self {
450 id: region.id.into(),
451 name: region.name,
452 partition: Some(PbPartition {
453 expression: partition_expr,
454 ..Default::default()
455 }),
456 attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
457 }
458 }
459}
460
461#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
462pub struct LegacyPartition {
463 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
464 pub column_list: Vec<Vec<u8>>,
465 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
466 pub value_list: Vec<Vec<u8>>,
467}
468
469fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
470 serializer.serialize_str(
471 String::from_utf8(val.to_vec())
472 .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
473 .as_str(),
474 )
475}
476
477pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
478where
479 D: Deserializer<'de>,
480{
481 let s = String::deserialize(deserializer)?;
482
483 Ok(s.into_bytes())
484}
485
486fn as_utf8_vec<S: Serializer>(
487 val: &[Vec<u8>],
488 serializer: S,
489) -> std::result::Result<S::Ok, S::Error> {
490 let mut seq = serializer.serialize_seq(Some(val.len()))?;
491 for v in val {
492 seq.serialize_element(&String::from_utf8_lossy(v))?;
493 }
494 seq.end()
495}
496
497pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
498where
499 D: Deserializer<'de>,
500{
501 let values = Vec::<String>::deserialize(deserializer)?;
502
503 let values = values
504 .into_iter()
505 .map(|value| value.into_bytes())
506 .collect::<Vec<_>>();
507 Ok(values)
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::key::RegionRoleSet;
514
515 #[test]
516 fn test_leader_is_downgraded() {
517 let mut region_route = RegionRoute {
518 region: Region {
519 id: 2.into(),
520 name: "r2".to_string(),
521 attrs: BTreeMap::new(),
522 partition: None,
523 partition_expr: "".to_string(),
524 },
525 leader_peer: Some(Peer::new(1, "a1")),
526 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
527 leader_state: None,
528 leader_down_since: None,
529 };
530
531 assert!(!region_route.is_leader_downgrading());
532
533 region_route.downgrade_leader();
534
535 assert!(region_route.is_leader_downgrading());
536 }
537
538 #[test]
539 fn test_region_route_decode() {
540 let region_route = RegionRoute {
541 region: Region {
542 id: 2.into(),
543 name: "r2".to_string(),
544 attrs: BTreeMap::new(),
545 partition: None,
546 partition_expr: "".to_string(),
547 },
548 leader_peer: Some(Peer::new(1, "a1")),
549 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
550 leader_state: None,
551 leader_down_since: None,
552 };
553
554 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
555
556 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
557
558 assert_eq!(decoded, region_route);
559 }
560
561 #[test]
562 fn test_region_route_compatibility() {
563 let region_route = RegionRoute {
564 region: Region {
565 id: 2.into(),
566 name: "r2".to_string(),
567 attrs: BTreeMap::new(),
568 partition: None,
569 partition_expr: "".to_string(),
570 },
571 leader_peer: Some(Peer::new(1, "a1")),
572 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
573 leader_state: Some(LeaderState::Downgrading),
574 leader_down_since: None,
575 };
576 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
577 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
578 assert_eq!(decoded, region_route);
579
580 let region_route = RegionRoute {
581 region: Region {
582 id: 2.into(),
583 name: "r2".to_string(),
584 attrs: BTreeMap::new(),
585 partition: None,
586 partition_expr: "".to_string(),
587 },
588 leader_peer: Some(Peer::new(1, "a1")),
589 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
590 leader_state: Some(LeaderState::Downgrading),
591 leader_down_since: None,
592 };
593 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
594 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
595 assert_eq!(decoded, region_route);
596
597 let region_route = RegionRoute {
598 region: Region {
599 id: 2.into(),
600 name: "r2".to_string(),
601 attrs: BTreeMap::new(),
602 partition: None,
603 partition_expr: "".to_string(),
604 },
605 leader_peer: Some(Peer::new(1, "a1")),
606 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
607 leader_state: Some(LeaderState::Downgrading),
608 leader_down_since: None,
609 };
610 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
611 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
612 assert_eq!(decoded, region_route);
613
614 let region_route = RegionRoute {
615 region: Region {
616 id: 2.into(),
617 name: "r2".to_string(),
618 attrs: BTreeMap::new(),
619 partition: None,
620 partition_expr: "".to_string(),
621 },
622 leader_peer: Some(Peer::new(1, "a1")),
623 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
624 leader_state: Some(LeaderState::Downgrading),
625 leader_down_since: None,
626 };
627 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
628 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
629 assert_eq!(decoded, region_route);
630 }
631
632 #[test]
633 fn test_region_distribution() {
634 let region_routes = vec![
635 RegionRoute {
636 region: Region {
637 id: RegionId::new(1, 1),
638 name: "r1".to_string(),
639 attrs: BTreeMap::new(),
640 partition: None,
641 partition_expr: "".to_string(),
642 },
643 leader_peer: Some(Peer::new(1, "a1")),
644 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
645 leader_state: None,
646 leader_down_since: None,
647 },
648 RegionRoute {
649 region: Region {
650 id: RegionId::new(1, 2),
651 name: "r2".to_string(),
652 attrs: BTreeMap::new(),
653 partition: None,
654 partition_expr: "".to_string(),
655 },
656 leader_peer: Some(Peer::new(2, "a2")),
657 follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
658 leader_state: None,
659 leader_down_since: None,
660 },
661 ];
662
663 let distribution = region_distribution(®ion_routes);
664 assert_eq!(distribution.len(), 3);
665 assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
666 assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
667 assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
668 }
669
670 #[test]
671 fn test_de_serialize_partition() {
672 let p = LegacyPartition {
673 column_list: vec![b"a".to_vec(), b"b".to_vec()],
674 value_list: vec![b"hi".to_vec(), b",".to_vec()],
675 };
676
677 let output = serde_json::to_string(&p).unwrap();
678 let got: LegacyPartition = serde_json::from_str(&output).unwrap();
679
680 assert_eq!(got, p);
681 }
682
683 #[test]
684 #[allow(deprecated)]
685 fn test_region_partition_expr() {
686 let r = PbRegion {
687 id: 1,
688 name: "r1".to_string(),
689 partition: None,
690 attrs: Default::default(),
691 };
692 assert_eq!(pb_region_partition_expr(&r), "");
693
694 let r2: Region = r.into();
695 assert_eq!(r2.partition_expr(), "");
696 assert!(r2.partition.is_none());
697
698 let r3: PbRegion = r2.into();
699 assert_eq!(r3.partition.as_ref().unwrap().expression, "");
700
701 let r = PbRegion {
702 id: 1,
703 name: "r1".to_string(),
704 partition: Some(PbPartition {
705 column_list: vec![b"a".to_vec()],
706 value_list: vec![b"{}".to_vec()],
707 expression: Default::default(),
708 }),
709 attrs: Default::default(),
710 };
711 assert_eq!(pb_region_partition_expr(&r), "{}");
712
713 let r2: Region = r.into();
714 assert_eq!(r2.partition_expr(), "{}");
715 assert!(r2.partition.is_none());
716
717 let r3: PbRegion = r2.into();
718 assert_eq!(r3.partition.as_ref().unwrap().expression, "{}");
719
720 let r = PbRegion {
721 id: 1,
722 name: "r1".to_string(),
723 partition: Some(PbPartition {
724 column_list: vec![b"a".to_vec()],
725 value_list: vec![b"{}".to_vec()],
726 expression: "a>b".to_string(),
727 }),
728 attrs: Default::default(),
729 };
730 assert_eq!(pb_region_partition_expr(&r), "a>b");
731
732 let r2: Region = r.into();
733 assert_eq!(r2.partition_expr(), "a>b");
734 assert!(r2.partition.is_none());
735
736 let r3: PbRegion = r2.into();
737 assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b");
738 }
739}