meta_srv/events/
region_migration_event.rs1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
20use common_event_recorder::Event;
21use common_event_recorder::error::{Result, SerializeEventSnafu};
22use serde::Serialize;
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25
26use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
27
28pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
29pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
30pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
31pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
32pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
33 "region_migration_trigger_reason";
34pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
35pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
36pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
37pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
38
39#[derive(Debug)]
41pub(crate) struct RegionMigrationEvent {
42 region_ids: Vec<RegionId>,
44 trigger_reason: RegionMigrationTriggerReason,
46 src_node_id: u64,
48 src_peer_addr: String,
50 dst_node_id: u64,
52 dst_peer_addr: String,
54 timeout: Duration,
56}
57
58#[derive(Debug, Serialize)]
59struct Payload {
60 #[serde(with = "humantime_serde")]
61 timeout: Duration,
62}
63
64impl RegionMigrationEvent {
65 pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
66 Self {
67 region_ids: ctx.region_ids.clone(),
68 trigger_reason: ctx.trigger_reason,
69 src_node_id: ctx.from_peer.id,
70 src_peer_addr: ctx.from_peer.addr.clone(),
71 dst_node_id: ctx.to_peer.id,
72 dst_peer_addr: ctx.to_peer.addr.clone(),
73 timeout: ctx.timeout,
74 }
75 }
76}
77
78impl Event for RegionMigrationEvent {
79 fn event_type(&self) -> &str {
80 REGION_MIGRATION_EVENT_TYPE
81 }
82
83 fn extra_schema(&self) -> Vec<ColumnSchema> {
84 vec![
85 ColumnSchema {
86 column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
87 datatype: ColumnDataType::Uint64.into(),
88 semantic_type: SemanticType::Field.into(),
89 ..Default::default()
90 },
91 ColumnSchema {
92 column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
93 datatype: ColumnDataType::Uint32.into(),
94 semantic_type: SemanticType::Field.into(),
95 ..Default::default()
96 },
97 ColumnSchema {
98 column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
99 datatype: ColumnDataType::Uint32.into(),
100 semantic_type: SemanticType::Field.into(),
101 ..Default::default()
102 },
103 ColumnSchema {
104 column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
105 datatype: ColumnDataType::String.into(),
106 semantic_type: SemanticType::Field.into(),
107 ..Default::default()
108 },
109 ColumnSchema {
110 column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
111 datatype: ColumnDataType::Uint64.into(),
112 semantic_type: SemanticType::Field.into(),
113 ..Default::default()
114 },
115 ColumnSchema {
116 column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
117 datatype: ColumnDataType::String.into(),
118 semantic_type: SemanticType::Field.into(),
119 ..Default::default()
120 },
121 ColumnSchema {
122 column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
123 datatype: ColumnDataType::Uint64.into(),
124 semantic_type: SemanticType::Field.into(),
125 ..Default::default()
126 },
127 ColumnSchema {
128 column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
129 datatype: ColumnDataType::String.into(),
130 semantic_type: SemanticType::Field.into(),
131 ..Default::default()
132 },
133 ]
134 }
135
136 fn extra_rows(&self) -> Result<Vec<Row>> {
137 let mut extra_rows = Vec::with_capacity(self.region_ids.len());
138 for region_id in &self.region_ids {
139 extra_rows.push(Row {
140 values: vec![
141 ValueData::U64Value(region_id.as_u64()).into(),
142 ValueData::U32Value(region_id.table_id()).into(),
143 ValueData::U32Value(region_id.region_number()).into(),
144 ValueData::StringValue(self.trigger_reason.to_string()).into(),
145 ValueData::U64Value(self.src_node_id).into(),
146 ValueData::StringValue(self.src_peer_addr.clone()).into(),
147 ValueData::U64Value(self.dst_node_id).into(),
148 ValueData::StringValue(self.dst_peer_addr.clone()).into(),
149 ],
150 });
151 }
152
153 Ok(extra_rows)
154 }
155
156 fn json_payload(&self) -> Result<String> {
157 serde_json::to_string(&Payload {
158 timeout: self.timeout,
159 })
160 .context(SerializeEventSnafu)
161 }
162
163 fn as_any(&self) -> &dyn Any {
164 self
165 }
166}