meta_srv/
cache_invalidator.rs1use api::v1::meta::MailboxMessage;
16use async_trait::async_trait;
17use common_error::ext::BoxedError;
18use common_meta::cache_invalidator::{CacheInvalidator, Context};
19use common_meta::error::{self as meta_error, Result as MetaResult};
20use common_meta::instruction::{CacheIdent, Instruction};
21use common_telemetry::tracing_context::TracingContext;
22use snafu::ResultExt;
23
24use crate::metasrv::MetasrvInfo;
25use crate::service::mailbox::{BroadcastChannel, MailboxRef};
26
27const DEFAULT_SUBJECT: &str = "Invalidate table";
28
29pub struct MetasrvCacheInvalidator {
30 mailbox: MailboxRef,
31 info: MetasrvInfo,
33}
34
35impl MetasrvCacheInvalidator {
36 pub fn new(mailbox: MailboxRef, info: MetasrvInfo) -> Self {
37 Self { mailbox, info }
38 }
39}
40
41impl MetasrvCacheInvalidator {
42 async fn broadcast(&self, ctx: &Context, instruction: Instruction) -> MetaResult<()> {
43 let subject = &ctx
44 .subject
45 .clone()
46 .unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
47
48 let tracing_ctx = TracingContext::from_current_span();
49 let mut msg = MailboxMessage::json_message(
50 subject,
51 &format!("Metasrv@{}", self.info.server_addr),
52 "Frontend broadcast",
53 common_time::util::current_time_millis(),
54 &instruction,
55 Some(tracing_ctx.to_w3c()),
56 )
57 .with_context(|_| meta_error::SerdeJsonSnafu)?;
58
59 self.mailbox
60 .broadcast(&BroadcastChannel::Frontend, &msg)
61 .await
62 .map_err(BoxedError::new)
63 .context(meta_error::ExternalSnafu)?;
64
65 msg.to = "Datanode broadcast".to_string();
66 self.mailbox
67 .broadcast(&BroadcastChannel::Datanode, &msg)
68 .await
69 .map_err(BoxedError::new)
70 .context(meta_error::ExternalSnafu)?;
71
72 msg.to = "Flownode broadcast".to_string();
73 self.mailbox
74 .broadcast(&BroadcastChannel::Flownode, &msg)
75 .await
76 .map_err(BoxedError::new)
77 .context(meta_error::ExternalSnafu)
78 }
79}
80
81#[async_trait]
82impl CacheInvalidator for MetasrvCacheInvalidator {
83 async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> MetaResult<()> {
84 let instruction = Instruction::InvalidateCaches(caches.to_vec());
85 self.broadcast(ctx, instruction).await
86 }
87}