1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_error::ext::BoxedError;
38use common_meta::key::SchemaMetadataManagerRef;
39use common_runtime::JoinHandle;
40use common_telemetry::{error, info, warn};
41use futures::future::try_join_all;
42use object_store::manager::ObjectStoreManagerRef;
43use prometheus::{Histogram, IntGauge};
44use rand::{rng, Rng};
45use snafu::{ensure, ResultExt};
46use store_api::logstore::LogStore;
47use store_api::region_engine::{
48 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
49};
50use store_api::storage::RegionId;
51use tokio::sync::mpsc::{Receiver, Sender};
52use tokio::sync::{mpsc, oneshot, watch, Mutex};
53
54use crate::cache::write_cache::{WriteCache, WriteCacheRef};
55use crate::cache::{CacheManager, CacheManagerRef};
56use crate::compaction::CompactionScheduler;
57use crate::config::MitoConfig;
58use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
65 WorkerRequest, WorkerRequestWithTime,
66};
67use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
68use crate::sst::file::FileId;
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::time_provider::{StdTimeProvider, TimeProviderRef};
73use crate::wal::Wal;
74use crate::worker::handle_manifest::RegionEditQueues;
75
76pub(crate) type WorkerId = u32;
78
79pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
80
81pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
83pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
85
86#[cfg_attr(doc, aquamarine::aquamarine)]
87pub(crate) struct WorkerGroup {
124 workers: Vec<RegionWorker>,
126 flush_job_pool: SchedulerRef,
128 compact_job_pool: SchedulerRef,
130 purge_scheduler: SchedulerRef,
132 cache_manager: CacheManagerRef,
134 file_ref_manager: FileReferenceManagerRef,
136}
137
138impl WorkerGroup {
139 pub(crate) async fn start<S: LogStore>(
143 config: Arc<MitoConfig>,
144 log_store: Arc<S>,
145 object_store_manager: ObjectStoreManagerRef,
146 schema_metadata_manager: SchemaMetadataManagerRef,
147 file_ref_manager: FileReferenceManagerRef,
148 plugins: Plugins,
149 ) -> Result<WorkerGroup> {
150 let (flush_sender, flush_receiver) = watch::channel(());
151 let write_buffer_manager = Arc::new(
152 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
153 .with_notifier(flush_sender.clone()),
154 );
155 let puffin_manager_factory = PuffinManagerFactory::new(
156 &config.index.aux_path,
157 config.index.staging_size.as_bytes(),
158 Some(config.index.write_buffer_size.as_bytes() as _),
159 config.index.staging_ttl,
160 )
161 .await?;
162 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
163 .await?
164 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
165 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
166 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
167 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
169 let write_cache = write_cache_from_config(
170 &config,
171 puffin_manager_factory.clone(),
172 intermediate_manager.clone(),
173 )
174 .await?;
175 let cache_manager = Arc::new(
176 CacheManager::builder()
177 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
178 .vector_cache_size(config.vector_cache_size.as_bytes())
179 .page_cache_size(config.page_cache_size.as_bytes())
180 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
181 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
182 .index_content_size(config.index.content_cache_size.as_bytes())
183 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
184 .index_result_cache_size(config.index.result_cache_size.as_bytes())
185 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
186 .write_cache(write_cache)
187 .build(),
188 );
189 let time_provider = Arc::new(StdTimeProvider);
190
191 let workers = (0..config.num_workers)
192 .map(|id| {
193 WorkerStarter {
194 id: id as WorkerId,
195 config: config.clone(),
196 log_store: log_store.clone(),
197 object_store_manager: object_store_manager.clone(),
198 write_buffer_manager: write_buffer_manager.clone(),
199 flush_job_pool: flush_job_pool.clone(),
200 compact_job_pool: compact_job_pool.clone(),
201 purge_scheduler: purge_scheduler.clone(),
202 listener: WorkerListener::default(),
203 cache_manager: cache_manager.clone(),
204 puffin_manager_factory: puffin_manager_factory.clone(),
205 intermediate_manager: intermediate_manager.clone(),
206 time_provider: time_provider.clone(),
207 flush_sender: flush_sender.clone(),
208 flush_receiver: flush_receiver.clone(),
209 plugins: plugins.clone(),
210 schema_metadata_manager: schema_metadata_manager.clone(),
211 file_ref_manager: file_ref_manager.clone(),
212 }
213 .start()
214 })
215 .collect::<Result<Vec<_>>>()?;
216
217 Ok(WorkerGroup {
218 workers,
219 flush_job_pool,
220 compact_job_pool,
221 purge_scheduler,
222 cache_manager,
223 file_ref_manager,
224 })
225 }
226
227 pub(crate) async fn stop(&self) -> Result<()> {
229 info!("Stop region worker group");
230
231 self.compact_job_pool.stop(true).await?;
234 self.flush_job_pool.stop(true).await?;
236 self.purge_scheduler.stop(true).await?;
238
239 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
240
241 Ok(())
242 }
243
244 pub(crate) async fn submit_to_worker(
246 &self,
247 region_id: RegionId,
248 request: WorkerRequest,
249 ) -> Result<()> {
250 self.worker(region_id).submit_request(request).await
251 }
252
253 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
255 self.worker(region_id).is_region_exists(region_id)
256 }
257
258 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
260 self.worker(region_id).is_region_opening(region_id)
261 }
262
263 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
267 self.worker(region_id).get_region(region_id)
268 }
269
270 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
272 self.cache_manager.clone()
273 }
274
275 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
276 self.file_ref_manager.clone()
277 }
278
279 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
281 let index = region_id_to_index(region_id, self.workers.len());
282
283 &self.workers[index]
284 }
285
286 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
287 self.workers
288 .iter()
289 .flat_map(|worker| worker.regions.list_regions())
290 }
291}
292
293#[cfg(any(test, feature = "test"))]
295impl WorkerGroup {
296 #[allow(clippy::too_many_arguments)]
300 pub(crate) async fn start_for_test<S: LogStore>(
301 config: Arc<MitoConfig>,
302 log_store: Arc<S>,
303 object_store_manager: ObjectStoreManagerRef,
304 write_buffer_manager: Option<WriteBufferManagerRef>,
305 listener: Option<crate::engine::listener::EventListenerRef>,
306 schema_metadata_manager: SchemaMetadataManagerRef,
307 file_ref_manager: FileReferenceManagerRef,
308 time_provider: TimeProviderRef,
309 ) -> Result<WorkerGroup> {
310 let (flush_sender, flush_receiver) = watch::channel(());
311 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
312 Arc::new(
313 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
314 .with_notifier(flush_sender.clone()),
315 )
316 });
317 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
318 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
319 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
320 let puffin_manager_factory = PuffinManagerFactory::new(
321 &config.index.aux_path,
322 config.index.staging_size.as_bytes(),
323 Some(config.index.write_buffer_size.as_bytes() as _),
324 config.index.staging_ttl,
325 )
326 .await?;
327 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
328 .await?
329 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
330 let write_cache = write_cache_from_config(
331 &config,
332 puffin_manager_factory.clone(),
333 intermediate_manager.clone(),
334 )
335 .await?;
336 let cache_manager = Arc::new(
337 CacheManager::builder()
338 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
339 .vector_cache_size(config.vector_cache_size.as_bytes())
340 .page_cache_size(config.page_cache_size.as_bytes())
341 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
342 .write_cache(write_cache)
343 .build(),
344 );
345 let workers = (0..config.num_workers)
346 .map(|id| {
347 WorkerStarter {
348 id: id as WorkerId,
349 config: config.clone(),
350 log_store: log_store.clone(),
351 object_store_manager: object_store_manager.clone(),
352 write_buffer_manager: write_buffer_manager.clone(),
353 flush_job_pool: flush_job_pool.clone(),
354 compact_job_pool: compact_job_pool.clone(),
355 purge_scheduler: purge_scheduler.clone(),
356 listener: WorkerListener::new(listener.clone()),
357 cache_manager: cache_manager.clone(),
358 puffin_manager_factory: puffin_manager_factory.clone(),
359 intermediate_manager: intermediate_manager.clone(),
360 time_provider: time_provider.clone(),
361 flush_sender: flush_sender.clone(),
362 flush_receiver: flush_receiver.clone(),
363 plugins: Plugins::new(),
364 schema_metadata_manager: schema_metadata_manager.clone(),
365 file_ref_manager: file_ref_manager.clone(),
366 }
367 .start()
368 })
369 .collect::<Result<Vec<_>>>()?;
370
371 Ok(WorkerGroup {
372 workers,
373 flush_job_pool,
374 compact_job_pool,
375 purge_scheduler,
376 cache_manager,
377 file_ref_manager,
378 })
379 }
380
381 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
383 &self.purge_scheduler
384 }
385}
386
387fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
388 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
389 % num_workers
390}
391
392async fn write_cache_from_config(
393 config: &MitoConfig,
394 puffin_manager_factory: PuffinManagerFactory,
395 intermediate_manager: IntermediateManager,
396) -> Result<Option<WriteCacheRef>> {
397 if !config.enable_write_cache {
398 return Ok(None);
399 }
400
401 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
402 .await
403 .context(CreateDirSnafu {
404 dir: &config.write_cache_path,
405 })?;
406
407 let cache = WriteCache::new_fs(
408 &config.write_cache_path,
409 config.write_cache_size,
410 config.write_cache_ttl,
411 puffin_manager_factory,
412 intermediate_manager,
413 )
414 .await?;
415 Ok(Some(Arc::new(cache)))
416}
417
418pub(crate) fn worker_init_check_delay() -> Duration {
420 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
421 Duration::from_secs(init_check_delay)
422}
423
424struct WorkerStarter<S> {
426 id: WorkerId,
427 config: Arc<MitoConfig>,
428 log_store: Arc<S>,
429 object_store_manager: ObjectStoreManagerRef,
430 write_buffer_manager: WriteBufferManagerRef,
431 compact_job_pool: SchedulerRef,
432 flush_job_pool: SchedulerRef,
433 purge_scheduler: SchedulerRef,
434 listener: WorkerListener,
435 cache_manager: CacheManagerRef,
436 puffin_manager_factory: PuffinManagerFactory,
437 intermediate_manager: IntermediateManager,
438 time_provider: TimeProviderRef,
439 flush_sender: watch::Sender<()>,
441 flush_receiver: watch::Receiver<()>,
443 plugins: Plugins,
444 schema_metadata_manager: SchemaMetadataManagerRef,
445 file_ref_manager: FileReferenceManagerRef,
446}
447
448impl<S: LogStore> WorkerStarter<S> {
449 fn start(self) -> Result<RegionWorker> {
451 let regions = Arc::new(RegionMap::default());
452 let opening_regions = Arc::new(OpeningRegions::default());
453 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
454
455 let running = Arc::new(AtomicBool::new(true));
456 let now = self.time_provider.current_time_millis();
457 let id_string = self.id.to_string();
458 let mut worker_thread = RegionWorkerLoop {
459 id: self.id,
460 config: self.config.clone(),
461 regions: regions.clone(),
462 dropping_regions: Arc::new(RegionMap::default()),
463 opening_regions: opening_regions.clone(),
464 sender: sender.clone(),
465 receiver,
466 wal: Wal::new(self.log_store),
467 object_store_manager: self.object_store_manager.clone(),
468 running: running.clone(),
469 memtable_builder_provider: MemtableBuilderProvider::new(
470 Some(self.write_buffer_manager.clone()),
471 self.config.clone(),
472 ),
473 purge_scheduler: self.purge_scheduler.clone(),
474 write_buffer_manager: self.write_buffer_manager,
475 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
476 compaction_scheduler: CompactionScheduler::new(
477 self.compact_job_pool,
478 sender.clone(),
479 self.cache_manager.clone(),
480 self.config,
481 self.listener.clone(),
482 self.plugins.clone(),
483 ),
484 stalled_requests: StalledRequests::default(),
485 listener: self.listener,
486 cache_manager: self.cache_manager,
487 puffin_manager_factory: self.puffin_manager_factory,
488 intermediate_manager: self.intermediate_manager,
489 time_provider: self.time_provider,
490 last_periodical_check_millis: now,
491 flush_sender: self.flush_sender,
492 flush_receiver: self.flush_receiver,
493 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
494 region_count: REGION_COUNT.with_label_values(&[&id_string]),
495 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
496 region_edit_queues: RegionEditQueues::default(),
497 schema_metadata_manager: self.schema_metadata_manager,
498 file_ref_manager: self.file_ref_manager.clone(),
499 };
500 let handle = common_runtime::spawn_global(async move {
501 worker_thread.run().await;
502 });
503
504 Ok(RegionWorker {
505 id: self.id,
506 regions,
507 opening_regions,
508 sender,
509 handle: Mutex::new(Some(handle)),
510 running,
511 })
512 }
513}
514
515pub(crate) struct RegionWorker {
517 id: WorkerId,
519 regions: RegionMapRef,
521 opening_regions: OpeningRegionsRef,
523 sender: Sender<WorkerRequestWithTime>,
525 handle: Mutex<Option<JoinHandle<()>>>,
527 running: Arc<AtomicBool>,
529}
530
531impl RegionWorker {
532 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
534 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
535 let request_with_time = WorkerRequestWithTime::new(request);
536 if self.sender.send(request_with_time).await.is_err() {
537 warn!(
538 "Worker {} is already exited but the running flag is still true",
539 self.id
540 );
541 self.set_running(false);
543 return WorkerStoppedSnafu { id: self.id }.fail();
544 }
545
546 Ok(())
547 }
548
549 async fn stop(&self) -> Result<()> {
553 let handle = self.handle.lock().await.take();
554 if let Some(handle) = handle {
555 info!("Stop region worker {}", self.id);
556
557 self.set_running(false);
558 if self
559 .sender
560 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
561 .await
562 .is_err()
563 {
564 warn!("Worker {} is already exited before stop", self.id);
565 }
566
567 handle.await.context(JoinSnafu)?;
568 }
569
570 Ok(())
571 }
572
573 fn is_running(&self) -> bool {
575 self.running.load(Ordering::Relaxed)
576 }
577
578 fn set_running(&self, value: bool) {
580 self.running.store(value, Ordering::Relaxed)
581 }
582
583 fn is_region_exists(&self, region_id: RegionId) -> bool {
585 self.regions.is_region_exists(region_id)
586 }
587
588 fn is_region_opening(&self, region_id: RegionId) -> bool {
590 self.opening_regions.is_region_exists(region_id)
591 }
592
593 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
595 self.regions.get_region(region_id)
596 }
597
598 #[cfg(test)]
599 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
601 &self.opening_regions
602 }
603}
604
605impl Drop for RegionWorker {
606 fn drop(&mut self) {
607 if self.is_running() {
608 self.set_running(false);
609 }
611 }
612}
613
614type RequestBuffer = Vec<WorkerRequest>;
615
616#[derive(Default)]
620pub(crate) struct StalledRequests {
621 pub(crate) requests:
628 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
629 pub(crate) estimated_size: usize,
631}
632
633impl StalledRequests {
634 pub(crate) fn append(
636 &mut self,
637 requests: &mut Vec<SenderWriteRequest>,
638 bulk_requests: &mut Vec<SenderBulkRequest>,
639 ) {
640 for req in requests.drain(..) {
641 self.push(req);
642 }
643 for req in bulk_requests.drain(..) {
644 self.push_bulk(req);
645 }
646 }
647
648 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
650 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
651 let req_size = req.request.estimated_size();
652 *size += req_size;
653 self.estimated_size += req_size;
654 requests.push(req);
655 }
656
657 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
658 let region_id = req.region_id;
659 let (size, _, requests) = self.requests.entry(region_id).or_default();
660 let req_size = req.request.estimated_size();
661 *size += req_size;
662 self.estimated_size += req_size;
663 requests.push(req);
664 }
665
666 pub(crate) fn remove(
668 &mut self,
669 region_id: &RegionId,
670 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
671 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
672 self.estimated_size -= size;
673 (write_reqs, bulk_reqs)
674 } else {
675 (vec![], vec![])
676 }
677 }
678
679 pub(crate) fn stalled_count(&self) -> usize {
681 self.requests
682 .values()
683 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
684 .sum()
685 }
686}
687
688struct RegionWorkerLoop<S> {
690 id: WorkerId,
692 config: Arc<MitoConfig>,
694 regions: RegionMapRef,
696 dropping_regions: RegionMapRef,
698 opening_regions: OpeningRegionsRef,
700 sender: Sender<WorkerRequestWithTime>,
702 receiver: Receiver<WorkerRequestWithTime>,
704 wal: Wal<S>,
706 object_store_manager: ObjectStoreManagerRef,
708 running: Arc<AtomicBool>,
710 memtable_builder_provider: MemtableBuilderProvider,
712 purge_scheduler: SchedulerRef,
714 write_buffer_manager: WriteBufferManagerRef,
716 flush_scheduler: FlushScheduler,
718 compaction_scheduler: CompactionScheduler,
720 stalled_requests: StalledRequests,
722 listener: WorkerListener,
724 cache_manager: CacheManagerRef,
726 puffin_manager_factory: PuffinManagerFactory,
728 intermediate_manager: IntermediateManager,
730 time_provider: TimeProviderRef,
732 last_periodical_check_millis: i64,
734 flush_sender: watch::Sender<()>,
736 flush_receiver: watch::Receiver<()>,
738 stalling_count: IntGauge,
740 region_count: IntGauge,
742 request_wait_time: Histogram,
744 region_edit_queues: RegionEditQueues,
746 schema_metadata_manager: SchemaMetadataManagerRef,
748 file_ref_manager: FileReferenceManagerRef,
750}
751
752impl<S: LogStore> RegionWorkerLoop<S> {
753 async fn run(&mut self) {
755 let init_check_delay = worker_init_check_delay();
756 info!(
757 "Start region worker thread {}, init_check_delay: {:?}",
758 self.id, init_check_delay
759 );
760 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
761
762 let mut write_req_buffer: Vec<SenderWriteRequest> =
764 Vec::with_capacity(self.config.worker_request_batch_size);
765 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
766 Vec::with_capacity(self.config.worker_request_batch_size);
767 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
768 Vec::with_capacity(self.config.worker_request_batch_size);
769 let mut general_req_buffer: Vec<WorkerRequest> =
770 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
771
772 while self.running.load(Ordering::Relaxed) {
773 write_req_buffer.clear();
775 ddl_req_buffer.clear();
776 general_req_buffer.clear();
777
778 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
779 let sleep = tokio::time::sleep(max_wait_time);
780 tokio::pin!(sleep);
781
782 tokio::select! {
783 request_opt = self.receiver.recv() => {
784 match request_opt {
785 Some(request_with_time) => {
786 let wait_time = request_with_time.created_at.elapsed();
788 self.request_wait_time.observe(wait_time.as_secs_f64());
789
790 match request_with_time.request {
791 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
792 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
793 req => general_req_buffer.push(req),
794 }
795 },
796 None => break,
798 }
799 }
800 recv_res = self.flush_receiver.changed() => {
801 if recv_res.is_err() {
802 break;
804 } else {
805 self.maybe_flush_worker();
810 self.handle_stalled_requests().await;
812 continue;
813 }
814 }
815 _ = &mut sleep => {
816 self.handle_periodical_tasks();
818 continue;
819 }
820 }
821
822 if self.flush_receiver.has_changed().unwrap_or(false) {
823 self.handle_stalled_requests().await;
827 }
828
829 for _ in 1..self.config.worker_request_batch_size {
831 match self.receiver.try_recv() {
833 Ok(request_with_time) => {
834 let wait_time = request_with_time.created_at.elapsed();
836 self.request_wait_time.observe(wait_time.as_secs_f64());
837
838 match request_with_time.request {
839 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
840 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
841 req => general_req_buffer.push(req),
842 }
843 }
844 Err(_) => break,
846 }
847 }
848
849 self.listener.on_recv_requests(
850 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
851 );
852
853 self.handle_requests(
854 &mut write_req_buffer,
855 &mut ddl_req_buffer,
856 &mut general_req_buffer,
857 &mut bulk_req_buffer,
858 )
859 .await;
860
861 self.handle_periodical_tasks();
862 }
863
864 self.clean().await;
865
866 info!("Exit region worker thread {}", self.id);
867 }
868
869 async fn handle_requests(
873 &mut self,
874 write_requests: &mut Vec<SenderWriteRequest>,
875 ddl_requests: &mut Vec<SenderDdlRequest>,
876 general_requests: &mut Vec<WorkerRequest>,
877 bulk_requests: &mut Vec<SenderBulkRequest>,
878 ) {
879 for worker_req in general_requests.drain(..) {
880 match worker_req {
881 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
882 continue;
884 }
885 WorkerRequest::Background { region_id, notify } => {
886 self.handle_background_notify(region_id, notify).await;
888 }
889 WorkerRequest::SetRegionRoleStateGracefully {
890 region_id,
891 region_role_state,
892 sender,
893 } => {
894 self.set_role_state_gracefully(region_id, region_role_state, sender)
895 .await;
896 }
897 WorkerRequest::EditRegion(request) => {
898 self.handle_region_edit(request).await;
899 }
900 WorkerRequest::Stop => {
901 debug_assert!(!self.running.load(Ordering::Relaxed));
902 }
903 WorkerRequest::SyncRegion(req) => {
904 self.handle_region_sync(req).await;
905 }
906 WorkerRequest::BulkInserts {
907 metadata,
908 request,
909 sender,
910 } => {
911 if let Some(region_metadata) = metadata {
912 self.handle_bulk_insert_batch(
913 region_metadata,
914 request,
915 bulk_requests,
916 sender,
917 )
918 .await;
919 } else {
920 error!("Cannot find region metadata for {}", request.region_id);
921 sender.send(
922 error::RegionNotFoundSnafu {
923 region_id: request.region_id,
924 }
925 .fail(),
926 );
927 }
928 }
929 }
930 }
931
932 self.handle_write_requests(write_requests, bulk_requests, true)
935 .await;
936
937 self.handle_ddl_requests(ddl_requests).await;
938 }
939
940 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
942 if ddl_requests.is_empty() {
943 return;
944 }
945
946 for ddl in ddl_requests.drain(..) {
947 let res = match ddl.request {
948 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
949 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
950 DdlRequest::Open((req, wal_entry_receiver)) => {
951 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
952 .await;
953 continue;
954 }
955 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
956 DdlRequest::Alter(req) => {
957 self.handle_alter_request(ddl.region_id, req, ddl.sender)
958 .await;
959 continue;
960 }
961 DdlRequest::Flush(req) => {
962 self.handle_flush_request(ddl.region_id, req, ddl.sender)
963 .await;
964 continue;
965 }
966 DdlRequest::Compact(req) => {
967 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
968 .await;
969 continue;
970 }
971 DdlRequest::Truncate(req) => {
972 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
973 .await;
974 continue;
975 }
976 DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
977 };
978
979 ddl.sender.send(res);
980 }
981 }
982
983 fn handle_periodical_tasks(&mut self) {
985 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
986 if self
987 .time_provider
988 .elapsed_since(self.last_periodical_check_millis)
989 < interval
990 {
991 return;
992 }
993
994 self.last_periodical_check_millis = self.time_provider.current_time_millis();
995
996 if let Err(e) = self.flush_periodically() {
997 error!(e; "Failed to flush regions periodically");
998 }
999 }
1000
1001 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1003 match notify {
1004 BackgroundNotify::FlushFinished(req) => {
1005 self.handle_flush_finished(region_id, req).await
1006 }
1007 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1008 BackgroundNotify::CompactionFinished(req) => {
1009 self.handle_compaction_finished(region_id, req).await
1010 }
1011 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1012 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1013 BackgroundNotify::RegionChange(req) => {
1014 self.handle_manifest_region_change_result(req).await
1015 }
1016 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1017 }
1018 }
1019
1020 async fn set_role_state_gracefully(
1022 &mut self,
1023 region_id: RegionId,
1024 region_role_state: SettableRegionRoleState,
1025 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1026 ) {
1027 if let Some(region) = self.regions.get_region(region_id) {
1028 common_runtime::spawn_global(async move {
1030 match region.set_role_state_gracefully(region_role_state).await {
1031 Ok(()) => {
1032 let last_entry_id = region.version_control.current().last_entry_id;
1033 let _ = sender.send(SetRegionRoleStateResponse::success(
1034 SetRegionRoleStateSuccess::mito(last_entry_id),
1035 ));
1036 }
1037 Err(e) => {
1038 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1039 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1040 BoxedError::new(e),
1041 ));
1042 }
1043 }
1044 });
1045 } else {
1046 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1047 }
1048 }
1049}
1050
1051impl<S> RegionWorkerLoop<S> {
1052 async fn clean(&self) {
1054 let regions = self.regions.list_regions();
1056 for region in regions {
1057 region.stop().await;
1058 }
1059
1060 self.regions.clear();
1061 }
1062
1063 fn notify_group(&mut self) {
1066 let _ = self.flush_sender.send(());
1068 self.flush_receiver.borrow_and_update();
1070 }
1071}
1072
1073#[derive(Default, Clone)]
1075pub(crate) struct WorkerListener {
1076 #[cfg(any(test, feature = "test"))]
1077 listener: Option<crate::engine::listener::EventListenerRef>,
1078}
1079
1080impl WorkerListener {
1081 #[cfg(any(test, feature = "test"))]
1082 pub(crate) fn new(
1083 listener: Option<crate::engine::listener::EventListenerRef>,
1084 ) -> WorkerListener {
1085 WorkerListener { listener }
1086 }
1087
1088 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1090 #[cfg(any(test, feature = "test"))]
1091 if let Some(listener) = &self.listener {
1092 listener.on_flush_success(region_id);
1093 }
1094 let _ = region_id;
1096 }
1097
1098 pub(crate) fn on_write_stall(&self) {
1100 #[cfg(any(test, feature = "test"))]
1101 if let Some(listener) = &self.listener {
1102 listener.on_write_stall();
1103 }
1104 }
1105
1106 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1107 #[cfg(any(test, feature = "test"))]
1108 if let Some(listener) = &self.listener {
1109 listener.on_flush_begin(region_id).await;
1110 }
1111 let _ = region_id;
1113 }
1114
1115 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1116 #[cfg(any(test, feature = "test"))]
1117 if let Some(listener) = &self.listener {
1118 return listener.on_later_drop_begin(region_id);
1119 }
1120 let _ = region_id;
1122 None
1123 }
1124
1125 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1127 #[cfg(any(test, feature = "test"))]
1128 if let Some(listener) = &self.listener {
1129 listener.on_later_drop_end(region_id, removed);
1130 }
1131 let _ = region_id;
1133 let _ = removed;
1134 }
1135
1136 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1137 #[cfg(any(test, feature = "test"))]
1138 if let Some(listener) = &self.listener {
1139 listener.on_merge_ssts_finished(region_id).await;
1140 }
1141 let _ = region_id;
1143 }
1144
1145 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1146 #[cfg(any(test, feature = "test"))]
1147 if let Some(listener) = &self.listener {
1148 listener.on_recv_requests(request_num);
1149 }
1150 let _ = request_num;
1152 }
1153
1154 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1155 #[cfg(any(test, feature = "test"))]
1156 if let Some(listener) = &self.listener {
1157 listener.on_file_cache_filled(_file_id);
1158 }
1159 }
1160
1161 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1162 #[cfg(any(test, feature = "test"))]
1163 if let Some(listener) = &self.listener {
1164 listener.on_compaction_scheduled(_region_id);
1165 }
1166 }
1167
1168 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1169 #[cfg(any(test, feature = "test"))]
1170 if let Some(listener) = &self.listener {
1171 listener
1172 .on_notify_region_change_result_begin(_region_id)
1173 .await;
1174 }
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use super::*;
1181 use crate::test_util::TestEnv;
1182
1183 #[test]
1184 fn test_region_id_to_index() {
1185 let num_workers = 4;
1186
1187 let region_id = RegionId::new(1, 2);
1188 let index = region_id_to_index(region_id, num_workers);
1189 assert_eq!(index, 3);
1190
1191 let region_id = RegionId::new(2, 3);
1192 let index = region_id_to_index(region_id, num_workers);
1193 assert_eq!(index, 1);
1194 }
1195
1196 #[tokio::test]
1197 async fn test_worker_group_start_stop() {
1198 let env = TestEnv::with_prefix("group-stop").await;
1199 let group = env
1200 .create_worker_group(MitoConfig {
1201 num_workers: 4,
1202 ..Default::default()
1203 })
1204 .await;
1205
1206 group.stop().await.unwrap();
1207 }
1208}