1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_rebuild_index;
28mod handle_truncate;
29mod handle_write;
30
31use std::collections::HashMap;
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36
37use common_base::Plugins;
38use common_error::ext::BoxedError;
39use common_meta::key::SchemaMetadataManagerRef;
40use common_runtime::JoinHandle;
41use common_telemetry::{error, info, warn};
42use futures::future::try_join_all;
43use object_store::manager::ObjectStoreManagerRef;
44use prometheus::{Histogram, IntGauge};
45use rand::{Rng, rng};
46use snafu::{ResultExt, ensure};
47use store_api::logstore::LogStore;
48use store_api::region_engine::{
49 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
50};
51use store_api::storage::{FileId, RegionId};
52use tokio::sync::mpsc::{Receiver, Sender};
53use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
54
55use crate::cache::write_cache::{WriteCache, WriteCacheRef};
56use crate::cache::{CacheManager, CacheManagerRef};
57use crate::compaction::CompactionScheduler;
58use crate::config::MitoConfig;
59use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
60use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
61use crate::memtable::MemtableBuilderProvider;
62use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
63use crate::region::opener::PartitionExprFetcherRef;
64use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
65use crate::request::{
66 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
67 WorkerRequest, WorkerRequestWithTime,
68};
69use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
70use crate::sst::file_ref::FileReferenceManagerRef;
71use crate::sst::index::IndexBuildScheduler;
72use crate::sst::index::intermediate::IntermediateManager;
73use crate::sst::index::puffin_manager::PuffinManagerFactory;
74use crate::time_provider::{StdTimeProvider, TimeProviderRef};
75use crate::wal::Wal;
76use crate::worker::handle_manifest::RegionEditQueues;
77
78pub(crate) type WorkerId = u32;
80
81pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
82
83pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
85pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
87
88#[cfg_attr(doc, aquamarine::aquamarine)]
89pub(crate) struct WorkerGroup {
126 workers: Vec<RegionWorker>,
128 flush_job_pool: SchedulerRef,
130 compact_job_pool: SchedulerRef,
132 index_build_job_pool: SchedulerRef,
134 purge_scheduler: SchedulerRef,
136 cache_manager: CacheManagerRef,
138 file_ref_manager: FileReferenceManagerRef,
140}
141
142impl WorkerGroup {
143 pub(crate) async fn start<S: LogStore>(
147 config: Arc<MitoConfig>,
148 log_store: Arc<S>,
149 object_store_manager: ObjectStoreManagerRef,
150 schema_metadata_manager: SchemaMetadataManagerRef,
151 file_ref_manager: FileReferenceManagerRef,
152 partition_expr_fetcher: PartitionExprFetcherRef,
153 plugins: Plugins,
154 ) -> Result<WorkerGroup> {
155 let (flush_sender, flush_receiver) = watch::channel(());
156 let write_buffer_manager = Arc::new(
157 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
158 .with_notifier(flush_sender.clone()),
159 );
160 let puffin_manager_factory = PuffinManagerFactory::new(
161 &config.index.aux_path,
162 config.index.staging_size.as_bytes(),
163 Some(config.index.write_buffer_size.as_bytes() as _),
164 config.index.staging_ttl,
165 )
166 .await?;
167 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
168 .await?
169 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
170 let index_build_job_pool =
171 Arc::new(LocalScheduler::new(config.max_background_index_builds));
172 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
173 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
174 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
175 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
177 let write_cache = write_cache_from_config(
178 &config,
179 puffin_manager_factory.clone(),
180 intermediate_manager.clone(),
181 )
182 .await?;
183 let cache_manager = Arc::new(
184 CacheManager::builder()
185 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
186 .vector_cache_size(config.vector_cache_size.as_bytes())
187 .page_cache_size(config.page_cache_size.as_bytes())
188 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
189 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
190 .index_content_size(config.index.content_cache_size.as_bytes())
191 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
192 .index_result_cache_size(config.index.result_cache_size.as_bytes())
193 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
194 .write_cache(write_cache)
195 .build(),
196 );
197 let time_provider = Arc::new(StdTimeProvider);
198
199 let workers = (0..config.num_workers)
200 .map(|id| {
201 WorkerStarter {
202 id: id as WorkerId,
203 config: config.clone(),
204 log_store: log_store.clone(),
205 object_store_manager: object_store_manager.clone(),
206 write_buffer_manager: write_buffer_manager.clone(),
207 index_build_job_pool: index_build_job_pool.clone(),
208 flush_job_pool: flush_job_pool.clone(),
209 compact_job_pool: compact_job_pool.clone(),
210 purge_scheduler: purge_scheduler.clone(),
211 listener: WorkerListener::default(),
212 cache_manager: cache_manager.clone(),
213 puffin_manager_factory: puffin_manager_factory.clone(),
214 intermediate_manager: intermediate_manager.clone(),
215 time_provider: time_provider.clone(),
216 flush_sender: flush_sender.clone(),
217 flush_receiver: flush_receiver.clone(),
218 plugins: plugins.clone(),
219 schema_metadata_manager: schema_metadata_manager.clone(),
220 file_ref_manager: file_ref_manager.clone(),
221 partition_expr_fetcher: partition_expr_fetcher.clone(),
222 flush_semaphore: flush_semaphore.clone(),
223 }
224 .start()
225 })
226 .collect::<Result<Vec<_>>>()?;
227
228 Ok(WorkerGroup {
229 workers,
230 flush_job_pool,
231 compact_job_pool,
232 index_build_job_pool,
233 purge_scheduler,
234 cache_manager,
235 file_ref_manager,
236 })
237 }
238
239 pub(crate) async fn stop(&self) -> Result<()> {
241 info!("Stop region worker group");
242
243 self.compact_job_pool.stop(true).await?;
246 self.flush_job_pool.stop(true).await?;
248 self.purge_scheduler.stop(true).await?;
250 self.index_build_job_pool.stop(true).await?;
252
253 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
254
255 Ok(())
256 }
257
258 pub(crate) async fn submit_to_worker(
260 &self,
261 region_id: RegionId,
262 request: WorkerRequest,
263 ) -> Result<()> {
264 self.worker(region_id).submit_request(request).await
265 }
266
267 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
269 self.worker(region_id).is_region_exists(region_id)
270 }
271
272 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
274 self.worker(region_id).is_region_opening(region_id)
275 }
276
277 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
281 self.worker(region_id).get_region(region_id)
282 }
283
284 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
286 self.cache_manager.clone()
287 }
288
289 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
290 self.file_ref_manager.clone()
291 }
292
293 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
295 let index = region_id_to_index(region_id, self.workers.len());
296
297 &self.workers[index]
298 }
299
300 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
301 self.workers
302 .iter()
303 .flat_map(|worker| worker.regions.list_regions())
304 }
305}
306
307#[cfg(any(test, feature = "test"))]
309impl WorkerGroup {
310 #[allow(clippy::too_many_arguments)]
314 pub(crate) async fn start_for_test<S: LogStore>(
315 config: Arc<MitoConfig>,
316 log_store: Arc<S>,
317 object_store_manager: ObjectStoreManagerRef,
318 write_buffer_manager: Option<WriteBufferManagerRef>,
319 listener: Option<crate::engine::listener::EventListenerRef>,
320 schema_metadata_manager: SchemaMetadataManagerRef,
321 file_ref_manager: FileReferenceManagerRef,
322 time_provider: TimeProviderRef,
323 partition_expr_fetcher: PartitionExprFetcherRef,
324 ) -> Result<WorkerGroup> {
325 let (flush_sender, flush_receiver) = watch::channel(());
326 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
327 Arc::new(
328 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
329 .with_notifier(flush_sender.clone()),
330 )
331 });
332 let index_build_job_pool =
333 Arc::new(LocalScheduler::new(config.max_background_index_builds));
334 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
335 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
336 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
337 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
338 let puffin_manager_factory = PuffinManagerFactory::new(
339 &config.index.aux_path,
340 config.index.staging_size.as_bytes(),
341 Some(config.index.write_buffer_size.as_bytes() as _),
342 config.index.staging_ttl,
343 )
344 .await?;
345 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
346 .await?
347 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
348 let write_cache = write_cache_from_config(
349 &config,
350 puffin_manager_factory.clone(),
351 intermediate_manager.clone(),
352 )
353 .await?;
354 let cache_manager = Arc::new(
355 CacheManager::builder()
356 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
357 .vector_cache_size(config.vector_cache_size.as_bytes())
358 .page_cache_size(config.page_cache_size.as_bytes())
359 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
360 .write_cache(write_cache)
361 .build(),
362 );
363 let workers = (0..config.num_workers)
364 .map(|id| {
365 WorkerStarter {
366 id: id as WorkerId,
367 config: config.clone(),
368 log_store: log_store.clone(),
369 object_store_manager: object_store_manager.clone(),
370 write_buffer_manager: write_buffer_manager.clone(),
371 index_build_job_pool: index_build_job_pool.clone(),
372 flush_job_pool: flush_job_pool.clone(),
373 compact_job_pool: compact_job_pool.clone(),
374 purge_scheduler: purge_scheduler.clone(),
375 listener: WorkerListener::new(listener.clone()),
376 cache_manager: cache_manager.clone(),
377 puffin_manager_factory: puffin_manager_factory.clone(),
378 intermediate_manager: intermediate_manager.clone(),
379 time_provider: time_provider.clone(),
380 flush_sender: flush_sender.clone(),
381 flush_receiver: flush_receiver.clone(),
382 plugins: Plugins::new(),
383 schema_metadata_manager: schema_metadata_manager.clone(),
384 file_ref_manager: file_ref_manager.clone(),
385 partition_expr_fetcher: partition_expr_fetcher.clone(),
386 flush_semaphore: flush_semaphore.clone(),
387 }
388 .start()
389 })
390 .collect::<Result<Vec<_>>>()?;
391
392 Ok(WorkerGroup {
393 workers,
394 flush_job_pool,
395 compact_job_pool,
396 index_build_job_pool,
397 purge_scheduler,
398 cache_manager,
399 file_ref_manager,
400 })
401 }
402
403 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
405 &self.purge_scheduler
406 }
407}
408
409fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
410 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
411 % num_workers
412}
413
414async fn write_cache_from_config(
415 config: &MitoConfig,
416 puffin_manager_factory: PuffinManagerFactory,
417 intermediate_manager: IntermediateManager,
418) -> Result<Option<WriteCacheRef>> {
419 if !config.enable_write_cache {
420 return Ok(None);
421 }
422
423 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
424 .await
425 .context(CreateDirSnafu {
426 dir: &config.write_cache_path,
427 })?;
428
429 let cache = WriteCache::new_fs(
430 &config.write_cache_path,
431 config.write_cache_size,
432 config.write_cache_ttl,
433 puffin_manager_factory,
434 intermediate_manager,
435 )
436 .await?;
437 Ok(Some(Arc::new(cache)))
438}
439
440pub(crate) fn worker_init_check_delay() -> Duration {
442 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
443 Duration::from_secs(init_check_delay)
444}
445
446struct WorkerStarter<S> {
448 id: WorkerId,
449 config: Arc<MitoConfig>,
450 log_store: Arc<S>,
451 object_store_manager: ObjectStoreManagerRef,
452 write_buffer_manager: WriteBufferManagerRef,
453 compact_job_pool: SchedulerRef,
454 index_build_job_pool: SchedulerRef,
455 flush_job_pool: SchedulerRef,
456 purge_scheduler: SchedulerRef,
457 listener: WorkerListener,
458 cache_manager: CacheManagerRef,
459 puffin_manager_factory: PuffinManagerFactory,
460 intermediate_manager: IntermediateManager,
461 time_provider: TimeProviderRef,
462 flush_sender: watch::Sender<()>,
464 flush_receiver: watch::Receiver<()>,
466 plugins: Plugins,
467 schema_metadata_manager: SchemaMetadataManagerRef,
468 file_ref_manager: FileReferenceManagerRef,
469 partition_expr_fetcher: PartitionExprFetcherRef,
470 flush_semaphore: Arc<Semaphore>,
471}
472
473impl<S: LogStore> WorkerStarter<S> {
474 fn start(self) -> Result<RegionWorker> {
476 let regions = Arc::new(RegionMap::default());
477 let opening_regions = Arc::new(OpeningRegions::default());
478 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
479
480 let running = Arc::new(AtomicBool::new(true));
481 let now = self.time_provider.current_time_millis();
482 let id_string = self.id.to_string();
483 let mut worker_thread = RegionWorkerLoop {
484 id: self.id,
485 config: self.config.clone(),
486 regions: regions.clone(),
487 dropping_regions: Arc::new(RegionMap::default()),
488 opening_regions: opening_regions.clone(),
489 sender: sender.clone(),
490 receiver,
491 wal: Wal::new(self.log_store),
492 object_store_manager: self.object_store_manager.clone(),
493 running: running.clone(),
494 memtable_builder_provider: MemtableBuilderProvider::new(
495 Some(self.write_buffer_manager.clone()),
496 self.config.clone(),
497 ),
498 purge_scheduler: self.purge_scheduler.clone(),
499 write_buffer_manager: self.write_buffer_manager,
500 index_build_scheduler: IndexBuildScheduler::new(self.index_build_job_pool),
501 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
502 compaction_scheduler: CompactionScheduler::new(
503 self.compact_job_pool,
504 sender.clone(),
505 self.cache_manager.clone(),
506 self.config,
507 self.listener.clone(),
508 self.plugins.clone(),
509 ),
510 stalled_requests: StalledRequests::default(),
511 listener: self.listener,
512 cache_manager: self.cache_manager,
513 puffin_manager_factory: self.puffin_manager_factory,
514 intermediate_manager: self.intermediate_manager,
515 time_provider: self.time_provider,
516 last_periodical_check_millis: now,
517 flush_sender: self.flush_sender,
518 flush_receiver: self.flush_receiver,
519 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
520 region_count: REGION_COUNT.with_label_values(&[&id_string]),
521 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
522 region_edit_queues: RegionEditQueues::default(),
523 schema_metadata_manager: self.schema_metadata_manager,
524 file_ref_manager: self.file_ref_manager.clone(),
525 partition_expr_fetcher: self.partition_expr_fetcher,
526 flush_semaphore: self.flush_semaphore,
527 };
528 let handle = common_runtime::spawn_global(async move {
529 worker_thread.run().await;
530 });
531
532 Ok(RegionWorker {
533 id: self.id,
534 regions,
535 opening_regions,
536 sender,
537 handle: Mutex::new(Some(handle)),
538 running,
539 })
540 }
541}
542
543pub(crate) struct RegionWorker {
545 id: WorkerId,
547 regions: RegionMapRef,
549 opening_regions: OpeningRegionsRef,
551 sender: Sender<WorkerRequestWithTime>,
553 handle: Mutex<Option<JoinHandle<()>>>,
555 running: Arc<AtomicBool>,
557}
558
559impl RegionWorker {
560 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
562 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
563 let request_with_time = WorkerRequestWithTime::new(request);
564 if self.sender.send(request_with_time).await.is_err() {
565 warn!(
566 "Worker {} is already exited but the running flag is still true",
567 self.id
568 );
569 self.set_running(false);
571 return WorkerStoppedSnafu { id: self.id }.fail();
572 }
573
574 Ok(())
575 }
576
577 async fn stop(&self) -> Result<()> {
581 let handle = self.handle.lock().await.take();
582 if let Some(handle) = handle {
583 info!("Stop region worker {}", self.id);
584
585 self.set_running(false);
586 if self
587 .sender
588 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
589 .await
590 .is_err()
591 {
592 warn!("Worker {} is already exited before stop", self.id);
593 }
594
595 handle.await.context(JoinSnafu)?;
596 }
597
598 Ok(())
599 }
600
601 fn is_running(&self) -> bool {
603 self.running.load(Ordering::Relaxed)
604 }
605
606 fn set_running(&self, value: bool) {
608 self.running.store(value, Ordering::Relaxed)
609 }
610
611 fn is_region_exists(&self, region_id: RegionId) -> bool {
613 self.regions.is_region_exists(region_id)
614 }
615
616 fn is_region_opening(&self, region_id: RegionId) -> bool {
618 self.opening_regions.is_region_exists(region_id)
619 }
620
621 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
623 self.regions.get_region(region_id)
624 }
625
626 #[cfg(test)]
627 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
629 &self.opening_regions
630 }
631}
632
633impl Drop for RegionWorker {
634 fn drop(&mut self) {
635 if self.is_running() {
636 self.set_running(false);
637 }
639 }
640}
641
642type RequestBuffer = Vec<WorkerRequest>;
643
644#[derive(Default)]
648pub(crate) struct StalledRequests {
649 pub(crate) requests:
656 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
657 pub(crate) estimated_size: usize,
659}
660
661impl StalledRequests {
662 pub(crate) fn append(
664 &mut self,
665 requests: &mut Vec<SenderWriteRequest>,
666 bulk_requests: &mut Vec<SenderBulkRequest>,
667 ) {
668 for req in requests.drain(..) {
669 self.push(req);
670 }
671 for req in bulk_requests.drain(..) {
672 self.push_bulk(req);
673 }
674 }
675
676 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
678 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
679 let req_size = req.request.estimated_size();
680 *size += req_size;
681 self.estimated_size += req_size;
682 requests.push(req);
683 }
684
685 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
686 let region_id = req.region_id;
687 let (size, _, requests) = self.requests.entry(region_id).or_default();
688 let req_size = req.request.estimated_size();
689 *size += req_size;
690 self.estimated_size += req_size;
691 requests.push(req);
692 }
693
694 pub(crate) fn remove(
696 &mut self,
697 region_id: &RegionId,
698 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
699 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
700 self.estimated_size -= size;
701 (write_reqs, bulk_reqs)
702 } else {
703 (vec![], vec![])
704 }
705 }
706
707 pub(crate) fn stalled_count(&self) -> usize {
709 self.requests
710 .values()
711 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
712 .sum()
713 }
714}
715
716struct RegionWorkerLoop<S> {
718 id: WorkerId,
720 config: Arc<MitoConfig>,
722 regions: RegionMapRef,
724 dropping_regions: RegionMapRef,
726 opening_regions: OpeningRegionsRef,
728 sender: Sender<WorkerRequestWithTime>,
730 receiver: Receiver<WorkerRequestWithTime>,
732 wal: Wal<S>,
734 object_store_manager: ObjectStoreManagerRef,
736 running: Arc<AtomicBool>,
738 memtable_builder_provider: MemtableBuilderProvider,
740 purge_scheduler: SchedulerRef,
742 write_buffer_manager: WriteBufferManagerRef,
744 index_build_scheduler: IndexBuildScheduler,
746 flush_scheduler: FlushScheduler,
748 compaction_scheduler: CompactionScheduler,
750 stalled_requests: StalledRequests,
752 listener: WorkerListener,
754 cache_manager: CacheManagerRef,
756 puffin_manager_factory: PuffinManagerFactory,
758 intermediate_manager: IntermediateManager,
760 time_provider: TimeProviderRef,
762 last_periodical_check_millis: i64,
764 flush_sender: watch::Sender<()>,
766 flush_receiver: watch::Receiver<()>,
768 stalling_count: IntGauge,
770 region_count: IntGauge,
772 request_wait_time: Histogram,
774 region_edit_queues: RegionEditQueues,
776 schema_metadata_manager: SchemaMetadataManagerRef,
778 file_ref_manager: FileReferenceManagerRef,
780 partition_expr_fetcher: PartitionExprFetcherRef,
782 flush_semaphore: Arc<Semaphore>,
784}
785
786impl<S: LogStore> RegionWorkerLoop<S> {
787 async fn run(&mut self) {
789 let init_check_delay = worker_init_check_delay();
790 info!(
791 "Start region worker thread {}, init_check_delay: {:?}",
792 self.id, init_check_delay
793 );
794 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
795
796 let mut write_req_buffer: Vec<SenderWriteRequest> =
798 Vec::with_capacity(self.config.worker_request_batch_size);
799 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
800 Vec::with_capacity(self.config.worker_request_batch_size);
801 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
802 Vec::with_capacity(self.config.worker_request_batch_size);
803 let mut general_req_buffer: Vec<WorkerRequest> =
804 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
805
806 while self.running.load(Ordering::Relaxed) {
807 write_req_buffer.clear();
809 ddl_req_buffer.clear();
810 general_req_buffer.clear();
811
812 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
813 let sleep = tokio::time::sleep(max_wait_time);
814 tokio::pin!(sleep);
815
816 tokio::select! {
817 request_opt = self.receiver.recv() => {
818 match request_opt {
819 Some(request_with_time) => {
820 let wait_time = request_with_time.created_at.elapsed();
822 self.request_wait_time.observe(wait_time.as_secs_f64());
823
824 match request_with_time.request {
825 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
826 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
827 req => general_req_buffer.push(req),
828 }
829 },
830 None => break,
832 }
833 }
834 recv_res = self.flush_receiver.changed() => {
835 if recv_res.is_err() {
836 break;
838 } else {
839 self.maybe_flush_worker();
844 self.handle_stalled_requests().await;
846 continue;
847 }
848 }
849 _ = &mut sleep => {
850 self.handle_periodical_tasks();
852 continue;
853 }
854 }
855
856 if self.flush_receiver.has_changed().unwrap_or(false) {
857 self.handle_stalled_requests().await;
861 }
862
863 for _ in 1..self.config.worker_request_batch_size {
865 match self.receiver.try_recv() {
867 Ok(request_with_time) => {
868 let wait_time = request_with_time.created_at.elapsed();
870 self.request_wait_time.observe(wait_time.as_secs_f64());
871
872 match request_with_time.request {
873 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
874 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
875 req => general_req_buffer.push(req),
876 }
877 }
878 Err(_) => break,
880 }
881 }
882
883 self.listener.on_recv_requests(
884 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
885 );
886
887 self.handle_requests(
888 &mut write_req_buffer,
889 &mut ddl_req_buffer,
890 &mut general_req_buffer,
891 &mut bulk_req_buffer,
892 )
893 .await;
894
895 self.handle_periodical_tasks();
896 }
897
898 self.clean().await;
899
900 info!("Exit region worker thread {}", self.id);
901 }
902
903 async fn handle_requests(
907 &mut self,
908 write_requests: &mut Vec<SenderWriteRequest>,
909 ddl_requests: &mut Vec<SenderDdlRequest>,
910 general_requests: &mut Vec<WorkerRequest>,
911 bulk_requests: &mut Vec<SenderBulkRequest>,
912 ) {
913 for worker_req in general_requests.drain(..) {
914 match worker_req {
915 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
916 continue;
918 }
919 WorkerRequest::Background { region_id, notify } => {
920 self.handle_background_notify(region_id, notify).await;
922 }
923 WorkerRequest::SetRegionRoleStateGracefully {
924 region_id,
925 region_role_state,
926 sender,
927 } => {
928 self.set_role_state_gracefully(region_id, region_role_state, sender)
929 .await;
930 }
931 WorkerRequest::EditRegion(request) => {
932 self.handle_region_edit(request).await;
933 }
934 WorkerRequest::BuildIndexRegion(request) => {
935 self.handle_rebuild_index(request).await;
936 }
937 WorkerRequest::Stop => {
938 debug_assert!(!self.running.load(Ordering::Relaxed));
939 }
940 WorkerRequest::SyncRegion(req) => {
941 self.handle_region_sync(req).await;
942 }
943 WorkerRequest::BulkInserts {
944 metadata,
945 request,
946 sender,
947 } => {
948 if let Some(region_metadata) = metadata {
949 self.handle_bulk_insert_batch(
950 region_metadata,
951 request,
952 bulk_requests,
953 sender,
954 )
955 .await;
956 } else {
957 error!("Cannot find region metadata for {}", request.region_id);
958 sender.send(
959 error::RegionNotFoundSnafu {
960 region_id: request.region_id,
961 }
962 .fail(),
963 );
964 }
965 }
966 }
967 }
968
969 self.handle_write_requests(write_requests, bulk_requests, true)
972 .await;
973
974 self.handle_ddl_requests(ddl_requests).await;
975 }
976
977 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
979 if ddl_requests.is_empty() {
980 return;
981 }
982
983 for ddl in ddl_requests.drain(..) {
984 let res = match ddl.request {
985 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
986 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
987 DdlRequest::Open((req, wal_entry_receiver)) => {
988 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
989 .await;
990 continue;
991 }
992 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
993 DdlRequest::Alter(req) => {
994 self.handle_alter_request(ddl.region_id, req, ddl.sender)
995 .await;
996 continue;
997 }
998 DdlRequest::Flush(req) => {
999 self.handle_flush_request(ddl.region_id, req, ddl.sender)
1000 .await;
1001 continue;
1002 }
1003 DdlRequest::Compact(req) => {
1004 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1005 .await;
1006 continue;
1007 }
1008 DdlRequest::Truncate(req) => {
1009 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1010 .await;
1011 continue;
1012 }
1013 DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
1014 };
1015
1016 ddl.sender.send(res);
1017 }
1018 }
1019
1020 fn handle_periodical_tasks(&mut self) {
1022 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1023 if self
1024 .time_provider
1025 .elapsed_since(self.last_periodical_check_millis)
1026 < interval
1027 {
1028 return;
1029 }
1030
1031 self.last_periodical_check_millis = self.time_provider.current_time_millis();
1032
1033 if let Err(e) = self.flush_periodically() {
1034 error!(e; "Failed to flush regions periodically");
1035 }
1036 }
1037
1038 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1040 match notify {
1041 BackgroundNotify::FlushFinished(req) => {
1042 self.handle_flush_finished(region_id, req).await
1043 }
1044 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1045 BackgroundNotify::IndexBuildFinished(req) => {
1046 self.handle_index_build_finished(region_id, req).await
1047 }
1048 BackgroundNotify::IndexBuildFailed(req) => {
1049 self.handle_index_build_failed(region_id, req).await
1050 }
1051 BackgroundNotify::CompactionFinished(req) => {
1052 self.handle_compaction_finished(region_id, req).await
1053 }
1054 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1055 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1056 BackgroundNotify::RegionChange(req) => {
1057 self.handle_manifest_region_change_result(req).await
1058 }
1059 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1060 }
1061 }
1062
1063 async fn set_role_state_gracefully(
1065 &mut self,
1066 region_id: RegionId,
1067 region_role_state: SettableRegionRoleState,
1068 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1069 ) {
1070 if let Some(region) = self.regions.get_region(region_id) {
1071 common_runtime::spawn_global(async move {
1073 match region.set_role_state_gracefully(region_role_state).await {
1074 Ok(()) => {
1075 let last_entry_id = region.version_control.current().last_entry_id;
1076 let _ = sender.send(SetRegionRoleStateResponse::success(
1077 SetRegionRoleStateSuccess::mito(last_entry_id),
1078 ));
1079 }
1080 Err(e) => {
1081 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1082 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1083 BoxedError::new(e),
1084 ));
1085 }
1086 }
1087 });
1088 } else {
1089 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1090 }
1091 }
1092}
1093
1094impl<S> RegionWorkerLoop<S> {
1095 async fn clean(&self) {
1097 let regions = self.regions.list_regions();
1099 for region in regions {
1100 region.stop().await;
1101 }
1102
1103 self.regions.clear();
1104 }
1105
1106 fn notify_group(&mut self) {
1109 let _ = self.flush_sender.send(());
1111 self.flush_receiver.borrow_and_update();
1113 }
1114}
1115
1116#[derive(Default, Clone)]
1118pub(crate) struct WorkerListener {
1119 #[cfg(any(test, feature = "test"))]
1120 listener: Option<crate::engine::listener::EventListenerRef>,
1121}
1122
1123impl WorkerListener {
1124 #[cfg(any(test, feature = "test"))]
1125 pub(crate) fn new(
1126 listener: Option<crate::engine::listener::EventListenerRef>,
1127 ) -> WorkerListener {
1128 WorkerListener { listener }
1129 }
1130
1131 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1133 #[cfg(any(test, feature = "test"))]
1134 if let Some(listener) = &self.listener {
1135 listener.on_flush_success(region_id);
1136 }
1137 let _ = region_id;
1139 }
1140
1141 pub(crate) fn on_write_stall(&self) {
1143 #[cfg(any(test, feature = "test"))]
1144 if let Some(listener) = &self.listener {
1145 listener.on_write_stall();
1146 }
1147 }
1148
1149 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1150 #[cfg(any(test, feature = "test"))]
1151 if let Some(listener) = &self.listener {
1152 listener.on_flush_begin(region_id).await;
1153 }
1154 let _ = region_id;
1156 }
1157
1158 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1159 #[cfg(any(test, feature = "test"))]
1160 if let Some(listener) = &self.listener {
1161 return listener.on_later_drop_begin(region_id);
1162 }
1163 let _ = region_id;
1165 None
1166 }
1167
1168 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1170 #[cfg(any(test, feature = "test"))]
1171 if let Some(listener) = &self.listener {
1172 listener.on_later_drop_end(region_id, removed);
1173 }
1174 let _ = region_id;
1176 let _ = removed;
1177 }
1178
1179 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1180 #[cfg(any(test, feature = "test"))]
1181 if let Some(listener) = &self.listener {
1182 listener.on_merge_ssts_finished(region_id).await;
1183 }
1184 let _ = region_id;
1186 }
1187
1188 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1189 #[cfg(any(test, feature = "test"))]
1190 if let Some(listener) = &self.listener {
1191 listener.on_recv_requests(request_num);
1192 }
1193 let _ = request_num;
1195 }
1196
1197 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1198 #[cfg(any(test, feature = "test"))]
1199 if let Some(listener) = &self.listener {
1200 listener.on_file_cache_filled(_file_id);
1201 }
1202 }
1203
1204 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1205 #[cfg(any(test, feature = "test"))]
1206 if let Some(listener) = &self.listener {
1207 listener.on_compaction_scheduled(_region_id);
1208 }
1209 }
1210
1211 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1212 #[cfg(any(test, feature = "test"))]
1213 if let Some(listener) = &self.listener {
1214 listener
1215 .on_notify_region_change_result_begin(_region_id)
1216 .await;
1217 }
1218 }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use super::*;
1224 use crate::test_util::TestEnv;
1225
1226 #[test]
1227 fn test_region_id_to_index() {
1228 let num_workers = 4;
1229
1230 let region_id = RegionId::new(1, 2);
1231 let index = region_id_to_index(region_id, num_workers);
1232 assert_eq!(index, 3);
1233
1234 let region_id = RegionId::new(2, 3);
1235 let index = region_id_to_index(region_id, num_workers);
1236 assert_eq!(index, 1);
1237 }
1238
1239 #[tokio::test]
1240 async fn test_worker_group_start_stop() {
1241 let env = TestEnv::with_prefix("group-stop").await;
1242 let group = env
1243 .create_worker_group(MitoConfig {
1244 num_workers: 4,
1245 ..Default::default()
1246 })
1247 .await;
1248
1249 group.stop().await.unwrap();
1250 }
1251}