1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18 Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19 TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::error::{self, Result};
31use crate::key::RegionDistribution;
32use crate::peer::Peer;
33use crate::DatanodeId;
34
35pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40 let mut regions_id_map = RegionDistribution::new();
41 for route in region_routes.iter() {
42 if let Some(peer) = route.leader_peer.as_ref() {
43 let region_number = route.region.id.region_number();
44 regions_id_map
45 .entry(peer.id)
46 .or_default()
47 .add_leader_region(region_number);
48 }
49 for peer in route.follower_peers.iter() {
50 let region_number = route.region.id.region_number();
51 regions_id_map
52 .entry(peer.id)
53 .or_default()
54 .add_follower_region(region_number);
55 }
56 }
57 for (_, region_role_set) in regions_id_map.iter_mut() {
58 region_role_set.sort()
60 }
61 regions_id_map
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
65pub struct TableRoute {
66 pub table: Table,
67 pub region_routes: Vec<RegionRoute>,
68 region_leaders: HashMap<RegionNumber, Option<Peer>>,
69}
70
71pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
73 region_routes
74 .iter()
75 .flat_map(|x| &x.leader_peer)
76 .cloned()
77 .collect()
78}
79
80pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
82 region_routes
83 .iter()
84 .flat_map(|x| &x.follower_peers)
85 .cloned()
86 .collect()
87}
88
89pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
91 region_routes
92 .iter()
93 .filter_map(|route| {
94 route
95 .leader_peer
96 .as_ref()
97 .map(|leader| (route.region.id, leader.id))
98 })
99 .collect::<Vec<_>>()
100}
101
102pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
106 region_routes
107 .iter()
108 .filter_map(|x| {
109 x.leader_peer
110 .as_ref()
111 .map(|leader| (x.region.id.region_number(), leader))
112 })
113 .collect::<HashMap<_, _>>()
114}
115
116pub fn find_region_leader(
117 region_routes: &[RegionRoute],
118 region_number: RegionNumber,
119) -> Option<Peer> {
120 region_routes
121 .iter()
122 .find(|x| x.region.id.region_number() == region_number)
123 .and_then(|r| r.leader_peer.as_ref())
124 .cloned()
125}
126
127pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
129 region_routes
130 .iter()
131 .filter_map(|x| {
132 if let Some(peer) = &x.leader_peer {
133 if peer == datanode {
134 return Some(x.region.id.region_number());
135 }
136 }
137 None
138 })
139 .collect()
140}
141
142pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144 region_routes
145 .iter()
146 .filter_map(|x| {
147 if x.follower_peers.contains(datanode) {
148 return Some(x.region.id.region_number());
149 }
150 None
151 })
152 .collect()
153}
154
155impl TableRoute {
156 pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
157 let region_leaders = region_routes
158 .iter()
159 .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
160 .collect::<HashMap<_, _>>();
161 Self {
162 table,
163 region_routes,
164 region_leaders,
165 }
166 }
167
168 pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
169 let table = table_route
170 .table
171 .context(error::RouteInfoCorruptedSnafu {
172 err_msg: "'table' is empty in table route",
173 })?
174 .try_into()?;
175
176 let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
177 for region_route in table_route.region_routes.into_iter() {
178 let region = region_route
179 .region
180 .context(error::RouteInfoCorruptedSnafu {
181 err_msg: "'region' is empty in region route",
182 })?
183 .into();
184
185 let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
186
187 let follower_peers = region_route
188 .follower_peer_indexes
189 .into_iter()
190 .filter_map(|x| peers.get(x as usize).cloned())
191 .collect::<Vec<_>>();
192
193 region_routes.push(RegionRoute {
194 region,
195 leader_peer,
196 follower_peers,
197 leader_state: None,
198 leader_down_since: None,
199 });
200 }
201
202 Ok(Self::new(table, region_routes))
203 }
204}
205
206#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
207pub struct Table {
208 pub id: u64,
209 pub table_name: TableName,
210 #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
211 pub table_schema: Vec<u8>,
212}
213
214impl TryFrom<PbTable> for Table {
215 type Error = error::Error;
216
217 fn try_from(t: PbTable) -> Result<Self> {
218 let table_name = t
219 .table_name
220 .context(error::RouteInfoCorruptedSnafu {
221 err_msg: "table name required",
222 })?
223 .into();
224 Ok(Self {
225 id: t.id,
226 table_name,
227 table_schema: t.table_schema,
228 })
229 }
230}
231
232impl From<Table> for PbTable {
233 fn from(table: Table) -> Self {
234 PbTable {
235 id: table.id,
236 table_name: Some(table.table_name.into()),
237 table_schema: table.table_schema,
238 }
239 }
240}
241
242#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
243pub struct RegionRoute {
244 pub region: Region,
245 #[builder(setter(into, strip_option))]
246 pub leader_peer: Option<Peer>,
247 #[builder(setter(into), default)]
248 pub follower_peers: Vec<Peer>,
249 #[builder(setter(into, strip_option), default)]
251 #[serde(
252 default,
253 alias = "leader_status",
254 skip_serializing_if = "Option::is_none"
255 )]
256 pub leader_state: Option<LeaderState>,
257 #[serde(default)]
259 #[builder(default = "self.default_leader_down_since()")]
260 pub leader_down_since: Option<i64>,
261}
262
263impl RegionRouteBuilder {
264 fn default_leader_down_since(&self) -> Option<i64> {
265 match self.leader_state {
266 Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
267 _ => None,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
275#[strum(serialize_all = "UPPERCASE")]
276pub enum LeaderState {
277 #[serde(alias = "Downgraded")]
282 Downgrading,
283}
284
285impl RegionRoute {
286 pub fn is_leader_downgrading(&self) -> bool {
294 matches!(self.leader_state, Some(LeaderState::Downgrading))
295 }
296
297 pub fn downgrade_leader(&mut self) {
309 self.leader_down_since = Some(current_time_millis());
310 self.leader_state = Some(LeaderState::Downgrading)
311 }
312
313 pub fn leader_down_millis(&self) -> Option<i64> {
315 self.leader_down_since
316 .map(|start| current_time_millis() - start)
317 }
318
319 pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
323 let updated = self.leader_state != state;
324
325 match (state, updated) {
326 (Some(LeaderState::Downgrading), true) => {
327 self.leader_down_since = Some(current_time_millis());
328 }
329 (Some(LeaderState::Downgrading), false) => {
330 }
332 _ => {
333 self.leader_down_since = None;
334 }
335 }
336
337 self.leader_state = state;
338 updated
339 }
340}
341
342pub struct RegionRoutes(pub Vec<RegionRoute>);
343
344impl RegionRoutes {
345 pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
346 convert_to_region_leader_map(&self.0)
347 }
348
349 pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
350 self.region_leader_map().get(®ion_number).copied()
351 }
352}
353
354#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
355pub struct Region {
356 pub id: RegionId,
357 pub name: String,
358 pub attrs: BTreeMap<String, String>,
359
360 pub partition: Option<LegacyPartition>,
362 #[serde(default)]
364 pub partition_expr: String,
365}
366
367impl Region {
368 #[cfg(any(test, feature = "testing"))]
369 pub fn new_test(id: RegionId) -> Self {
370 Self {
371 id,
372 ..Default::default()
373 }
374 }
375
376 pub fn partition_expr(&self) -> String {
378 if !self.partition_expr.is_empty() {
379 self.partition_expr.clone()
380 } else if let Some(LegacyPartition { value_list, .. }) = &self.partition {
381 if !value_list.is_empty() {
382 String::from_utf8_lossy(&value_list[0]).to_string()
383 } else {
384 "".to_string()
385 }
386 } else {
387 "".to_string()
388 }
389 }
390}
391
392#[allow(deprecated)]
394pub fn pb_region_partition_expr(r: &PbRegion) -> String {
395 if let Some(partition) = &r.partition {
396 if !partition.expression.is_empty() {
397 partition.expression.clone()
398 } else if !partition.value_list.is_empty() {
399 String::from_utf8_lossy(&partition.value_list[0]).to_string()
400 } else {
401 "".to_string()
402 }
403 } else {
404 "".to_string()
405 }
406}
407
408impl From<PbRegion> for Region {
409 fn from(r: PbRegion) -> Self {
410 let partition_expr = pb_region_partition_expr(&r);
411 Self {
412 id: r.id.into(),
413 name: r.name,
414 partition: None,
415 partition_expr,
416 attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
417 }
418 }
419}
420
421impl From<Region> for PbRegion {
422 fn from(region: Region) -> Self {
423 let partition_expr = region.partition_expr();
424 Self {
425 id: region.id.into(),
426 name: region.name,
427 partition: Some(PbPartition {
428 expression: partition_expr,
429 ..Default::default()
430 }),
431 attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
432 }
433 }
434}
435
436#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
437pub struct LegacyPartition {
438 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
439 pub column_list: Vec<Vec<u8>>,
440 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
441 pub value_list: Vec<Vec<u8>>,
442}
443
444fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
445 serializer.serialize_str(
446 String::from_utf8(val.to_vec())
447 .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
448 .as_str(),
449 )
450}
451
452pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
453where
454 D: Deserializer<'de>,
455{
456 let s = String::deserialize(deserializer)?;
457
458 Ok(s.into_bytes())
459}
460
461fn as_utf8_vec<S: Serializer>(
462 val: &[Vec<u8>],
463 serializer: S,
464) -> std::result::Result<S::Ok, S::Error> {
465 let mut seq = serializer.serialize_seq(Some(val.len()))?;
466 for v in val {
467 seq.serialize_element(&String::from_utf8_lossy(v))?;
468 }
469 seq.end()
470}
471
472pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
473where
474 D: Deserializer<'de>,
475{
476 let values = Vec::<String>::deserialize(deserializer)?;
477
478 let values = values
479 .into_iter()
480 .map(|value| value.into_bytes())
481 .collect::<Vec<_>>();
482 Ok(values)
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use crate::key::RegionRoleSet;
489
490 #[test]
491 fn test_leader_is_downgraded() {
492 let mut region_route = RegionRoute {
493 region: Region {
494 id: 2.into(),
495 name: "r2".to_string(),
496 attrs: BTreeMap::new(),
497 partition: None,
498 partition_expr: "".to_string(),
499 },
500 leader_peer: Some(Peer::new(1, "a1")),
501 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
502 leader_state: None,
503 leader_down_since: None,
504 };
505
506 assert!(!region_route.is_leader_downgrading());
507
508 region_route.downgrade_leader();
509
510 assert!(region_route.is_leader_downgrading());
511 }
512
513 #[test]
514 fn test_region_route_decode() {
515 let region_route = RegionRoute {
516 region: Region {
517 id: 2.into(),
518 name: "r2".to_string(),
519 attrs: BTreeMap::new(),
520 partition: None,
521 partition_expr: "".to_string(),
522 },
523 leader_peer: Some(Peer::new(1, "a1")),
524 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
525 leader_state: None,
526 leader_down_since: None,
527 };
528
529 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
530
531 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
532
533 assert_eq!(decoded, region_route);
534 }
535
536 #[test]
537 fn test_region_route_compatibility() {
538 let region_route = RegionRoute {
539 region: Region {
540 id: 2.into(),
541 name: "r2".to_string(),
542 attrs: BTreeMap::new(),
543 partition: None,
544 partition_expr: "".to_string(),
545 },
546 leader_peer: Some(Peer::new(1, "a1")),
547 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
548 leader_state: Some(LeaderState::Downgrading),
549 leader_down_since: None,
550 };
551 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
552 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
553 assert_eq!(decoded, region_route);
554
555 let region_route = RegionRoute {
556 region: Region {
557 id: 2.into(),
558 name: "r2".to_string(),
559 attrs: BTreeMap::new(),
560 partition: None,
561 partition_expr: "".to_string(),
562 },
563 leader_peer: Some(Peer::new(1, "a1")),
564 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
565 leader_state: Some(LeaderState::Downgrading),
566 leader_down_since: None,
567 };
568 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
569 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
570 assert_eq!(decoded, region_route);
571
572 let region_route = RegionRoute {
573 region: Region {
574 id: 2.into(),
575 name: "r2".to_string(),
576 attrs: BTreeMap::new(),
577 partition: None,
578 partition_expr: "".to_string(),
579 },
580 leader_peer: Some(Peer::new(1, "a1")),
581 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
582 leader_state: Some(LeaderState::Downgrading),
583 leader_down_since: None,
584 };
585 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
586 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
587 assert_eq!(decoded, region_route);
588
589 let region_route = RegionRoute {
590 region: Region {
591 id: 2.into(),
592 name: "r2".to_string(),
593 attrs: BTreeMap::new(),
594 partition: None,
595 partition_expr: "".to_string(),
596 },
597 leader_peer: Some(Peer::new(1, "a1")),
598 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
599 leader_state: Some(LeaderState::Downgrading),
600 leader_down_since: None,
601 };
602 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
603 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
604 assert_eq!(decoded, region_route);
605 }
606
607 #[test]
608 fn test_region_distribution() {
609 let region_routes = vec![
610 RegionRoute {
611 region: Region {
612 id: RegionId::new(1, 1),
613 name: "r1".to_string(),
614 attrs: BTreeMap::new(),
615 partition: None,
616 partition_expr: "".to_string(),
617 },
618 leader_peer: Some(Peer::new(1, "a1")),
619 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
620 leader_state: None,
621 leader_down_since: None,
622 },
623 RegionRoute {
624 region: Region {
625 id: RegionId::new(1, 2),
626 name: "r2".to_string(),
627 attrs: BTreeMap::new(),
628 partition: None,
629 partition_expr: "".to_string(),
630 },
631 leader_peer: Some(Peer::new(2, "a2")),
632 follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
633 leader_state: None,
634 leader_down_since: None,
635 },
636 ];
637
638 let distribution = region_distribution(®ion_routes);
639 assert_eq!(distribution.len(), 3);
640 assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
641 assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
642 assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
643 }
644
645 #[test]
646 fn test_de_serialize_partition() {
647 let p = LegacyPartition {
648 column_list: vec![b"a".to_vec(), b"b".to_vec()],
649 value_list: vec![b"hi".to_vec(), b",".to_vec()],
650 };
651
652 let output = serde_json::to_string(&p).unwrap();
653 let got: LegacyPartition = serde_json::from_str(&output).unwrap();
654
655 assert_eq!(got, p);
656 }
657
658 #[test]
659 #[allow(deprecated)]
660 fn test_region_partition_expr() {
661 let r = PbRegion {
662 id: 1,
663 name: "r1".to_string(),
664 partition: None,
665 attrs: Default::default(),
666 };
667 assert_eq!(pb_region_partition_expr(&r), "");
668
669 let r2: Region = r.into();
670 assert_eq!(r2.partition_expr(), "");
671 assert!(r2.partition.is_none());
672
673 let r3: PbRegion = r2.into();
674 assert_eq!(r3.partition.as_ref().unwrap().expression, "");
675
676 let r = PbRegion {
677 id: 1,
678 name: "r1".to_string(),
679 partition: Some(PbPartition {
680 column_list: vec![b"a".to_vec()],
681 value_list: vec![b"{}".to_vec()],
682 expression: Default::default(),
683 }),
684 attrs: Default::default(),
685 };
686 assert_eq!(pb_region_partition_expr(&r), "{}");
687
688 let r2: Region = r.into();
689 assert_eq!(r2.partition_expr(), "{}");
690 assert!(r2.partition.is_none());
691
692 let r3: PbRegion = r2.into();
693 assert_eq!(r3.partition.as_ref().unwrap().expression, "{}");
694
695 let r = PbRegion {
696 id: 1,
697 name: "r1".to_string(),
698 partition: Some(PbPartition {
699 column_list: vec![b"a".to_vec()],
700 value_list: vec![b"{}".to_vec()],
701 expression: "a>b".to_string(),
702 }),
703 attrs: Default::default(),
704 };
705 assert_eq!(pb_region_partition_expr(&r), "a>b");
706
707 let r2: Region = r.into();
708 assert_eq!(r2.partition_expr(), "a>b");
709 assert!(r2.partition.is_none());
710
711 let r3: PbRegion = r2.into();
712 assert_eq!(r3.partition.as_ref().unwrap().expression, "a>b");
713 }
714}