common_procedure/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
19use common_event_recorder::Event;
20use common_event_recorder::error::Result;
21use common_time::timestamp::{TimeUnit, Timestamp};
22
23use crate::{ProcedureId, ProcedureState};
24
25pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id";
26pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state";
27pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error";
28
29/// `ProcedureEvent` represents an event emitted by a procedure during its execution lifecycle.
30#[derive(Debug)]
31pub struct ProcedureEvent {
32    /// Unique identifier associated with the originating procedure instance.
33    pub procedure_id: ProcedureId,
34    /// The timestamp of the event.
35    pub timestamp: Timestamp,
36    /// The state of the procedure.
37    pub state: ProcedureState,
38    /// The event emitted by the procedure. It's generated by [Procedure::event].
39    pub internal_event: Box<dyn Event>,
40}
41
42impl ProcedureEvent {
43    pub fn new(
44        procedure_id: ProcedureId,
45        internal_event: Box<dyn Event>,
46        state: ProcedureState,
47    ) -> Self {
48        Self {
49            procedure_id,
50            internal_event,
51            timestamp: Timestamp::current_time(TimeUnit::Nanosecond),
52            state,
53        }
54    }
55}
56
57impl Event for ProcedureEvent {
58    fn event_type(&self) -> &str {
59        self.internal_event.event_type()
60    }
61
62    fn timestamp(&self) -> Timestamp {
63        self.timestamp
64    }
65
66    fn json_payload(&self) -> Result<String> {
67        self.internal_event.json_payload()
68    }
69
70    fn extra_schema(&self) -> Vec<ColumnSchema> {
71        let mut schema = vec![
72            ColumnSchema {
73                column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(),
74                datatype: ColumnDataType::String.into(),
75                semantic_type: SemanticType::Field.into(),
76                ..Default::default()
77            },
78            ColumnSchema {
79                column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(),
80                datatype: ColumnDataType::String.into(),
81                semantic_type: SemanticType::Field.into(),
82                ..Default::default()
83            },
84            ColumnSchema {
85                column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(),
86                datatype: ColumnDataType::String.into(),
87                semantic_type: SemanticType::Field.into(),
88                ..Default::default()
89            },
90        ];
91        schema.append(&mut self.internal_event.extra_schema());
92        schema
93    }
94
95    fn extra_rows(&self) -> Result<Vec<Row>> {
96        let mut internal_event_extra_rows = self.internal_event.extra_rows()?;
97        let mut rows = Vec::with_capacity(internal_event_extra_rows.len());
98        for internal_event_extra_row in internal_event_extra_rows.iter_mut() {
99            let error_str = match &self.state {
100                ProcedureState::Failed { error } => format!("{:?}", error),
101                ProcedureState::PrepareRollback { error } => format!("{:?}", error),
102                ProcedureState::RollingBack { error } => format!("{:?}", error),
103                ProcedureState::Retrying { error } => format!("{:?}", error),
104                ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
105                _ => "".to_string(),
106            };
107            let mut values = Vec::with_capacity(3 + internal_event_extra_row.values.len());
108            values.extend([
109                ValueData::StringValue(self.procedure_id.to_string()).into(),
110                ValueData::StringValue(self.state.as_str_name().to_string()).into(),
111                ValueData::StringValue(error_str).into(),
112            ]);
113            values.append(&mut internal_event_extra_row.values);
114            rows.push(Row { values });
115        }
116
117        Ok(rows)
118    }
119
120    fn as_any(&self) -> &dyn Any {
121        self
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use api::v1::value::ValueData;
128    use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
129    use common_event_recorder::Event;
130
131    use crate::{ProcedureEvent, ProcedureId, ProcedureState};
132
133    #[derive(Debug)]
134    struct TestEvent;
135
136    impl Event for TestEvent {
137        fn event_type(&self) -> &str {
138            "test_event"
139        }
140
141        fn extra_schema(&self) -> Vec<ColumnSchema> {
142            vec![ColumnSchema {
143                column_name: "test_event_column".to_string(),
144                datatype: ColumnDataType::String.into(),
145                semantic_type: SemanticType::Field.into(),
146                ..Default::default()
147            }]
148        }
149
150        fn extra_rows(&self) -> common_event_recorder::error::Result<Vec<Row>> {
151            Ok(vec![
152                Row {
153                    values: vec![ValueData::StringValue("test_event1".to_string()).into()],
154                },
155                Row {
156                    values: vec![ValueData::StringValue("test_event2".to_string()).into()],
157                },
158            ])
159        }
160
161        fn as_any(&self) -> &dyn std::any::Any {
162            self
163        }
164    }
165
166    #[test]
167    fn test_procedure_event_extra_rows() {
168        let procedure_event = ProcedureEvent::new(
169            ProcedureId::random(),
170            Box::new(TestEvent {}),
171            ProcedureState::Running,
172        );
173
174        let procedure_event_extra_rows = procedure_event.extra_rows().unwrap();
175        assert_eq!(procedure_event_extra_rows.len(), 2);
176        assert_eq!(procedure_event_extra_rows[0].values.len(), 4);
177        assert_eq!(
178            procedure_event_extra_rows[0].values[3],
179            ValueData::StringValue("test_event1".to_string()).into()
180        );
181        assert_eq!(procedure_event_extra_rows[1].values.len(), 4);
182        assert_eq!(
183            procedure_event_extra_rows[1].values[3],
184            ValueData::StringValue("test_event2".to_string()).into()
185        );
186    }
187}