datanode/
event_listener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::error;
16use store_api::storage::RegionId;
17use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
18
19pub enum RegionServerEvent {
20    Registered(RegionId),
21    Deregistered(RegionId),
22}
23
24pub trait RegionServerEventListener: Sync + Send {
25    /// Called *after* a new region was created/opened.
26    fn on_region_registered(&self, _region_id: RegionId) {}
27
28    /// Called *after* a region was closed.
29    fn on_region_deregistered(&self, _region_id: RegionId) {}
30}
31
32pub type RegionServerEventListenerRef = Box<dyn RegionServerEventListener>;
33
34pub struct NoopRegionServerEventListener;
35
36impl RegionServerEventListener for NoopRegionServerEventListener {}
37
38#[derive(Debug, Clone)]
39pub struct RegionServerEventSender(pub(crate) UnboundedSender<RegionServerEvent>);
40
41impl RegionServerEventListener for RegionServerEventSender {
42    fn on_region_registered(&self, region_id: RegionId) {
43        if let Err(e) = self.0.send(RegionServerEvent::Registered(region_id)) {
44            error!(e; "Failed to send registering region: {region_id} event");
45        }
46    }
47
48    fn on_region_deregistered(&self, region_id: RegionId) {
49        if let Err(e) = self.0.send(RegionServerEvent::Deregistered(region_id)) {
50            error!(e; "Failed to send deregistering region: {region_id} event");
51        }
52    }
53}
54
55pub struct RegionServerEventReceiver(pub(crate) UnboundedReceiver<RegionServerEvent>);
56
57pub fn new_region_server_event_channel() -> (RegionServerEventSender, RegionServerEventReceiver) {
58    let (tx, rx) = mpsc::unbounded_channel();
59
60    (RegionServerEventSender(tx), RegionServerEventReceiver(rx))
61}