meta_srv/events/
region_migration_event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
20use common_event_recorder::error::{Result, SerializeEventSnafu};
21use common_event_recorder::Event;
22use serde::Serialize;
23use snafu::ResultExt;
24use store_api::storage::{RegionId, TableId};
25
26use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
27
28pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
29pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
30pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
31pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
32pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
33    "region_migration_trigger_reason";
34pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
35pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
36pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
37pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
38
39/// RegionMigrationEvent is the event of region migration.
40#[derive(Debug, Serialize)]
41pub(crate) struct RegionMigrationEvent {
42    #[serde(skip)]
43    region_id: RegionId,
44    #[serde(skip)]
45    table_id: TableId,
46    #[serde(skip)]
47    region_number: u32,
48    #[serde(skip)]
49    trigger_reason: RegionMigrationTriggerReason,
50    #[serde(skip)]
51    src_node_id: u64,
52    #[serde(skip)]
53    src_peer_addr: String,
54    #[serde(skip)]
55    dst_node_id: u64,
56    #[serde(skip)]
57    dst_peer_addr: String,
58
59    // The following fields will be serialized as the json payload.
60    timeout: Duration,
61}
62
63impl RegionMigrationEvent {
64    pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
65        Self {
66            region_id: ctx.region_id,
67            table_id: ctx.region_id.table_id(),
68            region_number: ctx.region_id.region_number(),
69            trigger_reason: ctx.trigger_reason,
70            src_node_id: ctx.from_peer.id,
71            src_peer_addr: ctx.from_peer.addr.clone(),
72            dst_node_id: ctx.to_peer.id,
73            dst_peer_addr: ctx.to_peer.addr.clone(),
74            timeout: ctx.timeout,
75        }
76    }
77}
78
79impl Event for RegionMigrationEvent {
80    fn event_type(&self) -> &str {
81        REGION_MIGRATION_EVENT_TYPE
82    }
83
84    fn extra_schema(&self) -> Vec<ColumnSchema> {
85        vec![
86            ColumnSchema {
87                column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
88                datatype: ColumnDataType::Uint64.into(),
89                semantic_type: SemanticType::Field.into(),
90                ..Default::default()
91            },
92            ColumnSchema {
93                column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
94                datatype: ColumnDataType::Uint32.into(),
95                semantic_type: SemanticType::Field.into(),
96                ..Default::default()
97            },
98            ColumnSchema {
99                column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
100                datatype: ColumnDataType::Uint32.into(),
101                semantic_type: SemanticType::Field.into(),
102                ..Default::default()
103            },
104            ColumnSchema {
105                column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
106                datatype: ColumnDataType::String.into(),
107                semantic_type: SemanticType::Field.into(),
108                ..Default::default()
109            },
110            ColumnSchema {
111                column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
112                datatype: ColumnDataType::Uint64.into(),
113                semantic_type: SemanticType::Field.into(),
114                ..Default::default()
115            },
116            ColumnSchema {
117                column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
118                datatype: ColumnDataType::String.into(),
119                semantic_type: SemanticType::Field.into(),
120                ..Default::default()
121            },
122            ColumnSchema {
123                column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
124                datatype: ColumnDataType::Uint64.into(),
125                semantic_type: SemanticType::Field.into(),
126                ..Default::default()
127            },
128            ColumnSchema {
129                column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
130                datatype: ColumnDataType::String.into(),
131                semantic_type: SemanticType::Field.into(),
132                ..Default::default()
133            },
134        ]
135    }
136
137    fn extra_row(&self) -> Result<Row> {
138        Ok(Row {
139            values: vec![
140                ValueData::U64Value(self.region_id.as_u64()).into(),
141                ValueData::U32Value(self.table_id).into(),
142                ValueData::U32Value(self.region_number).into(),
143                ValueData::StringValue(self.trigger_reason.to_string()).into(),
144                ValueData::U64Value(self.src_node_id).into(),
145                ValueData::StringValue(self.src_peer_addr.to_string()).into(),
146                ValueData::U64Value(self.dst_node_id).into(),
147                ValueData::StringValue(self.dst_peer_addr.to_string()).into(),
148            ],
149        })
150    }
151
152    fn json_payload(&self) -> Result<String> {
153        serde_json::to_string(self).context(SerializeEventSnafu)
154    }
155
156    fn as_any(&self) -> &dyn Any {
157        self
158    }
159}