1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_rebuild_index;
28mod handle_truncate;
29mod handle_write;
30
31use std::collections::HashMap;
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36
37use common_base::Plugins;
38use common_error::ext::BoxedError;
39use common_meta::key::SchemaMetadataManagerRef;
40use common_runtime::JoinHandle;
41use common_telemetry::{error, info, warn};
42use futures::future::try_join_all;
43use object_store::manager::ObjectStoreManagerRef;
44use prometheus::{Histogram, IntGauge};
45use rand::{Rng, rng};
46use snafu::{ResultExt, ensure};
47use store_api::logstore::LogStore;
48use store_api::region_engine::{
49 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
50};
51use store_api::storage::{FileId, RegionId};
52use tokio::sync::mpsc::{Receiver, Sender};
53use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
54
55use crate::cache::write_cache::{WriteCache, WriteCacheRef};
56use crate::cache::{CacheManager, CacheManagerRef};
57use crate::compaction::CompactionScheduler;
58use crate::config::MitoConfig;
59use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
60use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
61use crate::gc::{GcLimiter, GcLimiterRef};
62use crate::memtable::MemtableBuilderProvider;
63use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
64use crate::region::opener::PartitionExprFetcherRef;
65use crate::region::{
66 CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
67 RegionMapRef,
68};
69use crate::request::{
70 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
71 WorkerRequest, WorkerRequestWithTime,
72};
73use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
74use crate::sst::file::RegionFileId;
75use crate::sst::file_ref::FileReferenceManagerRef;
76use crate::sst::index::IndexBuildScheduler;
77use crate::sst::index::intermediate::IntermediateManager;
78use crate::sst::index::puffin_manager::PuffinManagerFactory;
79use crate::time_provider::{StdTimeProvider, TimeProviderRef};
80use crate::wal::Wal;
81use crate::worker::handle_manifest::RegionEditQueues;
82
83pub(crate) type WorkerId = u32;
85
86pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
87
88pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
90pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
92
93#[cfg_attr(doc, aquamarine::aquamarine)]
94pub(crate) struct WorkerGroup {
131 workers: Vec<RegionWorker>,
133 flush_job_pool: SchedulerRef,
135 compact_job_pool: SchedulerRef,
137 index_build_job_pool: SchedulerRef,
139 purge_scheduler: SchedulerRef,
141 cache_manager: CacheManagerRef,
143 file_ref_manager: FileReferenceManagerRef,
145 gc_limiter: GcLimiterRef,
147}
148
149impl WorkerGroup {
150 pub(crate) async fn start<S: LogStore>(
154 config: Arc<MitoConfig>,
155 log_store: Arc<S>,
156 object_store_manager: ObjectStoreManagerRef,
157 schema_metadata_manager: SchemaMetadataManagerRef,
158 file_ref_manager: FileReferenceManagerRef,
159 partition_expr_fetcher: PartitionExprFetcherRef,
160 plugins: Plugins,
161 ) -> Result<WorkerGroup> {
162 let (flush_sender, flush_receiver) = watch::channel(());
163 let write_buffer_manager = Arc::new(
164 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
165 .with_notifier(flush_sender.clone()),
166 );
167 let puffin_manager_factory = PuffinManagerFactory::new(
168 &config.index.aux_path,
169 config.index.staging_size.as_bytes(),
170 Some(config.index.write_buffer_size.as_bytes() as _),
171 config.index.staging_ttl,
172 )
173 .await?;
174 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
175 .await?
176 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
177 let index_build_job_pool =
178 Arc::new(LocalScheduler::new(config.max_background_index_builds));
179 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
180 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
181 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
182 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
184 let write_cache = write_cache_from_config(
185 &config,
186 puffin_manager_factory.clone(),
187 intermediate_manager.clone(),
188 )
189 .await?;
190 let cache_manager = Arc::new(
191 CacheManager::builder()
192 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
193 .vector_cache_size(config.vector_cache_size.as_bytes())
194 .page_cache_size(config.page_cache_size.as_bytes())
195 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
196 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
197 .index_content_size(config.index.content_cache_size.as_bytes())
198 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
199 .index_result_cache_size(config.index.result_cache_size.as_bytes())
200 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
201 .write_cache(write_cache)
202 .build(),
203 );
204 let time_provider = Arc::new(StdTimeProvider);
205 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
206
207 let workers = (0..config.num_workers)
208 .map(|id| {
209 WorkerStarter {
210 id: id as WorkerId,
211 config: config.clone(),
212 log_store: log_store.clone(),
213 object_store_manager: object_store_manager.clone(),
214 write_buffer_manager: write_buffer_manager.clone(),
215 index_build_job_pool: index_build_job_pool.clone(),
216 flush_job_pool: flush_job_pool.clone(),
217 compact_job_pool: compact_job_pool.clone(),
218 purge_scheduler: purge_scheduler.clone(),
219 listener: WorkerListener::default(),
220 cache_manager: cache_manager.clone(),
221 puffin_manager_factory: puffin_manager_factory.clone(),
222 intermediate_manager: intermediate_manager.clone(),
223 time_provider: time_provider.clone(),
224 flush_sender: flush_sender.clone(),
225 flush_receiver: flush_receiver.clone(),
226 plugins: plugins.clone(),
227 schema_metadata_manager: schema_metadata_manager.clone(),
228 file_ref_manager: file_ref_manager.clone(),
229 partition_expr_fetcher: partition_expr_fetcher.clone(),
230 flush_semaphore: flush_semaphore.clone(),
231 }
232 .start()
233 })
234 .collect::<Result<Vec<_>>>()?;
235
236 Ok(WorkerGroup {
237 workers,
238 flush_job_pool,
239 compact_job_pool,
240 index_build_job_pool,
241 purge_scheduler,
242 cache_manager,
243 file_ref_manager,
244 gc_limiter,
245 })
246 }
247
248 pub(crate) async fn stop(&self) -> Result<()> {
250 info!("Stop region worker group");
251
252 self.compact_job_pool.stop(true).await?;
255 self.flush_job_pool.stop(true).await?;
257 self.purge_scheduler.stop(true).await?;
259 self.index_build_job_pool.stop(true).await?;
261
262 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
263
264 Ok(())
265 }
266
267 pub(crate) async fn submit_to_worker(
269 &self,
270 region_id: RegionId,
271 request: WorkerRequest,
272 ) -> Result<()> {
273 self.worker(region_id).submit_request(request).await
274 }
275
276 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
278 self.worker(region_id).is_region_exists(region_id)
279 }
280
281 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
283 self.worker(region_id).is_region_opening(region_id)
284 }
285
286 pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
288 self.worker(region_id).is_region_catching_up(region_id)
289 }
290
291 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
295 self.worker(region_id).get_region(region_id)
296 }
297
298 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
300 self.cache_manager.clone()
301 }
302
303 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
304 self.file_ref_manager.clone()
305 }
306
307 pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
308 self.gc_limiter.clone()
309 }
310
311 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
313 let index = region_id_to_index(region_id, self.workers.len());
314
315 &self.workers[index]
316 }
317
318 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
319 self.workers
320 .iter()
321 .flat_map(|worker| worker.regions.list_regions())
322 }
323}
324
325#[cfg(any(test, feature = "test"))]
327impl WorkerGroup {
328 #[allow(clippy::too_many_arguments)]
332 pub(crate) async fn start_for_test<S: LogStore>(
333 config: Arc<MitoConfig>,
334 log_store: Arc<S>,
335 object_store_manager: ObjectStoreManagerRef,
336 write_buffer_manager: Option<WriteBufferManagerRef>,
337 listener: Option<crate::engine::listener::EventListenerRef>,
338 schema_metadata_manager: SchemaMetadataManagerRef,
339 file_ref_manager: FileReferenceManagerRef,
340 time_provider: TimeProviderRef,
341 partition_expr_fetcher: PartitionExprFetcherRef,
342 ) -> Result<WorkerGroup> {
343 let (flush_sender, flush_receiver) = watch::channel(());
344 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
345 Arc::new(
346 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
347 .with_notifier(flush_sender.clone()),
348 )
349 });
350 let index_build_job_pool =
351 Arc::new(LocalScheduler::new(config.max_background_index_builds));
352 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
353 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
354 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
355 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
356 let puffin_manager_factory = PuffinManagerFactory::new(
357 &config.index.aux_path,
358 config.index.staging_size.as_bytes(),
359 Some(config.index.write_buffer_size.as_bytes() as _),
360 config.index.staging_ttl,
361 )
362 .await?;
363 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
364 .await?
365 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
366 let write_cache = write_cache_from_config(
367 &config,
368 puffin_manager_factory.clone(),
369 intermediate_manager.clone(),
370 )
371 .await?;
372 let cache_manager = Arc::new(
373 CacheManager::builder()
374 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
375 .vector_cache_size(config.vector_cache_size.as_bytes())
376 .page_cache_size(config.page_cache_size.as_bytes())
377 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
378 .write_cache(write_cache)
379 .build(),
380 );
381 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
382 let workers = (0..config.num_workers)
383 .map(|id| {
384 WorkerStarter {
385 id: id as WorkerId,
386 config: config.clone(),
387 log_store: log_store.clone(),
388 object_store_manager: object_store_manager.clone(),
389 write_buffer_manager: write_buffer_manager.clone(),
390 index_build_job_pool: index_build_job_pool.clone(),
391 flush_job_pool: flush_job_pool.clone(),
392 compact_job_pool: compact_job_pool.clone(),
393 purge_scheduler: purge_scheduler.clone(),
394 listener: WorkerListener::new(listener.clone()),
395 cache_manager: cache_manager.clone(),
396 puffin_manager_factory: puffin_manager_factory.clone(),
397 intermediate_manager: intermediate_manager.clone(),
398 time_provider: time_provider.clone(),
399 flush_sender: flush_sender.clone(),
400 flush_receiver: flush_receiver.clone(),
401 plugins: Plugins::new(),
402 schema_metadata_manager: schema_metadata_manager.clone(),
403 file_ref_manager: file_ref_manager.clone(),
404 partition_expr_fetcher: partition_expr_fetcher.clone(),
405 flush_semaphore: flush_semaphore.clone(),
406 }
407 .start()
408 })
409 .collect::<Result<Vec<_>>>()?;
410
411 Ok(WorkerGroup {
412 workers,
413 flush_job_pool,
414 compact_job_pool,
415 index_build_job_pool,
416 purge_scheduler,
417 cache_manager,
418 file_ref_manager,
419 gc_limiter,
420 })
421 }
422
423 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
425 &self.purge_scheduler
426 }
427}
428
429fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
430 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
431 % num_workers
432}
433
434pub async fn write_cache_from_config(
435 config: &MitoConfig,
436 puffin_manager_factory: PuffinManagerFactory,
437 intermediate_manager: IntermediateManager,
438) -> Result<Option<WriteCacheRef>> {
439 if !config.enable_write_cache {
440 return Ok(None);
441 }
442
443 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
444 .await
445 .context(CreateDirSnafu {
446 dir: &config.write_cache_path,
447 })?;
448
449 let cache = WriteCache::new_fs(
450 &config.write_cache_path,
451 config.write_cache_size,
452 config.write_cache_ttl,
453 Some(config.index_cache_percent),
454 puffin_manager_factory,
455 intermediate_manager,
456 )
457 .await?;
458 Ok(Some(Arc::new(cache)))
459}
460
461pub(crate) fn worker_init_check_delay() -> Duration {
463 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
464 Duration::from_secs(init_check_delay)
465}
466
467struct WorkerStarter<S> {
469 id: WorkerId,
470 config: Arc<MitoConfig>,
471 log_store: Arc<S>,
472 object_store_manager: ObjectStoreManagerRef,
473 write_buffer_manager: WriteBufferManagerRef,
474 compact_job_pool: SchedulerRef,
475 index_build_job_pool: SchedulerRef,
476 flush_job_pool: SchedulerRef,
477 purge_scheduler: SchedulerRef,
478 listener: WorkerListener,
479 cache_manager: CacheManagerRef,
480 puffin_manager_factory: PuffinManagerFactory,
481 intermediate_manager: IntermediateManager,
482 time_provider: TimeProviderRef,
483 flush_sender: watch::Sender<()>,
485 flush_receiver: watch::Receiver<()>,
487 plugins: Plugins,
488 schema_metadata_manager: SchemaMetadataManagerRef,
489 file_ref_manager: FileReferenceManagerRef,
490 partition_expr_fetcher: PartitionExprFetcherRef,
491 flush_semaphore: Arc<Semaphore>,
492}
493
494impl<S: LogStore> WorkerStarter<S> {
495 fn start(self) -> Result<RegionWorker> {
497 let regions = Arc::new(RegionMap::default());
498 let opening_regions = Arc::new(OpeningRegions::default());
499 let catchup_regions = Arc::new(CatchupRegions::default());
500 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
501
502 let running = Arc::new(AtomicBool::new(true));
503 let now = self.time_provider.current_time_millis();
504 let id_string = self.id.to_string();
505 let mut worker_thread = RegionWorkerLoop {
506 id: self.id,
507 config: self.config.clone(),
508 regions: regions.clone(),
509 catchup_regions: catchup_regions.clone(),
510 dropping_regions: Arc::new(RegionMap::default()),
511 opening_regions: opening_regions.clone(),
512 sender: sender.clone(),
513 receiver,
514 wal: Wal::new(self.log_store),
515 object_store_manager: self.object_store_manager.clone(),
516 running: running.clone(),
517 memtable_builder_provider: MemtableBuilderProvider::new(
518 Some(self.write_buffer_manager.clone()),
519 self.config.clone(),
520 ),
521 purge_scheduler: self.purge_scheduler.clone(),
522 write_buffer_manager: self.write_buffer_manager,
523 index_build_scheduler: IndexBuildScheduler::new(
524 self.index_build_job_pool,
525 self.config.max_background_index_builds,
526 ),
527 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
528 compaction_scheduler: CompactionScheduler::new(
529 self.compact_job_pool,
530 sender.clone(),
531 self.cache_manager.clone(),
532 self.config,
533 self.listener.clone(),
534 self.plugins.clone(),
535 ),
536 stalled_requests: StalledRequests::default(),
537 listener: self.listener,
538 cache_manager: self.cache_manager,
539 puffin_manager_factory: self.puffin_manager_factory,
540 intermediate_manager: self.intermediate_manager,
541 time_provider: self.time_provider,
542 last_periodical_check_millis: now,
543 flush_sender: self.flush_sender,
544 flush_receiver: self.flush_receiver,
545 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
546 region_count: REGION_COUNT.with_label_values(&[&id_string]),
547 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
548 region_edit_queues: RegionEditQueues::default(),
549 schema_metadata_manager: self.schema_metadata_manager,
550 file_ref_manager: self.file_ref_manager.clone(),
551 partition_expr_fetcher: self.partition_expr_fetcher,
552 flush_semaphore: self.flush_semaphore,
553 };
554 let handle = common_runtime::spawn_global(async move {
555 worker_thread.run().await;
556 });
557
558 Ok(RegionWorker {
559 id: self.id,
560 regions,
561 opening_regions,
562 catchup_regions,
563 sender,
564 handle: Mutex::new(Some(handle)),
565 running,
566 })
567 }
568}
569
570pub(crate) struct RegionWorker {
572 id: WorkerId,
574 regions: RegionMapRef,
576 opening_regions: OpeningRegionsRef,
578 catchup_regions: CatchupRegionsRef,
580 sender: Sender<WorkerRequestWithTime>,
582 handle: Mutex<Option<JoinHandle<()>>>,
584 running: Arc<AtomicBool>,
586}
587
588impl RegionWorker {
589 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
591 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
592 let request_with_time = WorkerRequestWithTime::new(request);
593 if self.sender.send(request_with_time).await.is_err() {
594 warn!(
595 "Worker {} is already exited but the running flag is still true",
596 self.id
597 );
598 self.set_running(false);
600 return WorkerStoppedSnafu { id: self.id }.fail();
601 }
602
603 Ok(())
604 }
605
606 async fn stop(&self) -> Result<()> {
610 let handle = self.handle.lock().await.take();
611 if let Some(handle) = handle {
612 info!("Stop region worker {}", self.id);
613
614 self.set_running(false);
615 if self
616 .sender
617 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
618 .await
619 .is_err()
620 {
621 warn!("Worker {} is already exited before stop", self.id);
622 }
623
624 handle.await.context(JoinSnafu)?;
625 }
626
627 Ok(())
628 }
629
630 fn is_running(&self) -> bool {
632 self.running.load(Ordering::Relaxed)
633 }
634
635 fn set_running(&self, value: bool) {
637 self.running.store(value, Ordering::Relaxed)
638 }
639
640 fn is_region_exists(&self, region_id: RegionId) -> bool {
642 self.regions.is_region_exists(region_id)
643 }
644
645 fn is_region_opening(&self, region_id: RegionId) -> bool {
647 self.opening_regions.is_region_exists(region_id)
648 }
649
650 fn is_region_catching_up(&self, region_id: RegionId) -> bool {
652 self.catchup_regions.is_region_exists(region_id)
653 }
654
655 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
657 self.regions.get_region(region_id)
658 }
659
660 #[cfg(test)]
661 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
663 &self.opening_regions
664 }
665
666 #[cfg(test)]
667 pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
669 &self.catchup_regions
670 }
671}
672
673impl Drop for RegionWorker {
674 fn drop(&mut self) {
675 if self.is_running() {
676 self.set_running(false);
677 }
679 }
680}
681
682type RequestBuffer = Vec<WorkerRequest>;
683
684#[derive(Default)]
688pub(crate) struct StalledRequests {
689 pub(crate) requests:
696 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
697 pub(crate) estimated_size: usize,
699}
700
701impl StalledRequests {
702 pub(crate) fn append(
704 &mut self,
705 requests: &mut Vec<SenderWriteRequest>,
706 bulk_requests: &mut Vec<SenderBulkRequest>,
707 ) {
708 for req in requests.drain(..) {
709 self.push(req);
710 }
711 for req in bulk_requests.drain(..) {
712 self.push_bulk(req);
713 }
714 }
715
716 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
718 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
719 let req_size = req.request.estimated_size();
720 *size += req_size;
721 self.estimated_size += req_size;
722 requests.push(req);
723 }
724
725 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
726 let region_id = req.region_id;
727 let (size, _, requests) = self.requests.entry(region_id).or_default();
728 let req_size = req.request.estimated_size();
729 *size += req_size;
730 self.estimated_size += req_size;
731 requests.push(req);
732 }
733
734 pub(crate) fn remove(
736 &mut self,
737 region_id: &RegionId,
738 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
739 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
740 self.estimated_size -= size;
741 (write_reqs, bulk_reqs)
742 } else {
743 (vec![], vec![])
744 }
745 }
746
747 pub(crate) fn stalled_count(&self) -> usize {
749 self.requests
750 .values()
751 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
752 .sum()
753 }
754}
755
756struct RegionWorkerLoop<S> {
758 id: WorkerId,
760 config: Arc<MitoConfig>,
762 regions: RegionMapRef,
764 dropping_regions: RegionMapRef,
766 opening_regions: OpeningRegionsRef,
768 catchup_regions: CatchupRegionsRef,
770 sender: Sender<WorkerRequestWithTime>,
772 receiver: Receiver<WorkerRequestWithTime>,
774 wal: Wal<S>,
776 object_store_manager: ObjectStoreManagerRef,
778 running: Arc<AtomicBool>,
780 memtable_builder_provider: MemtableBuilderProvider,
782 purge_scheduler: SchedulerRef,
784 write_buffer_manager: WriteBufferManagerRef,
786 index_build_scheduler: IndexBuildScheduler,
788 flush_scheduler: FlushScheduler,
790 compaction_scheduler: CompactionScheduler,
792 stalled_requests: StalledRequests,
794 listener: WorkerListener,
796 cache_manager: CacheManagerRef,
798 puffin_manager_factory: PuffinManagerFactory,
800 intermediate_manager: IntermediateManager,
802 time_provider: TimeProviderRef,
804 last_periodical_check_millis: i64,
806 flush_sender: watch::Sender<()>,
808 flush_receiver: watch::Receiver<()>,
810 stalling_count: IntGauge,
812 region_count: IntGauge,
814 request_wait_time: Histogram,
816 region_edit_queues: RegionEditQueues,
818 schema_metadata_manager: SchemaMetadataManagerRef,
820 file_ref_manager: FileReferenceManagerRef,
822 partition_expr_fetcher: PartitionExprFetcherRef,
824 flush_semaphore: Arc<Semaphore>,
826}
827
828impl<S: LogStore> RegionWorkerLoop<S> {
829 async fn run(&mut self) {
831 let init_check_delay = worker_init_check_delay();
832 info!(
833 "Start region worker thread {}, init_check_delay: {:?}",
834 self.id, init_check_delay
835 );
836 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
837
838 let mut write_req_buffer: Vec<SenderWriteRequest> =
840 Vec::with_capacity(self.config.worker_request_batch_size);
841 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
842 Vec::with_capacity(self.config.worker_request_batch_size);
843 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
844 Vec::with_capacity(self.config.worker_request_batch_size);
845 let mut general_req_buffer: Vec<WorkerRequest> =
846 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
847
848 while self.running.load(Ordering::Relaxed) {
849 write_req_buffer.clear();
851 ddl_req_buffer.clear();
852 general_req_buffer.clear();
853
854 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
855 let sleep = tokio::time::sleep(max_wait_time);
856 tokio::pin!(sleep);
857
858 tokio::select! {
859 request_opt = self.receiver.recv() => {
860 match request_opt {
861 Some(request_with_time) => {
862 let wait_time = request_with_time.created_at.elapsed();
864 self.request_wait_time.observe(wait_time.as_secs_f64());
865
866 match request_with_time.request {
867 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
868 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
869 req => general_req_buffer.push(req),
870 }
871 },
872 None => break,
874 }
875 }
876 recv_res = self.flush_receiver.changed() => {
877 if recv_res.is_err() {
878 break;
880 } else {
881 self.maybe_flush_worker();
886 self.handle_stalled_requests().await;
888 continue;
889 }
890 }
891 _ = &mut sleep => {
892 self.handle_periodical_tasks();
894 continue;
895 }
896 }
897
898 if self.flush_receiver.has_changed().unwrap_or(false) {
899 self.handle_stalled_requests().await;
903 }
904
905 for _ in 1..self.config.worker_request_batch_size {
907 match self.receiver.try_recv() {
909 Ok(request_with_time) => {
910 let wait_time = request_with_time.created_at.elapsed();
912 self.request_wait_time.observe(wait_time.as_secs_f64());
913
914 match request_with_time.request {
915 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
916 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
917 req => general_req_buffer.push(req),
918 }
919 }
920 Err(_) => break,
922 }
923 }
924
925 self.listener.on_recv_requests(
926 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
927 );
928
929 self.handle_requests(
930 &mut write_req_buffer,
931 &mut ddl_req_buffer,
932 &mut general_req_buffer,
933 &mut bulk_req_buffer,
934 )
935 .await;
936
937 self.handle_periodical_tasks();
938 }
939
940 self.clean().await;
941
942 info!("Exit region worker thread {}", self.id);
943 }
944
945 async fn handle_requests(
949 &mut self,
950 write_requests: &mut Vec<SenderWriteRequest>,
951 ddl_requests: &mut Vec<SenderDdlRequest>,
952 general_requests: &mut Vec<WorkerRequest>,
953 bulk_requests: &mut Vec<SenderBulkRequest>,
954 ) {
955 for worker_req in general_requests.drain(..) {
956 match worker_req {
957 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
958 continue;
960 }
961 WorkerRequest::Background { region_id, notify } => {
962 self.handle_background_notify(region_id, notify).await;
964 }
965 WorkerRequest::SetRegionRoleStateGracefully {
966 region_id,
967 region_role_state,
968 sender,
969 } => {
970 self.set_role_state_gracefully(region_id, region_role_state, sender)
971 .await;
972 }
973 WorkerRequest::EditRegion(request) => {
974 self.handle_region_edit(request).await;
975 }
976 WorkerRequest::Stop => {
977 debug_assert!(!self.running.load(Ordering::Relaxed));
978 }
979 WorkerRequest::SyncRegion(req) => {
980 self.handle_region_sync(req).await;
981 }
982 WorkerRequest::BulkInserts {
983 metadata,
984 request,
985 sender,
986 } => {
987 if let Some(region_metadata) = metadata {
988 self.handle_bulk_insert_batch(
989 region_metadata,
990 request,
991 bulk_requests,
992 sender,
993 )
994 .await;
995 } else {
996 error!("Cannot find region metadata for {}", request.region_id);
997 sender.send(
998 error::RegionNotFoundSnafu {
999 region_id: request.region_id,
1000 }
1001 .fail(),
1002 );
1003 }
1004 }
1005 }
1006 }
1007
1008 self.handle_write_requests(write_requests, bulk_requests, true)
1011 .await;
1012
1013 self.handle_ddl_requests(ddl_requests).await;
1014 }
1015
1016 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1018 if ddl_requests.is_empty() {
1019 return;
1020 }
1021
1022 for ddl in ddl_requests.drain(..) {
1023 let res = match ddl.request {
1024 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1025 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
1026 DdlRequest::Open((req, wal_entry_receiver)) => {
1027 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1028 .await;
1029 continue;
1030 }
1031 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
1032 DdlRequest::Alter(req) => {
1033 self.handle_alter_request(ddl.region_id, req, ddl.sender)
1034 .await;
1035 continue;
1036 }
1037 DdlRequest::Flush(req) => {
1038 self.handle_flush_request(ddl.region_id, req, ddl.sender)
1039 .await;
1040 continue;
1041 }
1042 DdlRequest::Compact(req) => {
1043 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1044 .await;
1045 continue;
1046 }
1047 DdlRequest::BuildIndex(req) => {
1048 self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1049 .await;
1050 continue;
1051 }
1052 DdlRequest::Truncate(req) => {
1053 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1054 .await;
1055 continue;
1056 }
1057 DdlRequest::Catchup((req, wal_entry_receiver)) => {
1058 self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1059 .await;
1060 continue;
1061 }
1062 };
1063
1064 ddl.sender.send(res);
1065 }
1066 }
1067
1068 fn handle_periodical_tasks(&mut self) {
1070 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1071 if self
1072 .time_provider
1073 .elapsed_since(self.last_periodical_check_millis)
1074 < interval
1075 {
1076 return;
1077 }
1078
1079 self.last_periodical_check_millis = self.time_provider.current_time_millis();
1080
1081 if let Err(e) = self.flush_periodically() {
1082 error!(e; "Failed to flush regions periodically");
1083 }
1084 }
1085
1086 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1088 match notify {
1089 BackgroundNotify::FlushFinished(req) => {
1090 self.handle_flush_finished(region_id, req).await
1091 }
1092 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1093 BackgroundNotify::IndexBuildFinished(req) => {
1094 self.handle_index_build_finished(region_id, req).await
1095 }
1096 BackgroundNotify::IndexBuildStopped(req) => {
1097 self.handle_index_build_stopped(region_id, req).await
1098 }
1099 BackgroundNotify::IndexBuildFailed(req) => {
1100 self.handle_index_build_failed(region_id, req).await
1101 }
1102 BackgroundNotify::CompactionFinished(req) => {
1103 self.handle_compaction_finished(region_id, req).await
1104 }
1105 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1106 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1107 BackgroundNotify::RegionChange(req) => {
1108 self.handle_manifest_region_change_result(req).await
1109 }
1110 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1111 }
1112 }
1113
1114 async fn set_role_state_gracefully(
1116 &mut self,
1117 region_id: RegionId,
1118 region_role_state: SettableRegionRoleState,
1119 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1120 ) {
1121 if let Some(region) = self.regions.get_region(region_id) {
1122 common_runtime::spawn_global(async move {
1124 match region.set_role_state_gracefully(region_role_state).await {
1125 Ok(()) => {
1126 let last_entry_id = region.version_control.current().last_entry_id;
1127 let _ = sender.send(SetRegionRoleStateResponse::success(
1128 SetRegionRoleStateSuccess::mito(last_entry_id),
1129 ));
1130 }
1131 Err(e) => {
1132 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1133 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1134 BoxedError::new(e),
1135 ));
1136 }
1137 }
1138 });
1139 } else {
1140 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1141 }
1142 }
1143}
1144
1145impl<S> RegionWorkerLoop<S> {
1146 async fn clean(&self) {
1148 let regions = self.regions.list_regions();
1150 for region in regions {
1151 region.stop().await;
1152 }
1153
1154 self.regions.clear();
1155 }
1156
1157 fn notify_group(&mut self) {
1160 let _ = self.flush_sender.send(());
1162 self.flush_receiver.borrow_and_update();
1164 }
1165}
1166
1167#[derive(Default, Clone)]
1169pub(crate) struct WorkerListener {
1170 #[cfg(any(test, feature = "test"))]
1171 listener: Option<crate::engine::listener::EventListenerRef>,
1172}
1173
1174impl WorkerListener {
1175 #[cfg(any(test, feature = "test"))]
1176 pub(crate) fn new(
1177 listener: Option<crate::engine::listener::EventListenerRef>,
1178 ) -> WorkerListener {
1179 WorkerListener { listener }
1180 }
1181
1182 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1184 #[cfg(any(test, feature = "test"))]
1185 if let Some(listener) = &self.listener {
1186 listener.on_flush_success(region_id);
1187 }
1188 let _ = region_id;
1190 }
1191
1192 pub(crate) fn on_write_stall(&self) {
1194 #[cfg(any(test, feature = "test"))]
1195 if let Some(listener) = &self.listener {
1196 listener.on_write_stall();
1197 }
1198 }
1199
1200 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1201 #[cfg(any(test, feature = "test"))]
1202 if let Some(listener) = &self.listener {
1203 listener.on_flush_begin(region_id).await;
1204 }
1205 let _ = region_id;
1207 }
1208
1209 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1210 #[cfg(any(test, feature = "test"))]
1211 if let Some(listener) = &self.listener {
1212 return listener.on_later_drop_begin(region_id);
1213 }
1214 let _ = region_id;
1216 None
1217 }
1218
1219 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1221 #[cfg(any(test, feature = "test"))]
1222 if let Some(listener) = &self.listener {
1223 listener.on_later_drop_end(region_id, removed);
1224 }
1225 let _ = region_id;
1227 let _ = removed;
1228 }
1229
1230 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1231 #[cfg(any(test, feature = "test"))]
1232 if let Some(listener) = &self.listener {
1233 listener.on_merge_ssts_finished(region_id).await;
1234 }
1235 let _ = region_id;
1237 }
1238
1239 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1240 #[cfg(any(test, feature = "test"))]
1241 if let Some(listener) = &self.listener {
1242 listener.on_recv_requests(request_num);
1243 }
1244 let _ = request_num;
1246 }
1247
1248 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1249 #[cfg(any(test, feature = "test"))]
1250 if let Some(listener) = &self.listener {
1251 listener.on_file_cache_filled(_file_id);
1252 }
1253 }
1254
1255 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1256 #[cfg(any(test, feature = "test"))]
1257 if let Some(listener) = &self.listener {
1258 listener.on_compaction_scheduled(_region_id);
1259 }
1260 }
1261
1262 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1263 #[cfg(any(test, feature = "test"))]
1264 if let Some(listener) = &self.listener {
1265 listener
1266 .on_notify_region_change_result_begin(_region_id)
1267 .await;
1268 }
1269 }
1270
1271 pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1272 #[cfg(any(test, feature = "test"))]
1273 if let Some(listener) = &self.listener {
1274 listener.on_index_build_finish(_region_file_id).await;
1275 }
1276 }
1277
1278 pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1279 #[cfg(any(test, feature = "test"))]
1280 if let Some(listener) = &self.listener {
1281 listener.on_index_build_begin(_region_file_id).await;
1282 }
1283 }
1284
1285 pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1286 #[cfg(any(test, feature = "test"))]
1287 if let Some(listener) = &self.listener {
1288 listener.on_index_build_abort(_region_file_id).await;
1289 }
1290 }
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295 use super::*;
1296 use crate::test_util::TestEnv;
1297
1298 #[test]
1299 fn test_region_id_to_index() {
1300 let num_workers = 4;
1301
1302 let region_id = RegionId::new(1, 2);
1303 let index = region_id_to_index(region_id, num_workers);
1304 assert_eq!(index, 3);
1305
1306 let region_id = RegionId::new(2, 3);
1307 let index = region_id_to_index(region_id, num_workers);
1308 assert_eq!(index, 1);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_worker_group_start_stop() {
1313 let env = TestEnv::with_prefix("group-stop").await;
1314 let group = env
1315 .create_worker_group(MitoConfig {
1316 num_workers: 4,
1317 ..Default::default()
1318 })
1319 .await;
1320
1321 group.stop().await.unwrap();
1322 }
1323}