1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use api::v1::meta::{
18 Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable,
19 TableRoute as PbTableRoute,
20};
21use common_time::util::current_time_millis;
22use derive_builder::Builder;
23use serde::ser::SerializeSeq;
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use snafu::OptionExt;
26use store_api::storage::{RegionId, RegionNumber};
27use strum::AsRefStr;
28use table::table_name::TableName;
29
30use crate::error::{self, Result};
31use crate::key::RegionDistribution;
32use crate::peer::Peer;
33use crate::DatanodeId;
34
35pub fn region_distribution(region_routes: &[RegionRoute]) -> RegionDistribution {
40 let mut regions_id_map = RegionDistribution::new();
41 for route in region_routes.iter() {
42 if let Some(peer) = route.leader_peer.as_ref() {
43 let region_id = route.region.id.region_number();
44 regions_id_map.entry(peer.id).or_default().push(region_id);
45 }
46 for peer in route.follower_peers.iter() {
47 let region_id = route.region.id.region_number();
48 regions_id_map.entry(peer.id).or_default().push(region_id);
49 }
50 }
51 for (_, regions) in regions_id_map.iter_mut() {
52 regions.sort()
54 }
55 regions_id_map
56}
57
58#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
59pub struct TableRoute {
60 pub table: Table,
61 pub region_routes: Vec<RegionRoute>,
62 region_leaders: HashMap<RegionNumber, Option<Peer>>,
63}
64
65pub fn find_leaders(region_routes: &[RegionRoute]) -> HashSet<Peer> {
67 region_routes
68 .iter()
69 .flat_map(|x| &x.leader_peer)
70 .cloned()
71 .collect()
72}
73
74pub fn find_followers(region_routes: &[RegionRoute]) -> HashSet<Peer> {
76 region_routes
77 .iter()
78 .flat_map(|x| &x.follower_peers)
79 .cloned()
80 .collect()
81}
82
83pub fn operating_leader_regions(region_routes: &[RegionRoute]) -> Vec<(RegionId, DatanodeId)> {
85 region_routes
86 .iter()
87 .filter_map(|route| {
88 route
89 .leader_peer
90 .as_ref()
91 .map(|leader| (route.region.id, leader.id))
92 })
93 .collect::<Vec<_>>()
94}
95
96pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<RegionNumber, &Peer> {
100 region_routes
101 .iter()
102 .filter_map(|x| {
103 x.leader_peer
104 .as_ref()
105 .map(|leader| (x.region.id.region_number(), leader))
106 })
107 .collect::<HashMap<_, _>>()
108}
109
110pub fn find_region_leader(
111 region_routes: &[RegionRoute],
112 region_number: RegionNumber,
113) -> Option<Peer> {
114 region_routes
115 .iter()
116 .find(|x| x.region.id.region_number() == region_number)
117 .and_then(|r| r.leader_peer.as_ref())
118 .cloned()
119}
120
121pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
123 region_routes
124 .iter()
125 .filter_map(|x| {
126 if let Some(peer) = &x.leader_peer {
127 if peer == datanode {
128 return Some(x.region.id.region_number());
129 }
130 }
131 None
132 })
133 .collect()
134}
135
136pub fn find_follower_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
138 region_routes
139 .iter()
140 .filter_map(|x| {
141 if x.follower_peers.contains(datanode) {
142 return Some(x.region.id.region_number());
143 }
144 None
145 })
146 .collect()
147}
148
149impl TableRoute {
150 pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
151 let region_leaders = region_routes
152 .iter()
153 .map(|x| (x.region.id.region_number(), x.leader_peer.clone()))
154 .collect::<HashMap<_, _>>();
155 Self {
156 table,
157 region_routes,
158 region_leaders,
159 }
160 }
161
162 pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
163 let table = table_route
164 .table
165 .context(error::RouteInfoCorruptedSnafu {
166 err_msg: "'table' is empty in table route",
167 })?
168 .try_into()?;
169
170 let mut region_routes = Vec::with_capacity(table_route.region_routes.len());
171 for region_route in table_route.region_routes.into_iter() {
172 let region = region_route
173 .region
174 .context(error::RouteInfoCorruptedSnafu {
175 err_msg: "'region' is empty in region route",
176 })?
177 .into();
178
179 let leader_peer = peers.get(region_route.leader_peer_index as usize).cloned();
180
181 let follower_peers = region_route
182 .follower_peer_indexes
183 .into_iter()
184 .filter_map(|x| peers.get(x as usize).cloned())
185 .collect::<Vec<_>>();
186
187 region_routes.push(RegionRoute {
188 region,
189 leader_peer,
190 follower_peers,
191 leader_state: None,
192 leader_down_since: None,
193 });
194 }
195
196 Ok(Self::new(table, region_routes))
197 }
198}
199
200#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
201pub struct Table {
202 pub id: u64,
203 pub table_name: TableName,
204 #[serde(serialize_with = "as_utf8", deserialize_with = "from_utf8")]
205 pub table_schema: Vec<u8>,
206}
207
208impl TryFrom<PbTable> for Table {
209 type Error = error::Error;
210
211 fn try_from(t: PbTable) -> Result<Self> {
212 let table_name = t
213 .table_name
214 .context(error::RouteInfoCorruptedSnafu {
215 err_msg: "table name required",
216 })?
217 .into();
218 Ok(Self {
219 id: t.id,
220 table_name,
221 table_schema: t.table_schema,
222 })
223 }
224}
225
226impl From<Table> for PbTable {
227 fn from(table: Table) -> Self {
228 PbTable {
229 id: table.id,
230 table_name: Some(table.table_name.into()),
231 table_schema: table.table_schema,
232 }
233 }
234}
235
236#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Builder)]
237pub struct RegionRoute {
238 pub region: Region,
239 #[builder(setter(into, strip_option))]
240 pub leader_peer: Option<Peer>,
241 #[builder(setter(into), default)]
242 pub follower_peers: Vec<Peer>,
243 #[builder(setter(into, strip_option), default)]
245 #[serde(
246 default,
247 alias = "leader_status",
248 skip_serializing_if = "Option::is_none"
249 )]
250 pub leader_state: Option<LeaderState>,
251 #[serde(default)]
253 #[builder(default = "self.default_leader_down_since()")]
254 pub leader_down_since: Option<i64>,
255}
256
257impl RegionRouteBuilder {
258 fn default_leader_down_since(&self) -> Option<i64> {
259 match self.leader_state {
260 Some(Some(LeaderState::Downgrading)) => Some(current_time_millis()),
261 _ => None,
262 }
263 }
264}
265
266#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)]
269#[strum(serialize_all = "UPPERCASE")]
270pub enum LeaderState {
271 #[serde(alias = "Downgraded")]
276 Downgrading,
277}
278
279impl RegionRoute {
280 pub fn is_leader_downgrading(&self) -> bool {
288 matches!(self.leader_state, Some(LeaderState::Downgrading))
289 }
290
291 pub fn downgrade_leader(&mut self) {
303 self.leader_down_since = Some(current_time_millis());
304 self.leader_state = Some(LeaderState::Downgrading)
305 }
306
307 pub fn leader_down_millis(&self) -> Option<i64> {
309 self.leader_down_since
310 .map(|start| current_time_millis() - start)
311 }
312
313 pub fn set_leader_state(&mut self, state: Option<LeaderState>) -> bool {
317 let updated = self.leader_state != state;
318
319 match (state, updated) {
320 (Some(LeaderState::Downgrading), true) => {
321 self.leader_down_since = Some(current_time_millis());
322 }
323 (Some(LeaderState::Downgrading), false) => {
324 }
326 _ => {
327 self.leader_down_since = None;
328 }
329 }
330
331 self.leader_state = state;
332 updated
333 }
334}
335
336pub struct RegionRoutes(pub Vec<RegionRoute>);
337
338impl RegionRoutes {
339 pub fn region_leader_map(&self) -> HashMap<RegionNumber, &Peer> {
340 convert_to_region_leader_map(&self.0)
341 }
342
343 pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
344 self.region_leader_map().get(®ion_number).copied()
345 }
346}
347
348#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq)]
349pub struct Region {
350 pub id: RegionId,
351 pub name: String,
352 pub partition: Option<Partition>,
353 pub attrs: BTreeMap<String, String>,
354}
355
356impl Region {
357 #[cfg(any(test, feature = "testing"))]
358 pub fn new_test(id: RegionId) -> Self {
359 Self {
360 id,
361 ..Default::default()
362 }
363 }
364}
365
366impl From<PbRegion> for Region {
367 fn from(r: PbRegion) -> Self {
368 Self {
369 id: r.id.into(),
370 name: r.name,
371 partition: r.partition.map(Into::into),
372 attrs: r.attrs.into_iter().collect::<BTreeMap<_, _>>(),
373 }
374 }
375}
376
377impl From<Region> for PbRegion {
378 fn from(region: Region) -> Self {
379 Self {
380 id: region.id.into(),
381 name: region.name,
382 partition: region.partition.map(Into::into),
383 attrs: region.attrs.into_iter().collect::<HashMap<_, _>>(),
384 }
385 }
386}
387
388#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
389pub struct Partition {
390 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
391 pub column_list: Vec<Vec<u8>>,
392 #[serde(serialize_with = "as_utf8_vec", deserialize_with = "from_utf8_vec")]
393 pub value_list: Vec<Vec<u8>>,
394}
395
396fn as_utf8<S: Serializer>(val: &[u8], serializer: S) -> std::result::Result<S::Ok, S::Error> {
397 serializer.serialize_str(
398 String::from_utf8(val.to_vec())
399 .unwrap_or_else(|_| "<unknown-not-UTF8>".to_string())
400 .as_str(),
401 )
402}
403
404pub fn from_utf8<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
405where
406 D: Deserializer<'de>,
407{
408 let s = String::deserialize(deserializer)?;
409
410 Ok(s.into_bytes())
411}
412
413fn as_utf8_vec<S: Serializer>(
414 val: &[Vec<u8>],
415 serializer: S,
416) -> std::result::Result<S::Ok, S::Error> {
417 let mut seq = serializer.serialize_seq(Some(val.len()))?;
418 for v in val {
419 seq.serialize_element(&String::from_utf8_lossy(v))?;
420 }
421 seq.end()
422}
423
424pub fn from_utf8_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Vec<u8>>, D::Error>
425where
426 D: Deserializer<'de>,
427{
428 let values = Vec::<String>::deserialize(deserializer)?;
429
430 let values = values
431 .into_iter()
432 .map(|value| value.into_bytes())
433 .collect::<Vec<_>>();
434 Ok(values)
435}
436
437impl From<Partition> for PbPartition {
438 fn from(p: Partition) -> Self {
439 Self {
440 column_list: p.column_list,
441 value_list: p.value_list,
442 }
443 }
444}
445
446impl From<PbPartition> for Partition {
447 fn from(p: PbPartition) -> Self {
448 Self {
449 column_list: p.column_list,
450 value_list: p.value_list,
451 }
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458
459 #[test]
460 fn test_leader_is_downgraded() {
461 let mut region_route = RegionRoute {
462 region: Region {
463 id: 2.into(),
464 name: "r2".to_string(),
465 partition: None,
466 attrs: BTreeMap::new(),
467 },
468 leader_peer: Some(Peer::new(1, "a1")),
469 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
470 leader_state: None,
471 leader_down_since: None,
472 };
473
474 assert!(!region_route.is_leader_downgrading());
475
476 region_route.downgrade_leader();
477
478 assert!(region_route.is_leader_downgrading());
479 }
480
481 #[test]
482 fn test_region_route_decode() {
483 let region_route = RegionRoute {
484 region: Region {
485 id: 2.into(),
486 name: "r2".to_string(),
487 partition: None,
488 attrs: BTreeMap::new(),
489 },
490 leader_peer: Some(Peer::new(1, "a1")),
491 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
492 leader_state: None,
493 leader_down_since: None,
494 };
495
496 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#;
497
498 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
499
500 assert_eq!(decoded, region_route);
501 }
502
503 #[test]
504 fn test_region_route_compatibility() {
505 let region_route = RegionRoute {
506 region: Region {
507 id: 2.into(),
508 name: "r2".to_string(),
509 partition: None,
510 attrs: BTreeMap::new(),
511 },
512 leader_peer: Some(Peer::new(1, "a1")),
513 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
514 leader_state: Some(LeaderState::Downgrading),
515 leader_down_since: None,
516 };
517 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgraded","leader_down_since":null}"#;
518 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
519 assert_eq!(decoded, region_route);
520
521 let region_route = RegionRoute {
522 region: Region {
523 id: 2.into(),
524 name: "r2".to_string(),
525 partition: None,
526 attrs: BTreeMap::new(),
527 },
528 leader_peer: Some(Peer::new(1, "a1")),
529 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
530 leader_state: Some(LeaderState::Downgrading),
531 leader_down_since: None,
532 };
533 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgraded","leader_down_since":null}"#;
534 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
535 assert_eq!(decoded, region_route);
536
537 let region_route = RegionRoute {
538 region: Region {
539 id: 2.into(),
540 name: "r2".to_string(),
541 partition: None,
542 attrs: BTreeMap::new(),
543 },
544 leader_peer: Some(Peer::new(1, "a1")),
545 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
546 leader_state: Some(LeaderState::Downgrading),
547 leader_down_since: None,
548 };
549 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_state":"Downgrading","leader_down_since":null}"#;
550 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
551 assert_eq!(decoded, region_route);
552
553 let region_route = RegionRoute {
554 region: Region {
555 id: 2.into(),
556 name: "r2".to_string(),
557 partition: None,
558 attrs: BTreeMap::new(),
559 },
560 leader_peer: Some(Peer::new(1, "a1")),
561 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
562 leader_state: Some(LeaderState::Downgrading),
563 leader_down_since: None,
564 };
565 let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}],"leader_status":"Downgrading","leader_down_since":null}"#;
566 let decoded: RegionRoute = serde_json::from_str(input).unwrap();
567 assert_eq!(decoded, region_route);
568 }
569
570 #[test]
571 fn test_de_serialize_partition() {
572 let p = Partition {
573 column_list: vec![b"a".to_vec(), b"b".to_vec()],
574 value_list: vec![b"hi".to_vec(), b",".to_vec()],
575 };
576
577 let output = serde_json::to_string(&p).unwrap();
578 let got: Partition = serde_json::from_str(&output).unwrap();
579
580 assert_eq!(got, p);
581 }
582
583 #[test]
584 fn test_region_distribution() {
585 let region_routes = vec![
586 RegionRoute {
587 region: Region {
588 id: RegionId::new(1, 1),
589 name: "r1".to_string(),
590 partition: None,
591 attrs: BTreeMap::new(),
592 },
593 leader_peer: Some(Peer::new(1, "a1")),
594 follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
595 leader_state: None,
596 leader_down_since: None,
597 },
598 RegionRoute {
599 region: Region {
600 id: RegionId::new(1, 2),
601 name: "r2".to_string(),
602 partition: None,
603 attrs: BTreeMap::new(),
604 },
605 leader_peer: Some(Peer::new(2, "a2")),
606 follower_peers: vec![Peer::new(1, "a1"), Peer::new(3, "a3")],
607 leader_state: None,
608 leader_down_since: None,
609 },
610 ];
611
612 let distribution = region_distribution(®ion_routes);
613 assert_eq!(distribution.len(), 3);
614 assert_eq!(distribution[&1], vec![1, 2]);
615 assert_eq!(distribution[&2], vec![1, 2]);
616 assert_eq!(distribution[&3], vec![1, 2]);
617 }
618}