1mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_enter_staging;
25mod handle_flush;
26mod handle_manifest;
27mod handle_open;
28mod handle_rebuild_index;
29mod handle_remap;
30mod handle_truncate;
31mod handle_write;
32
33use std::collections::HashMap;
34use std::path::Path;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::Duration;
38
39use common_base::Plugins;
40use common_base::readable_size::ReadableSize;
41use common_error::ext::BoxedError;
42use common_meta::key::SchemaMetadataManagerRef;
43use common_runtime::JoinHandle;
44use common_stat::get_total_memory_bytes;
45use common_telemetry::{error, info, warn};
46use futures::future::try_join_all;
47use object_store::manager::ObjectStoreManagerRef;
48use prometheus::{Histogram, IntGauge};
49use rand::{Rng, rng};
50use snafu::{ResultExt, ensure};
51use store_api::logstore::LogStore;
52use store_api::region_engine::{
53 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
54};
55use store_api::storage::{FileId, RegionId};
56use tokio::sync::mpsc::{Receiver, Sender};
57use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
58
59use crate::cache::write_cache::{WriteCache, WriteCacheRef};
60use crate::cache::{CacheManager, CacheManagerRef};
61use crate::compaction::CompactionScheduler;
62use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
63use crate::config::MitoConfig;
64use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
65use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
66use crate::gc::{GcLimiter, GcLimiterRef};
67use crate::memtable::MemtableBuilderProvider;
68use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
69use crate::region::opener::PartitionExprFetcherRef;
70use crate::region::{
71 CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
72 RegionMapRef,
73};
74use crate::request::{
75 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
76 WorkerRequest, WorkerRequestWithTime,
77};
78use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
79use crate::sst::file::RegionFileId;
80use crate::sst::file_ref::FileReferenceManagerRef;
81use crate::sst::index::IndexBuildScheduler;
82use crate::sst::index::intermediate::IntermediateManager;
83use crate::sst::index::puffin_manager::PuffinManagerFactory;
84use crate::time_provider::{StdTimeProvider, TimeProviderRef};
85use crate::wal::Wal;
86use crate::worker::handle_manifest::RegionEditQueues;
87
88pub(crate) type WorkerId = u32;
90
91pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
92
93pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
95pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99pub(crate) struct WorkerGroup {
136 workers: Vec<RegionWorker>,
138 flush_job_pool: SchedulerRef,
140 compact_job_pool: SchedulerRef,
142 index_build_job_pool: SchedulerRef,
144 purge_scheduler: SchedulerRef,
146 cache_manager: CacheManagerRef,
148 file_ref_manager: FileReferenceManagerRef,
150 gc_limiter: GcLimiterRef,
152}
153
154impl WorkerGroup {
155 pub(crate) async fn start<S: LogStore>(
159 config: Arc<MitoConfig>,
160 log_store: Arc<S>,
161 object_store_manager: ObjectStoreManagerRef,
162 schema_metadata_manager: SchemaMetadataManagerRef,
163 file_ref_manager: FileReferenceManagerRef,
164 partition_expr_fetcher: PartitionExprFetcherRef,
165 plugins: Plugins,
166 ) -> Result<WorkerGroup> {
167 let (flush_sender, flush_receiver) = watch::channel(());
168 let write_buffer_manager = Arc::new(
169 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
170 .with_notifier(flush_sender.clone()),
171 );
172 let puffin_manager_factory = PuffinManagerFactory::new(
173 &config.index.aux_path,
174 config.index.staging_size.as_bytes(),
175 Some(config.index.write_buffer_size.as_bytes() as _),
176 config.index.staging_ttl,
177 )
178 .await?;
179 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
180 .await?
181 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
182 let index_build_job_pool =
183 Arc::new(LocalScheduler::new(config.max_background_index_builds));
184 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
185 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
186 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
187 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
189 let write_cache = write_cache_from_config(
190 &config,
191 puffin_manager_factory.clone(),
192 intermediate_manager.clone(),
193 )
194 .await?;
195 let cache_manager = Arc::new(
196 CacheManager::builder()
197 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
198 .vector_cache_size(config.vector_cache_size.as_bytes())
199 .page_cache_size(config.page_cache_size.as_bytes())
200 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
201 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
202 .index_content_size(config.index.content_cache_size.as_bytes())
203 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
204 .index_result_cache_size(config.index.result_cache_size.as_bytes())
205 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
206 .write_cache(write_cache)
207 .build(),
208 );
209 let time_provider = Arc::new(StdTimeProvider);
210 let total_memory = get_total_memory_bytes();
211 let total_memory = if total_memory > 0 {
212 total_memory as u64
213 } else {
214 0
215 };
216 let compaction_limit_bytes = config
217 .experimental_compaction_memory_limit
218 .resolve(total_memory);
219 let compaction_memory_manager =
220 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
221 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
222
223 let workers = (0..config.num_workers)
224 .map(|id| {
225 WorkerStarter {
226 id: id as WorkerId,
227 config: config.clone(),
228 log_store: log_store.clone(),
229 object_store_manager: object_store_manager.clone(),
230 write_buffer_manager: write_buffer_manager.clone(),
231 index_build_job_pool: index_build_job_pool.clone(),
232 flush_job_pool: flush_job_pool.clone(),
233 compact_job_pool: compact_job_pool.clone(),
234 purge_scheduler: purge_scheduler.clone(),
235 listener: WorkerListener::default(),
236 cache_manager: cache_manager.clone(),
237 compaction_memory_manager: compaction_memory_manager.clone(),
238 puffin_manager_factory: puffin_manager_factory.clone(),
239 intermediate_manager: intermediate_manager.clone(),
240 time_provider: time_provider.clone(),
241 flush_sender: flush_sender.clone(),
242 flush_receiver: flush_receiver.clone(),
243 plugins: plugins.clone(),
244 schema_metadata_manager: schema_metadata_manager.clone(),
245 file_ref_manager: file_ref_manager.clone(),
246 partition_expr_fetcher: partition_expr_fetcher.clone(),
247 flush_semaphore: flush_semaphore.clone(),
248 }
249 .start()
250 })
251 .collect::<Result<Vec<_>>>()?;
252
253 Ok(WorkerGroup {
254 workers,
255 flush_job_pool,
256 compact_job_pool,
257 index_build_job_pool,
258 purge_scheduler,
259 cache_manager,
260 file_ref_manager,
261 gc_limiter,
262 })
263 }
264
265 pub(crate) async fn stop(&self) -> Result<()> {
267 info!("Stop region worker group");
268
269 self.compact_job_pool.stop(true).await?;
272 self.flush_job_pool.stop(true).await?;
274 self.purge_scheduler.stop(true).await?;
276 self.index_build_job_pool.stop(true).await?;
278
279 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
280
281 Ok(())
282 }
283
284 pub(crate) async fn submit_to_worker(
286 &self,
287 region_id: RegionId,
288 request: WorkerRequest,
289 ) -> Result<()> {
290 self.worker(region_id).submit_request(request).await
291 }
292
293 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
295 self.worker(region_id).is_region_exists(region_id)
296 }
297
298 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
300 self.worker(region_id).is_region_opening(region_id)
301 }
302
303 pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
305 self.worker(region_id).is_region_catching_up(region_id)
306 }
307
308 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
312 self.worker(region_id).get_region(region_id)
313 }
314
315 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
317 self.cache_manager.clone()
318 }
319
320 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
321 self.file_ref_manager.clone()
322 }
323
324 pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
325 self.gc_limiter.clone()
326 }
327
328 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
330 let index = region_id_to_index(region_id, self.workers.len());
331
332 &self.workers[index]
333 }
334
335 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
336 self.workers
337 .iter()
338 .flat_map(|worker| worker.regions.list_regions())
339 }
340}
341
342#[cfg(any(test, feature = "test"))]
344impl WorkerGroup {
345 #[allow(clippy::too_many_arguments)]
349 pub(crate) async fn start_for_test<S: LogStore>(
350 config: Arc<MitoConfig>,
351 log_store: Arc<S>,
352 object_store_manager: ObjectStoreManagerRef,
353 write_buffer_manager: Option<WriteBufferManagerRef>,
354 listener: Option<crate::engine::listener::EventListenerRef>,
355 schema_metadata_manager: SchemaMetadataManagerRef,
356 file_ref_manager: FileReferenceManagerRef,
357 time_provider: TimeProviderRef,
358 partition_expr_fetcher: PartitionExprFetcherRef,
359 ) -> Result<WorkerGroup> {
360 let (flush_sender, flush_receiver) = watch::channel(());
361 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
362 Arc::new(
363 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
364 .with_notifier(flush_sender.clone()),
365 )
366 });
367 let index_build_job_pool =
368 Arc::new(LocalScheduler::new(config.max_background_index_builds));
369 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
370 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
371 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
372 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
373 let puffin_manager_factory = PuffinManagerFactory::new(
374 &config.index.aux_path,
375 config.index.staging_size.as_bytes(),
376 Some(config.index.write_buffer_size.as_bytes() as _),
377 config.index.staging_ttl,
378 )
379 .await?;
380 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
381 .await?
382 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
383 let write_cache = write_cache_from_config(
384 &config,
385 puffin_manager_factory.clone(),
386 intermediate_manager.clone(),
387 )
388 .await?;
389 let cache_manager = Arc::new(
390 CacheManager::builder()
391 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
392 .vector_cache_size(config.vector_cache_size.as_bytes())
393 .page_cache_size(config.page_cache_size.as_bytes())
394 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
395 .write_cache(write_cache)
396 .build(),
397 );
398 let total_memory = get_total_memory_bytes();
399 let total_memory = if total_memory > 0 {
400 total_memory as u64
401 } else {
402 0
403 };
404 let compaction_limit_bytes = config
405 .experimental_compaction_memory_limit
406 .resolve(total_memory);
407 let compaction_memory_manager =
408 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
409 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
410 let workers = (0..config.num_workers)
411 .map(|id| {
412 WorkerStarter {
413 id: id as WorkerId,
414 config: config.clone(),
415 log_store: log_store.clone(),
416 object_store_manager: object_store_manager.clone(),
417 write_buffer_manager: write_buffer_manager.clone(),
418 index_build_job_pool: index_build_job_pool.clone(),
419 flush_job_pool: flush_job_pool.clone(),
420 compact_job_pool: compact_job_pool.clone(),
421 purge_scheduler: purge_scheduler.clone(),
422 listener: WorkerListener::new(listener.clone()),
423 cache_manager: cache_manager.clone(),
424 compaction_memory_manager: compaction_memory_manager.clone(),
425 puffin_manager_factory: puffin_manager_factory.clone(),
426 intermediate_manager: intermediate_manager.clone(),
427 time_provider: time_provider.clone(),
428 flush_sender: flush_sender.clone(),
429 flush_receiver: flush_receiver.clone(),
430 plugins: Plugins::new(),
431 schema_metadata_manager: schema_metadata_manager.clone(),
432 file_ref_manager: file_ref_manager.clone(),
433 partition_expr_fetcher: partition_expr_fetcher.clone(),
434 flush_semaphore: flush_semaphore.clone(),
435 }
436 .start()
437 })
438 .collect::<Result<Vec<_>>>()?;
439
440 Ok(WorkerGroup {
441 workers,
442 flush_job_pool,
443 compact_job_pool,
444 index_build_job_pool,
445 purge_scheduler,
446 cache_manager,
447 file_ref_manager,
448 gc_limiter,
449 })
450 }
451
452 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
454 &self.purge_scheduler
455 }
456}
457
458fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
459 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
460 % num_workers
461}
462
463pub async fn write_cache_from_config(
464 config: &MitoConfig,
465 puffin_manager_factory: PuffinManagerFactory,
466 intermediate_manager: IntermediateManager,
467) -> Result<Option<WriteCacheRef>> {
468 if !config.enable_write_cache {
469 return Ok(None);
470 }
471
472 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
473 .await
474 .context(CreateDirSnafu {
475 dir: &config.write_cache_path,
476 })?;
477
478 let cache = WriteCache::new_fs(
479 &config.write_cache_path,
480 config.write_cache_size,
481 config.write_cache_ttl,
482 Some(config.index_cache_percent),
483 puffin_manager_factory,
484 intermediate_manager,
485 ReadableSize(0),
487 )
488 .await?;
489 Ok(Some(Arc::new(cache)))
490}
491
492pub(crate) fn worker_init_check_delay() -> Duration {
494 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
495 Duration::from_secs(init_check_delay)
496}
497
498struct WorkerStarter<S> {
500 id: WorkerId,
501 config: Arc<MitoConfig>,
502 log_store: Arc<S>,
503 object_store_manager: ObjectStoreManagerRef,
504 write_buffer_manager: WriteBufferManagerRef,
505 compact_job_pool: SchedulerRef,
506 index_build_job_pool: SchedulerRef,
507 flush_job_pool: SchedulerRef,
508 purge_scheduler: SchedulerRef,
509 listener: WorkerListener,
510 cache_manager: CacheManagerRef,
511 compaction_memory_manager: Arc<CompactionMemoryManager>,
512 puffin_manager_factory: PuffinManagerFactory,
513 intermediate_manager: IntermediateManager,
514 time_provider: TimeProviderRef,
515 flush_sender: watch::Sender<()>,
517 flush_receiver: watch::Receiver<()>,
519 plugins: Plugins,
520 schema_metadata_manager: SchemaMetadataManagerRef,
521 file_ref_manager: FileReferenceManagerRef,
522 partition_expr_fetcher: PartitionExprFetcherRef,
523 flush_semaphore: Arc<Semaphore>,
524}
525
526impl<S: LogStore> WorkerStarter<S> {
527 fn start(self) -> Result<RegionWorker> {
529 let regions = Arc::new(RegionMap::default());
530 let opening_regions = Arc::new(OpeningRegions::default());
531 let catchup_regions = Arc::new(CatchupRegions::default());
532 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
533
534 let running = Arc::new(AtomicBool::new(true));
535 let now = self.time_provider.current_time_millis();
536 let id_string = self.id.to_string();
537 let mut worker_thread = RegionWorkerLoop {
538 id: self.id,
539 config: self.config.clone(),
540 regions: regions.clone(),
541 catchup_regions: catchup_regions.clone(),
542 dropping_regions: Arc::new(RegionMap::default()),
543 opening_regions: opening_regions.clone(),
544 sender: sender.clone(),
545 receiver,
546 wal: Wal::new(self.log_store),
547 object_store_manager: self.object_store_manager.clone(),
548 running: running.clone(),
549 memtable_builder_provider: MemtableBuilderProvider::new(
550 Some(self.write_buffer_manager.clone()),
551 self.config.clone(),
552 ),
553 purge_scheduler: self.purge_scheduler.clone(),
554 write_buffer_manager: self.write_buffer_manager,
555 index_build_scheduler: IndexBuildScheduler::new(
556 self.index_build_job_pool,
557 self.config.max_background_index_builds,
558 ),
559 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
560 compaction_scheduler: CompactionScheduler::new(
561 self.compact_job_pool,
562 sender.clone(),
563 self.cache_manager.clone(),
564 self.config.clone(),
565 self.listener.clone(),
566 self.plugins.clone(),
567 self.compaction_memory_manager.clone(),
568 self.config.experimental_compaction_on_exhausted,
569 ),
570 stalled_requests: StalledRequests::default(),
571 listener: self.listener,
572 cache_manager: self.cache_manager,
573 puffin_manager_factory: self.puffin_manager_factory,
574 intermediate_manager: self.intermediate_manager,
575 time_provider: self.time_provider,
576 last_periodical_check_millis: now,
577 flush_sender: self.flush_sender,
578 flush_receiver: self.flush_receiver,
579 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
580 region_count: REGION_COUNT.with_label_values(&[&id_string]),
581 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
582 region_edit_queues: RegionEditQueues::default(),
583 schema_metadata_manager: self.schema_metadata_manager,
584 file_ref_manager: self.file_ref_manager.clone(),
585 partition_expr_fetcher: self.partition_expr_fetcher,
586 flush_semaphore: self.flush_semaphore,
587 };
588 let handle = common_runtime::spawn_global(async move {
589 worker_thread.run().await;
590 });
591
592 Ok(RegionWorker {
593 id: self.id,
594 regions,
595 opening_regions,
596 catchup_regions,
597 sender,
598 handle: Mutex::new(Some(handle)),
599 running,
600 })
601 }
602}
603
604pub(crate) struct RegionWorker {
606 id: WorkerId,
608 regions: RegionMapRef,
610 opening_regions: OpeningRegionsRef,
612 catchup_regions: CatchupRegionsRef,
614 sender: Sender<WorkerRequestWithTime>,
616 handle: Mutex<Option<JoinHandle<()>>>,
618 running: Arc<AtomicBool>,
620}
621
622impl RegionWorker {
623 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
625 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
626 let request_with_time = WorkerRequestWithTime::new(request);
627 if self.sender.send(request_with_time).await.is_err() {
628 warn!(
629 "Worker {} is already exited but the running flag is still true",
630 self.id
631 );
632 self.set_running(false);
634 return WorkerStoppedSnafu { id: self.id }.fail();
635 }
636
637 Ok(())
638 }
639
640 async fn stop(&self) -> Result<()> {
644 let handle = self.handle.lock().await.take();
645 if let Some(handle) = handle {
646 info!("Stop region worker {}", self.id);
647
648 self.set_running(false);
649 if self
650 .sender
651 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
652 .await
653 .is_err()
654 {
655 warn!("Worker {} is already exited before stop", self.id);
656 }
657
658 handle.await.context(JoinSnafu)?;
659 }
660
661 Ok(())
662 }
663
664 fn is_running(&self) -> bool {
666 self.running.load(Ordering::Relaxed)
667 }
668
669 fn set_running(&self, value: bool) {
671 self.running.store(value, Ordering::Relaxed)
672 }
673
674 fn is_region_exists(&self, region_id: RegionId) -> bool {
676 self.regions.is_region_exists(region_id)
677 }
678
679 fn is_region_opening(&self, region_id: RegionId) -> bool {
681 self.opening_regions.is_region_exists(region_id)
682 }
683
684 fn is_region_catching_up(&self, region_id: RegionId) -> bool {
686 self.catchup_regions.is_region_exists(region_id)
687 }
688
689 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
691 self.regions.get_region(region_id)
692 }
693
694 #[cfg(test)]
695 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
697 &self.opening_regions
698 }
699
700 #[cfg(test)]
701 pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
703 &self.catchup_regions
704 }
705}
706
707impl Drop for RegionWorker {
708 fn drop(&mut self) {
709 if self.is_running() {
710 self.set_running(false);
711 }
713 }
714}
715
716type RequestBuffer = Vec<WorkerRequest>;
717
718#[derive(Default)]
722pub(crate) struct StalledRequests {
723 pub(crate) requests:
730 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
731 pub(crate) estimated_size: usize,
733}
734
735impl StalledRequests {
736 pub(crate) fn append(
738 &mut self,
739 requests: &mut Vec<SenderWriteRequest>,
740 bulk_requests: &mut Vec<SenderBulkRequest>,
741 ) {
742 for req in requests.drain(..) {
743 self.push(req);
744 }
745 for req in bulk_requests.drain(..) {
746 self.push_bulk(req);
747 }
748 }
749
750 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
752 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
753 let req_size = req.request.estimated_size();
754 *size += req_size;
755 self.estimated_size += req_size;
756 requests.push(req);
757 }
758
759 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
760 let region_id = req.region_id;
761 let (size, _, requests) = self.requests.entry(region_id).or_default();
762 let req_size = req.request.estimated_size();
763 *size += req_size;
764 self.estimated_size += req_size;
765 requests.push(req);
766 }
767
768 pub(crate) fn remove(
770 &mut self,
771 region_id: &RegionId,
772 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
773 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
774 self.estimated_size -= size;
775 (write_reqs, bulk_reqs)
776 } else {
777 (vec![], vec![])
778 }
779 }
780
781 pub(crate) fn stalled_count(&self) -> usize {
783 self.requests
784 .values()
785 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
786 .sum()
787 }
788}
789
790struct RegionWorkerLoop<S> {
792 id: WorkerId,
794 config: Arc<MitoConfig>,
796 regions: RegionMapRef,
798 dropping_regions: RegionMapRef,
800 opening_regions: OpeningRegionsRef,
802 catchup_regions: CatchupRegionsRef,
804 sender: Sender<WorkerRequestWithTime>,
806 receiver: Receiver<WorkerRequestWithTime>,
808 wal: Wal<S>,
810 object_store_manager: ObjectStoreManagerRef,
812 running: Arc<AtomicBool>,
814 memtable_builder_provider: MemtableBuilderProvider,
816 purge_scheduler: SchedulerRef,
818 write_buffer_manager: WriteBufferManagerRef,
820 index_build_scheduler: IndexBuildScheduler,
822 flush_scheduler: FlushScheduler,
824 compaction_scheduler: CompactionScheduler,
826 stalled_requests: StalledRequests,
828 listener: WorkerListener,
830 cache_manager: CacheManagerRef,
832 puffin_manager_factory: PuffinManagerFactory,
834 intermediate_manager: IntermediateManager,
836 time_provider: TimeProviderRef,
838 last_periodical_check_millis: i64,
840 flush_sender: watch::Sender<()>,
842 flush_receiver: watch::Receiver<()>,
844 stalling_count: IntGauge,
846 region_count: IntGauge,
848 request_wait_time: Histogram,
850 region_edit_queues: RegionEditQueues,
852 schema_metadata_manager: SchemaMetadataManagerRef,
854 file_ref_manager: FileReferenceManagerRef,
856 partition_expr_fetcher: PartitionExprFetcherRef,
858 flush_semaphore: Arc<Semaphore>,
860}
861
862impl<S: LogStore> RegionWorkerLoop<S> {
863 async fn run(&mut self) {
865 let init_check_delay = worker_init_check_delay();
866 info!(
867 "Start region worker thread {}, init_check_delay: {:?}",
868 self.id, init_check_delay
869 );
870 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
871
872 let mut write_req_buffer: Vec<SenderWriteRequest> =
874 Vec::with_capacity(self.config.worker_request_batch_size);
875 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
876 Vec::with_capacity(self.config.worker_request_batch_size);
877 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
878 Vec::with_capacity(self.config.worker_request_batch_size);
879 let mut general_req_buffer: Vec<WorkerRequest> =
880 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
881
882 while self.running.load(Ordering::Relaxed) {
883 write_req_buffer.clear();
885 ddl_req_buffer.clear();
886 general_req_buffer.clear();
887
888 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
889 let sleep = tokio::time::sleep(max_wait_time);
890 tokio::pin!(sleep);
891
892 tokio::select! {
893 request_opt = self.receiver.recv() => {
894 match request_opt {
895 Some(request_with_time) => {
896 let wait_time = request_with_time.created_at.elapsed();
898 self.request_wait_time.observe(wait_time.as_secs_f64());
899
900 match request_with_time.request {
901 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
902 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
903 req => general_req_buffer.push(req),
904 }
905 },
906 None => break,
908 }
909 }
910 recv_res = self.flush_receiver.changed() => {
911 if recv_res.is_err() {
912 break;
914 } else {
915 self.maybe_flush_worker();
920 self.handle_stalled_requests().await;
922 continue;
923 }
924 }
925 _ = &mut sleep => {
926 self.handle_periodical_tasks();
928 continue;
929 }
930 }
931
932 if self.flush_receiver.has_changed().unwrap_or(false) {
933 self.handle_stalled_requests().await;
937 }
938
939 for _ in 1..self.config.worker_request_batch_size {
941 match self.receiver.try_recv() {
943 Ok(request_with_time) => {
944 let wait_time = request_with_time.created_at.elapsed();
946 self.request_wait_time.observe(wait_time.as_secs_f64());
947
948 match request_with_time.request {
949 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
950 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
951 req => general_req_buffer.push(req),
952 }
953 }
954 Err(_) => break,
956 }
957 }
958
959 self.listener.on_recv_requests(
960 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
961 );
962
963 self.handle_requests(
964 &mut write_req_buffer,
965 &mut ddl_req_buffer,
966 &mut general_req_buffer,
967 &mut bulk_req_buffer,
968 )
969 .await;
970
971 self.handle_periodical_tasks();
972 }
973
974 self.clean().await;
975
976 info!("Exit region worker thread {}", self.id);
977 }
978
979 async fn handle_requests(
983 &mut self,
984 write_requests: &mut Vec<SenderWriteRequest>,
985 ddl_requests: &mut Vec<SenderDdlRequest>,
986 general_requests: &mut Vec<WorkerRequest>,
987 bulk_requests: &mut Vec<SenderBulkRequest>,
988 ) {
989 for worker_req in general_requests.drain(..) {
990 match worker_req {
991 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
992 continue;
994 }
995 WorkerRequest::Background { region_id, notify } => {
996 self.handle_background_notify(region_id, notify).await;
998 }
999 WorkerRequest::SetRegionRoleStateGracefully {
1000 region_id,
1001 region_role_state,
1002 sender,
1003 } => {
1004 self.set_role_state_gracefully(region_id, region_role_state, sender)
1005 .await;
1006 }
1007 WorkerRequest::EditRegion(request) => {
1008 self.handle_region_edit(request).await;
1009 }
1010 WorkerRequest::Stop => {
1011 debug_assert!(!self.running.load(Ordering::Relaxed));
1012 }
1013 WorkerRequest::SyncRegion(req) => {
1014 self.handle_region_sync(req).await;
1015 }
1016 WorkerRequest::BulkInserts {
1017 metadata,
1018 request,
1019 sender,
1020 } => {
1021 if let Some(region_metadata) = metadata {
1022 self.handle_bulk_insert_batch(
1023 region_metadata,
1024 request,
1025 bulk_requests,
1026 sender,
1027 )
1028 .await;
1029 } else {
1030 error!("Cannot find region metadata for {}", request.region_id);
1031 sender.send(
1032 error::RegionNotFoundSnafu {
1033 region_id: request.region_id,
1034 }
1035 .fail(),
1036 );
1037 }
1038 }
1039 WorkerRequest::RemapManifests(req) => {
1040 self.handle_remap_manifests_request(req);
1041 }
1042 }
1043 }
1044
1045 self.handle_write_requests(write_requests, bulk_requests, true)
1048 .await;
1049
1050 self.handle_ddl_requests(ddl_requests).await;
1051 }
1052
1053 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1055 if ddl_requests.is_empty() {
1056 return;
1057 }
1058
1059 for ddl in ddl_requests.drain(..) {
1060 let res = match ddl.request {
1061 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1062 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
1063 DdlRequest::Open((req, wal_entry_receiver)) => {
1064 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1065 .await;
1066 continue;
1067 }
1068 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
1069 DdlRequest::Alter(req) => {
1070 self.handle_alter_request(ddl.region_id, req, ddl.sender)
1071 .await;
1072 continue;
1073 }
1074 DdlRequest::Flush(req) => {
1075 self.handle_flush_request(ddl.region_id, req, ddl.sender);
1076 continue;
1077 }
1078 DdlRequest::Compact(req) => {
1079 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1080 .await;
1081 continue;
1082 }
1083 DdlRequest::BuildIndex(req) => {
1084 self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1085 .await;
1086 continue;
1087 }
1088 DdlRequest::Truncate(req) => {
1089 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1090 .await;
1091 continue;
1092 }
1093 DdlRequest::Catchup((req, wal_entry_receiver)) => {
1094 self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1095 .await;
1096 continue;
1097 }
1098 DdlRequest::EnterStaging(req) => {
1099 self.handle_enter_staging_request(
1100 ddl.region_id,
1101 req.partition_expr,
1102 ddl.sender,
1103 )
1104 .await;
1105 continue;
1106 }
1107 };
1108
1109 ddl.sender.send(res);
1110 }
1111 }
1112
1113 fn handle_periodical_tasks(&mut self) {
1115 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1116 if self
1117 .time_provider
1118 .elapsed_since(self.last_periodical_check_millis)
1119 < interval
1120 {
1121 return;
1122 }
1123
1124 self.last_periodical_check_millis = self.time_provider.current_time_millis();
1125
1126 if let Err(e) = self.flush_periodically() {
1127 error!(e; "Failed to flush regions periodically");
1128 }
1129 }
1130
1131 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1133 match notify {
1134 BackgroundNotify::FlushFinished(req) => {
1135 self.handle_flush_finished(region_id, req).await
1136 }
1137 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1138 BackgroundNotify::IndexBuildFinished(req) => {
1139 self.handle_index_build_finished(region_id, req).await
1140 }
1141 BackgroundNotify::IndexBuildStopped(req) => {
1142 self.handle_index_build_stopped(region_id, req).await
1143 }
1144 BackgroundNotify::IndexBuildFailed(req) => {
1145 self.handle_index_build_failed(region_id, req).await
1146 }
1147 BackgroundNotify::CompactionFinished(req) => {
1148 self.handle_compaction_finished(region_id, req).await
1149 }
1150 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1151 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1152 BackgroundNotify::RegionChange(req) => {
1153 self.handle_manifest_region_change_result(req).await
1154 }
1155 BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1156 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1157 }
1158 }
1159
1160 async fn set_role_state_gracefully(
1162 &mut self,
1163 region_id: RegionId,
1164 region_role_state: SettableRegionRoleState,
1165 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1166 ) {
1167 if let Some(region) = self.regions.get_region(region_id) {
1168 common_runtime::spawn_global(async move {
1170 match region.set_role_state_gracefully(region_role_state).await {
1171 Ok(()) => {
1172 let last_entry_id = region.version_control.current().last_entry_id;
1173 let _ = sender.send(SetRegionRoleStateResponse::success(
1174 SetRegionRoleStateSuccess::mito(last_entry_id),
1175 ));
1176 }
1177 Err(e) => {
1178 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1179 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1180 BoxedError::new(e),
1181 ));
1182 }
1183 }
1184 });
1185 } else {
1186 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1187 }
1188 }
1189}
1190
1191impl<S> RegionWorkerLoop<S> {
1192 async fn clean(&self) {
1194 let regions = self.regions.list_regions();
1196 for region in regions {
1197 region.stop().await;
1198 }
1199
1200 self.regions.clear();
1201 }
1202
1203 fn notify_group(&mut self) {
1206 let _ = self.flush_sender.send(());
1208 self.flush_receiver.borrow_and_update();
1210 }
1211}
1212
1213#[derive(Default, Clone)]
1215pub(crate) struct WorkerListener {
1216 #[cfg(any(test, feature = "test"))]
1217 listener: Option<crate::engine::listener::EventListenerRef>,
1218}
1219
1220impl WorkerListener {
1221 #[cfg(any(test, feature = "test"))]
1222 pub(crate) fn new(
1223 listener: Option<crate::engine::listener::EventListenerRef>,
1224 ) -> WorkerListener {
1225 WorkerListener { listener }
1226 }
1227
1228 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1230 #[cfg(any(test, feature = "test"))]
1231 if let Some(listener) = &self.listener {
1232 listener.on_flush_success(region_id);
1233 }
1234 let _ = region_id;
1236 }
1237
1238 pub(crate) fn on_write_stall(&self) {
1240 #[cfg(any(test, feature = "test"))]
1241 if let Some(listener) = &self.listener {
1242 listener.on_write_stall();
1243 }
1244 }
1245
1246 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1247 #[cfg(any(test, feature = "test"))]
1248 if let Some(listener) = &self.listener {
1249 listener.on_flush_begin(region_id).await;
1250 }
1251 let _ = region_id;
1253 }
1254
1255 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1256 #[cfg(any(test, feature = "test"))]
1257 if let Some(listener) = &self.listener {
1258 return listener.on_later_drop_begin(region_id);
1259 }
1260 let _ = region_id;
1262 None
1263 }
1264
1265 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1267 #[cfg(any(test, feature = "test"))]
1268 if let Some(listener) = &self.listener {
1269 listener.on_later_drop_end(region_id, removed);
1270 }
1271 let _ = region_id;
1273 let _ = removed;
1274 }
1275
1276 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1277 #[cfg(any(test, feature = "test"))]
1278 if let Some(listener) = &self.listener {
1279 listener.on_merge_ssts_finished(region_id).await;
1280 }
1281 let _ = region_id;
1283 }
1284
1285 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1286 #[cfg(any(test, feature = "test"))]
1287 if let Some(listener) = &self.listener {
1288 listener.on_recv_requests(request_num);
1289 }
1290 let _ = request_num;
1292 }
1293
1294 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1295 #[cfg(any(test, feature = "test"))]
1296 if let Some(listener) = &self.listener {
1297 listener.on_file_cache_filled(_file_id);
1298 }
1299 }
1300
1301 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1302 #[cfg(any(test, feature = "test"))]
1303 if let Some(listener) = &self.listener {
1304 listener.on_compaction_scheduled(_region_id);
1305 }
1306 }
1307
1308 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1309 #[cfg(any(test, feature = "test"))]
1310 if let Some(listener) = &self.listener {
1311 listener
1312 .on_notify_region_change_result_begin(_region_id)
1313 .await;
1314 }
1315 }
1316
1317 pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1318 #[cfg(any(test, feature = "test"))]
1319 if let Some(listener) = &self.listener {
1320 listener.on_enter_staging_result_begin(_region_id).await;
1321 }
1322 }
1323
1324 pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1325 #[cfg(any(test, feature = "test"))]
1326 if let Some(listener) = &self.listener {
1327 listener.on_index_build_finish(_region_file_id).await;
1328 }
1329 }
1330
1331 pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1332 #[cfg(any(test, feature = "test"))]
1333 if let Some(listener) = &self.listener {
1334 listener.on_index_build_begin(_region_file_id).await;
1335 }
1336 }
1337
1338 pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1339 #[cfg(any(test, feature = "test"))]
1340 if let Some(listener) = &self.listener {
1341 listener.on_index_build_abort(_region_file_id).await;
1342 }
1343 }
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348 use super::*;
1349 use crate::test_util::TestEnv;
1350
1351 #[test]
1352 fn test_region_id_to_index() {
1353 let num_workers = 4;
1354
1355 let region_id = RegionId::new(1, 2);
1356 let index = region_id_to_index(region_id, num_workers);
1357 assert_eq!(index, 3);
1358
1359 let region_id = RegionId::new(2, 3);
1360 let index = region_id_to_index(region_id, num_workers);
1361 assert_eq!(index, 1);
1362 }
1363
1364 #[tokio::test]
1365 async fn test_worker_group_start_stop() {
1366 let env = TestEnv::with_prefix("group-stop").await;
1367 let group = env
1368 .create_worker_group(MitoConfig {
1369 num_workers: 4,
1370 ..Default::default()
1371 })
1372 .await;
1373
1374 group.stop().await.unwrap();
1375 }
1376}