meta_srv/events/
region_migration_event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
20use common_event_recorder::Event;
21use common_event_recorder::error::{Result, SerializeEventSnafu};
22use serde::Serialize;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
27
28pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
29pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
30pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
31pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
32pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
33    "region_migration_trigger_reason";
34pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
35pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
36pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
37pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
38
39/// RegionMigrationEvent is the event of region migration.
40#[derive(Debug, Serialize)]
41pub(crate) struct RegionMigrationEvent {
42    #[serde(skip)]
43    region_id: RegionId,
44    #[serde(skip)]
45    table_id: TableId,
46    #[serde(skip)]
47    region_number: u32,
48    #[serde(skip)]
49    trigger_reason: RegionMigrationTriggerReason,
50    #[serde(skip)]
51    src_node_id: u64,
52    #[serde(skip)]
53    src_peer_addr: String,
54    #[serde(skip)]
55    dst_node_id: u64,
56    #[serde(skip)]
57    dst_peer_addr: String,
58
59    // The following fields will be serialized as the json payload.
60    timeout: Duration,
61}
62
63impl RegionMigrationEvent {
64    pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
65        // FIXME(weny): handle multiple region ids.
66        let region_id = ctx.region_ids[0];
67        Self {
68            region_id,
69            table_id: region_id.table_id(),
70            region_number: region_id.region_number(),
71            trigger_reason: ctx.trigger_reason,
72            src_node_id: ctx.from_peer.id,
73            src_peer_addr: ctx.from_peer.addr.clone(),
74            dst_node_id: ctx.to_peer.id,
75            dst_peer_addr: ctx.to_peer.addr.clone(),
76            timeout: ctx.timeout,
77        }
78    }
79}
80
81impl Event for RegionMigrationEvent {
82    fn event_type(&self) -> &str {
83        REGION_MIGRATION_EVENT_TYPE
84    }
85
86    fn extra_schema(&self) -> Vec<ColumnSchema> {
87        vec![
88            ColumnSchema {
89                column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
90                datatype: ColumnDataType::Uint64.into(),
91                semantic_type: SemanticType::Field.into(),
92                ..Default::default()
93            },
94            ColumnSchema {
95                column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
96                datatype: ColumnDataType::Uint32.into(),
97                semantic_type: SemanticType::Field.into(),
98                ..Default::default()
99            },
100            ColumnSchema {
101                column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
102                datatype: ColumnDataType::Uint32.into(),
103                semantic_type: SemanticType::Field.into(),
104                ..Default::default()
105            },
106            ColumnSchema {
107                column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
108                datatype: ColumnDataType::String.into(),
109                semantic_type: SemanticType::Field.into(),
110                ..Default::default()
111            },
112            ColumnSchema {
113                column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
114                datatype: ColumnDataType::Uint64.into(),
115                semantic_type: SemanticType::Field.into(),
116                ..Default::default()
117            },
118            ColumnSchema {
119                column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
120                datatype: ColumnDataType::String.into(),
121                semantic_type: SemanticType::Field.into(),
122                ..Default::default()
123            },
124            ColumnSchema {
125                column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
126                datatype: ColumnDataType::Uint64.into(),
127                semantic_type: SemanticType::Field.into(),
128                ..Default::default()
129            },
130            ColumnSchema {
131                column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
132                datatype: ColumnDataType::String.into(),
133                semantic_type: SemanticType::Field.into(),
134                ..Default::default()
135            },
136        ]
137    }
138
139    fn extra_row(&self) -> Result<Row> {
140        Ok(Row {
141            values: vec![
142                ValueData::U64Value(self.region_id.as_u64()).into(),
143                ValueData::U32Value(self.table_id).into(),
144                ValueData::U32Value(self.region_number).into(),
145                ValueData::StringValue(self.trigger_reason.to_string()).into(),
146                ValueData::U64Value(self.src_node_id).into(),
147                ValueData::StringValue(self.src_peer_addr.clone()).into(),
148                ValueData::U64Value(self.dst_node_id).into(),
149                ValueData::StringValue(self.dst_peer_addr.clone()).into(),
150            ],
151        })
152    }
153
154    fn json_payload(&self) -> Result<String> {
155        serde_json::to_string(self).context(SerializeEventSnafu)
156    }
157
158    fn as_any(&self) -> &dyn Any {
159        self
160    }
161}