1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::column_data_type_extension::TypeExt;
22use api::v1::value::ValueData;
23use api::v1::{
24 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25 RowInsertRequest, RowInsertRequests, Rows, SemanticType,
26};
27use async_trait::async_trait;
28use backon::{BackoffBuilder, ExponentialBuilder};
29use common_telemetry::{debug, error, info, warn};
30use common_time::timestamp::{TimeUnit, Timestamp};
31use humantime::format_duration;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
35use tokio::sync::mpsc::{Receiver, Sender, channel};
36use tokio::task::JoinHandle;
37use tokio::time::sleep;
38use tokio_util::sync::CancellationToken;
39
40use crate::error::{MismatchedSchemaSnafu, Result};
41
42pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events";
44
45pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type";
47pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload";
49pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
51
52pub type EventRecorderRef = Arc<dyn EventRecorder>;
54
55pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
57const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
59pub const DEFAULT_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
61const DEFAULT_CHANNEL_SIZE: usize = 2048;
63const DEFAULT_BUFFER_SIZE: usize = 100;
65const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
67
68pub trait Event: Send + Sync + Debug {
77 fn table_name(&self) -> &str {
79 DEFAULT_EVENTS_TABLE_NAME
80 }
81
82 fn event_type(&self) -> &str;
84
85 fn timestamp(&self) -> Timestamp {
87 Timestamp::current_time(TimeUnit::Nanosecond)
88 }
89
90 fn json_payload(&self) -> Result<String> {
92 Ok("".to_string())
93 }
94
95 fn extra_schema(&self) -> Vec<ColumnSchema> {
97 vec![]
98 }
99
100 fn extra_row(&self) -> Result<Row> {
102 Ok(Row { values: vec![] })
103 }
104
105 fn as_any(&self) -> &dyn Any;
107}
108
109pub trait Eventable: Send + Sync + Debug {
111 fn to_event(&self) -> Option<Box<dyn Event>> {
113 None
114 }
115}
116
117#[allow(clippy::borrowed_box)]
119pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
120 events
121 .iter()
122 .into_grouping_map_by(|event| event.event_type())
123 .collect()
124}
125
126#[allow(clippy::borrowed_box)]
128pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
129 validate_events(events)?;
131
132 let event = &events[0];
134 let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
135 schema.extend(vec![
136 ColumnSchema {
137 column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
138 datatype: ColumnDataType::String.into(),
139 semantic_type: SemanticType::Tag.into(),
140 ..Default::default()
141 },
142 ColumnSchema {
143 column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
144 datatype: ColumnDataType::Binary as i32,
145 semantic_type: SemanticType::Field as i32,
146 datatype_extension: Some(ColumnDataTypeExtension {
147 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
148 }),
149 ..Default::default()
150 },
151 ColumnSchema {
152 column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
153 datatype: ColumnDataType::TimestampNanosecond.into(),
154 semantic_type: SemanticType::Timestamp.into(),
155 ..Default::default()
156 },
157 ]);
158 schema.extend(event.extra_schema());
159
160 let mut rows: Vec<Row> = Vec::with_capacity(events.len());
161 for event in events {
162 let extra_row = event.extra_row()?;
163 let mut values = Vec::with_capacity(3 + extra_row.values.len());
164 values.extend([
165 ValueData::StringValue(event.event_type().to_string()).into(),
166 ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
167 ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
168 ]);
169 values.extend(extra_row.values);
170 rows.push(Row { values });
171 }
172
173 Ok(RowInsertRequests {
174 inserts: vec![RowInsertRequest {
175 table_name: event.table_name().to_string(),
176 rows: Some(Rows { schema, rows }),
177 }],
178 })
179}
180
181#[allow(clippy::borrowed_box)]
183fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
184 let extra_schema = events[0].extra_schema();
186 for event in events {
187 if event.extra_schema() != extra_schema {
188 MismatchedSchemaSnafu {
189 expected: extra_schema.clone(),
190 actual: event.extra_schema(),
191 }
192 .fail()?;
193 }
194 }
195 Ok(())
196}
197
198pub trait EventRecorder: Send + Sync + Debug + 'static {
200 fn record(&self, event: Box<dyn Event>);
202
203 fn close(&self);
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
209pub struct EventHandlerOptions {
210 pub ttl: Duration,
212 pub append_mode: bool,
214}
215
216impl Default for EventHandlerOptions {
217 fn default() -> Self {
218 Self {
219 ttl: DEFAULT_EVENTS_TABLE_TTL,
220 append_mode: true,
221 }
222 }
223}
224
225impl EventHandlerOptions {
226 pub fn to_hints(&self) -> Vec<(&str, String)> {
228 vec![
229 (TTL_KEY, format_duration(self.ttl).to_string()),
230 (APPEND_MODE_KEY, self.append_mode.to_string()),
231 ]
232 }
233}
234
235#[async_trait]
237pub trait EventHandler: Send + Sync + 'static {
238 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
245pub struct EventRecorderOptions {
246 #[serde(with = "humantime_serde")]
248 pub ttl: Duration,
249}
250
251impl Default for EventRecorderOptions {
252 fn default() -> Self {
253 Self {
254 ttl: DEFAULT_EVENTS_TABLE_TTL,
255 }
256 }
257}
258
259#[derive(Debug)]
261pub struct EventRecorderImpl {
262 tx: Sender<Box<dyn Event>>,
264 cancel_token: CancellationToken,
266 handle: Option<JoinHandle<()>>,
268}
269
270impl EventRecorderImpl {
271 pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
272 let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
273 let cancel_token = CancellationToken::new();
274
275 let mut recorder = Self {
276 tx,
277 handle: None,
278 cancel_token: cancel_token.clone(),
279 };
280
281 let processor = EventProcessor::new(
282 rx,
283 event_handler,
284 DEFAULT_FLUSH_INTERVAL_SECONDS,
285 DEFAULT_MAX_RETRY_TIMES,
286 )
287 .with_cancel_token(cancel_token);
288
289 let handle = tokio::spawn(async move {
291 processor.process(DEFAULT_BUFFER_SIZE).await;
292 });
293
294 recorder.handle = Some(handle);
295
296 recorder
297 }
298}
299
300impl EventRecorder for EventRecorderImpl {
301 fn record(&self, event: Box<dyn Event>) {
303 if let Err(e) = self.tx.try_send(event) {
304 error!("Failed to send event to the background processor: {}", e);
305 }
306 }
307
308 fn close(&self) {
310 self.cancel_token.cancel();
311 }
312}
313
314impl Drop for EventRecorderImpl {
315 fn drop(&mut self) {
316 if let Some(handle) = self.handle.take() {
317 handle.abort();
318 info!("Aborted the background processor in event recorder");
319 }
320 }
321}
322
323struct EventProcessor {
324 rx: Receiver<Box<dyn Event>>,
325 event_handler: Box<dyn EventHandler>,
326 max_retry_times: u64,
327 process_interval: Duration,
328 cancel_token: CancellationToken,
329}
330
331impl EventProcessor {
332 fn new(
333 rx: Receiver<Box<dyn Event>>,
334 event_handler: Box<dyn EventHandler>,
335 process_interval: Duration,
336 max_retry_times: u64,
337 ) -> Self {
338 Self {
339 rx,
340 event_handler,
341 max_retry_times,
342 process_interval,
343 cancel_token: CancellationToken::new(),
344 }
345 }
346
347 fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
348 self.cancel_token = cancel_token;
349 self
350 }
351
352 async fn process(mut self, buffer_size: usize) {
353 info!("Start the background processor in event recorder to handle the received events.");
354
355 let mut buffer = Vec::with_capacity(buffer_size);
356 let mut interval = tokio::time::interval(self.process_interval);
357
358 loop {
359 tokio::select! {
360 maybe_event = self.rx.recv() => {
361 if let Some(maybe_event) = maybe_event {
362 debug!("Received event: {:?}", maybe_event);
363
364 if buffer.len() >= buffer_size {
365 debug!(
366 "Flushing events to the event handler because the buffer is full with {} events",
367 buffer.len()
368 );
369 self.flush_events_to_handler(&mut buffer).await;
370 }
371
372 buffer.push(maybe_event);
374 } else {
375 self.flush_events_to_handler(&mut buffer).await;
377 break;
378 }
379 }
380 _ = self.cancel_token.cancelled() => {
382 warn!("Received a cancel signal, flushing the buffer and exiting the loop");
383 self.flush_events_to_handler(&mut buffer).await;
384 break;
385 }
386 _ = interval.tick() => {
388 self.flush_events_to_handler(&mut buffer).await;
389 }
390 }
391 }
392 }
393
394 async fn flush_events_to_handler(&self, buffer: &mut Vec<Box<dyn Event>>) {
396 if !buffer.is_empty() {
397 debug!("Flushing {} events to the event handler", buffer.len());
398
399 let mut backoff = ExponentialBuilder::default()
400 .with_min_delay(Duration::from_millis(
401 DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1),
402 ))
403 .with_max_delay(Duration::from_millis(
404 DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64,
405 ))
406 .with_max_times(self.max_retry_times as usize)
407 .build();
408
409 loop {
410 match self.event_handler.handle(buffer).await {
411 Ok(()) => {
412 debug!("Successfully handled {} events", buffer.len());
413 break;
414 }
415 Err(e) => {
416 if let Some(d) = backoff.next() {
417 warn!(e; "Failed to handle events, retrying...");
418 sleep(d).await;
419 continue;
420 } else {
421 warn!(
422 e; "Failed to handle events after {} retries",
423 self.max_retry_times
424 );
425 break;
426 }
427 }
428 }
429 }
430 }
431
432 buffer.clear();
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 #[derive(Debug)]
442 struct TestEvent {}
443
444 impl Event for TestEvent {
445 fn event_type(&self) -> &str {
446 "test_event"
447 }
448
449 fn json_payload(&self) -> Result<String> {
450 Ok("{\"procedure_id\": \"1234567890\"}".to_string())
451 }
452
453 fn as_any(&self) -> &dyn Any {
454 self
455 }
456 }
457
458 struct TestEventHandlerImpl {}
459
460 #[async_trait]
461 impl EventHandler for TestEventHandlerImpl {
462 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
463 let event = events
464 .first()
465 .unwrap()
466 .as_any()
467 .downcast_ref::<TestEvent>()
468 .unwrap();
469 assert_eq!(
470 event.json_payload().unwrap(),
471 "{\"procedure_id\": \"1234567890\"}"
472 );
473 assert_eq!(event.event_type(), "test_event");
474 Ok(())
475 }
476 }
477
478 #[tokio::test]
479 async fn test_event_recorder() {
480 let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
481 event_recorder.record(Box::new(TestEvent {}));
482
483 sleep(Duration::from_millis(500)).await;
485
486 event_recorder.close();
488
489 sleep(Duration::from_millis(500)).await;
491
492 if let Some(handle) = event_recorder.handle.take() {
493 assert!(handle.await.is_ok());
494 }
495 }
496
497 struct TestEventHandlerImplShouldPanic {}
498
499 #[async_trait]
500 impl EventHandler for TestEventHandlerImplShouldPanic {
501 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
502 let event = events
503 .first()
504 .unwrap()
505 .as_any()
506 .downcast_ref::<TestEvent>()
507 .unwrap();
508
509 assert_eq!(
511 event.json_payload().unwrap(),
512 "{\"procedure_id\": \"should_panic\"}"
513 );
514 assert_eq!(event.event_type(), "should_panic");
515 Ok(())
516 }
517 }
518
519 #[tokio::test]
520 async fn test_event_recorder_should_panic() {
521 let mut event_recorder =
522 EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
523
524 event_recorder.record(Box::new(TestEvent {}));
525
526 sleep(Duration::from_millis(500)).await;
528
529 event_recorder.close();
531
532 sleep(Duration::from_millis(500)).await;
534
535 if let Some(handle) = event_recorder.handle.take() {
536 assert!(handle.await.unwrap_err().is_panic());
537 }
538 }
539
540 #[derive(Debug)]
541 struct TestEventA {}
542
543 impl Event for TestEventA {
544 fn event_type(&self) -> &str {
545 "A"
546 }
547
548 fn as_any(&self) -> &dyn Any {
549 self
550 }
551 }
552
553 #[derive(Debug)]
554 struct TestEventB {}
555
556 impl Event for TestEventB {
557 fn table_name(&self) -> &str {
558 "table_B"
559 }
560
561 fn event_type(&self) -> &str {
562 "B"
563 }
564
565 fn as_any(&self) -> &dyn Any {
566 self
567 }
568 }
569
570 #[derive(Debug)]
571 struct TestEventC {}
572
573 impl Event for TestEventC {
574 fn table_name(&self) -> &str {
575 "table_C"
576 }
577
578 fn event_type(&self) -> &str {
579 "C"
580 }
581
582 fn as_any(&self) -> &dyn Any {
583 self
584 }
585 }
586
587 #[test]
588 fn test_group_events_by_type() {
589 let events: Vec<Box<dyn Event>> = vec![
590 Box::new(TestEventA {}),
591 Box::new(TestEventB {}),
592 Box::new(TestEventA {}),
593 Box::new(TestEventC {}),
594 Box::new(TestEventB {}),
595 Box::new(TestEventC {}),
596 Box::new(TestEventA {}),
597 ];
598
599 let event_groups = group_events_by_type(&events);
600 assert_eq!(event_groups.len(), 3);
601 assert_eq!(event_groups.get("A").unwrap().len(), 3);
602 assert_eq!(event_groups.get("B").unwrap().len(), 2);
603 assert_eq!(event_groups.get("C").unwrap().len(), 2);
604 }
605
606 #[test]
607 fn test_build_row_inserts_request() {
608 let events: Vec<Box<dyn Event>> = vec![
609 Box::new(TestEventA {}),
610 Box::new(TestEventB {}),
611 Box::new(TestEventA {}),
612 Box::new(TestEventC {}),
613 Box::new(TestEventB {}),
614 Box::new(TestEventC {}),
615 Box::new(TestEventA {}),
616 ];
617
618 let event_groups = group_events_by_type(&events);
619 assert_eq!(event_groups.len(), 3);
620 assert_eq!(event_groups.get("A").unwrap().len(), 3);
621 assert_eq!(event_groups.get("B").unwrap().len(), 2);
622 assert_eq!(event_groups.get("C").unwrap().len(), 2);
623
624 for (event_type, events) in event_groups {
625 let row_inserts_request = build_row_inserts_request(&events).unwrap();
626 if event_type == "A" {
627 assert_eq!(row_inserts_request.inserts.len(), 1);
628 assert_eq!(
629 row_inserts_request.inserts[0].table_name,
630 DEFAULT_EVENTS_TABLE_NAME
631 );
632 assert_eq!(
633 row_inserts_request.inserts[0]
634 .rows
635 .as_ref()
636 .unwrap()
637 .rows
638 .len(),
639 3
640 );
641 } else if event_type == "B" {
642 assert_eq!(row_inserts_request.inserts.len(), 1);
643 assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
644 assert_eq!(
645 row_inserts_request.inserts[0]
646 .rows
647 .as_ref()
648 .unwrap()
649 .rows
650 .len(),
651 2
652 );
653 } else if event_type == "C" {
654 assert_eq!(row_inserts_request.inserts.len(), 1);
655 assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
656 assert_eq!(
657 row_inserts_request.inserts[0]
658 .rows
659 .as_ref()
660 .unwrap()
661 .rows
662 .len(),
663 2
664 );
665 } else {
666 panic!("Unexpected event type: {}", event_type);
667 }
668 }
669 }
670}