1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_meta::key::SchemaMetadataManagerRef;
38use common_runtime::JoinHandle;
39use common_telemetry::{error, info, warn};
40use futures::future::try_join_all;
41use object_store::manager::ObjectStoreManagerRef;
42use prometheus::{Histogram, IntGauge};
43use rand::{rng, Rng};
44use snafu::{ensure, ResultExt};
45use store_api::logstore::LogStore;
46use store_api::region_engine::{
47 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
48};
49use store_api::storage::RegionId;
50use tokio::sync::mpsc::{Receiver, Sender};
51use tokio::sync::{mpsc, oneshot, watch, Mutex};
52
53use crate::cache::write_cache::{WriteCache, WriteCacheRef};
54use crate::cache::{CacheManager, CacheManagerRef};
55use crate::compaction::CompactionScheduler;
56use crate::config::MitoConfig;
57use crate::error;
58use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
65 WorkerRequest, WorkerRequestWithTime,
66};
67use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
68use crate::sst::file::FileId;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::puffin_manager::PuffinManagerFactory;
71use crate::time_provider::{StdTimeProvider, TimeProviderRef};
72use crate::wal::Wal;
73use crate::worker::handle_manifest::RegionEditQueues;
74
75pub(crate) type WorkerId = u32;
77
78pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
79
80pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
82pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
86pub(crate) struct WorkerGroup {
123 workers: Vec<RegionWorker>,
125 flush_job_pool: SchedulerRef,
127 compact_job_pool: SchedulerRef,
129 purge_scheduler: SchedulerRef,
131 cache_manager: CacheManagerRef,
133}
134
135impl WorkerGroup {
136 pub(crate) async fn start<S: LogStore>(
140 config: Arc<MitoConfig>,
141 log_store: Arc<S>,
142 object_store_manager: ObjectStoreManagerRef,
143 schema_metadata_manager: SchemaMetadataManagerRef,
144 plugins: Plugins,
145 ) -> Result<WorkerGroup> {
146 let (flush_sender, flush_receiver) = watch::channel(());
147 let write_buffer_manager = Arc::new(
148 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
149 .with_notifier(flush_sender.clone()),
150 );
151 let puffin_manager_factory = PuffinManagerFactory::new(
152 &config.index.aux_path,
153 config.index.staging_size.as_bytes(),
154 Some(config.index.write_buffer_size.as_bytes() as _),
155 config.index.staging_ttl,
156 )
157 .await?;
158 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
159 .await?
160 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
161 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
162 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
163 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
165 let write_cache = write_cache_from_config(
166 &config,
167 puffin_manager_factory.clone(),
168 intermediate_manager.clone(),
169 )
170 .await?;
171 let cache_manager = Arc::new(
172 CacheManager::builder()
173 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
174 .vector_cache_size(config.vector_cache_size.as_bytes())
175 .page_cache_size(config.page_cache_size.as_bytes())
176 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
177 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
178 .index_content_size(config.index.content_cache_size.as_bytes())
179 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
180 .index_result_cache_size(config.index.result_cache_size.as_bytes())
181 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
182 .write_cache(write_cache)
183 .build(),
184 );
185 let time_provider = Arc::new(StdTimeProvider);
186
187 let workers = (0..config.num_workers)
188 .map(|id| {
189 WorkerStarter {
190 id: id as WorkerId,
191 config: config.clone(),
192 log_store: log_store.clone(),
193 object_store_manager: object_store_manager.clone(),
194 write_buffer_manager: write_buffer_manager.clone(),
195 flush_job_pool: flush_job_pool.clone(),
196 compact_job_pool: compact_job_pool.clone(),
197 purge_scheduler: purge_scheduler.clone(),
198 listener: WorkerListener::default(),
199 cache_manager: cache_manager.clone(),
200 puffin_manager_factory: puffin_manager_factory.clone(),
201 intermediate_manager: intermediate_manager.clone(),
202 time_provider: time_provider.clone(),
203 flush_sender: flush_sender.clone(),
204 flush_receiver: flush_receiver.clone(),
205 plugins: plugins.clone(),
206 schema_metadata_manager: schema_metadata_manager.clone(),
207 }
208 .start()
209 })
210 .collect();
211
212 Ok(WorkerGroup {
213 workers,
214 flush_job_pool,
215 compact_job_pool,
216 purge_scheduler,
217 cache_manager,
218 })
219 }
220
221 pub(crate) async fn stop(&self) -> Result<()> {
223 info!("Stop region worker group");
224
225 self.compact_job_pool.stop(true).await?;
228 self.flush_job_pool.stop(true).await?;
230 self.purge_scheduler.stop(true).await?;
232
233 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
234
235 Ok(())
236 }
237
238 pub(crate) async fn submit_to_worker(
240 &self,
241 region_id: RegionId,
242 request: WorkerRequest,
243 ) -> Result<()> {
244 self.worker(region_id).submit_request(request).await
245 }
246
247 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
249 self.worker(region_id).is_region_exists(region_id)
250 }
251
252 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
254 self.worker(region_id).is_region_opening(region_id)
255 }
256
257 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
261 self.worker(region_id).get_region(region_id)
262 }
263
264 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
266 self.cache_manager.clone()
267 }
268
269 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
271 let index = region_id_to_index(region_id, self.workers.len());
272
273 &self.workers[index]
274 }
275}
276
277#[cfg(any(test, feature = "test"))]
279impl WorkerGroup {
280 pub(crate) async fn start_for_test<S: LogStore>(
284 config: Arc<MitoConfig>,
285 log_store: Arc<S>,
286 object_store_manager: ObjectStoreManagerRef,
287 write_buffer_manager: Option<WriteBufferManagerRef>,
288 listener: Option<crate::engine::listener::EventListenerRef>,
289 schema_metadata_manager: SchemaMetadataManagerRef,
290 time_provider: TimeProviderRef,
291 ) -> Result<WorkerGroup> {
292 let (flush_sender, flush_receiver) = watch::channel(());
293 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
294 Arc::new(
295 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
296 .with_notifier(flush_sender.clone()),
297 )
298 });
299 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
300 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
301 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
302 let puffin_manager_factory = PuffinManagerFactory::new(
303 &config.index.aux_path,
304 config.index.staging_size.as_bytes(),
305 Some(config.index.write_buffer_size.as_bytes() as _),
306 config.index.staging_ttl,
307 )
308 .await?;
309 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
310 .await?
311 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
312 let write_cache = write_cache_from_config(
313 &config,
314 puffin_manager_factory.clone(),
315 intermediate_manager.clone(),
316 )
317 .await?;
318 let cache_manager = Arc::new(
319 CacheManager::builder()
320 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
321 .vector_cache_size(config.vector_cache_size.as_bytes())
322 .page_cache_size(config.page_cache_size.as_bytes())
323 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
324 .write_cache(write_cache)
325 .build(),
326 );
327 let workers = (0..config.num_workers)
328 .map(|id| {
329 WorkerStarter {
330 id: id as WorkerId,
331 config: config.clone(),
332 log_store: log_store.clone(),
333 object_store_manager: object_store_manager.clone(),
334 write_buffer_manager: write_buffer_manager.clone(),
335 flush_job_pool: flush_job_pool.clone(),
336 compact_job_pool: compact_job_pool.clone(),
337 purge_scheduler: purge_scheduler.clone(),
338 listener: WorkerListener::new(listener.clone()),
339 cache_manager: cache_manager.clone(),
340 puffin_manager_factory: puffin_manager_factory.clone(),
341 intermediate_manager: intermediate_manager.clone(),
342 time_provider: time_provider.clone(),
343 flush_sender: flush_sender.clone(),
344 flush_receiver: flush_receiver.clone(),
345 plugins: Plugins::new(),
346 schema_metadata_manager: schema_metadata_manager.clone(),
347 }
348 .start()
349 })
350 .collect();
351
352 Ok(WorkerGroup {
353 workers,
354 flush_job_pool,
355 compact_job_pool,
356 purge_scheduler,
357 cache_manager,
358 })
359 }
360
361 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
363 &self.purge_scheduler
364 }
365}
366
367fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
368 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
369 % num_workers
370}
371
372async fn write_cache_from_config(
373 config: &MitoConfig,
374 puffin_manager_factory: PuffinManagerFactory,
375 intermediate_manager: IntermediateManager,
376) -> Result<Option<WriteCacheRef>> {
377 if !config.enable_write_cache {
378 return Ok(None);
379 }
380
381 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
382 .await
383 .context(CreateDirSnafu {
384 dir: &config.write_cache_path,
385 })?;
386
387 let cache = WriteCache::new_fs(
388 &config.write_cache_path,
389 config.write_cache_size,
390 config.write_cache_ttl,
391 puffin_manager_factory,
392 intermediate_manager,
393 )
394 .await?;
395 Ok(Some(Arc::new(cache)))
396}
397
398pub(crate) fn worker_init_check_delay() -> Duration {
400 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
401 Duration::from_secs(init_check_delay)
402}
403
404struct WorkerStarter<S> {
406 id: WorkerId,
407 config: Arc<MitoConfig>,
408 log_store: Arc<S>,
409 object_store_manager: ObjectStoreManagerRef,
410 write_buffer_manager: WriteBufferManagerRef,
411 compact_job_pool: SchedulerRef,
412 flush_job_pool: SchedulerRef,
413 purge_scheduler: SchedulerRef,
414 listener: WorkerListener,
415 cache_manager: CacheManagerRef,
416 puffin_manager_factory: PuffinManagerFactory,
417 intermediate_manager: IntermediateManager,
418 time_provider: TimeProviderRef,
419 flush_sender: watch::Sender<()>,
421 flush_receiver: watch::Receiver<()>,
423 plugins: Plugins,
424 schema_metadata_manager: SchemaMetadataManagerRef,
425}
426
427impl<S: LogStore> WorkerStarter<S> {
428 fn start(self) -> RegionWorker {
430 let regions = Arc::new(RegionMap::default());
431 let opening_regions = Arc::new(OpeningRegions::default());
432 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
433
434 let running = Arc::new(AtomicBool::new(true));
435 let now = self.time_provider.current_time_millis();
436 let id_string = self.id.to_string();
437 let mut worker_thread = RegionWorkerLoop {
438 id: self.id,
439 config: self.config.clone(),
440 regions: regions.clone(),
441 dropping_regions: Arc::new(RegionMap::default()),
442 opening_regions: opening_regions.clone(),
443 sender: sender.clone(),
444 receiver,
445 wal: Wal::new(self.log_store),
446 object_store_manager: self.object_store_manager.clone(),
447 running: running.clone(),
448 memtable_builder_provider: MemtableBuilderProvider::new(
449 Some(self.write_buffer_manager.clone()),
450 self.config.clone(),
451 ),
452 purge_scheduler: self.purge_scheduler.clone(),
453 write_buffer_manager: self.write_buffer_manager,
454 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
455 compaction_scheduler: CompactionScheduler::new(
456 self.compact_job_pool,
457 sender.clone(),
458 self.cache_manager.clone(),
459 self.config,
460 self.listener.clone(),
461 self.plugins.clone(),
462 ),
463 stalled_requests: StalledRequests::default(),
464 listener: self.listener,
465 cache_manager: self.cache_manager,
466 puffin_manager_factory: self.puffin_manager_factory,
467 intermediate_manager: self.intermediate_manager,
468 time_provider: self.time_provider,
469 last_periodical_check_millis: now,
470 flush_sender: self.flush_sender,
471 flush_receiver: self.flush_receiver,
472 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
473 region_count: REGION_COUNT.with_label_values(&[&id_string]),
474 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
475 region_edit_queues: RegionEditQueues::default(),
476 schema_metadata_manager: self.schema_metadata_manager,
477 };
478 let handle = common_runtime::spawn_global(async move {
479 worker_thread.run().await;
480 });
481
482 RegionWorker {
483 id: self.id,
484 regions,
485 opening_regions,
486 sender,
487 handle: Mutex::new(Some(handle)),
488 running,
489 }
490 }
491}
492
493pub(crate) struct RegionWorker {
495 id: WorkerId,
497 regions: RegionMapRef,
499 opening_regions: OpeningRegionsRef,
501 sender: Sender<WorkerRequestWithTime>,
503 handle: Mutex<Option<JoinHandle<()>>>,
505 running: Arc<AtomicBool>,
507}
508
509impl RegionWorker {
510 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
512 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
513 let request_with_time = WorkerRequestWithTime::new(request);
514 if self.sender.send(request_with_time).await.is_err() {
515 warn!(
516 "Worker {} is already exited but the running flag is still true",
517 self.id
518 );
519 self.set_running(false);
521 return WorkerStoppedSnafu { id: self.id }.fail();
522 }
523
524 Ok(())
525 }
526
527 async fn stop(&self) -> Result<()> {
531 let handle = self.handle.lock().await.take();
532 if let Some(handle) = handle {
533 info!("Stop region worker {}", self.id);
534
535 self.set_running(false);
536 if self
537 .sender
538 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
539 .await
540 .is_err()
541 {
542 warn!("Worker {} is already exited before stop", self.id);
543 }
544
545 handle.await.context(JoinSnafu)?;
546 }
547
548 Ok(())
549 }
550
551 fn is_running(&self) -> bool {
553 self.running.load(Ordering::Relaxed)
554 }
555
556 fn set_running(&self, value: bool) {
558 self.running.store(value, Ordering::Relaxed)
559 }
560
561 fn is_region_exists(&self, region_id: RegionId) -> bool {
563 self.regions.is_region_exists(region_id)
564 }
565
566 fn is_region_opening(&self, region_id: RegionId) -> bool {
568 self.opening_regions.is_region_exists(region_id)
569 }
570
571 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
573 self.regions.get_region(region_id)
574 }
575
576 #[cfg(test)]
577 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
579 &self.opening_regions
580 }
581}
582
583impl Drop for RegionWorker {
584 fn drop(&mut self) {
585 if self.is_running() {
586 self.set_running(false);
587 }
589 }
590}
591
592type RequestBuffer = Vec<WorkerRequest>;
593
594#[derive(Default)]
598pub(crate) struct StalledRequests {
599 pub(crate) requests:
606 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
607 pub(crate) estimated_size: usize,
609}
610
611impl StalledRequests {
612 pub(crate) fn append(
614 &mut self,
615 requests: &mut Vec<SenderWriteRequest>,
616 bulk_requests: &mut Vec<SenderBulkRequest>,
617 ) {
618 for req in requests.drain(..) {
619 self.push(req);
620 }
621 for req in bulk_requests.drain(..) {
622 self.push_bulk(req);
623 }
624 }
625
626 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
628 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
629 let req_size = req.request.estimated_size();
630 *size += req_size;
631 self.estimated_size += req_size;
632 requests.push(req);
633 }
634
635 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
636 let region_id = req.region_id;
637 let (size, _, requests) = self.requests.entry(region_id).or_default();
638 let req_size = req.request.estimated_size();
639 *size += req_size;
640 self.estimated_size += req_size;
641 requests.push(req);
642 }
643
644 pub(crate) fn remove(
646 &mut self,
647 region_id: &RegionId,
648 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
649 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
650 self.estimated_size -= size;
651 (write_reqs, bulk_reqs)
652 } else {
653 (vec![], vec![])
654 }
655 }
656
657 pub(crate) fn stalled_count(&self) -> usize {
659 self.requests
660 .values()
661 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
662 .sum()
663 }
664}
665
666struct RegionWorkerLoop<S> {
668 id: WorkerId,
670 config: Arc<MitoConfig>,
672 regions: RegionMapRef,
674 dropping_regions: RegionMapRef,
676 opening_regions: OpeningRegionsRef,
678 sender: Sender<WorkerRequestWithTime>,
680 receiver: Receiver<WorkerRequestWithTime>,
682 wal: Wal<S>,
684 object_store_manager: ObjectStoreManagerRef,
686 running: Arc<AtomicBool>,
688 memtable_builder_provider: MemtableBuilderProvider,
690 purge_scheduler: SchedulerRef,
692 write_buffer_manager: WriteBufferManagerRef,
694 flush_scheduler: FlushScheduler,
696 compaction_scheduler: CompactionScheduler,
698 stalled_requests: StalledRequests,
700 listener: WorkerListener,
702 cache_manager: CacheManagerRef,
704 puffin_manager_factory: PuffinManagerFactory,
706 intermediate_manager: IntermediateManager,
708 time_provider: TimeProviderRef,
710 last_periodical_check_millis: i64,
712 flush_sender: watch::Sender<()>,
714 flush_receiver: watch::Receiver<()>,
716 stalling_count: IntGauge,
718 region_count: IntGauge,
720 request_wait_time: Histogram,
722 region_edit_queues: RegionEditQueues,
724 schema_metadata_manager: SchemaMetadataManagerRef,
726}
727
728impl<S: LogStore> RegionWorkerLoop<S> {
729 async fn run(&mut self) {
731 let init_check_delay = worker_init_check_delay();
732 info!(
733 "Start region worker thread {}, init_check_delay: {:?}",
734 self.id, init_check_delay
735 );
736 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
737
738 let mut write_req_buffer: Vec<SenderWriteRequest> =
740 Vec::with_capacity(self.config.worker_request_batch_size);
741 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
742 Vec::with_capacity(self.config.worker_request_batch_size);
743 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
744 Vec::with_capacity(self.config.worker_request_batch_size);
745 let mut general_req_buffer: Vec<WorkerRequest> =
746 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
747
748 while self.running.load(Ordering::Relaxed) {
749 write_req_buffer.clear();
751 ddl_req_buffer.clear();
752 general_req_buffer.clear();
753
754 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
755 let sleep = tokio::time::sleep(max_wait_time);
756 tokio::pin!(sleep);
757
758 tokio::select! {
759 request_opt = self.receiver.recv() => {
760 match request_opt {
761 Some(request_with_time) => {
762 let wait_time = request_with_time.created_at.elapsed();
764 self.request_wait_time.observe(wait_time.as_secs_f64());
765
766 match request_with_time.request {
767 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
768 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
769 req => general_req_buffer.push(req),
770 }
771 },
772 None => break,
774 }
775 }
776 recv_res = self.flush_receiver.changed() => {
777 if recv_res.is_err() {
778 break;
780 } else {
781 self.maybe_flush_worker();
786 self.handle_stalled_requests().await;
788 continue;
789 }
790 }
791 _ = &mut sleep => {
792 self.handle_periodical_tasks();
794 continue;
795 }
796 }
797
798 if self.flush_receiver.has_changed().unwrap_or(false) {
799 self.handle_stalled_requests().await;
803 }
804
805 for _ in 1..self.config.worker_request_batch_size {
807 match self.receiver.try_recv() {
809 Ok(request_with_time) => {
810 let wait_time = request_with_time.created_at.elapsed();
812 self.request_wait_time.observe(wait_time.as_secs_f64());
813
814 match request_with_time.request {
815 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
816 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
817 req => general_req_buffer.push(req),
818 }
819 }
820 Err(_) => break,
822 }
823 }
824
825 self.listener.on_recv_requests(
826 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
827 );
828
829 self.handle_requests(
830 &mut write_req_buffer,
831 &mut ddl_req_buffer,
832 &mut general_req_buffer,
833 &mut bulk_req_buffer,
834 )
835 .await;
836
837 self.handle_periodical_tasks();
838 }
839
840 self.clean().await;
841
842 info!("Exit region worker thread {}", self.id);
843 }
844
845 async fn handle_requests(
849 &mut self,
850 write_requests: &mut Vec<SenderWriteRequest>,
851 ddl_requests: &mut Vec<SenderDdlRequest>,
852 general_requests: &mut Vec<WorkerRequest>,
853 bulk_requests: &mut Vec<SenderBulkRequest>,
854 ) {
855 for worker_req in general_requests.drain(..) {
856 match worker_req {
857 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
858 continue;
860 }
861 WorkerRequest::Background { region_id, notify } => {
862 self.handle_background_notify(region_id, notify).await;
864 }
865 WorkerRequest::SetRegionRoleStateGracefully {
866 region_id,
867 region_role_state,
868 sender,
869 } => {
870 self.set_role_state_gracefully(region_id, region_role_state, sender)
871 .await;
872 }
873 WorkerRequest::EditRegion(request) => {
874 self.handle_region_edit(request).await;
875 }
876 WorkerRequest::Stop => {
877 debug_assert!(!self.running.load(Ordering::Relaxed));
878 }
879 WorkerRequest::SyncRegion(req) => {
880 self.handle_region_sync(req).await;
881 }
882 WorkerRequest::BulkInserts {
883 metadata,
884 request,
885 sender,
886 } => {
887 if let Some(region_metadata) = metadata {
888 self.handle_bulk_insert_batch(
889 region_metadata,
890 request,
891 bulk_requests,
892 sender,
893 )
894 .await;
895 } else {
896 error!("Cannot find region metadata for {}", request.region_id);
897 sender.send(
898 error::RegionNotFoundSnafu {
899 region_id: request.region_id,
900 }
901 .fail(),
902 );
903 }
904 }
905 }
906 }
907
908 self.handle_write_requests(write_requests, bulk_requests, true)
911 .await;
912
913 self.handle_ddl_requests(ddl_requests).await;
914 }
915
916 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
918 if ddl_requests.is_empty() {
919 return;
920 }
921
922 for ddl in ddl_requests.drain(..) {
923 let res = match ddl.request {
924 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
925 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
926 DdlRequest::Open((req, wal_entry_receiver)) => {
927 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
928 .await;
929 continue;
930 }
931 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
932 DdlRequest::Alter(req) => {
933 self.handle_alter_request(ddl.region_id, req, ddl.sender)
934 .await;
935 continue;
936 }
937 DdlRequest::Flush(req) => {
938 self.handle_flush_request(ddl.region_id, req, ddl.sender)
939 .await;
940 continue;
941 }
942 DdlRequest::Compact(req) => {
943 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
944 .await;
945 continue;
946 }
947 DdlRequest::Truncate(_) => {
948 self.handle_truncate_request(ddl.region_id, ddl.sender)
949 .await;
950 continue;
951 }
952 DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
953 };
954
955 ddl.sender.send(res);
956 }
957 }
958
959 fn handle_periodical_tasks(&mut self) {
961 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
962 if self
963 .time_provider
964 .elapsed_since(self.last_periodical_check_millis)
965 < interval
966 {
967 return;
968 }
969
970 self.last_periodical_check_millis = self.time_provider.current_time_millis();
971
972 if let Err(e) = self.flush_periodically() {
973 error!(e; "Failed to flush regions periodically");
974 }
975 }
976
977 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
979 match notify {
980 BackgroundNotify::FlushFinished(req) => {
981 self.handle_flush_finished(region_id, req).await
982 }
983 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
984 BackgroundNotify::CompactionFinished(req) => {
985 self.handle_compaction_finished(region_id, req).await
986 }
987 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
988 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
989 BackgroundNotify::RegionChange(req) => {
990 self.handle_manifest_region_change_result(req).await
991 }
992 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
993 }
994 }
995
996 async fn set_role_state_gracefully(
998 &mut self,
999 region_id: RegionId,
1000 region_role_state: SettableRegionRoleState,
1001 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1002 ) {
1003 if let Some(region) = self.regions.get_region(region_id) {
1004 common_runtime::spawn_global(async move {
1006 region.set_role_state_gracefully(region_role_state).await;
1007
1008 let last_entry_id = region.version_control.current().last_entry_id;
1009 let _ = sender.send(SetRegionRoleStateResponse::success(
1010 SetRegionRoleStateSuccess::mito(last_entry_id),
1011 ));
1012 });
1013 } else {
1014 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1015 }
1016 }
1017}
1018
1019impl<S> RegionWorkerLoop<S> {
1020 async fn clean(&self) {
1022 let regions = self.regions.list_regions();
1024 for region in regions {
1025 region.stop().await;
1026 }
1027
1028 self.regions.clear();
1029 }
1030
1031 fn notify_group(&mut self) {
1034 let _ = self.flush_sender.send(());
1036 self.flush_receiver.borrow_and_update();
1038 }
1039}
1040
1041#[derive(Default, Clone)]
1043pub(crate) struct WorkerListener {
1044 #[cfg(any(test, feature = "test"))]
1045 listener: Option<crate::engine::listener::EventListenerRef>,
1046}
1047
1048impl WorkerListener {
1049 #[cfg(any(test, feature = "test"))]
1050 pub(crate) fn new(
1051 listener: Option<crate::engine::listener::EventListenerRef>,
1052 ) -> WorkerListener {
1053 WorkerListener { listener }
1054 }
1055
1056 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1058 #[cfg(any(test, feature = "test"))]
1059 if let Some(listener) = &self.listener {
1060 listener.on_flush_success(region_id);
1061 }
1062 let _ = region_id;
1064 }
1065
1066 pub(crate) fn on_write_stall(&self) {
1068 #[cfg(any(test, feature = "test"))]
1069 if let Some(listener) = &self.listener {
1070 listener.on_write_stall();
1071 }
1072 }
1073
1074 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1075 #[cfg(any(test, feature = "test"))]
1076 if let Some(listener) = &self.listener {
1077 listener.on_flush_begin(region_id).await;
1078 }
1079 let _ = region_id;
1081 }
1082
1083 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1084 #[cfg(any(test, feature = "test"))]
1085 if let Some(listener) = &self.listener {
1086 return listener.on_later_drop_begin(region_id);
1087 }
1088 let _ = region_id;
1090 None
1091 }
1092
1093 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1095 #[cfg(any(test, feature = "test"))]
1096 if let Some(listener) = &self.listener {
1097 listener.on_later_drop_end(region_id, removed);
1098 }
1099 let _ = region_id;
1101 let _ = removed;
1102 }
1103
1104 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1105 #[cfg(any(test, feature = "test"))]
1106 if let Some(listener) = &self.listener {
1107 listener.on_merge_ssts_finished(region_id).await;
1108 }
1109 let _ = region_id;
1111 }
1112
1113 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1114 #[cfg(any(test, feature = "test"))]
1115 if let Some(listener) = &self.listener {
1116 listener.on_recv_requests(request_num);
1117 }
1118 let _ = request_num;
1120 }
1121
1122 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1123 #[cfg(any(test, feature = "test"))]
1124 if let Some(listener) = &self.listener {
1125 listener.on_file_cache_filled(_file_id);
1126 }
1127 }
1128
1129 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1130 #[cfg(any(test, feature = "test"))]
1131 if let Some(listener) = &self.listener {
1132 listener.on_compaction_scheduled(_region_id);
1133 }
1134 }
1135
1136 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1137 #[cfg(any(test, feature = "test"))]
1138 if let Some(listener) = &self.listener {
1139 listener
1140 .on_notify_region_change_result_begin(_region_id)
1141 .await;
1142 }
1143 }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148 use super::*;
1149 use crate::test_util::TestEnv;
1150
1151 #[test]
1152 fn test_region_id_to_index() {
1153 let num_workers = 4;
1154
1155 let region_id = RegionId::new(1, 2);
1156 let index = region_id_to_index(region_id, num_workers);
1157 assert_eq!(index, 3);
1158
1159 let region_id = RegionId::new(2, 3);
1160 let index = region_id_to_index(region_id, num_workers);
1161 assert_eq!(index, 1);
1162 }
1163
1164 #[tokio::test]
1165 async fn test_worker_group_start_stop() {
1166 let env = TestEnv::with_prefix("group-stop").await;
1167 let group = env
1168 .create_worker_group(MitoConfig {
1169 num_workers: 4,
1170 ..Default::default()
1171 })
1172 .await;
1173
1174 group.stop().await.unwrap();
1175 }
1176}