common_procedure/
event.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::value::ValueData;
18use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
19use common_event_recorder::error::Result;
20use common_event_recorder::Event;
21use common_time::timestamp::{TimeUnit, Timestamp};
22
23use crate::{ProcedureId, ProcedureState};
24
25pub const EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME: &str = "procedure_id";
26pub const EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME: &str = "procedure_state";
27pub const EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME: &str = "procedure_error";
28
29/// `ProcedureEvent` represents an event emitted by a procedure during its execution lifecycle.
30#[derive(Debug)]
31pub struct ProcedureEvent {
32    /// Unique identifier associated with the originating procedure instance.
33    pub procedure_id: ProcedureId,
34    /// The timestamp of the event.
35    pub timestamp: Timestamp,
36    /// The state of the procedure.
37    pub state: ProcedureState,
38    /// The event emitted by the procedure. It's generated by [Procedure::event].
39    pub internal_event: Box<dyn Event>,
40}
41
42impl ProcedureEvent {
43    pub fn new(
44        procedure_id: ProcedureId,
45        internal_event: Box<dyn Event>,
46        state: ProcedureState,
47    ) -> Self {
48        Self {
49            procedure_id,
50            internal_event,
51            timestamp: Timestamp::current_time(TimeUnit::Nanosecond),
52            state,
53        }
54    }
55}
56
57impl Event for ProcedureEvent {
58    fn event_type(&self) -> &str {
59        self.internal_event.event_type()
60    }
61
62    fn timestamp(&self) -> Timestamp {
63        self.timestamp
64    }
65
66    fn json_payload(&self) -> Result<String> {
67        self.internal_event.json_payload()
68    }
69
70    fn extra_schema(&self) -> Vec<ColumnSchema> {
71        let mut schema = vec![
72            ColumnSchema {
73                column_name: EVENTS_TABLE_PROCEDURE_ID_COLUMN_NAME.to_string(),
74                datatype: ColumnDataType::String.into(),
75                semantic_type: SemanticType::Field.into(),
76                ..Default::default()
77            },
78            ColumnSchema {
79                column_name: EVENTS_TABLE_PROCEDURE_STATE_COLUMN_NAME.to_string(),
80                datatype: ColumnDataType::String.into(),
81                semantic_type: SemanticType::Field.into(),
82                ..Default::default()
83            },
84            ColumnSchema {
85                column_name: EVENTS_TABLE_PROCEDURE_ERROR_COLUMN_NAME.to_string(),
86                datatype: ColumnDataType::String.into(),
87                semantic_type: SemanticType::Field.into(),
88                ..Default::default()
89            },
90        ];
91        schema.append(&mut self.internal_event.extra_schema());
92        schema
93    }
94
95    fn extra_row(&self) -> Result<Row> {
96        let error_str = match &self.state {
97            ProcedureState::Failed { error } => format!("{:?}", error),
98            ProcedureState::PrepareRollback { error } => format!("{:?}", error),
99            ProcedureState::RollingBack { error } => format!("{:?}", error),
100            ProcedureState::Retrying { error } => format!("{:?}", error),
101            ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
102            _ => "".to_string(),
103        };
104        let mut row = vec![
105            ValueData::StringValue(self.procedure_id.to_string()).into(),
106            ValueData::StringValue(self.state.as_str_name().to_string()).into(),
107            ValueData::StringValue(error_str).into(),
108        ];
109        row.append(&mut self.internal_event.extra_row()?.values);
110        Ok(Row { values: row })
111    }
112
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116}