common_procedure/
event.rs1use std::any::Any;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
19use common_event_recorder::error::Result;
20use common_event_recorder::Event;
21use common_time::timestamp::{TimeUnit, Timestamp};
22
23use crate::{ProcedureId, ProcedureState};
24
25pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id";
26pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state";
27pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error";
28
29#[derive(Debug)]
31pub struct ProcedureEvent {
32 pub procedure_id: ProcedureId,
34 pub timestamp: Timestamp,
36 pub state: ProcedureState,
38 pub internal_event: Box<dyn Event>,
40}
41
42impl ProcedureEvent {
43 pub fn new(
44 procedure_id: ProcedureId,
45 internal_event: Box<dyn Event>,
46 state: ProcedureState,
47 ) -> Self {
48 Self {
49 procedure_id,
50 internal_event,
51 timestamp: Timestamp::current_time(TimeUnit::Nanosecond),
52 state,
53 }
54 }
55}
56
57impl Event for ProcedureEvent {
58 fn event_type(&self) -> &str {
59 self.internal_event.event_type()
60 }
61
62 fn timestamp(&self) -> Timestamp {
63 self.timestamp
64 }
65
66 fn json_payload(&self) -> Result<String> {
67 self.internal_event.json_payload()
68 }
69
70 fn extra_schema(&self) -> Vec<ColumnSchema> {
71 let mut schema = vec![
72 ColumnSchema {
73 column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(),
74 datatype: ColumnDataType::String.into(),
75 semantic_type: SemanticType::Field.into(),
76 ..Default::default()
77 },
78 ColumnSchema {
79 column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(),
80 datatype: ColumnDataType::String.into(),
81 semantic_type: SemanticType::Field.into(),
82 ..Default::default()
83 },
84 ColumnSchema {
85 column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(),
86 datatype: ColumnDataType::String.into(),
87 semantic_type: SemanticType::Field.into(),
88 ..Default::default()
89 },
90 ];
91 schema.append(&mut self.internal_event.extra_schema());
92 schema
93 }
94
95 fn extra_row(&self) -> Result<Row> {
96 let error_str = match &self.state {
97 ProcedureState::Failed { error } => format!("{:?}", error),
98 ProcedureState::PrepareRollback { error } => format!("{:?}", error),
99 ProcedureState::RollingBack { error } => format!("{:?}", error),
100 ProcedureState::Retrying { error } => format!("{:?}", error),
101 ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
102 _ => "".to_string(),
103 };
104 let mut row = vec![
105 ValueData::StringValue(self.procedure_id.to_string()).into(),
106 ValueData::StringValue(self.state.as_str_name().to_string()).into(),
107 ValueData::StringValue(error_str).into(),
108 ];
109 row.append(&mut self.internal_event.extra_row()?.values);
110 Ok(Row { values: row })
111 }
112
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116}