1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18 Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19 TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::error::{self, Result};
31use crate::key::RegionDistribution;
32use crate::peer::Peer;
33use crate::DatanodeId;
34
35pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40 let mut regions_id_map = RegionDistribution::new();
41 for route in region_routes.iter() {
42 if let Some(peer) = route.leader_peer.as_ref() {
43 let region_number = route.region.id.region_number();
44 regions_id_map
45 .entry(peer.id)
46 .or_default()
47 .add_leader_region(region_number);
48 }
49 for peer in route.follower_peers.iter() {
50 let region_number = route.region.id.region_number();
51 regions_id_map
52 .entry(peer.id)
53 .or_default()
54 .add_follower_region(region_number);
55 }
56 }
57 for (_, region_role_set) in regions_id_map.iter_mut() {
58 region_role_set.sort()
60 }
61 regions_id_map
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
65pub struct TableRoute {
66 pub table: Table,
67 pub region_routes: Vec<RegionRoute>,
68 region_leaders: HashMap<RegionNumber, Option<Peer>>,
69}
70
71pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
73 region_routes
74 .iter()
75 .flat_map(|x| &x.leader_peer)
76 .cloned()
77 .collect()
78}
79
80pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
82 region_routes
83 .iter()
84 .flat_map(|x| &x.follower_peers)
85 .cloned()
86 .collect()
87}
88
89pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
91 region_routes
92 .iter()
93 .filter_map(|route| {
94 route
95 .leader_peer
96 .as_ref()
97 .map(|leader| (route.region.id, leader.id))
98 })
99 .collect::<Vec<_>>()
100}
101
102pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
106 region_routes
107 .iter()
108 .filter_map(|x| {
109 x.leader_peer
110 .as_ref()
111 .map(|leader| (x.region.id.region_number(), leader))
112 })
113 .collect::<HashMap<_, _>>()
114}
115
116pub fn find_region_leader(
117 region_routes: &[RegionRoute],
118 region_number: RegionNumber,
119) -> Option<Peer> {
120 region_routes
121 .iter()
122 .find(|x| x.region.id.region_number() == region_number)
123 .and_then(|r| r.leader_peer.as_ref())
124 .cloned()
125}
126
127pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
129 region_routes
130 .iter()
131 .filter_map(|x| {
132 if let Some(peer) = &x.leader_peer {
133 if peer == datanode {
134 return Some(x.region.id.region_number());
135 }
136 }
137 None
138 })
139 .collect()
140}
141
142pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
144 region_routes
145 .iter()
146 .filter_map(|x| {
147 if x.follower_peers.contains(datanode) {
148 return Some(x.region.id.region_number());
149 }
150 None
151 })
152 .collect()
153}
154
155impl TableRoute {
156 pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
157 let region_leaders = region_routes
158 .iter()
159 .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
160 .collect::<HashMap<_, _>>();
161 Self {
162 table,
163 region_routes,
164 region_leaders,
165 }
166 }
167
168 pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
169 let table = table_route
170 .table
171 .context(error::RouteInfoCorruptedSnafu {
172 err_msg: "'table' is empty in table route",
173 })?
174 .try_into()?;
175
176 let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
177 for region_route in table_route.region_routes.into_iter() {
178 let region = region_route
179 .region
180 .context(error::RouteInfoCorruptedSnafu {
181 err_msg: "'region' is empty in region route",
182 })?
183 .into();
184
185 let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
186
187 let follower_peers = region_route
188 .follower_peer_indexes
189 .into_iter()
190 .filter_map(|x| peers.get(x as usize).cloned())
191 .collect::<Vec<_>>();
192
193 region_routes.push(RegionRoute {
194 region,
195 leader_peer,
196 follower_peers,
197 leader_state: None,
198 leader_down_since: None,
199 });
200 }
201
202 Ok(Self::new(table, region_routes))
203 }
204}
205
206#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
207pub struct Table {
208 pub id: u64,
209 pub table_name: TableName,
210 #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
211 pub table_schema: Vec<u8>,
212}
213
214impl TryFrom<PbTable> for Table {
215 type Error = error::Error;
216
217 fn try_from(t: PbTable) -> Result<Self> {
218 let table_name = t
219 .table_name
220 .context(error::RouteInfoCorruptedSnafu {
221 err_msg: "table name required",
222 })?
223 .into();
224 Ok(Self {
225 id: t.id,
226 table_name,
227 table_schema: t.table_schema,
228 })
229 }
230}
231
232impl From<Table> for PbTable {
233 fn from(table: Table) -> Self {
234 PbTable {
235 id: table.id,
236 table_name: Some(table.table_name.into()),
237 table_schema: table.table_schema,
238 }
239 }
240}
241
242#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
243pub struct RegionRoute {
244 pub region: Region,
245 #[builder(setter(into, strip_option))]
246 pub leader_peer: Option<Peer>,
247 #[builder(setter(into), default)]
248 pub follower_peers: Vec<Peer>,
249 #[builder(setter(into, strip_option), default)]
251 #[serde(
252 default,
253 alias = "leader_status",
254 skip_serializing_if = "Option::is_none"
255 )]
256 pub leader_state: Option<LeaderState>,
257 #[serde(default)]
259 #[builder(default = "self.default_leader_down_since()")]
260 pub leader_down_since: Option<i64>,
261}
262
263impl RegionRouteBuilder {
264 fn default_leader_down_since(&self) -> Option<i64> {
265 match self.leader_state {
266 Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
267 _ => None,
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
275#[strum(serialize_all = "UPPERCASE")]
276pub enum LeaderState {
277 #[serde(alias = "Downgraded")]
282 Downgrading,
283}
284
285impl RegionRoute {
286 pub fn is_leader_downgrading(&self) -> bool {
294 matches!(self.leader_state, Some(LeaderState::Downgrading))
295 }
296
297 pub fn downgrade_leader(&mut self) {
309 self.leader_down_since = Some(current_time_millis());
310 self.leader_state = Some(LeaderState::Downgrading)
311 }
312
313 pub fn leader_down_millis(&self) -> Option<i64> {
315 self.leader_down_since
316 .map(|start| current_time_millis() - start)
317 }
318
319 pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
323 let updated = self.leader_state != state;
324
325 match (state, updated) {
326 (Some(LeaderState::Downgrading), true) => {
327 self.leader_down_since = Some(current_time_millis());
328 }
329 (Some(LeaderState::Downgrading), false) => {
330 }
332 _ => {
333 self.leader_down_since = None;
334 }
335 }
336
337 self.leader_state = state;
338 updated
339 }
340}
341
342pub struct RegionRoutes(pub Vec<RegionRoute>);
343
344impl RegionRoutes {
345 pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
346 convert_to_region_leader_map(&self.0)
347 }
348
349 pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
350 self.region_leader_map().get(®ion_number).copied()
351 }
352}
353
354#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
355pub struct Region {
356 pub id: RegionId,
357 pub name: String,
358 pub partition: Option<Partition>,
359 pub attrs: BTreeMap<String, String>,
360}
361
362impl Region {
363 #[cfg(any(test, feature = "testing"))]
364 pub fn new_test(id: RegionId) -> Self {
365 Self {
366 id,
367 ..Default::default()
368 }
369 }
370}
371
372impl From<PbRegion> for Region {
373 fn from(r: PbRegion) -> Self {
374 Self {
375 id: r.id.into(),
376 name: r.name,
377 partition: r.partition.map(Into::into),
378 attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
379 }
380 }
381}
382
383impl From<Region> for PbRegion {
384 fn from(region: Region) -> Self {
385 Self {
386 id: region.id.into(),
387 name: region.name,
388 partition: region.partition.map(Into::into),
389 attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
390 }
391 }
392}
393
394#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
395pub struct Partition {
396 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
397 pub column_list: Vec<Vec<u8>>,
398 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
399 pub value_list: Vec<Vec<u8>>,
400}
401
402fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
403 serializer.serialize_str(
404 String::from_utf8(val.to_vec())
405 .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
406 .as_str(),
407 )
408}
409
410pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
411where
412 D: Deserializer<'de>,
413{
414 let s = String::deserialize(deserializer)?;
415
416 Ok(s.into_bytes())
417}
418
419fn as_utf8_vec<S: Serializer>(
420 val: &[Vec<u8>],
421 serializer: S,
422) -> std::result::Result<S::Ok, S::Error> {
423 let mut seq = serializer.serialize_seq(Some(val.len()))?;
424 for v in val {
425 seq.serialize_element(&String::from_utf8_lossy(v))?;
426 }
427 seq.end()
428}
429
430pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
431where
432 D: Deserializer<'de>,
433{
434 let values = Vec::<String>::deserialize(deserializer)?;
435
436 let values = values
437 .into_iter()
438 .map(|value| value.into_bytes())
439 .collect::<Vec<_>>();
440 Ok(values)
441}
442
443impl From<Partition> for PbPartition {
444 fn from(p: Partition) -> Self {
445 Self {
446 column_list: p.column_list,
447 value_list: p.value_list,
448 }
449 }
450}
451
452impl From<PbPartition> for Partition {
453 fn from(p: PbPartition) -> Self {
454 Self {
455 column_list: p.column_list,
456 value_list: p.value_list,
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use crate::key::RegionRoleSet;
465
466 #[test]
467 fn test_leader_is_downgraded() {
468 let mut region_route = RegionRoute {
469 region: Region {
470 id: 2.into(),
471 name: "r2".to_string(),
472 partition: None,
473 attrs: BTreeMap::new(),
474 },
475 leader_peer: Some(Peer::new(1, "a1")),
476 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
477 leader_state: None,
478 leader_down_since: None,
479 };
480
481 assert!(!region_route.is_leader_downgrading());
482
483 region_route.downgrade_leader();
484
485 assert!(region_route.is_leader_downgrading());
486 }
487
488 #[test]
489 fn test_region_route_decode() {
490 let region_route = RegionRoute {
491 region: Region {
492 id: 2.into(),
493 name: "r2".to_string(),
494 partition: None,
495 attrs: BTreeMap::new(),
496 },
497 leader_peer: Some(Peer::new(1, "a1")),
498 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
499 leader_state: None,
500 leader_down_since: None,
501 };
502
503 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
504
505 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
506
507 assert_eq!(decoded, region_route);
508 }
509
510 #[test]
511 fn test_region_route_compatibility() {
512 let region_route = RegionRoute {
513 region: Region {
514 id: 2.into(),
515 name: "r2".to_string(),
516 partition: None,
517 attrs: BTreeMap::new(),
518 },
519 leader_peer: Some(Peer::new(1, "a1")),
520 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
521 leader_state: Some(LeaderState::Downgrading),
522 leader_down_since: None,
523 };
524 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
525 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
526 assert_eq!(decoded, region_route);
527
528 let region_route = RegionRoute {
529 region: Region {
530 id: 2.into(),
531 name: "r2".to_string(),
532 partition: None,
533 attrs: BTreeMap::new(),
534 },
535 leader_peer: Some(Peer::new(1, "a1")),
536 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
537 leader_state: Some(LeaderState::Downgrading),
538 leader_down_since: None,
539 };
540 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
541 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
542 assert_eq!(decoded, region_route);
543
544 let region_route = RegionRoute {
545 region: Region {
546 id: 2.into(),
547 name: "r2".to_string(),
548 partition: None,
549 attrs: BTreeMap::new(),
550 },
551 leader_peer: Some(Peer::new(1, "a1")),
552 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
553 leader_state: Some(LeaderState::Downgrading),
554 leader_down_since: None,
555 };
556 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
557 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
558 assert_eq!(decoded, region_route);
559
560 let region_route = RegionRoute {
561 region: Region {
562 id: 2.into(),
563 name: "r2".to_string(),
564 partition: None,
565 attrs: BTreeMap::new(),
566 },
567 leader_peer: Some(Peer::new(1, "a1")),
568 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
569 leader_state: Some(LeaderState::Downgrading),
570 leader_down_since: None,
571 };
572 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
573 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
574 assert_eq!(decoded, region_route);
575 }
576
577 #[test]
578 fn test_de_serialize_partition() {
579 let p = Partition {
580 column_list: vec![b"a".to_vec(), b"b".to_vec()],
581 value_list: vec![b"hi".to_vec(), b",".to_vec()],
582 };
583
584 let output = serde_json::to_string(&p).unwrap();
585 let got: Partition = serde_json::from_str(&output).unwrap();
586
587 assert_eq!(got, p);
588 }
589
590 #[test]
591 fn test_region_distribution() {
592 let region_routes = vec![
593 RegionRoute {
594 region: Region {
595 id: RegionId::new(1, 1),
596 name: "r1".to_string(),
597 partition: None,
598 attrs: BTreeMap::new(),
599 },
600 leader_peer: Some(Peer::new(1, "a1")),
601 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
602 leader_state: None,
603 leader_down_since: None,
604 },
605 RegionRoute {
606 region: Region {
607 id: RegionId::new(1, 2),
608 name: "r2".to_string(),
609 partition: None,
610 attrs: BTreeMap::new(),
611 },
612 leader_peer: Some(Peer::new(2, "a2")),
613 follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
614 leader_state: None,
615 leader_down_since: None,
616 },
617 ];
618
619 let distribution = region_distribution(®ion_routes);
620 assert_eq!(distribution.len(), 3);
621 assert_eq!(distribution[&1], RegionRoleSet::new(vec![1], vec![2]));
622 assert_eq!(distribution[&2], RegionRoleSet::new(vec![2], vec![1]));
623 assert_eq!(distribution[&3], RegionRoleSet::new(vec![], vec![1, 2]));
624 }
625}