mod handle_alter;
mod handle_catchup;
mod handle_close;
mod handle_compaction;
mod handle_create;
mod handle_drop;
mod handle_flush;
mod handle_manifest;
mod handle_open;
mod handle_truncate;
mod handle_write;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use prometheus::IntGauge;
use rand::{rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::Wal;
use crate::worker::handle_manifest::RegionEditQueues;
pub(crate) type WorkerId = u32;
pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
#[cfg_attr(doc, aquamarine::aquamarine)]
pub(crate) struct WorkerGroup {
workers: Vec<RegionWorker>,
flush_job_pool: SchedulerRef,
compact_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
cache_manager: CacheManagerRef,
}
impl WorkerGroup {
pub(crate) async fn start<S: LogStore>(
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<WorkerGroup> {
let (flush_sender, flush_receiver) = watch::channel(());
let write_buffer_manager = Arc::new(
WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
.with_notifier(flush_sender.clone()),
);
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
.await?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
.index_content_size(config.index.content_cache_size.as_bytes())
.index_content_page_size(config.index.content_cache_page_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
plugins: plugins.clone(),
schema_metadata_manager: schema_metadata_manager.clone(),
}
.start()
})
.collect();
Ok(WorkerGroup {
workers,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
}
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");
self.compact_job_pool.stop(true).await?;
self.flush_job_pool.stop(true).await?;
self.purge_scheduler.stop(true).await?;
try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
Ok(())
}
pub(crate) async fn submit_to_worker(
&self,
region_id: RegionId,
request: WorkerRequest,
) -> Result<()> {
self.worker(region_id).submit_request(request).await
}
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
self.worker(region_id).is_region_exists(region_id)
}
pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
self.worker(region_id).is_region_opening(region_id)
}
pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.worker(region_id).get_region(region_id)
}
pub(crate) fn cache_manager(&self) -> CacheManagerRef {
self.cache_manager.clone()
}
pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
let index = region_id_to_index(region_id, self.workers.len());
&self.workers[index]
}
}
#[cfg(any(test, feature = "test"))]
impl WorkerGroup {
pub(crate) async fn start_for_test<S: LogStore>(
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
schema_metadata_manager: SchemaMetadataManagerRef,
time_provider: TimeProviderRef,
) -> Result<WorkerGroup> {
let (flush_sender, flush_receiver) = watch::channel(());
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(
WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
.with_notifier(flush_sender.clone()),
)
});
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let write_cache = write_cache_from_config(
&config,
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
.await?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
id: id as WorkerId,
config: config.clone(),
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
plugins: Plugins::new(),
schema_metadata_manager: schema_metadata_manager.clone(),
}
.start()
})
.collect();
Ok(WorkerGroup {
workers,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
}
pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
&self.purge_scheduler
}
}
fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
% num_workers
}
async fn write_cache_from_config(
config: &MitoConfig,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
if !config.enable_write_cache {
return Ok(None);
}
tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
.await
.context(CreateDirSnafu {
dir: &config.write_cache_path,
})?;
let cache = WriteCache::new_fs(
&config.write_cache_path,
config.write_cache_size,
config.write_cache_ttl,
puffin_manager_factory,
intermediate_manager,
)
.await?;
Ok(Some(Arc::new(cache)))
}
pub(crate) fn worker_init_check_delay() -> Duration {
let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
Duration::from_secs(init_check_delay)
}
struct WorkerStarter<S> {
id: WorkerId,
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
compact_job_pool: SchedulerRef,
flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
flush_sender: watch::Sender<()>,
flush_receiver: watch::Receiver<()>,
plugins: Plugins,
schema_metadata_manager: SchemaMetadataManagerRef,
}
impl<S: LogStore> WorkerStarter<S> {
fn start(self) -> RegionWorker {
let regions = Arc::new(RegionMap::default());
let opening_regions = Arc::new(OpeningRegions::default());
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
let running = Arc::new(AtomicBool::new(true));
let now = self.time_provider.current_time_millis();
let id_string = self.id.to_string();
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config.clone(),
regions: regions.clone(),
dropping_regions: Arc::new(RegionMap::default()),
opening_regions: opening_regions.clone(),
sender: sender.clone(),
receiver,
wal: Wal::new(self.log_store),
object_store_manager: self.object_store_manager.clone(),
running: running.clone(),
memtable_builder_provider: MemtableBuilderProvider::new(
Some(self.write_buffer_manager.clone()),
self.config.clone(),
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config,
self.listener.clone(),
self.plugins.clone(),
),
stalled_requests: StalledRequests::default(),
listener: self.listener,
cache_manager: self.cache_manager,
puffin_manager_factory: self.puffin_manager_factory,
intermediate_manager: self.intermediate_manager,
time_provider: self.time_provider,
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
schema_metadata_manager: self.schema_metadata_manager,
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
});
RegionWorker {
id: self.id,
regions,
opening_regions,
sender,
handle: Mutex::new(Some(handle)),
running,
}
}
}
pub(crate) struct RegionWorker {
id: WorkerId,
regions: RegionMapRef,
opening_regions: OpeningRegionsRef,
sender: Sender<WorkerRequest>,
handle: Mutex<Option<JoinHandle<()>>>,
running: Arc<AtomicBool>,
}
impl RegionWorker {
async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
if self.sender.send(request).await.is_err() {
warn!(
"Worker {} is already exited but the running flag is still true",
self.id
);
self.set_running(false);
return WorkerStoppedSnafu { id: self.id }.fail();
}
Ok(())
}
async fn stop(&self) -> Result<()> {
let handle = self.handle.lock().await.take();
if let Some(handle) = handle {
info!("Stop region worker {}", self.id);
self.set_running(false);
if self.sender.send(WorkerRequest::Stop).await.is_err() {
warn!("Worker {} is already exited before stop", self.id);
}
handle.await.context(JoinSnafu)?;
}
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
fn set_running(&self, value: bool) {
self.running.store(value, Ordering::Relaxed)
}
fn is_region_exists(&self, region_id: RegionId) -> bool {
self.regions.is_region_exists(region_id)
}
fn is_region_opening(&self, region_id: RegionId) -> bool {
self.opening_regions.is_region_exists(region_id)
}
fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.regions.get_region(region_id)
}
#[cfg(test)]
pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
&self.opening_regions
}
}
impl Drop for RegionWorker {
fn drop(&mut self) {
if self.is_running() {
self.set_running(false);
}
}
}
type RequestBuffer = Vec<WorkerRequest>;
#[derive(Default)]
pub(crate) struct StalledRequests {
pub(crate) requests: HashMap<RegionId, (usize, Vec<SenderWriteRequest>)>,
pub(crate) estimated_size: usize,
}
impl StalledRequests {
pub(crate) fn append(&mut self, requests: &mut Vec<SenderWriteRequest>) {
for req in requests.drain(..) {
self.push(req);
}
}
pub(crate) fn push(&mut self, req: SenderWriteRequest) {
let (size, requests) = self.requests.entry(req.request.region_id).or_default();
let req_size = req.request.estimated_size();
*size += req_size;
self.estimated_size += req_size;
requests.push(req);
}
pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec<SenderWriteRequest> {
if let Some((size, requests)) = self.requests.remove(region_id) {
self.estimated_size -= size;
requests
} else {
vec![]
}
}
pub(crate) fn stalled_count(&self) -> usize {
self.requests.values().map(|reqs| reqs.1.len()).sum()
}
}
struct RegionWorkerLoop<S> {
id: WorkerId,
config: Arc<MitoConfig>,
regions: RegionMapRef,
dropping_regions: RegionMapRef,
opening_regions: OpeningRegionsRef,
sender: Sender<WorkerRequest>,
receiver: Receiver<WorkerRequest>,
wal: Wal<S>,
object_store_manager: ObjectStoreManagerRef,
running: Arc<AtomicBool>,
memtable_builder_provider: MemtableBuilderProvider,
purge_scheduler: SchedulerRef,
write_buffer_manager: WriteBufferManagerRef,
flush_scheduler: FlushScheduler,
compaction_scheduler: CompactionScheduler,
stalled_requests: StalledRequests,
listener: WorkerListener,
cache_manager: CacheManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
last_periodical_check_millis: i64,
flush_sender: watch::Sender<()>,
flush_receiver: watch::Receiver<()>,
stalled_count: IntGauge,
region_count: IntGauge,
region_edit_queues: RegionEditQueues,
schema_metadata_manager: SchemaMetadataManagerRef,
}
impl<S: LogStore> RegionWorkerLoop<S> {
async fn run(&mut self) {
let init_check_delay = worker_init_check_delay();
info!(
"Start region worker thread {}, init_check_delay: {:?}",
self.id, init_check_delay
);
self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
let mut write_req_buffer: Vec<SenderWriteRequest> =
Vec::with_capacity(self.config.worker_request_batch_size);
let mut ddl_req_buffer: Vec<SenderDdlRequest> =
Vec::with_capacity(self.config.worker_request_batch_size);
let mut general_req_buffer: Vec<WorkerRequest> =
RequestBuffer::with_capacity(self.config.worker_request_batch_size);
while self.running.load(Ordering::Relaxed) {
write_req_buffer.clear();
ddl_req_buffer.clear();
general_req_buffer.clear();
let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
let sleep = tokio::time::sleep(max_wait_time);
tokio::pin!(sleep);
tokio::select! {
request_opt = self.receiver.recv() => {
match request_opt {
Some(request) => match request {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
_ => general_req_buffer.push(request),
},
None => break,
}
}
recv_res = self.flush_receiver.changed() => {
if recv_res.is_err() {
break;
} else {
self.maybe_flush_worker();
self.handle_stalled_requests().await;
continue;
}
}
_ = &mut sleep => {
self.handle_periodical_tasks();
continue;
}
}
if self.flush_receiver.has_changed().unwrap_or(false) {
self.handle_stalled_requests().await;
}
for _ in 1..self.config.worker_request_batch_size {
match self.receiver.try_recv() {
Ok(req) => match req {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
_ => general_req_buffer.push(req),
},
Err(_) => break,
}
}
self.listener.on_recv_requests(
write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
);
self.handle_requests(
&mut write_req_buffer,
&mut ddl_req_buffer,
&mut general_req_buffer,
)
.await;
self.handle_periodical_tasks();
}
self.clean().await;
info!("Exit region worker thread {}", self.id);
}
async fn handle_requests(
&mut self,
write_requests: &mut Vec<SenderWriteRequest>,
ddl_requests: &mut Vec<SenderDdlRequest>,
general_requests: &mut Vec<WorkerRequest>,
) {
for worker_req in general_requests.drain(..) {
match worker_req {
WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
continue;
}
WorkerRequest::Background { region_id, notify } => {
self.handle_background_notify(region_id, notify).await;
}
WorkerRequest::SetRegionRoleStateGracefully {
region_id,
region_role_state,
sender,
} => {
self.set_role_state_gracefully(region_id, region_role_state, sender)
.await;
}
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
}
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
}
WorkerRequest::SyncRegion(req) => {
self.handle_region_sync(req).await;
}
}
}
self.handle_write_requests(write_requests, true).await;
self.handle_ddl_requests(ddl_requests).await;
}
async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
if ddl_requests.is_empty() {
return;
}
for ddl in ddl_requests.drain(..) {
let res = match ddl.request {
DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
DdlRequest::Open((req, wal_entry_receiver)) => {
self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
.await;
continue;
}
DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
DdlRequest::Alter(req) => {
self.handle_alter_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Flush(req) => {
self.handle_flush_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Compact(req) => {
self.handle_compaction_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Truncate(_) => {
self.handle_truncate_request(ddl.region_id, ddl.sender)
.await;
continue;
}
DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
};
ddl.sender.send(res);
}
}
fn handle_periodical_tasks(&mut self) {
let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
if self
.time_provider
.elapsed_since(self.last_periodical_check_millis)
< interval
{
return;
}
self.last_periodical_check_millis = self.time_provider.current_time_millis();
if let Err(e) = self.flush_periodically() {
error!(e; "Failed to flush regions periodically");
}
}
async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
match notify {
BackgroundNotify::FlushFinished(req) => {
self.handle_flush_finished(region_id, req).await
}
BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
BackgroundNotify::CompactionFinished(req) => {
self.handle_compaction_finished(region_id, req).await
}
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
BackgroundNotify::RegionChange(req) => {
self.handle_manifest_region_change_result(req).await
}
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}
async fn set_role_state_gracefully(
&mut self,
region_id: RegionId,
region_role_state: SettableRegionRoleState,
sender: oneshot::Sender<SetRegionRoleStateResponse>,
) {
if let Some(region) = self.regions.get_region(region_id) {
common_runtime::spawn_global(async move {
region.set_role_state_gracefully(region_role_state).await;
let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetRegionRoleStateResponse::success(Some(last_entry_id)));
});
} else {
let _ = sender.send(SetRegionRoleStateResponse::NotFound);
}
}
}
impl<S> RegionWorkerLoop<S> {
async fn clean(&self) {
let regions = self.regions.list_regions();
for region in regions {
region.stop().await;
}
self.regions.clear();
}
fn notify_group(&mut self) {
let _ = self.flush_sender.send(());
self.flush_receiver.borrow_and_update();
}
}
#[derive(Default, Clone)]
pub(crate) struct WorkerListener {
#[cfg(any(test, feature = "test"))]
listener: Option<crate::engine::listener::EventListenerRef>,
}
impl WorkerListener {
#[cfg(any(test, feature = "test"))]
pub(crate) fn new(
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerListener {
WorkerListener { listener }
}
pub(crate) fn on_flush_success(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_flush_success(region_id);
}
let _ = region_id;
}
pub(crate) fn on_write_stall(&self) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_write_stall();
}
}
pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_flush_begin(region_id).await;
}
let _ = region_id;
}
pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
return listener.on_later_drop_begin(region_id);
}
let _ = region_id;
None
}
pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_later_drop_end(region_id, removed);
}
let _ = region_id;
let _ = removed;
}
pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_merge_ssts_finished(region_id).await;
}
let _ = region_id;
}
pub(crate) fn on_recv_requests(&self, request_num: usize) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_recv_requests(request_num);
}
let _ = request_num;
}
pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_file_cache_filled(_file_id);
}
}
pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_compaction_scheduled(_region_id);
}
}
pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener
.on_notify_region_change_result_begin(_region_id)
.await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::TestEnv;
#[test]
fn test_region_id_to_index() {
let num_workers = 4;
let region_id = RegionId::new(1, 2);
let index = region_id_to_index(region_id, num_workers);
assert_eq!(index, 3);
let region_id = RegionId::new(2, 3);
let index = region_id_to_index(region_id, num_workers);
assert_eq!(index, 1);
}
#[tokio::test]
async fn test_worker_group_start_stop() {
let env = TestEnv::with_prefix("group-stop");
let group = env
.create_worker_group(MitoConfig {
num_workers: 4,
..Default::default()
})
.await;
group.stop().await.unwrap();
}
}