1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_meta::key::SchemaMetadataManagerRef;
38use common_runtime::JoinHandle;
39use common_telemetry::{error, info, warn};
40use futures::future::try_join_all;
41use object_store::manager::ObjectStoreManagerRef;
42use prometheus::IntGauge;
43use rand::{rng, Rng};
44use snafu::{ensure, ResultExt};
45use store_api::logstore::LogStore;
46use store_api::region_engine::{
47 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
48};
49use store_api::storage::RegionId;
50use tokio::sync::mpsc::{Receiver, Sender};
51use tokio::sync::{mpsc, oneshot, watch, Mutex};
52
53use crate::cache::write_cache::{WriteCache, WriteCacheRef};
54use crate::cache::{CacheManager, CacheManagerRef};
55use crate::compaction::CompactionScheduler;
56use crate::config::MitoConfig;
57use crate::error;
58use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64 BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
65};
66use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
67use crate::sst::file::FileId;
68use crate::sst::index::intermediate::IntermediateManager;
69use crate::sst::index::puffin_manager::PuffinManagerFactory;
70use crate::time_provider::{StdTimeProvider, TimeProviderRef};
71use crate::wal::Wal;
72use crate::worker::handle_manifest::RegionEditQueues;
73
74pub(crate) type WorkerId = u32;
76
77pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
78
79pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
81pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
83
84#[cfg_attr(doc, aquamarine::aquamarine)]
85pub(crate) struct WorkerGroup {
122 workers: Vec<RegionWorker>,
124 flush_job_pool: SchedulerRef,
126 compact_job_pool: SchedulerRef,
128 purge_scheduler: SchedulerRef,
130 cache_manager: CacheManagerRef,
132}
133
134impl WorkerGroup {
135 pub(crate) async fn start<S: LogStore>(
139 config: Arc<MitoConfig>,
140 log_store: Arc<S>,
141 object_store_manager: ObjectStoreManagerRef,
142 schema_metadata_manager: SchemaMetadataManagerRef,
143 plugins: Plugins,
144 ) -> Result<WorkerGroup> {
145 let (flush_sender, flush_receiver) = watch::channel(());
146 let write_buffer_manager = Arc::new(
147 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
148 .with_notifier(flush_sender.clone()),
149 );
150 let puffin_manager_factory = PuffinManagerFactory::new(
151 &config.index.aux_path,
152 config.index.staging_size.as_bytes(),
153 Some(config.index.write_buffer_size.as_bytes() as _),
154 config.index.staging_ttl,
155 )
156 .await?;
157 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
158 .await?
159 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
160 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
161 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
162 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
164 let write_cache = write_cache_from_config(
165 &config,
166 puffin_manager_factory.clone(),
167 intermediate_manager.clone(),
168 )
169 .await?;
170 let cache_manager = Arc::new(
171 CacheManager::builder()
172 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
173 .vector_cache_size(config.vector_cache_size.as_bytes())
174 .page_cache_size(config.page_cache_size.as_bytes())
175 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
176 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
177 .index_content_size(config.index.content_cache_size.as_bytes())
178 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
179 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
180 .write_cache(write_cache)
181 .build(),
182 );
183 let time_provider = Arc::new(StdTimeProvider);
184
185 let workers = (0..config.num_workers)
186 .map(|id| {
187 WorkerStarter {
188 id: id as WorkerId,
189 config: config.clone(),
190 log_store: log_store.clone(),
191 object_store_manager: object_store_manager.clone(),
192 write_buffer_manager: write_buffer_manager.clone(),
193 flush_job_pool: flush_job_pool.clone(),
194 compact_job_pool: compact_job_pool.clone(),
195 purge_scheduler: purge_scheduler.clone(),
196 listener: WorkerListener::default(),
197 cache_manager: cache_manager.clone(),
198 puffin_manager_factory: puffin_manager_factory.clone(),
199 intermediate_manager: intermediate_manager.clone(),
200 time_provider: time_provider.clone(),
201 flush_sender: flush_sender.clone(),
202 flush_receiver: flush_receiver.clone(),
203 plugins: plugins.clone(),
204 schema_metadata_manager: schema_metadata_manager.clone(),
205 }
206 .start()
207 })
208 .collect();
209
210 Ok(WorkerGroup {
211 workers,
212 flush_job_pool,
213 compact_job_pool,
214 purge_scheduler,
215 cache_manager,
216 })
217 }
218
219 pub(crate) async fn stop(&self) -> Result<()> {
221 info!("Stop region worker group");
222
223 self.compact_job_pool.stop(true).await?;
226 self.flush_job_pool.stop(true).await?;
228 self.purge_scheduler.stop(true).await?;
230
231 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
232
233 Ok(())
234 }
235
236 pub(crate) async fn submit_to_worker(
238 &self,
239 region_id: RegionId,
240 request: WorkerRequest,
241 ) -> Result<()> {
242 self.worker(region_id).submit_request(request).await
243 }
244
245 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
247 self.worker(region_id).is_region_exists(region_id)
248 }
249
250 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
252 self.worker(region_id).is_region_opening(region_id)
253 }
254
255 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
259 self.worker(region_id).get_region(region_id)
260 }
261
262 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
264 self.cache_manager.clone()
265 }
266
267 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
269 let index = region_id_to_index(region_id, self.workers.len());
270
271 &self.workers[index]
272 }
273}
274
275#[cfg(any(test, feature = "test"))]
277impl WorkerGroup {
278 pub(crate) async fn start_for_test<S: LogStore>(
282 config: Arc<MitoConfig>,
283 log_store: Arc<S>,
284 object_store_manager: ObjectStoreManagerRef,
285 write_buffer_manager: Option<WriteBufferManagerRef>,
286 listener: Option<crate::engine::listener::EventListenerRef>,
287 schema_metadata_manager: SchemaMetadataManagerRef,
288 time_provider: TimeProviderRef,
289 ) -> Result<WorkerGroup> {
290 let (flush_sender, flush_receiver) = watch::channel(());
291 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
292 Arc::new(
293 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
294 .with_notifier(flush_sender.clone()),
295 )
296 });
297 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
298 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
299 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
300 let puffin_manager_factory = PuffinManagerFactory::new(
301 &config.index.aux_path,
302 config.index.staging_size.as_bytes(),
303 Some(config.index.write_buffer_size.as_bytes() as _),
304 config.index.staging_ttl,
305 )
306 .await?;
307 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
308 .await?
309 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
310 let write_cache = write_cache_from_config(
311 &config,
312 puffin_manager_factory.clone(),
313 intermediate_manager.clone(),
314 )
315 .await?;
316 let cache_manager = Arc::new(
317 CacheManager::builder()
318 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
319 .vector_cache_size(config.vector_cache_size.as_bytes())
320 .page_cache_size(config.page_cache_size.as_bytes())
321 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
322 .write_cache(write_cache)
323 .build(),
324 );
325 let workers = (0..config.num_workers)
326 .map(|id| {
327 WorkerStarter {
328 id: id as WorkerId,
329 config: config.clone(),
330 log_store: log_store.clone(),
331 object_store_manager: object_store_manager.clone(),
332 write_buffer_manager: write_buffer_manager.clone(),
333 flush_job_pool: flush_job_pool.clone(),
334 compact_job_pool: compact_job_pool.clone(),
335 purge_scheduler: purge_scheduler.clone(),
336 listener: WorkerListener::new(listener.clone()),
337 cache_manager: cache_manager.clone(),
338 puffin_manager_factory: puffin_manager_factory.clone(),
339 intermediate_manager: intermediate_manager.clone(),
340 time_provider: time_provider.clone(),
341 flush_sender: flush_sender.clone(),
342 flush_receiver: flush_receiver.clone(),
343 plugins: Plugins::new(),
344 schema_metadata_manager: schema_metadata_manager.clone(),
345 }
346 .start()
347 })
348 .collect();
349
350 Ok(WorkerGroup {
351 workers,
352 flush_job_pool,
353 compact_job_pool,
354 purge_scheduler,
355 cache_manager,
356 })
357 }
358
359 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
361 &self.purge_scheduler
362 }
363}
364
365fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
366 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
367 % num_workers
368}
369
370async fn write_cache_from_config(
371 config: &MitoConfig,
372 puffin_manager_factory: PuffinManagerFactory,
373 intermediate_manager: IntermediateManager,
374) -> Result<Option<WriteCacheRef>> {
375 if !config.enable_write_cache {
376 return Ok(None);
377 }
378
379 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
380 .await
381 .context(CreateDirSnafu {
382 dir: &config.write_cache_path,
383 })?;
384
385 let cache = WriteCache::new_fs(
386 &config.write_cache_path,
387 config.write_cache_size,
388 config.write_cache_ttl,
389 puffin_manager_factory,
390 intermediate_manager,
391 )
392 .await?;
393 Ok(Some(Arc::new(cache)))
394}
395
396pub(crate) fn worker_init_check_delay() -> Duration {
398 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
399 Duration::from_secs(init_check_delay)
400}
401
402struct WorkerStarter<S> {
404 id: WorkerId,
405 config: Arc<MitoConfig>,
406 log_store: Arc<S>,
407 object_store_manager: ObjectStoreManagerRef,
408 write_buffer_manager: WriteBufferManagerRef,
409 compact_job_pool: SchedulerRef,
410 flush_job_pool: SchedulerRef,
411 purge_scheduler: SchedulerRef,
412 listener: WorkerListener,
413 cache_manager: CacheManagerRef,
414 puffin_manager_factory: PuffinManagerFactory,
415 intermediate_manager: IntermediateManager,
416 time_provider: TimeProviderRef,
417 flush_sender: watch::Sender<()>,
419 flush_receiver: watch::Receiver<()>,
421 plugins: Plugins,
422 schema_metadata_manager: SchemaMetadataManagerRef,
423}
424
425impl<S: LogStore> WorkerStarter<S> {
426 fn start(self) -> RegionWorker {
428 let regions = Arc::new(RegionMap::default());
429 let opening_regions = Arc::new(OpeningRegions::default());
430 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
431
432 let running = Arc::new(AtomicBool::new(true));
433 let now = self.time_provider.current_time_millis();
434 let id_string = self.id.to_string();
435 let mut worker_thread = RegionWorkerLoop {
436 id: self.id,
437 config: self.config.clone(),
438 regions: regions.clone(),
439 dropping_regions: Arc::new(RegionMap::default()),
440 opening_regions: opening_regions.clone(),
441 sender: sender.clone(),
442 receiver,
443 wal: Wal::new(self.log_store),
444 object_store_manager: self.object_store_manager.clone(),
445 running: running.clone(),
446 memtable_builder_provider: MemtableBuilderProvider::new(
447 Some(self.write_buffer_manager.clone()),
448 self.config.clone(),
449 ),
450 purge_scheduler: self.purge_scheduler.clone(),
451 write_buffer_manager: self.write_buffer_manager,
452 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
453 compaction_scheduler: CompactionScheduler::new(
454 self.compact_job_pool,
455 sender.clone(),
456 self.cache_manager.clone(),
457 self.config,
458 self.listener.clone(),
459 self.plugins.clone(),
460 ),
461 stalled_requests: StalledRequests::default(),
462 listener: self.listener,
463 cache_manager: self.cache_manager,
464 puffin_manager_factory: self.puffin_manager_factory,
465 intermediate_manager: self.intermediate_manager,
466 time_provider: self.time_provider,
467 last_periodical_check_millis: now,
468 flush_sender: self.flush_sender,
469 flush_receiver: self.flush_receiver,
470 stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
471 region_count: REGION_COUNT.with_label_values(&[&id_string]),
472 region_edit_queues: RegionEditQueues::default(),
473 schema_metadata_manager: self.schema_metadata_manager,
474 };
475 let handle = common_runtime::spawn_global(async move {
476 worker_thread.run().await;
477 });
478
479 RegionWorker {
480 id: self.id,
481 regions,
482 opening_regions,
483 sender,
484 handle: Mutex::new(Some(handle)),
485 running,
486 }
487 }
488}
489
490pub(crate) struct RegionWorker {
492 id: WorkerId,
494 regions: RegionMapRef,
496 opening_regions: OpeningRegionsRef,
498 sender: Sender<WorkerRequest>,
500 handle: Mutex<Option<JoinHandle<()>>>,
502 running: Arc<AtomicBool>,
504}
505
506impl RegionWorker {
507 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
509 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
510 if self.sender.send(request).await.is_err() {
511 warn!(
512 "Worker {} is already exited but the running flag is still true",
513 self.id
514 );
515 self.set_running(false);
517 return WorkerStoppedSnafu { id: self.id }.fail();
518 }
519
520 Ok(())
521 }
522
523 async fn stop(&self) -> Result<()> {
527 let handle = self.handle.lock().await.take();
528 if let Some(handle) = handle {
529 info!("Stop region worker {}", self.id);
530
531 self.set_running(false);
532 if self.sender.send(WorkerRequest::Stop).await.is_err() {
533 warn!("Worker {} is already exited before stop", self.id);
534 }
535
536 handle.await.context(JoinSnafu)?;
537 }
538
539 Ok(())
540 }
541
542 fn is_running(&self) -> bool {
544 self.running.load(Ordering::Relaxed)
545 }
546
547 fn set_running(&self, value: bool) {
549 self.running.store(value, Ordering::Relaxed)
550 }
551
552 fn is_region_exists(&self, region_id: RegionId) -> bool {
554 self.regions.is_region_exists(region_id)
555 }
556
557 fn is_region_opening(&self, region_id: RegionId) -> bool {
559 self.opening_regions.is_region_exists(region_id)
560 }
561
562 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
564 self.regions.get_region(region_id)
565 }
566
567 #[cfg(test)]
568 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
570 &self.opening_regions
571 }
572}
573
574impl Drop for RegionWorker {
575 fn drop(&mut self) {
576 if self.is_running() {
577 self.set_running(false);
578 }
580 }
581}
582
583type RequestBuffer = Vec<WorkerRequest>;
584
585#[derive(Default)]
589pub(crate) struct StalledRequests {
590 pub(crate) requests: HashMap<RegionId, (usize, Vec<SenderWriteRequest>)>,
597 pub(crate) estimated_size: usize,
599}
600
601impl StalledRequests {
602 pub(crate) fn append(&mut self, requests: &mut Vec<SenderWriteRequest>) {
604 for req in requests.drain(..) {
605 self.push(req);
606 }
607 }
608
609 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
611 let (size, requests) = self.requests.entry(req.request.region_id).or_default();
612 let req_size = req.request.estimated_size();
613 *size += req_size;
614 self.estimated_size += req_size;
615 requests.push(req);
616 }
617
618 pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec<SenderWriteRequest> {
620 if let Some((size, requests)) = self.requests.remove(region_id) {
621 self.estimated_size -= size;
622 requests
623 } else {
624 vec![]
625 }
626 }
627
628 pub(crate) fn stalled_count(&self) -> usize {
630 self.requests.values().map(|reqs| reqs.1.len()).sum()
631 }
632}
633
634struct RegionWorkerLoop<S> {
636 id: WorkerId,
638 config: Arc<MitoConfig>,
640 regions: RegionMapRef,
642 dropping_regions: RegionMapRef,
644 opening_regions: OpeningRegionsRef,
646 sender: Sender<WorkerRequest>,
648 receiver: Receiver<WorkerRequest>,
650 wal: Wal<S>,
652 object_store_manager: ObjectStoreManagerRef,
654 running: Arc<AtomicBool>,
656 memtable_builder_provider: MemtableBuilderProvider,
658 purge_scheduler: SchedulerRef,
660 write_buffer_manager: WriteBufferManagerRef,
662 flush_scheduler: FlushScheduler,
664 compaction_scheduler: CompactionScheduler,
666 stalled_requests: StalledRequests,
668 listener: WorkerListener,
670 cache_manager: CacheManagerRef,
672 puffin_manager_factory: PuffinManagerFactory,
674 intermediate_manager: IntermediateManager,
676 time_provider: TimeProviderRef,
678 last_periodical_check_millis: i64,
680 flush_sender: watch::Sender<()>,
682 flush_receiver: watch::Receiver<()>,
684 stalled_count: IntGauge,
686 region_count: IntGauge,
688 region_edit_queues: RegionEditQueues,
690 schema_metadata_manager: SchemaMetadataManagerRef,
692}
693
694impl<S: LogStore> RegionWorkerLoop<S> {
695 async fn run(&mut self) {
697 let init_check_delay = worker_init_check_delay();
698 info!(
699 "Start region worker thread {}, init_check_delay: {:?}",
700 self.id, init_check_delay
701 );
702 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
703
704 let mut write_req_buffer: Vec<SenderWriteRequest> =
706 Vec::with_capacity(self.config.worker_request_batch_size);
707 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
708 Vec::with_capacity(self.config.worker_request_batch_size);
709 let mut general_req_buffer: Vec<WorkerRequest> =
710 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
711
712 while self.running.load(Ordering::Relaxed) {
713 write_req_buffer.clear();
715 ddl_req_buffer.clear();
716 general_req_buffer.clear();
717
718 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
719 let sleep = tokio::time::sleep(max_wait_time);
720 tokio::pin!(sleep);
721
722 tokio::select! {
723 request_opt = self.receiver.recv() => {
724 match request_opt {
725 Some(request) => match request {
726 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
727 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
728 _ => general_req_buffer.push(request),
729 },
730 None => break,
732 }
733 }
734 recv_res = self.flush_receiver.changed() => {
735 if recv_res.is_err() {
736 break;
738 } else {
739 self.maybe_flush_worker();
744 self.handle_stalled_requests().await;
746 continue;
747 }
748 }
749 _ = &mut sleep => {
750 self.handle_periodical_tasks();
752 continue;
753 }
754 }
755
756 if self.flush_receiver.has_changed().unwrap_or(false) {
757 self.handle_stalled_requests().await;
761 }
762
763 for _ in 1..self.config.worker_request_batch_size {
765 match self.receiver.try_recv() {
767 Ok(req) => match req {
768 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
769 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
770 _ => general_req_buffer.push(req),
771 },
772 Err(_) => break,
774 }
775 }
776
777 self.listener.on_recv_requests(
778 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
779 );
780
781 self.handle_requests(
782 &mut write_req_buffer,
783 &mut ddl_req_buffer,
784 &mut general_req_buffer,
785 )
786 .await;
787
788 self.handle_periodical_tasks();
789 }
790
791 self.clean().await;
792
793 info!("Exit region worker thread {}", self.id);
794 }
795
796 async fn handle_requests(
800 &mut self,
801 write_requests: &mut Vec<SenderWriteRequest>,
802 ddl_requests: &mut Vec<SenderDdlRequest>,
803 general_requests: &mut Vec<WorkerRequest>,
804 ) {
805 for worker_req in general_requests.drain(..) {
806 match worker_req {
807 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
808 continue;
810 }
811 WorkerRequest::Background { region_id, notify } => {
812 self.handle_background_notify(region_id, notify).await;
814 }
815 WorkerRequest::SetRegionRoleStateGracefully {
816 region_id,
817 region_role_state,
818 sender,
819 } => {
820 self.set_role_state_gracefully(region_id, region_role_state, sender)
821 .await;
822 }
823 WorkerRequest::EditRegion(request) => {
824 self.handle_region_edit(request).await;
825 }
826 WorkerRequest::Stop => {
827 debug_assert!(!self.running.load(Ordering::Relaxed));
828 }
829 WorkerRequest::SyncRegion(req) => {
830 self.handle_region_sync(req).await;
831 }
832 WorkerRequest::BulkInserts {
833 metadata,
834 request,
835 sender,
836 } => {
837 if let Some(region_metadata) = metadata {
838 self.handle_bulk_inserts(request, region_metadata, write_requests, sender)
839 .await;
840 } else {
841 error!("Cannot find region metadata for {}", request.region_id);
842 sender.send(
843 error::RegionNotFoundSnafu {
844 region_id: request.region_id,
845 }
846 .fail(),
847 );
848 }
849 }
850 }
851 }
852
853 self.handle_write_requests(write_requests, true).await;
856
857 self.handle_ddl_requests(ddl_requests).await;
858 }
859
860 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
862 if ddl_requests.is_empty() {
863 return;
864 }
865
866 for ddl in ddl_requests.drain(..) {
867 let res = match ddl.request {
868 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
869 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
870 DdlRequest::Open((req, wal_entry_receiver)) => {
871 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
872 .await;
873 continue;
874 }
875 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
876 DdlRequest::Alter(req) => {
877 self.handle_alter_request(ddl.region_id, req, ddl.sender)
878 .await;
879 continue;
880 }
881 DdlRequest::Flush(req) => {
882 self.handle_flush_request(ddl.region_id, req, ddl.sender)
883 .await;
884 continue;
885 }
886 DdlRequest::Compact(req) => {
887 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
888 .await;
889 continue;
890 }
891 DdlRequest::Truncate(_) => {
892 self.handle_truncate_request(ddl.region_id, ddl.sender)
893 .await;
894 continue;
895 }
896 DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
897 };
898
899 ddl.sender.send(res);
900 }
901 }
902
903 fn handle_periodical_tasks(&mut self) {
905 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
906 if self
907 .time_provider
908 .elapsed_since(self.last_periodical_check_millis)
909 < interval
910 {
911 return;
912 }
913
914 self.last_periodical_check_millis = self.time_provider.current_time_millis();
915
916 if let Err(e) = self.flush_periodically() {
917 error!(e; "Failed to flush regions periodically");
918 }
919 }
920
921 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
923 match notify {
924 BackgroundNotify::FlushFinished(req) => {
925 self.handle_flush_finished(region_id, req).await
926 }
927 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
928 BackgroundNotify::CompactionFinished(req) => {
929 self.handle_compaction_finished(region_id, req).await
930 }
931 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
932 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
933 BackgroundNotify::RegionChange(req) => {
934 self.handle_manifest_region_change_result(req).await
935 }
936 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
937 }
938 }
939
940 async fn set_role_state_gracefully(
942 &mut self,
943 region_id: RegionId,
944 region_role_state: SettableRegionRoleState,
945 sender: oneshot::Sender<SetRegionRoleStateResponse>,
946 ) {
947 if let Some(region) = self.regions.get_region(region_id) {
948 common_runtime::spawn_global(async move {
950 region.set_role_state_gracefully(region_role_state).await;
951
952 let last_entry_id = region.version_control.current().last_entry_id;
953 let _ = sender.send(SetRegionRoleStateResponse::success(
954 SetRegionRoleStateSuccess::mito(last_entry_id),
955 ));
956 });
957 } else {
958 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
959 }
960 }
961}
962
963impl<S> RegionWorkerLoop<S> {
964 async fn clean(&self) {
966 let regions = self.regions.list_regions();
968 for region in regions {
969 region.stop().await;
970 }
971
972 self.regions.clear();
973 }
974
975 fn notify_group(&mut self) {
978 let _ = self.flush_sender.send(());
980 self.flush_receiver.borrow_and_update();
982 }
983}
984
985#[derive(Default, Clone)]
987pub(crate) struct WorkerListener {
988 #[cfg(any(test, feature = "test"))]
989 listener: Option<crate::engine::listener::EventListenerRef>,
990}
991
992impl WorkerListener {
993 #[cfg(any(test, feature = "test"))]
994 pub(crate) fn new(
995 listener: Option<crate::engine::listener::EventListenerRef>,
996 ) -> WorkerListener {
997 WorkerListener { listener }
998 }
999
1000 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1002 #[cfg(any(test, feature = "test"))]
1003 if let Some(listener) = &self.listener {
1004 listener.on_flush_success(region_id);
1005 }
1006 let _ = region_id;
1008 }
1009
1010 pub(crate) fn on_write_stall(&self) {
1012 #[cfg(any(test, feature = "test"))]
1013 if let Some(listener) = &self.listener {
1014 listener.on_write_stall();
1015 }
1016 }
1017
1018 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1019 #[cfg(any(test, feature = "test"))]
1020 if let Some(listener) = &self.listener {
1021 listener.on_flush_begin(region_id).await;
1022 }
1023 let _ = region_id;
1025 }
1026
1027 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1028 #[cfg(any(test, feature = "test"))]
1029 if let Some(listener) = &self.listener {
1030 return listener.on_later_drop_begin(region_id);
1031 }
1032 let _ = region_id;
1034 None
1035 }
1036
1037 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1039 #[cfg(any(test, feature = "test"))]
1040 if let Some(listener) = &self.listener {
1041 listener.on_later_drop_end(region_id, removed);
1042 }
1043 let _ = region_id;
1045 let _ = removed;
1046 }
1047
1048 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1049 #[cfg(any(test, feature = "test"))]
1050 if let Some(listener) = &self.listener {
1051 listener.on_merge_ssts_finished(region_id).await;
1052 }
1053 let _ = region_id;
1055 }
1056
1057 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1058 #[cfg(any(test, feature = "test"))]
1059 if let Some(listener) = &self.listener {
1060 listener.on_recv_requests(request_num);
1061 }
1062 let _ = request_num;
1064 }
1065
1066 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1067 #[cfg(any(test, feature = "test"))]
1068 if let Some(listener) = &self.listener {
1069 listener.on_file_cache_filled(_file_id);
1070 }
1071 }
1072
1073 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1074 #[cfg(any(test, feature = "test"))]
1075 if let Some(listener) = &self.listener {
1076 listener.on_compaction_scheduled(_region_id);
1077 }
1078 }
1079
1080 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1081 #[cfg(any(test, feature = "test"))]
1082 if let Some(listener) = &self.listener {
1083 listener
1084 .on_notify_region_change_result_begin(_region_id)
1085 .await;
1086 }
1087 }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use super::*;
1093 use crate::test_util::TestEnv;
1094
1095 #[test]
1096 fn test_region_id_to_index() {
1097 let num_workers = 4;
1098
1099 let region_id = RegionId::new(1, 2);
1100 let index = region_id_to_index(region_id, num_workers);
1101 assert_eq!(index, 3);
1102
1103 let region_id = RegionId::new(2, 3);
1104 let index = region_id_to_index(region_id, num_workers);
1105 assert_eq!(index, 1);
1106 }
1107
1108 #[tokio::test]
1109 async fn test_worker_group_start_stop() {
1110 let env = TestEnv::with_prefix("group-stop");
1111 let group = env
1112 .create_worker_group(MitoConfig {
1113 num_workers: 4,
1114 ..Default::default()
1115 })
1116 .await;
1117
1118 group.stop().await.unwrap();
1119 }
1120}