meta_srv/events/
region_migration_event.rs1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
20use common_event_recorder::Event;
21use common_event_recorder::error::{Result, SerializeEventSnafu};
22use serde::Serialize;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
27
28pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
29pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
30pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
31pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
32pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
33 "region_migration_trigger_reason";
34pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
35pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
36pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
37pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
38
39#[derive(Debug, Serialize)]
41pub(crate) struct RegionMigrationEvent {
42 #[serde(skip)]
43 region_id: RegionId,
44 #[serde(skip)]
45 table_id: TableId,
46 #[serde(skip)]
47 region_number: u32,
48 #[serde(skip)]
49 trigger_reason: RegionMigrationTriggerReason,
50 #[serde(skip)]
51 src_node_id: u64,
52 #[serde(skip)]
53 src_peer_addr: String,
54 #[serde(skip)]
55 dst_node_id: u64,
56 #[serde(skip)]
57 dst_peer_addr: String,
58
59 timeout: Duration,
61}
62
63impl RegionMigrationEvent {
64 pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
65 let region_id = ctx.region_ids[0];
67 Self {
68 region_id,
69 table_id: region_id.table_id(),
70 region_number: region_id.region_number(),
71 trigger_reason: ctx.trigger_reason,
72 src_node_id: ctx.from_peer.id,
73 src_peer_addr: ctx.from_peer.addr.clone(),
74 dst_node_id: ctx.to_peer.id,
75 dst_peer_addr: ctx.to_peer.addr.clone(),
76 timeout: ctx.timeout,
77 }
78 }
79}
80
81impl Event for RegionMigrationEvent {
82 fn event_type(&self) -> &str {
83 REGION_MIGRATION_EVENT_TYPE
84 }
85
86 fn extra_schema(&self) -> Vec<ColumnSchema> {
87 vec![
88 ColumnSchema {
89 column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
90 datatype: ColumnDataType::Uint64.into(),
91 semantic_type: SemanticType::Field.into(),
92 ..Default::default()
93 },
94 ColumnSchema {
95 column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
96 datatype: ColumnDataType::Uint32.into(),
97 semantic_type: SemanticType::Field.into(),
98 ..Default::default()
99 },
100 ColumnSchema {
101 column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
102 datatype: ColumnDataType::Uint32.into(),
103 semantic_type: SemanticType::Field.into(),
104 ..Default::default()
105 },
106 ColumnSchema {
107 column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
108 datatype: ColumnDataType::String.into(),
109 semantic_type: SemanticType::Field.into(),
110 ..Default::default()
111 },
112 ColumnSchema {
113 column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
114 datatype: ColumnDataType::Uint64.into(),
115 semantic_type: SemanticType::Field.into(),
116 ..Default::default()
117 },
118 ColumnSchema {
119 column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
120 datatype: ColumnDataType::String.into(),
121 semantic_type: SemanticType::Field.into(),
122 ..Default::default()
123 },
124 ColumnSchema {
125 column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
126 datatype: ColumnDataType::Uint64.into(),
127 semantic_type: SemanticType::Field.into(),
128 ..Default::default()
129 },
130 ColumnSchema {
131 column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
132 datatype: ColumnDataType::String.into(),
133 semantic_type: SemanticType::Field.into(),
134 ..Default::default()
135 },
136 ]
137 }
138
139 fn extra_row(&self) -> Result<Row> {
140 Ok(Row {
141 values: vec![
142 ValueData::U64Value(self.region_id.as_u64()).into(),
143 ValueData::U32Value(self.table_id).into(),
144 ValueData::U32Value(self.region_number).into(),
145 ValueData::StringValue(self.trigger_reason.to_string()).into(),
146 ValueData::U64Value(self.src_node_id).into(),
147 ValueData::StringValue(self.src_peer_addr.clone()).into(),
148 ValueData::U64Value(self.dst_node_id).into(),
149 ValueData::StringValue(self.dst_peer_addr.clone()).into(),
150 ],
151 })
152 }
153
154 fn json_payload(&self) -> Result<String> {
155 serde_json::to_string(self).context(SerializeEventSnafu)
156 }
157
158 fn as_any(&self) -> &dyn Any {
159 self
160 }
161}