1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_meta::key::SchemaMetadataManagerRef;
38use common_runtime::JoinHandle;
39use common_telemetry::{error, info, warn};
40use futures::future::try_join_all;
41use object_store::manager::ObjectStoreManagerRef;
42use prometheus::IntGauge;
43use rand::{rng, Rng};
44use snafu::{ensure, ResultExt};
45use store_api::logstore::LogStore;
46use store_api::region_engine::{
47 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
48};
49use store_api::storage::RegionId;
50use tokio::sync::mpsc::{Receiver, Sender};
51use tokio::sync::{mpsc, oneshot, watch, Mutex};
52
53use crate::cache::write_cache::{WriteCache, WriteCacheRef};
54use crate::cache::{CacheManager, CacheManagerRef};
55use crate::compaction::CompactionScheduler;
56use crate::config::MitoConfig;
57use crate::error;
58use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
65 WorkerRequest,
66};
67use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
68use crate::sst::file::FileId;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::puffin_manager::PuffinManagerFactory;
71use crate::time_provider::{StdTimeProvider, TimeProviderRef};
72use crate::wal::Wal;
73use crate::worker::handle_manifest::RegionEditQueues;
74
75pub(crate) type WorkerId = u32;
77
78pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
79
80pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
82pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
86pub(crate) struct WorkerGroup {
123 workers: Vec<RegionWorker>,
125 flush_job_pool: SchedulerRef,
127 compact_job_pool: SchedulerRef,
129 purge_scheduler: SchedulerRef,
131 cache_manager: CacheManagerRef,
133}
134
135impl WorkerGroup {
136 pub(crate) async fn start<S: LogStore>(
140 config: Arc<MitoConfig>,
141 log_store: Arc<S>,
142 object_store_manager: ObjectStoreManagerRef,
143 schema_metadata_manager: SchemaMetadataManagerRef,
144 plugins: Plugins,
145 ) -> Result<WorkerGroup> {
146 let (flush_sender, flush_receiver) = watch::channel(());
147 let write_buffer_manager = Arc::new(
148 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
149 .with_notifier(flush_sender.clone()),
150 );
151 let puffin_manager_factory = PuffinManagerFactory::new(
152 &config.index.aux_path,
153 config.index.staging_size.as_bytes(),
154 Some(config.index.write_buffer_size.as_bytes() as _),
155 config.index.staging_ttl,
156 )
157 .await?;
158 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
159 .await?
160 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
161 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
162 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
163 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
165 let write_cache = write_cache_from_config(
166 &config,
167 puffin_manager_factory.clone(),
168 intermediate_manager.clone(),
169 )
170 .await?;
171 let cache_manager = Arc::new(
172 CacheManager::builder()
173 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
174 .vector_cache_size(config.vector_cache_size.as_bytes())
175 .page_cache_size(config.page_cache_size.as_bytes())
176 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
177 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
178 .index_content_size(config.index.content_cache_size.as_bytes())
179 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
180 .index_result_cache_size(config.index.result_cache_size.as_bytes())
181 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
182 .write_cache(write_cache)
183 .build(),
184 );
185 let time_provider = Arc::new(StdTimeProvider);
186
187 let workers = (0..config.num_workers)
188 .map(|id| {
189 WorkerStarter {
190 id: id as WorkerId,
191 config: config.clone(),
192 log_store: log_store.clone(),
193 object_store_manager: object_store_manager.clone(),
194 write_buffer_manager: write_buffer_manager.clone(),
195 flush_job_pool: flush_job_pool.clone(),
196 compact_job_pool: compact_job_pool.clone(),
197 purge_scheduler: purge_scheduler.clone(),
198 listener: WorkerListener::default(),
199 cache_manager: cache_manager.clone(),
200 puffin_manager_factory: puffin_manager_factory.clone(),
201 intermediate_manager: intermediate_manager.clone(),
202 time_provider: time_provider.clone(),
203 flush_sender: flush_sender.clone(),
204 flush_receiver: flush_receiver.clone(),
205 plugins: plugins.clone(),
206 schema_metadata_manager: schema_metadata_manager.clone(),
207 }
208 .start()
209 })
210 .collect();
211
212 Ok(WorkerGroup {
213 workers,
214 flush_job_pool,
215 compact_job_pool,
216 purge_scheduler,
217 cache_manager,
218 })
219 }
220
221 pub(crate) async fn stop(&self) -> Result<()> {
223 info!("Stop region worker group");
224
225 self.compact_job_pool.stop(true).await?;
228 self.flush_job_pool.stop(true).await?;
230 self.purge_scheduler.stop(true).await?;
232
233 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
234
235 Ok(())
236 }
237
238 pub(crate) async fn submit_to_worker(
240 &self,
241 region_id: RegionId,
242 request: WorkerRequest,
243 ) -> Result<()> {
244 self.worker(region_id).submit_request(request).await
245 }
246
247 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
249 self.worker(region_id).is_region_exists(region_id)
250 }
251
252 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
254 self.worker(region_id).is_region_opening(region_id)
255 }
256
257 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
261 self.worker(region_id).get_region(region_id)
262 }
263
264 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
266 self.cache_manager.clone()
267 }
268
269 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
271 let index = region_id_to_index(region_id, self.workers.len());
272
273 &self.workers[index]
274 }
275}
276
277#[cfg(any(test, feature = "test"))]
279impl WorkerGroup {
280 pub(crate) async fn start_for_test<S: LogStore>(
284 config: Arc<MitoConfig>,
285 log_store: Arc<S>,
286 object_store_manager: ObjectStoreManagerRef,
287 write_buffer_manager: Option<WriteBufferManagerRef>,
288 listener: Option<crate::engine::listener::EventListenerRef>,
289 schema_metadata_manager: SchemaMetadataManagerRef,
290 time_provider: TimeProviderRef,
291 ) -> Result<WorkerGroup> {
292 let (flush_sender, flush_receiver) = watch::channel(());
293 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
294 Arc::new(
295 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
296 .with_notifier(flush_sender.clone()),
297 )
298 });
299 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
300 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
301 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
302 let puffin_manager_factory = PuffinManagerFactory::new(
303 &config.index.aux_path,
304 config.index.staging_size.as_bytes(),
305 Some(config.index.write_buffer_size.as_bytes() as _),
306 config.index.staging_ttl,
307 )
308 .await?;
309 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
310 .await?
311 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
312 let write_cache = write_cache_from_config(
313 &config,
314 puffin_manager_factory.clone(),
315 intermediate_manager.clone(),
316 )
317 .await?;
318 let cache_manager = Arc::new(
319 CacheManager::builder()
320 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
321 .vector_cache_size(config.vector_cache_size.as_bytes())
322 .page_cache_size(config.page_cache_size.as_bytes())
323 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
324 .write_cache(write_cache)
325 .build(),
326 );
327 let workers = (0..config.num_workers)
328 .map(|id| {
329 WorkerStarter {
330 id: id as WorkerId,
331 config: config.clone(),
332 log_store: log_store.clone(),
333 object_store_manager: object_store_manager.clone(),
334 write_buffer_manager: write_buffer_manager.clone(),
335 flush_job_pool: flush_job_pool.clone(),
336 compact_job_pool: compact_job_pool.clone(),
337 purge_scheduler: purge_scheduler.clone(),
338 listener: WorkerListener::new(listener.clone()),
339 cache_manager: cache_manager.clone(),
340 puffin_manager_factory: puffin_manager_factory.clone(),
341 intermediate_manager: intermediate_manager.clone(),
342 time_provider: time_provider.clone(),
343 flush_sender: flush_sender.clone(),
344 flush_receiver: flush_receiver.clone(),
345 plugins: Plugins::new(),
346 schema_metadata_manager: schema_metadata_manager.clone(),
347 }
348 .start()
349 })
350 .collect();
351
352 Ok(WorkerGroup {
353 workers,
354 flush_job_pool,
355 compact_job_pool,
356 purge_scheduler,
357 cache_manager,
358 })
359 }
360
361 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
363 &self.purge_scheduler
364 }
365}
366
367fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
368 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
369 % num_workers
370}
371
372async fn write_cache_from_config(
373 config: &MitoConfig,
374 puffin_manager_factory: PuffinManagerFactory,
375 intermediate_manager: IntermediateManager,
376) -> Result<Option<WriteCacheRef>> {
377 if !config.enable_write_cache {
378 return Ok(None);
379 }
380
381 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
382 .await
383 .context(CreateDirSnafu {
384 dir: &config.write_cache_path,
385 })?;
386
387 let cache = WriteCache::new_fs(
388 &config.write_cache_path,
389 config.write_cache_size,
390 config.write_cache_ttl,
391 puffin_manager_factory,
392 intermediate_manager,
393 )
394 .await?;
395 Ok(Some(Arc::new(cache)))
396}
397
398pub(crate) fn worker_init_check_delay() -> Duration {
400 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
401 Duration::from_secs(init_check_delay)
402}
403
404struct WorkerStarter<S> {
406 id: WorkerId,
407 config: Arc<MitoConfig>,
408 log_store: Arc<S>,
409 object_store_manager: ObjectStoreManagerRef,
410 write_buffer_manager: WriteBufferManagerRef,
411 compact_job_pool: SchedulerRef,
412 flush_job_pool: SchedulerRef,
413 purge_scheduler: SchedulerRef,
414 listener: WorkerListener,
415 cache_manager: CacheManagerRef,
416 puffin_manager_factory: PuffinManagerFactory,
417 intermediate_manager: IntermediateManager,
418 time_provider: TimeProviderRef,
419 flush_sender: watch::Sender<()>,
421 flush_receiver: watch::Receiver<()>,
423 plugins: Plugins,
424 schema_metadata_manager: SchemaMetadataManagerRef,
425}
426
427impl<S: LogStore> WorkerStarter<S> {
428 fn start(self) -> RegionWorker {
430 let regions = Arc::new(RegionMap::default());
431 let opening_regions = Arc::new(OpeningRegions::default());
432 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
433
434 let running = Arc::new(AtomicBool::new(true));
435 let now = self.time_provider.current_time_millis();
436 let id_string = self.id.to_string();
437 let mut worker_thread = RegionWorkerLoop {
438 id: self.id,
439 config: self.config.clone(),
440 regions: regions.clone(),
441 dropping_regions: Arc::new(RegionMap::default()),
442 opening_regions: opening_regions.clone(),
443 sender: sender.clone(),
444 receiver,
445 wal: Wal::new(self.log_store),
446 object_store_manager: self.object_store_manager.clone(),
447 running: running.clone(),
448 memtable_builder_provider: MemtableBuilderProvider::new(
449 Some(self.write_buffer_manager.clone()),
450 self.config.clone(),
451 ),
452 purge_scheduler: self.purge_scheduler.clone(),
453 write_buffer_manager: self.write_buffer_manager,
454 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
455 compaction_scheduler: CompactionScheduler::new(
456 self.compact_job_pool,
457 sender.clone(),
458 self.cache_manager.clone(),
459 self.config,
460 self.listener.clone(),
461 self.plugins.clone(),
462 ),
463 stalled_requests: StalledRequests::default(),
464 listener: self.listener,
465 cache_manager: self.cache_manager,
466 puffin_manager_factory: self.puffin_manager_factory,
467 intermediate_manager: self.intermediate_manager,
468 time_provider: self.time_provider,
469 last_periodical_check_millis: now,
470 flush_sender: self.flush_sender,
471 flush_receiver: self.flush_receiver,
472 stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
473 region_count: REGION_COUNT.with_label_values(&[&id_string]),
474 region_edit_queues: RegionEditQueues::default(),
475 schema_metadata_manager: self.schema_metadata_manager,
476 };
477 let handle = common_runtime::spawn_global(async move {
478 worker_thread.run().await;
479 });
480
481 RegionWorker {
482 id: self.id,
483 regions,
484 opening_regions,
485 sender,
486 handle: Mutex::new(Some(handle)),
487 running,
488 }
489 }
490}
491
492pub(crate) struct RegionWorker {
494 id: WorkerId,
496 regions: RegionMapRef,
498 opening_regions: OpeningRegionsRef,
500 sender: Sender<WorkerRequest>,
502 handle: Mutex<Option<JoinHandle<()>>>,
504 running: Arc<AtomicBool>,
506}
507
508impl RegionWorker {
509 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
511 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
512 if self.sender.send(request).await.is_err() {
513 warn!(
514 "Worker {} is already exited but the running flag is still true",
515 self.id
516 );
517 self.set_running(false);
519 return WorkerStoppedSnafu { id: self.id }.fail();
520 }
521
522 Ok(())
523 }
524
525 async fn stop(&self) -> Result<()> {
529 let handle = self.handle.lock().await.take();
530 if let Some(handle) = handle {
531 info!("Stop region worker {}", self.id);
532
533 self.set_running(false);
534 if self.sender.send(WorkerRequest::Stop).await.is_err() {
535 warn!("Worker {} is already exited before stop", self.id);
536 }
537
538 handle.await.context(JoinSnafu)?;
539 }
540
541 Ok(())
542 }
543
544 fn is_running(&self) -> bool {
546 self.running.load(Ordering::Relaxed)
547 }
548
549 fn set_running(&self, value: bool) {
551 self.running.store(value, Ordering::Relaxed)
552 }
553
554 fn is_region_exists(&self, region_id: RegionId) -> bool {
556 self.regions.is_region_exists(region_id)
557 }
558
559 fn is_region_opening(&self, region_id: RegionId) -> bool {
561 self.opening_regions.is_region_exists(region_id)
562 }
563
564 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
566 self.regions.get_region(region_id)
567 }
568
569 #[cfg(test)]
570 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
572 &self.opening_regions
573 }
574}
575
576impl Drop for RegionWorker {
577 fn drop(&mut self) {
578 if self.is_running() {
579 self.set_running(false);
580 }
582 }
583}
584
585type RequestBuffer = Vec<WorkerRequest>;
586
587#[derive(Default)]
591pub(crate) struct StalledRequests {
592 pub(crate) requests:
599 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
600 pub(crate) estimated_size: usize,
602}
603
604impl StalledRequests {
605 pub(crate) fn append(
607 &mut self,
608 requests: &mut Vec<SenderWriteRequest>,
609 bulk_requests: &mut Vec<SenderBulkRequest>,
610 ) {
611 for req in requests.drain(..) {
612 self.push(req);
613 }
614 for req in bulk_requests.drain(..) {
615 self.push_bulk(req);
616 }
617 }
618
619 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
621 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
622 let req_size = req.request.estimated_size();
623 *size += req_size;
624 self.estimated_size += req_size;
625 requests.push(req);
626 }
627
628 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
629 let region_id = req.region_id;
630 let (size, _, requests) = self.requests.entry(region_id).or_default();
631 let req_size = req.request.estimated_size();
632 *size += req_size;
633 self.estimated_size += req_size;
634 requests.push(req);
635 }
636
637 pub(crate) fn remove(
639 &mut self,
640 region_id: &RegionId,
641 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
642 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
643 self.estimated_size -= size;
644 (write_reqs, bulk_reqs)
645 } else {
646 (vec![], vec![])
647 }
648 }
649
650 pub(crate) fn stalled_count(&self) -> usize {
652 self.requests
653 .values()
654 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
655 .sum()
656 }
657}
658
659struct RegionWorkerLoop<S> {
661 id: WorkerId,
663 config: Arc<MitoConfig>,
665 regions: RegionMapRef,
667 dropping_regions: RegionMapRef,
669 opening_regions: OpeningRegionsRef,
671 sender: Sender<WorkerRequest>,
673 receiver: Receiver<WorkerRequest>,
675 wal: Wal<S>,
677 object_store_manager: ObjectStoreManagerRef,
679 running: Arc<AtomicBool>,
681 memtable_builder_provider: MemtableBuilderProvider,
683 purge_scheduler: SchedulerRef,
685 write_buffer_manager: WriteBufferManagerRef,
687 flush_scheduler: FlushScheduler,
689 compaction_scheduler: CompactionScheduler,
691 stalled_requests: StalledRequests,
693 listener: WorkerListener,
695 cache_manager: CacheManagerRef,
697 puffin_manager_factory: PuffinManagerFactory,
699 intermediate_manager: IntermediateManager,
701 time_provider: TimeProviderRef,
703 last_periodical_check_millis: i64,
705 flush_sender: watch::Sender<()>,
707 flush_receiver: watch::Receiver<()>,
709 stalled_count: IntGauge,
711 region_count: IntGauge,
713 region_edit_queues: RegionEditQueues,
715 schema_metadata_manager: SchemaMetadataManagerRef,
717}
718
719impl<S: LogStore> RegionWorkerLoop<S> {
720 async fn run(&mut self) {
722 let init_check_delay = worker_init_check_delay();
723 info!(
724 "Start region worker thread {}, init_check_delay: {:?}",
725 self.id, init_check_delay
726 );
727 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
728
729 let mut write_req_buffer: Vec<SenderWriteRequest> =
731 Vec::with_capacity(self.config.worker_request_batch_size);
732 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
733 Vec::with_capacity(self.config.worker_request_batch_size);
734 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
735 Vec::with_capacity(self.config.worker_request_batch_size);
736 let mut general_req_buffer: Vec<WorkerRequest> =
737 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
738
739 while self.running.load(Ordering::Relaxed) {
740 write_req_buffer.clear();
742 ddl_req_buffer.clear();
743 general_req_buffer.clear();
744
745 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
746 let sleep = tokio::time::sleep(max_wait_time);
747 tokio::pin!(sleep);
748
749 tokio::select! {
750 request_opt = self.receiver.recv() => {
751 match request_opt {
752 Some(request) => match request {
753 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
754 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
755 _ => general_req_buffer.push(request),
756 },
757 None => break,
759 }
760 }
761 recv_res = self.flush_receiver.changed() => {
762 if recv_res.is_err() {
763 break;
765 } else {
766 self.maybe_flush_worker();
771 self.handle_stalled_requests().await;
773 continue;
774 }
775 }
776 _ = &mut sleep => {
777 self.handle_periodical_tasks();
779 continue;
780 }
781 }
782
783 if self.flush_receiver.has_changed().unwrap_or(false) {
784 self.handle_stalled_requests().await;
788 }
789
790 for _ in 1..self.config.worker_request_batch_size {
792 match self.receiver.try_recv() {
794 Ok(req) => match req {
795 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
796 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
797 _ => general_req_buffer.push(req),
798 },
799 Err(_) => break,
801 }
802 }
803
804 self.listener.on_recv_requests(
805 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
806 );
807
808 self.handle_requests(
809 &mut write_req_buffer,
810 &mut ddl_req_buffer,
811 &mut general_req_buffer,
812 &mut bulk_req_buffer,
813 )
814 .await;
815
816 self.handle_periodical_tasks();
817 }
818
819 self.clean().await;
820
821 info!("Exit region worker thread {}", self.id);
822 }
823
824 async fn handle_requests(
828 &mut self,
829 write_requests: &mut Vec<SenderWriteRequest>,
830 ddl_requests: &mut Vec<SenderDdlRequest>,
831 general_requests: &mut Vec<WorkerRequest>,
832 bulk_requests: &mut Vec<SenderBulkRequest>,
833 ) {
834 for worker_req in general_requests.drain(..) {
835 match worker_req {
836 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
837 continue;
839 }
840 WorkerRequest::Background { region_id, notify } => {
841 self.handle_background_notify(region_id, notify).await;
843 }
844 WorkerRequest::SetRegionRoleStateGracefully {
845 region_id,
846 region_role_state,
847 sender,
848 } => {
849 self.set_role_state_gracefully(region_id, region_role_state, sender)
850 .await;
851 }
852 WorkerRequest::EditRegion(request) => {
853 self.handle_region_edit(request).await;
854 }
855 WorkerRequest::Stop => {
856 debug_assert!(!self.running.load(Ordering::Relaxed));
857 }
858 WorkerRequest::SyncRegion(req) => {
859 self.handle_region_sync(req).await;
860 }
861 WorkerRequest::BulkInserts {
862 metadata,
863 request,
864 sender,
865 } => {
866 if let Some(region_metadata) = metadata {
867 self.handle_bulk_insert_batch(
868 region_metadata,
869 request,
870 bulk_requests,
871 sender,
872 )
873 .await;
874 } else {
875 error!("Cannot find region metadata for {}", request.region_id);
876 sender.send(
877 error::RegionNotFoundSnafu {
878 region_id: request.region_id,
879 }
880 .fail(),
881 );
882 }
883 }
884 }
885 }
886
887 self.handle_write_requests(write_requests, bulk_requests, true)
890 .await;
891
892 self.handle_ddl_requests(ddl_requests).await;
893 }
894
895 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
897 if ddl_requests.is_empty() {
898 return;
899 }
900
901 for ddl in ddl_requests.drain(..) {
902 let res = match ddl.request {
903 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
904 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
905 DdlRequest::Open((req, wal_entry_receiver)) => {
906 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
907 .await;
908 continue;
909 }
910 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
911 DdlRequest::Alter(req) => {
912 self.handle_alter_request(ddl.region_id, req, ddl.sender)
913 .await;
914 continue;
915 }
916 DdlRequest::Flush(req) => {
917 self.handle_flush_request(ddl.region_id, req, ddl.sender)
918 .await;
919 continue;
920 }
921 DdlRequest::Compact(req) => {
922 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
923 .await;
924 continue;
925 }
926 DdlRequest::Truncate(_) => {
927 self.handle_truncate_request(ddl.region_id, ddl.sender)
928 .await;
929 continue;
930 }
931 DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
932 };
933
934 ddl.sender.send(res);
935 }
936 }
937
938 fn handle_periodical_tasks(&mut self) {
940 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
941 if self
942 .time_provider
943 .elapsed_since(self.last_periodical_check_millis)
944 < interval
945 {
946 return;
947 }
948
949 self.last_periodical_check_millis = self.time_provider.current_time_millis();
950
951 if let Err(e) = self.flush_periodically() {
952 error!(e; "Failed to flush regions periodically");
953 }
954 }
955
956 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
958 match notify {
959 BackgroundNotify::FlushFinished(req) => {
960 self.handle_flush_finished(region_id, req).await
961 }
962 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
963 BackgroundNotify::CompactionFinished(req) => {
964 self.handle_compaction_finished(region_id, req).await
965 }
966 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
967 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
968 BackgroundNotify::RegionChange(req) => {
969 self.handle_manifest_region_change_result(req).await
970 }
971 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
972 }
973 }
974
975 async fn set_role_state_gracefully(
977 &mut self,
978 region_id: RegionId,
979 region_role_state: SettableRegionRoleState,
980 sender: oneshot::Sender<SetRegionRoleStateResponse>,
981 ) {
982 if let Some(region) = self.regions.get_region(region_id) {
983 common_runtime::spawn_global(async move {
985 region.set_role_state_gracefully(region_role_state).await;
986
987 let last_entry_id = region.version_control.current().last_entry_id;
988 let _ = sender.send(SetRegionRoleStateResponse::success(
989 SetRegionRoleStateSuccess::mito(last_entry_id),
990 ));
991 });
992 } else {
993 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
994 }
995 }
996}
997
998impl<S> RegionWorkerLoop<S> {
999 async fn clean(&self) {
1001 let regions = self.regions.list_regions();
1003 for region in regions {
1004 region.stop().await;
1005 }
1006
1007 self.regions.clear();
1008 }
1009
1010 fn notify_group(&mut self) {
1013 let _ = self.flush_sender.send(());
1015 self.flush_receiver.borrow_and_update();
1017 }
1018}
1019
1020#[derive(Default, Clone)]
1022pub(crate) struct WorkerListener {
1023 #[cfg(any(test, feature = "test"))]
1024 listener: Option<crate::engine::listener::EventListenerRef>,
1025}
1026
1027impl WorkerListener {
1028 #[cfg(any(test, feature = "test"))]
1029 pub(crate) fn new(
1030 listener: Option<crate::engine::listener::EventListenerRef>,
1031 ) -> WorkerListener {
1032 WorkerListener { listener }
1033 }
1034
1035 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1037 #[cfg(any(test, feature = "test"))]
1038 if let Some(listener) = &self.listener {
1039 listener.on_flush_success(region_id);
1040 }
1041 let _ = region_id;
1043 }
1044
1045 pub(crate) fn on_write_stall(&self) {
1047 #[cfg(any(test, feature = "test"))]
1048 if let Some(listener) = &self.listener {
1049 listener.on_write_stall();
1050 }
1051 }
1052
1053 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1054 #[cfg(any(test, feature = "test"))]
1055 if let Some(listener) = &self.listener {
1056 listener.on_flush_begin(region_id).await;
1057 }
1058 let _ = region_id;
1060 }
1061
1062 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1063 #[cfg(any(test, feature = "test"))]
1064 if let Some(listener) = &self.listener {
1065 return listener.on_later_drop_begin(region_id);
1066 }
1067 let _ = region_id;
1069 None
1070 }
1071
1072 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1074 #[cfg(any(test, feature = "test"))]
1075 if let Some(listener) = &self.listener {
1076 listener.on_later_drop_end(region_id, removed);
1077 }
1078 let _ = region_id;
1080 let _ = removed;
1081 }
1082
1083 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1084 #[cfg(any(test, feature = "test"))]
1085 if let Some(listener) = &self.listener {
1086 listener.on_merge_ssts_finished(region_id).await;
1087 }
1088 let _ = region_id;
1090 }
1091
1092 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1093 #[cfg(any(test, feature = "test"))]
1094 if let Some(listener) = &self.listener {
1095 listener.on_recv_requests(request_num);
1096 }
1097 let _ = request_num;
1099 }
1100
1101 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1102 #[cfg(any(test, feature = "test"))]
1103 if let Some(listener) = &self.listener {
1104 listener.on_file_cache_filled(_file_id);
1105 }
1106 }
1107
1108 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1109 #[cfg(any(test, feature = "test"))]
1110 if let Some(listener) = &self.listener {
1111 listener.on_compaction_scheduled(_region_id);
1112 }
1113 }
1114
1115 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1116 #[cfg(any(test, feature = "test"))]
1117 if let Some(listener) = &self.listener {
1118 listener
1119 .on_notify_region_change_result_begin(_region_id)
1120 .await;
1121 }
1122 }
1123}
1124
1125#[cfg(test)]
1126mod tests {
1127 use super::*;
1128 use crate::test_util::TestEnv;
1129
1130 #[test]
1131 fn test_region_id_to_index() {
1132 let num_workers = 4;
1133
1134 let region_id = RegionId::new(1, 2);
1135 let index = region_id_to_index(region_id, num_workers);
1136 assert_eq!(index, 3);
1137
1138 let region_id = RegionId::new(2, 3);
1139 let index = region_id_to_index(region_id, num_workers);
1140 assert_eq!(index, 1);
1141 }
1142
1143 #[tokio::test]
1144 async fn test_worker_group_start_stop() {
1145 let env = TestEnv::with_prefix("group-stop");
1146 let group = env
1147 .create_worker_group(MitoConfig {
1148 num_workers: 4,
1149 ..Default::default()
1150 })
1151 .await;
1152
1153 group.stop().await.unwrap();
1154 }
1155}