meta_srv/
cache_invalidator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::MailboxMessage;
16use async_trait::async_trait;
17use common_error::ext::BoxedError;
18use common_meta::cache_invalidator::{CacheInvalidator, Context};
19use common_meta::error::{self as meta_error, Result as MetaResult};
20use common_meta::instruction::{CacheIdent, Instruction};
21use common_telemetry::tracing_context::TracingContext;
22use snafu::ResultExt;
23
24use crate::metasrv::MetasrvInfo;
25use crate::service::mailbox::{BroadcastChannel, MailboxRef};
26
27const DEFAULT_SUBJECT: &str = "Invalidate table";
28
29pub struct MetasrvCacheInvalidator {
30    mailbox: MailboxRef,
31    // Metasrv infos
32    info: MetasrvInfo,
33}
34
35impl MetasrvCacheInvalidator {
36    pub fn new(mailbox: MailboxRef, info: MetasrvInfo) -> Self {
37        Self { mailbox, info }
38    }
39}
40
41impl MetasrvCacheInvalidator {
42    async fn broadcast(&self, ctx: &Context, instruction: Instruction) -> MetaResult<()> {
43        let subject = &ctx
44            .subject
45            .clone()
46            .unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
47
48        let tracing_ctx = TracingContext::from_current_span();
49        let mut msg = MailboxMessage::json_message(
50            subject,
51            &format!("Metasrv@{}", self.info.server_addr),
52            "Frontend broadcast",
53            common_time::util::current_time_millis(),
54            &instruction,
55            Some(tracing_ctx.to_w3c()),
56        )
57        .with_context(|_| meta_error::SerdeJsonSnafu)?;
58
59        self.mailbox
60            .broadcast(&BroadcastChannel::Frontend, &msg)
61            .await
62            .map_err(BoxedError::new)
63            .context(meta_error::ExternalSnafu)?;
64
65        msg.to = "Datanode broadcast".to_string();
66        self.mailbox
67            .broadcast(&BroadcastChannel::Datanode, &msg)
68            .await
69            .map_err(BoxedError::new)
70            .context(meta_error::ExternalSnafu)?;
71
72        msg.to = "Flownode broadcast".to_string();
73        self.mailbox
74            .broadcast(&BroadcastChannel::Flownode, &msg)
75            .await
76            .map_err(BoxedError::new)
77            .context(meta_error::ExternalSnafu)
78    }
79}
80
81#[async_trait]
82impl CacheInvalidator for MetasrvCacheInvalidator {
83    async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> MetaResult<()> {
84        let instruction = Instruction::InvalidateCaches(caches.to_vec());
85        self.broadcast(ctx, instruction).await
86    }
87}