meta_srv/events/
region_migration_event.rs1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
20use common_event_recorder::error::{Result, SerializeEventSnafu};
21use common_event_recorder::Event;
22use serde::Serialize;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
27
28pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
29pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
30pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
31pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
32pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
33 "region_migration_trigger_reason";
34pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
35pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
36pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
37pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
38
39#[derive(Debug, Serialize)]
41pub(crate) struct RegionMigrationEvent {
42 #[serde(skip)]
43 region_id: RegionId,
44 #[serde(skip)]
45 table_id: TableId,
46 #[serde(skip)]
47 region_number: u32,
48 #[serde(skip)]
49 trigger_reason: RegionMigrationTriggerReason,
50 #[serde(skip)]
51 src_node_id: u64,
52 #[serde(skip)]
53 src_peer_addr: String,
54 #[serde(skip)]
55 dst_node_id: u64,
56 #[serde(skip)]
57 dst_peer_addr: String,
58
59 timeout: Duration,
61}
62
63impl RegionMigrationEvent {
64 pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
65 Self {
66 region_id: ctx.region_id,
67 table_id: ctx.region_id.table_id(),
68 region_number: ctx.region_id.region_number(),
69 trigger_reason: ctx.trigger_reason,
70 src_node_id: ctx.from_peer.id,
71 src_peer_addr: ctx.from_peer.addr.clone(),
72 dst_node_id: ctx.to_peer.id,
73 dst_peer_addr: ctx.to_peer.addr.clone(),
74 timeout: ctx.timeout,
75 }
76 }
77}
78
79impl Event for RegionMigrationEvent {
80 fn event_type(&self) -> &str {
81 REGION_MIGRATION_EVENT_TYPE
82 }
83
84 fn extra_schema(&self) -> Vec<ColumnSchema> {
85 vec![
86 ColumnSchema {
87 column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
88 datatype: ColumnDataType::Uint64.into(),
89 semantic_type: SemanticType::Field.into(),
90 ..Default::default()
91 },
92 ColumnSchema {
93 column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
94 datatype: ColumnDataType::Uint32.into(),
95 semantic_type: SemanticType::Field.into(),
96 ..Default::default()
97 },
98 ColumnSchema {
99 column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
100 datatype: ColumnDataType::Uint32.into(),
101 semantic_type: SemanticType::Field.into(),
102 ..Default::default()
103 },
104 ColumnSchema {
105 column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
106 datatype: ColumnDataType::String.into(),
107 semantic_type: SemanticType::Field.into(),
108 ..Default::default()
109 },
110 ColumnSchema {
111 column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
112 datatype: ColumnDataType::Uint64.into(),
113 semantic_type: SemanticType::Field.into(),
114 ..Default::default()
115 },
116 ColumnSchema {
117 column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
118 datatype: ColumnDataType::String.into(),
119 semantic_type: SemanticType::Field.into(),
120 ..Default::default()
121 },
122 ColumnSchema {
123 column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
124 datatype: ColumnDataType::Uint64.into(),
125 semantic_type: SemanticType::Field.into(),
126 ..Default::default()
127 },
128 ColumnSchema {
129 column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
130 datatype: ColumnDataType::String.into(),
131 semantic_type: SemanticType::Field.into(),
132 ..Default::default()
133 },
134 ]
135 }
136
137 fn extra_row(&self) -> Result<Row> {
138 Ok(Row {
139 values: vec![
140 ValueData::U64Value(self.region_id.as_u64()).into(),
141 ValueData::U32Value(self.table_id).into(),
142 ValueData::U32Value(self.region_number).into(),
143 ValueData::StringValue(self.trigger_reason.to_string()).into(),
144 ValueData::U64Value(self.src_node_id).into(),
145 ValueData::StringValue(self.src_peer_addr.to_string()).into(),
146 ValueData::U64Value(self.dst_node_id).into(),
147 ValueData::StringValue(self.dst_peer_addr.to_string()).into(),
148 ],
149 })
150 }
151
152 fn json_payload(&self) -> Result<String> {
153 serde_json::to_string(self).context(SerializeEventSnafu)
154 }
155
156 fn as_any(&self) -> &dyn Any {
157 self
158 }
159}