meta_srv/events/
region_migration_event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::value::ValueData;
19use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
20use common_event_recorder::Event;
21use common_event_recorder::error::{Result, SerializeEventSnafu};
22use serde::Serialize;
23use snafu::ResultExt;
24use store_api::storage::RegionId;
25
26use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
27
28pub const REGION_MIGRATION_EVENT_TYPE: &str = "region_migration";
29pub const EVENTS_TABLE_REGION_ID_COLUMN_NAME: &str = "region_id";
30pub const EVENTS_TABLE_TABLE_ID_COLUMN_NAME: &str = "table_id";
31pub const EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME: &str = "region_number";
32pub const EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME: &str =
33    "region_migration_trigger_reason";
34pub const EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME: &str = "region_migration_src_node_id";
35pub const EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME: &str = "region_migration_src_peer_addr";
36pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_node_id";
37pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
38
39/// RegionMigrationEvent is the event of region migration.
40#[derive(Debug)]
41pub(crate) struct RegionMigrationEvent {
42    // The region ids of the region migration.
43    region_ids: Vec<RegionId>,
44    // The trigger reason of the region migration.
45    trigger_reason: RegionMigrationTriggerReason,
46    // The source node id of the region migration.
47    src_node_id: u64,
48    // The source peer address of the region migration.
49    src_peer_addr: String,
50    // The destination node id of the region migration.
51    dst_node_id: u64,
52    // The destination peer address of the region migration.
53    dst_peer_addr: String,
54    // The timeout of the region migration.
55    timeout: Duration,
56}
57
58#[derive(Debug, Serialize)]
59struct Payload {
60    #[serde(with = "humantime_serde")]
61    timeout: Duration,
62}
63
64impl RegionMigrationEvent {
65    pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
66        Self {
67            region_ids: ctx.region_ids.clone(),
68            trigger_reason: ctx.trigger_reason,
69            src_node_id: ctx.from_peer.id,
70            src_peer_addr: ctx.from_peer.addr.clone(),
71            dst_node_id: ctx.to_peer.id,
72            dst_peer_addr: ctx.to_peer.addr.clone(),
73            timeout: ctx.timeout,
74        }
75    }
76}
77
78impl Event for RegionMigrationEvent {
79    fn event_type(&self) -> &str {
80        REGION_MIGRATION_EVENT_TYPE
81    }
82
83    fn extra_schema(&self) -> Vec<ColumnSchema> {
84        vec![
85            ColumnSchema {
86                column_name: EVENTS_TABLE_REGION_ID_COLUMN_NAME.to_string(),
87                datatype: ColumnDataType::Uint64.into(),
88                semantic_type: SemanticType::Field.into(),
89                ..Default::default()
90            },
91            ColumnSchema {
92                column_name: EVENTS_TABLE_TABLE_ID_COLUMN_NAME.to_string(),
93                datatype: ColumnDataType::Uint32.into(),
94                semantic_type: SemanticType::Field.into(),
95                ..Default::default()
96            },
97            ColumnSchema {
98                column_name: EVENTS_TABLE_REGION_NUMBER_COLUMN_NAME.to_string(),
99                datatype: ColumnDataType::Uint32.into(),
100                semantic_type: SemanticType::Field.into(),
101                ..Default::default()
102            },
103            ColumnSchema {
104                column_name: EVENTS_TABLE_REGION_MIGRATION_TRIGGER_REASON_COLUMN_NAME.to_string(),
105                datatype: ColumnDataType::String.into(),
106                semantic_type: SemanticType::Field.into(),
107                ..Default::default()
108            },
109            ColumnSchema {
110                column_name: EVENTS_TABLE_SRC_NODE_ID_COLUMN_NAME.to_string(),
111                datatype: ColumnDataType::Uint64.into(),
112                semantic_type: SemanticType::Field.into(),
113                ..Default::default()
114            },
115            ColumnSchema {
116                column_name: EVENTS_TABLE_SRC_PEER_ADDR_COLUMN_NAME.to_string(),
117                datatype: ColumnDataType::String.into(),
118                semantic_type: SemanticType::Field.into(),
119                ..Default::default()
120            },
121            ColumnSchema {
122                column_name: EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME.to_string(),
123                datatype: ColumnDataType::Uint64.into(),
124                semantic_type: SemanticType::Field.into(),
125                ..Default::default()
126            },
127            ColumnSchema {
128                column_name: EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME.to_string(),
129                datatype: ColumnDataType::String.into(),
130                semantic_type: SemanticType::Field.into(),
131                ..Default::default()
132            },
133        ]
134    }
135
136    fn extra_rows(&self) -> Result<Vec<Row>> {
137        let mut extra_rows = Vec::with_capacity(self.region_ids.len());
138        for region_id in &self.region_ids {
139            extra_rows.push(Row {
140                values: vec![
141                    ValueData::U64Value(region_id.as_u64()).into(),
142                    ValueData::U32Value(region_id.table_id()).into(),
143                    ValueData::U32Value(region_id.region_number()).into(),
144                    ValueData::StringValue(self.trigger_reason.to_string()).into(),
145                    ValueData::U64Value(self.src_node_id).into(),
146                    ValueData::StringValue(self.src_peer_addr.clone()).into(),
147                    ValueData::U64Value(self.dst_node_id).into(),
148                    ValueData::StringValue(self.dst_peer_addr.clone()).into(),
149                ],
150            });
151        }
152
153        Ok(extra_rows)
154    }
155
156    fn json_payload(&self) -> Result<String> {
157        serde_json::to_string(&Payload {
158            timeout: self.timeout,
159        })
160        .context(SerializeEventSnafu)
161    }
162
163    fn as_any(&self) -> &dyn Any {
164        self
165    }
166}