common_procedure/
event.rs1use std::any::Any;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
19use common_event_recorder::Event;
20use common_event_recorder::error::Result;
21use common_time::timestamp::{TimeUnit, Timestamp};
22
23use crate::{ProcedureId, ProcedureState};
24
25pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id";
26pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state";
27pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error";
28
29#[derive(Debug)]
31pub struct ProcedureEvent {
32 pub procedure_id: ProcedureId,
34 pub timestamp: Timestamp,
36 pub state: ProcedureState,
38 pub internal_event: Box<dyn Event>,
40}
41
42impl ProcedureEvent {
43 pub fn new(
44 procedure_id: ProcedureId,
45 internal_event: Box<dyn Event>,
46 state: ProcedureState,
47 ) -> Self {
48 Self {
49 procedure_id,
50 internal_event,
51 timestamp: Timestamp::current_time(TimeUnit::Nanosecond),
52 state,
53 }
54 }
55}
56
57impl Event for ProcedureEvent {
58 fn event_type(&self) -> &str {
59 self.internal_event.event_type()
60 }
61
62 fn timestamp(&self) -> Timestamp {
63 self.timestamp
64 }
65
66 fn json_payload(&self) -> Result<String> {
67 self.internal_event.json_payload()
68 }
69
70 fn extra_schema(&self) -> Vec<ColumnSchema> {
71 let mut schema = vec![
72 ColumnSchema {
73 column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(),
74 datatype: ColumnDataType::String.into(),
75 semantic_type: SemanticType::Field.into(),
76 ..Default::default()
77 },
78 ColumnSchema {
79 column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(),
80 datatype: ColumnDataType::String.into(),
81 semantic_type: SemanticType::Field.into(),
82 ..Default::default()
83 },
84 ColumnSchema {
85 column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(),
86 datatype: ColumnDataType::String.into(),
87 semantic_type: SemanticType::Field.into(),
88 ..Default::default()
89 },
90 ];
91 schema.append(&mut self.internal_event.extra_schema());
92 schema
93 }
94
95 fn extra_rows(&self) -> Result<Vec<Row>> {
96 let mut internal_event_extra_rows = self.internal_event.extra_rows()?;
97 let mut rows = Vec::with_capacity(internal_event_extra_rows.len());
98 for internal_event_extra_row in internal_event_extra_rows.iter_mut() {
99 let error_str = match &self.state {
100 ProcedureState::Failed { error } => format!("{:?}", error),
101 ProcedureState::PrepareRollback { error } => format!("{:?}", error),
102 ProcedureState::RollingBack { error } => format!("{:?}", error),
103 ProcedureState::Retrying { error } => format!("{:?}", error),
104 ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
105 _ => "".to_string(),
106 };
107 let mut values = Vec::with_capacity(3 + internal_event_extra_row.values.len());
108 values.extend([
109 ValueData::StringValue(self.procedure_id.to_string()).into(),
110 ValueData::StringValue(self.state.as_str_name().to_string()).into(),
111 ValueData::StringValue(error_str).into(),
112 ]);
113 values.append(&mut internal_event_extra_row.values);
114 rows.push(Row { values });
115 }
116
117 Ok(rows)
118 }
119
120 fn as_any(&self) -> &dyn Any {
121 self
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use api::v1::value::ValueData;
128 use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
129 use common_event_recorder::Event;
130
131 use crate::{ProcedureEvent, ProcedureId, ProcedureState};
132
133 #[derive(Debug)]
134 struct TestEvent;
135
136 impl Event for TestEvent {
137 fn event_type(&self) -> &str {
138 "test_event"
139 }
140
141 fn extra_schema(&self) -> Vec<ColumnSchema> {
142 vec![ColumnSchema {
143 column_name: "test_event_column".to_string(),
144 datatype: ColumnDataType::String.into(),
145 semantic_type: SemanticType::Field.into(),
146 ..Default::default()
147 }]
148 }
149
150 fn extra_rows(&self) -> common_event_recorder::error::Result<Vec<Row>> {
151 Ok(vec![
152 Row {
153 values: vec![ValueData::StringValue("test_event1".to_string()).into()],
154 },
155 Row {
156 values: vec![ValueData::StringValue("test_event2".to_string()).into()],
157 },
158 ])
159 }
160
161 fn as_any(&self) -> &dyn std::any::Any {
162 self
163 }
164 }
165
166 #[test]
167 fn test_procedure_event_extra_rows() {
168 let procedure_event = ProcedureEvent::new(
169 ProcedureId::random(),
170 Box::new(TestEvent {}),
171 ProcedureState::Running,
172 );
173
174 let procedure_event_extra_rows = procedure_event.extra_rows().unwrap();
175 assert_eq!(procedure_event_extra_rows.len(), 2);
176 assert_eq!(procedure_event_extra_rows[0].values.len(), 4);
177 assert_eq!(
178 procedure_event_extra_rows[0].values[3],
179 ValueData::StringValue("test_event1".to_string()).into()
180 );
181 assert_eq!(procedure_event_extra_rows[1].values.len(), 4);
182 assert_eq!(
183 procedure_event_extra_rows[1].values[3],
184 ValueData::StringValue("test_event2".to_string()).into()
185 );
186 }
187}