1mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72 CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73 RegionMapRef,
74};
75use crate::request::{
76 BackgroundNotify, BulkInsertRequest, DdlRequest, SenderBulkRequest, SenderDdlRequest,
77 SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100pub(crate) struct WorkerGroup {
137 workers: Vec<RegionWorker>,
139 flush_job_pool: SchedulerRef,
141 compact_job_pool: SchedulerRef,
143 index_build_job_pool: SchedulerRef,
145 purge_scheduler: SchedulerRef,
147 cache_manager: CacheManagerRef,
149 file_ref_manager: FileReferenceManagerRef,
151 gc_limiter: GcLimiterRef,
153 object_store_manager: ObjectStoreManagerRef,
155 puffin_manager_factory: PuffinManagerFactory,
157 intermediate_manager: IntermediateManager,
159 schema_metadata_manager: SchemaMetadataManagerRef,
161}
162
163impl WorkerGroup {
164 pub(crate) async fn start<S: LogStore>(
168 config: Arc<MitoConfig>,
169 log_store: Arc<S>,
170 object_store_manager: ObjectStoreManagerRef,
171 schema_metadata_manager: SchemaMetadataManagerRef,
172 file_ref_manager: FileReferenceManagerRef,
173 partition_expr_fetcher: PartitionExprFetcherRef,
174 plugins: Plugins,
175 ) -> Result<WorkerGroup> {
176 let (flush_sender, flush_receiver) = watch::channel(());
177 let write_buffer_manager = Arc::new(
178 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
179 .with_notifier(flush_sender.clone()),
180 );
181 let puffin_manager_factory = PuffinManagerFactory::new(
182 &config.index.aux_path,
183 config.index.staging_size.as_bytes(),
184 Some(config.index.write_buffer_size.as_bytes() as _),
185 config.index.staging_ttl,
186 )
187 .await?;
188 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
189 .await?
190 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
191 let index_build_job_pool =
192 Arc::new(LocalScheduler::new(config.max_background_index_builds));
193 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
194 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
195 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
196 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
198 let write_cache = write_cache_from_config(
199 &config,
200 puffin_manager_factory.clone(),
201 intermediate_manager.clone(),
202 )
203 .await?;
204 let cache_manager = Arc::new(
205 CacheManager::builder()
206 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
207 .vector_cache_size(config.vector_cache_size.as_bytes())
208 .page_cache_size(config.page_cache_size.as_bytes())
209 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
210 .range_result_cache_size(config.range_result_cache_size.as_bytes())
211 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
212 .index_content_size(config.index.content_cache_size.as_bytes())
213 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
214 .index_result_cache_size(config.index.result_cache_size.as_bytes())
215 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
216 .write_cache(write_cache)
217 .build(),
218 );
219 let time_provider = Arc::new(StdTimeProvider);
220 let total_memory = get_total_memory_bytes();
221 let total_memory = if total_memory > 0 {
222 total_memory as u64
223 } else {
224 0
225 };
226 let compaction_limit_bytes = config
227 .experimental_compaction_memory_limit
228 .resolve(total_memory);
229 let compaction_memory_manager =
230 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
231 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
232
233 let workers = (0..config.num_workers)
234 .map(|id| {
235 WorkerStarter {
236 id: id as WorkerId,
237 config: config.clone(),
238 log_store: log_store.clone(),
239 object_store_manager: object_store_manager.clone(),
240 write_buffer_manager: write_buffer_manager.clone(),
241 index_build_job_pool: index_build_job_pool.clone(),
242 flush_job_pool: flush_job_pool.clone(),
243 compact_job_pool: compact_job_pool.clone(),
244 purge_scheduler: purge_scheduler.clone(),
245 listener: WorkerListener::default(),
246 cache_manager: cache_manager.clone(),
247 compaction_memory_manager: compaction_memory_manager.clone(),
248 puffin_manager_factory: puffin_manager_factory.clone(),
249 intermediate_manager: intermediate_manager.clone(),
250 time_provider: time_provider.clone(),
251 flush_sender: flush_sender.clone(),
252 flush_receiver: flush_receiver.clone(),
253 plugins: plugins.clone(),
254 schema_metadata_manager: schema_metadata_manager.clone(),
255 file_ref_manager: file_ref_manager.clone(),
256 partition_expr_fetcher: partition_expr_fetcher.clone(),
257 flush_semaphore: flush_semaphore.clone(),
258 }
259 .start()
260 })
261 .collect::<Result<Vec<_>>>()?;
262
263 Ok(WorkerGroup {
264 workers,
265 flush_job_pool,
266 compact_job_pool,
267 index_build_job_pool,
268 purge_scheduler,
269 cache_manager,
270 file_ref_manager,
271 gc_limiter,
272 object_store_manager,
273 puffin_manager_factory,
274 intermediate_manager,
275 schema_metadata_manager,
276 })
277 }
278
279 pub(crate) async fn stop(&self) -> Result<()> {
281 info!("Stop region worker group");
282
283 self.compact_job_pool.stop(true).await?;
286 self.flush_job_pool.stop(true).await?;
288 self.purge_scheduler.stop(true).await?;
290 self.index_build_job_pool.stop(true).await?;
292
293 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
294
295 Ok(())
296 }
297
298 pub(crate) async fn submit_to_worker(
300 &self,
301 region_id: RegionId,
302 request: WorkerRequest,
303 ) -> Result<()> {
304 self.worker(region_id).submit_request(request).await
305 }
306
307 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
309 self.worker(region_id).is_region_exists(region_id)
310 }
311
312 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
314 self.worker(region_id).is_region_opening(region_id)
315 }
316
317 pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
319 self.worker(region_id).is_region_catching_up(region_id)
320 }
321
322 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
326 self.worker(region_id).get_region(region_id)
327 }
328
329 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
331 self.cache_manager.clone()
332 }
333
334 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
335 self.file_ref_manager.clone()
336 }
337
338 pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
339 self.gc_limiter.clone()
340 }
341
342 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
344 let index = region_id_to_index(region_id, self.workers.len());
345
346 &self.workers[index]
347 }
348
349 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
350 self.workers
351 .iter()
352 .flat_map(|worker| worker.regions.list_regions())
353 }
354
355 pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef {
356 &self.object_store_manager
357 }
358
359 pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
360 &self.puffin_manager_factory
361 }
362
363 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
364 &self.intermediate_manager
365 }
366
367 pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
368 &self.schema_metadata_manager
369 }
370}
371
372#[cfg(any(test, feature = "test"))]
374impl WorkerGroup {
375 #[allow(clippy::too_many_arguments)]
379 pub(crate) async fn start_for_test<S: LogStore>(
380 config: Arc<MitoConfig>,
381 log_store: Arc<S>,
382 object_store_manager: ObjectStoreManagerRef,
383 write_buffer_manager: Option<WriteBufferManagerRef>,
384 listener: Option<crate::engine::listener::EventListenerRef>,
385 schema_metadata_manager: SchemaMetadataManagerRef,
386 file_ref_manager: FileReferenceManagerRef,
387 time_provider: TimeProviderRef,
388 partition_expr_fetcher: PartitionExprFetcherRef,
389 ) -> Result<WorkerGroup> {
390 let (flush_sender, flush_receiver) = watch::channel(());
391 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
392 Arc::new(
393 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
394 .with_notifier(flush_sender.clone()),
395 )
396 });
397 let index_build_job_pool =
398 Arc::new(LocalScheduler::new(config.max_background_index_builds));
399 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
400 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
401 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
402 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
403 let puffin_manager_factory = PuffinManagerFactory::new(
404 &config.index.aux_path,
405 config.index.staging_size.as_bytes(),
406 Some(config.index.write_buffer_size.as_bytes() as _),
407 config.index.staging_ttl,
408 )
409 .await?;
410 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
411 .await?
412 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
413 let write_cache = write_cache_from_config(
414 &config,
415 puffin_manager_factory.clone(),
416 intermediate_manager.clone(),
417 )
418 .await?;
419 let cache_manager = Arc::new(
420 CacheManager::builder()
421 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
422 .vector_cache_size(config.vector_cache_size.as_bytes())
423 .page_cache_size(config.page_cache_size.as_bytes())
424 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
425 .range_result_cache_size(config.range_result_cache_size.as_bytes())
426 .write_cache(write_cache)
427 .build(),
428 );
429 let total_memory = get_total_memory_bytes();
430 let total_memory = if total_memory > 0 {
431 total_memory as u64
432 } else {
433 0
434 };
435 let compaction_limit_bytes = config
436 .experimental_compaction_memory_limit
437 .resolve(total_memory);
438 let compaction_memory_manager =
439 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
440 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
441 let workers = (0..config.num_workers)
442 .map(|id| {
443 WorkerStarter {
444 id: id as WorkerId,
445 config: config.clone(),
446 log_store: log_store.clone(),
447 object_store_manager: object_store_manager.clone(),
448 write_buffer_manager: write_buffer_manager.clone(),
449 index_build_job_pool: index_build_job_pool.clone(),
450 flush_job_pool: flush_job_pool.clone(),
451 compact_job_pool: compact_job_pool.clone(),
452 purge_scheduler: purge_scheduler.clone(),
453 listener: WorkerListener::new(listener.clone()),
454 cache_manager: cache_manager.clone(),
455 compaction_memory_manager: compaction_memory_manager.clone(),
456 puffin_manager_factory: puffin_manager_factory.clone(),
457 intermediate_manager: intermediate_manager.clone(),
458 time_provider: time_provider.clone(),
459 flush_sender: flush_sender.clone(),
460 flush_receiver: flush_receiver.clone(),
461 plugins: Plugins::new(),
462 schema_metadata_manager: schema_metadata_manager.clone(),
463 file_ref_manager: file_ref_manager.clone(),
464 partition_expr_fetcher: partition_expr_fetcher.clone(),
465 flush_semaphore: flush_semaphore.clone(),
466 }
467 .start()
468 })
469 .collect::<Result<Vec<_>>>()?;
470
471 Ok(WorkerGroup {
472 workers,
473 flush_job_pool,
474 compact_job_pool,
475 index_build_job_pool,
476 purge_scheduler,
477 cache_manager,
478 file_ref_manager,
479 gc_limiter,
480 object_store_manager,
481 puffin_manager_factory,
482 intermediate_manager,
483 schema_metadata_manager,
484 })
485 }
486
487 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
489 &self.purge_scheduler
490 }
491}
492
493fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
494 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
495 % num_workers
496}
497
498pub async fn write_cache_from_config(
499 config: &MitoConfig,
500 puffin_manager_factory: PuffinManagerFactory,
501 intermediate_manager: IntermediateManager,
502) -> Result<Option<WriteCacheRef>> {
503 if !config.enable_write_cache {
504 return Ok(None);
505 }
506
507 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
508 .await
509 .context(CreateDirSnafu {
510 dir: &config.write_cache_path,
511 })?;
512
513 let cache = WriteCache::new_fs(
514 &config.write_cache_path,
515 config.write_cache_size,
516 config.write_cache_ttl,
517 Some(config.index_cache_percent),
518 config.enable_refill_cache_on_read,
519 puffin_manager_factory,
520 intermediate_manager,
521 config.manifest_cache_size,
522 )
523 .await?;
524 Ok(Some(Arc::new(cache)))
525}
526
527pub(crate) fn worker_init_check_delay() -> Duration {
529 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
530 Duration::from_secs(init_check_delay)
531}
532
533struct WorkerStarter<S> {
535 id: WorkerId,
536 config: Arc<MitoConfig>,
537 log_store: Arc<S>,
538 object_store_manager: ObjectStoreManagerRef,
539 write_buffer_manager: WriteBufferManagerRef,
540 compact_job_pool: SchedulerRef,
541 index_build_job_pool: SchedulerRef,
542 flush_job_pool: SchedulerRef,
543 purge_scheduler: SchedulerRef,
544 listener: WorkerListener,
545 cache_manager: CacheManagerRef,
546 compaction_memory_manager: Arc<CompactionMemoryManager>,
547 puffin_manager_factory: PuffinManagerFactory,
548 intermediate_manager: IntermediateManager,
549 time_provider: TimeProviderRef,
550 flush_sender: watch::Sender<()>,
552 flush_receiver: watch::Receiver<()>,
554 plugins: Plugins,
555 schema_metadata_manager: SchemaMetadataManagerRef,
556 file_ref_manager: FileReferenceManagerRef,
557 partition_expr_fetcher: PartitionExprFetcherRef,
558 flush_semaphore: Arc<Semaphore>,
559}
560
561impl<S: LogStore> WorkerStarter<S> {
562 fn start(self) -> Result<RegionWorker> {
564 let regions = Arc::new(RegionMap::default());
565 let opening_regions = Arc::new(OpeningRegions::default());
566 let catchup_regions = Arc::new(CatchupRegions::default());
567 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
568
569 let running = Arc::new(AtomicBool::new(true));
570 let now = self.time_provider.current_time_millis();
571 let id_string = self.id.to_string();
572 let mut worker_thread = RegionWorkerLoop {
573 id: self.id,
574 config: self.config.clone(),
575 regions: regions.clone(),
576 catchup_regions: catchup_regions.clone(),
577 dropping_regions: Arc::new(RegionMap::default()),
578 opening_regions: opening_regions.clone(),
579 sender: sender.clone(),
580 receiver,
581 wal: Wal::new(self.log_store),
582 object_store_manager: self.object_store_manager.clone(),
583 running: running.clone(),
584 memtable_builder_provider: MemtableBuilderProvider::new(
585 Some(self.write_buffer_manager.clone()),
586 self.config.clone(),
587 ),
588 purge_scheduler: self.purge_scheduler.clone(),
589 write_buffer_manager: self.write_buffer_manager,
590 index_build_scheduler: IndexBuildScheduler::new(
591 self.index_build_job_pool,
592 self.config.max_background_index_builds,
593 ),
594 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
595 compaction_scheduler: CompactionScheduler::new(
596 self.compact_job_pool,
597 sender.clone(),
598 self.cache_manager.clone(),
599 self.config.clone(),
600 self.listener.clone(),
601 self.plugins.clone(),
602 self.compaction_memory_manager.clone(),
603 self.config.experimental_compaction_on_exhausted,
604 ),
605 stalled_requests: StalledRequests::default(),
606 listener: self.listener,
607 cache_manager: self.cache_manager,
608 puffin_manager_factory: self.puffin_manager_factory,
609 intermediate_manager: self.intermediate_manager,
610 time_provider: self.time_provider,
611 last_periodical_check_millis: now,
612 flush_sender: self.flush_sender,
613 flush_receiver: self.flush_receiver,
614 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
615 region_count: REGION_COUNT.with_label_values(&[&id_string]),
616 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
617 region_edit_queues: RegionEditQueues::default(),
618 schema_metadata_manager: self.schema_metadata_manager,
619 file_ref_manager: self.file_ref_manager.clone(),
620 partition_expr_fetcher: self.partition_expr_fetcher,
621 flush_semaphore: self.flush_semaphore,
622 };
623 let handle = common_runtime::spawn_global(async move {
624 worker_thread.run().await;
625 });
626
627 Ok(RegionWorker {
628 id: self.id,
629 regions,
630 opening_regions,
631 catchup_regions,
632 sender,
633 handle: Mutex::new(Some(handle)),
634 running,
635 })
636 }
637}
638
639pub(crate) struct RegionWorker {
641 id: WorkerId,
643 regions: RegionMapRef,
645 opening_regions: OpeningRegionsRef,
647 catchup_regions: CatchupRegionsRef,
649 sender: Sender<WorkerRequestWithTime>,
651 handle: Mutex<Option<JoinHandle<()>>>,
653 running: Arc<AtomicBool>,
655}
656
657impl RegionWorker {
658 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
660 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
661 let request_with_time = WorkerRequestWithTime::new(request);
662 if self.sender.send(request_with_time).await.is_err() {
663 warn!(
664 "Worker {} is already exited but the running flag is still true",
665 self.id
666 );
667 self.set_running(false);
669 return WorkerStoppedSnafu { id: self.id }.fail();
670 }
671
672 Ok(())
673 }
674
675 async fn stop(&self) -> Result<()> {
679 let handle = self.handle.lock().await.take();
680 if let Some(handle) = handle {
681 info!("Stop region worker {}", self.id);
682
683 self.set_running(false);
684 if self
685 .sender
686 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
687 .await
688 .is_err()
689 {
690 warn!("Worker {} is already exited before stop", self.id);
691 }
692
693 handle.await.context(JoinSnafu)?;
694 }
695
696 Ok(())
697 }
698
699 fn is_running(&self) -> bool {
701 self.running.load(Ordering::Relaxed)
702 }
703
704 fn set_running(&self, value: bool) {
706 self.running.store(value, Ordering::Relaxed)
707 }
708
709 fn is_region_exists(&self, region_id: RegionId) -> bool {
711 self.regions.is_region_exists(region_id)
712 }
713
714 fn is_region_opening(&self, region_id: RegionId) -> bool {
716 self.opening_regions.is_region_exists(region_id)
717 }
718
719 fn is_region_catching_up(&self, region_id: RegionId) -> bool {
721 self.catchup_regions.is_region_exists(region_id)
722 }
723
724 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
726 self.regions.get_region(region_id)
727 }
728
729 #[cfg(test)]
730 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
732 &self.opening_regions
733 }
734
735 #[cfg(test)]
736 pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
738 &self.catchup_regions
739 }
740}
741
742impl Drop for RegionWorker {
743 fn drop(&mut self) {
744 if self.is_running() {
745 self.set_running(false);
746 }
748 }
749}
750
751type RequestBuffer = Vec<WorkerRequest>;
752
753#[derive(Default)]
757pub(crate) struct StalledRequests {
758 pub(crate) requests:
765 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
766 pub(crate) estimated_size: usize,
768}
769
770impl StalledRequests {
771 pub(crate) fn append(
773 &mut self,
774 requests: &mut Vec<SenderWriteRequest>,
775 bulk_requests: &mut Vec<SenderBulkRequest>,
776 ) {
777 for req in requests.drain(..) {
778 self.push(req);
779 }
780 for req in bulk_requests.drain(..) {
781 self.push_bulk(req);
782 }
783 }
784
785 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
787 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
788 let req_size = req.request.estimated_size();
789 *size += req_size;
790 self.estimated_size += req_size;
791 requests.push(req);
792 }
793
794 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
795 let region_id = req.region_id;
796 let (size, _, requests) = self.requests.entry(region_id).or_default();
797 let req_size = req.request.estimated_size();
798 *size += req_size;
799 self.estimated_size += req_size;
800 requests.push(req);
801 }
802
803 pub(crate) fn remove(
805 &mut self,
806 region_id: &RegionId,
807 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
808 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
809 self.estimated_size -= size;
810 (write_reqs, bulk_reqs)
811 } else {
812 (vec![], vec![])
813 }
814 }
815
816 pub(crate) fn stalled_count(&self) -> usize {
818 self.requests
819 .values()
820 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
821 .sum()
822 }
823}
824
825struct RegionWorkerLoop<S> {
827 id: WorkerId,
829 config: Arc<MitoConfig>,
831 regions: RegionMapRef,
833 dropping_regions: RegionMapRef,
835 opening_regions: OpeningRegionsRef,
837 catchup_regions: CatchupRegionsRef,
839 sender: Sender<WorkerRequestWithTime>,
841 receiver: Receiver<WorkerRequestWithTime>,
843 wal: Wal<S>,
845 object_store_manager: ObjectStoreManagerRef,
847 running: Arc<AtomicBool>,
849 memtable_builder_provider: MemtableBuilderProvider,
851 purge_scheduler: SchedulerRef,
853 write_buffer_manager: WriteBufferManagerRef,
855 index_build_scheduler: IndexBuildScheduler,
857 flush_scheduler: FlushScheduler,
859 compaction_scheduler: CompactionScheduler,
861 stalled_requests: StalledRequests,
863 listener: WorkerListener,
865 cache_manager: CacheManagerRef,
867 puffin_manager_factory: PuffinManagerFactory,
869 intermediate_manager: IntermediateManager,
871 time_provider: TimeProviderRef,
873 last_periodical_check_millis: i64,
875 flush_sender: watch::Sender<()>,
877 flush_receiver: watch::Receiver<()>,
879 stalling_count: IntGauge,
881 region_count: IntGauge,
883 request_wait_time: Histogram,
885 region_edit_queues: RegionEditQueues,
887 schema_metadata_manager: SchemaMetadataManagerRef,
889 file_ref_manager: FileReferenceManagerRef,
891 partition_expr_fetcher: PartitionExprFetcherRef,
893 flush_semaphore: Arc<Semaphore>,
895}
896
897impl<S: LogStore> RegionWorkerLoop<S> {
898 async fn run(&mut self) {
900 let init_check_delay = worker_init_check_delay();
901 info!(
902 "Start region worker thread {}, init_check_delay: {:?}",
903 self.id, init_check_delay
904 );
905 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
906
907 let mut write_req_buffer: Vec<SenderWriteRequest> =
909 Vec::with_capacity(self.config.worker_request_batch_size);
910 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
911 Vec::with_capacity(self.config.worker_request_batch_size);
912 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
913 Vec::with_capacity(self.config.worker_request_batch_size);
914 let mut general_req_buffer: Vec<WorkerRequest> =
915 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
916
917 while self.running.load(Ordering::Relaxed) {
918 write_req_buffer.clear();
920 ddl_req_buffer.clear();
921 general_req_buffer.clear();
922 let mut bulk_insert_req_num = 0;
923
924 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
925 let sleep = tokio::time::sleep(max_wait_time);
926 tokio::pin!(sleep);
927
928 tokio::select! {
929 request_opt = self.receiver.recv() => {
930 match request_opt {
931 Some(request_with_time) => {
932 let wait_time = request_with_time.created_at.elapsed();
934 self.request_wait_time.observe(wait_time.as_secs_f64());
935
936 match request_with_time.request {
937 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
938 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
939 WorkerRequest::BulkInserts(bulk_insert) => {
940 bulk_insert_req_num += 1;
941 self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
942 .await;
943 }
944 req => general_req_buffer.push(req),
945 }
946 },
947 None => break,
949 }
950 }
951 recv_res = self.flush_receiver.changed() => {
952 if recv_res.is_err() {
953 break;
955 } else {
956 self.maybe_flush_worker();
961 self.handle_stalled_requests().await;
963 continue;
964 }
965 }
966 _ = &mut sleep => {
967 self.handle_periodical_tasks();
969 continue;
970 }
971 }
972
973 if self.flush_receiver.has_changed().unwrap_or(false) {
974 self.handle_stalled_requests().await;
978 }
979
980 for _ in 1..self.config.worker_request_batch_size {
982 match self.receiver.try_recv() {
984 Ok(request_with_time) => {
985 let wait_time = request_with_time.created_at.elapsed();
987 self.request_wait_time.observe(wait_time.as_secs_f64());
988
989 match request_with_time.request {
990 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
991 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
992 WorkerRequest::BulkInserts(bulk_insert) => {
993 bulk_insert_req_num += 1;
994 self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
995 .await
996 }
997 req => general_req_buffer.push(req),
998 }
999 }
1000 Err(_) => break,
1002 }
1003 }
1004
1005 self.listener.on_recv_requests(
1006 write_req_buffer.len()
1007 + ddl_req_buffer.len()
1008 + general_req_buffer.len()
1009 + bulk_insert_req_num,
1010 );
1011
1012 self.handle_requests(
1013 &mut write_req_buffer,
1014 &mut ddl_req_buffer,
1015 &mut general_req_buffer,
1016 &mut bulk_req_buffer,
1017 )
1018 .await;
1019
1020 self.handle_periodical_tasks();
1021 }
1022
1023 self.clean().await;
1024
1025 info!("Exit region worker thread {}", self.id);
1026 }
1027
1028 async fn buffer_bulk_insert_request(
1029 &mut self,
1030 bulk_insert: BulkInsertRequest,
1031 bulk_requests: &mut Vec<SenderBulkRequest>,
1032 ) {
1033 let BulkInsertRequest {
1034 metadata,
1035 request,
1036 sender,
1037 } = bulk_insert;
1038
1039 if let Some(region_metadata) = metadata {
1040 self.handle_bulk_insert_batch(region_metadata, request, bulk_requests, sender)
1041 .await;
1042 } else {
1043 error!("Cannot find region metadata for {}", request.region_id);
1044 sender.send(
1045 error::RegionNotFoundSnafu {
1046 region_id: request.region_id,
1047 }
1048 .fail(),
1049 );
1050 }
1051 }
1052
1053 async fn handle_requests(
1057 &mut self,
1058 write_requests: &mut Vec<SenderWriteRequest>,
1059 ddl_requests: &mut Vec<SenderDdlRequest>,
1060 general_requests: &mut Vec<WorkerRequest>,
1061 bulk_requests: &mut Vec<SenderBulkRequest>,
1062 ) {
1063 for worker_req in general_requests.drain(..) {
1064 match worker_req {
1065 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
1066 continue;
1068 }
1069 WorkerRequest::BulkInserts(_) => unreachable!("bulk inserts are buffered"),
1070 WorkerRequest::Background { region_id, notify } => {
1071 if matches!(
1072 ¬ify,
1073 BackgroundNotify::RegionEdit(edit_result)
1074 if edit_result.update_region_state
1075 ) {
1076 self.handle_buffered_region_write_requests(
1085 ®ion_id,
1086 write_requests,
1087 bulk_requests,
1088 )
1089 .await;
1090 }
1091 self.handle_background_notify(region_id, notify).await;
1093 }
1094 WorkerRequest::SetRegionRoleStateGracefully {
1095 region_id,
1096 region_role_state,
1097 sender,
1098 } => {
1099 self.set_role_state_gracefully(region_id, region_role_state, sender)
1100 .await;
1101 }
1102 WorkerRequest::EditRegion(request) => {
1103 self.handle_region_edit(request);
1104 }
1105 WorkerRequest::Stop => {
1106 debug_assert!(!self.running.load(Ordering::Relaxed));
1107 }
1108 WorkerRequest::SyncRegion(req) => {
1109 self.handle_region_sync(req).await;
1110 }
1111 WorkerRequest::RemapManifests(req) => {
1112 self.handle_remap_manifests_request(req);
1113 }
1114 WorkerRequest::CopyRegionFrom(req) => {
1115 self.handle_copy_region_from_request(req);
1116 }
1117 }
1118 }
1119
1120 self.handle_write_requests(write_requests, bulk_requests, true)
1123 .await;
1124
1125 self.handle_ddl_requests(ddl_requests).await;
1126 }
1127
1128 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1130 if ddl_requests.is_empty() {
1131 return;
1132 }
1133
1134 for ddl in ddl_requests.drain(..) {
1135 let res = match ddl.request {
1136 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1137 DdlRequest::Drop(req) => {
1138 self.handle_drop_request(ddl.region_id, req.partial_drop)
1139 .await
1140 }
1141 DdlRequest::Open((req, wal_entry_receiver)) => {
1142 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1143 .await;
1144 continue;
1145 }
1146 DdlRequest::Close(_) => {
1147 self.handle_close_request(ddl.region_id, ddl.sender).await;
1148 continue;
1149 }
1150 DdlRequest::Alter(req) => {
1151 self.handle_alter_request(ddl.region_id, req, ddl.sender)
1152 .await;
1153 continue;
1154 }
1155 DdlRequest::Flush(req) => {
1156 self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
1157 continue;
1158 }
1159 DdlRequest::Compact(req) => {
1160 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1161 .await;
1162 continue;
1163 }
1164 DdlRequest::BuildIndex(req) => {
1165 self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1166 .await;
1167 continue;
1168 }
1169 DdlRequest::Truncate(req) => {
1170 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1171 .await;
1172 continue;
1173 }
1174 DdlRequest::Catchup((req, wal_entry_receiver)) => {
1175 self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1176 .await;
1177 continue;
1178 }
1179 DdlRequest::EnterStaging(req) => {
1180 self.handle_enter_staging_request(
1181 ddl.region_id,
1182 req.partition_directive,
1183 ddl.sender,
1184 )
1185 .await;
1186 continue;
1187 }
1188 DdlRequest::ApplyStagingManifest(req) => {
1189 self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1190 .await;
1191 continue;
1192 }
1193 };
1194
1195 ddl.sender.send(res);
1196 }
1197 }
1198
1199 fn handle_periodical_tasks(&mut self) {
1201 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1202 if self
1203 .time_provider
1204 .elapsed_since(self.last_periodical_check_millis)
1205 < interval
1206 {
1207 return;
1208 }
1209
1210 self.last_periodical_check_millis = self.time_provider.current_time_millis();
1211
1212 if let Err(e) = self.flush_periodically() {
1213 error!(e; "Failed to flush regions periodically");
1214 }
1215 }
1216
1217 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1219 match notify {
1220 BackgroundNotify::FlushFinished(req) => {
1221 self.handle_flush_finished(region_id, req).await
1222 }
1223 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1224 BackgroundNotify::IndexBuildFinished(req) => {
1225 self.handle_index_build_finished(region_id, req).await
1226 }
1227 BackgroundNotify::IndexBuildStopped(req) => {
1228 self.handle_index_build_stopped(region_id, req).await
1229 }
1230 BackgroundNotify::IndexBuildFailed(req) => {
1231 self.handle_index_build_failed(region_id, req).await
1232 }
1233 BackgroundNotify::CompactionFinished(req) => {
1234 self.handle_compaction_finished(region_id, req).await
1235 }
1236 BackgroundNotify::CompactionCancelled(req) => {
1237 self.handle_compaction_cancelled(region_id, req).await
1238 }
1239 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1240 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1241 BackgroundNotify::RegionChange(req) => {
1242 self.handle_manifest_region_change_result(req).await
1243 }
1244 BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1245 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1246 BackgroundNotify::CopyRegionFromFinished(req) => {
1247 self.handle_copy_region_from_finished(req)
1248 }
1249 }
1250 }
1251
1252 async fn set_role_state_gracefully(
1254 &mut self,
1255 region_id: RegionId,
1256 region_role_state: SettableRegionRoleState,
1257 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1258 ) {
1259 if let Some(region) = self.regions.get_region(region_id) {
1260 common_runtime::spawn_global(async move {
1262 match region.set_role_state_gracefully(region_role_state).await {
1263 Ok(()) => {
1264 let last_entry_id = region.version_control.current().last_entry_id;
1265 let _ = sender.send(SetRegionRoleStateResponse::success(
1266 SetRegionRoleStateSuccess::mito(last_entry_id),
1267 ));
1268 }
1269 Err(e) => {
1270 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1271 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1272 BoxedError::new(e),
1273 ));
1274 }
1275 }
1276 });
1277 } else {
1278 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1279 }
1280 }
1281}
1282
1283impl<S> RegionWorkerLoop<S> {
1284 async fn clean(&self) {
1286 let regions = self.regions.list_regions();
1288 for region in regions {
1289 region.stop().await;
1290 }
1291
1292 self.regions.clear();
1293 }
1294
1295 fn notify_group(&mut self) {
1298 let _ = self.flush_sender.send(());
1300 self.flush_receiver.borrow_and_update();
1302 }
1303}
1304
1305#[derive(Default, Clone)]
1307pub(crate) struct WorkerListener {
1308 #[cfg(any(test, feature = "test"))]
1309 listener: Option<crate::engine::listener::EventListenerRef>,
1310}
1311
1312impl WorkerListener {
1313 #[cfg(any(test, feature = "test"))]
1314 pub(crate) fn new(
1315 listener: Option<crate::engine::listener::EventListenerRef>,
1316 ) -> WorkerListener {
1317 WorkerListener { listener }
1318 }
1319
1320 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1322 #[cfg(any(test, feature = "test"))]
1323 if let Some(listener) = &self.listener {
1324 listener.on_flush_success(region_id);
1325 }
1326 let _ = region_id;
1328 }
1329
1330 pub(crate) fn on_write_stall(&self) {
1332 #[cfg(any(test, feature = "test"))]
1333 if let Some(listener) = &self.listener {
1334 listener.on_write_stall();
1335 }
1336 }
1337
1338 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1339 #[cfg(any(test, feature = "test"))]
1340 if let Some(listener) = &self.listener {
1341 listener.on_flush_begin(region_id).await;
1342 }
1343 let _ = region_id;
1345 }
1346
1347 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1348 #[cfg(any(test, feature = "test"))]
1349 if let Some(listener) = &self.listener {
1350 return listener.on_later_drop_begin(region_id);
1351 }
1352 let _ = region_id;
1354 None
1355 }
1356
1357 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1359 #[cfg(any(test, feature = "test"))]
1360 if let Some(listener) = &self.listener {
1361 listener.on_later_drop_end(region_id, removed);
1362 }
1363 let _ = region_id;
1365 let _ = removed;
1366 }
1367
1368 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1369 #[cfg(any(test, feature = "test"))]
1370 if let Some(listener) = &self.listener {
1371 listener.on_merge_ssts_finished(region_id).await;
1372 }
1373 let _ = region_id;
1375 }
1376
1377 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1378 #[cfg(any(test, feature = "test"))]
1379 if let Some(listener) = &self.listener {
1380 listener.on_recv_requests(request_num);
1381 }
1382 let _ = request_num;
1384 }
1385
1386 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1387 #[cfg(any(test, feature = "test"))]
1388 if let Some(listener) = &self.listener {
1389 listener.on_file_cache_filled(_file_id);
1390 }
1391 }
1392
1393 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1394 #[cfg(any(test, feature = "test"))]
1395 if let Some(listener) = &self.listener {
1396 listener.on_compaction_scheduled(_region_id);
1397 }
1398 }
1399
1400 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1401 #[cfg(any(test, feature = "test"))]
1402 if let Some(listener) = &self.listener {
1403 listener
1404 .on_notify_region_change_result_begin(_region_id)
1405 .await;
1406 }
1407 }
1408
1409 pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1410 #[cfg(any(test, feature = "test"))]
1411 if let Some(listener) = &self.listener {
1412 listener.on_enter_staging_result_begin(_region_id).await;
1413 }
1414 }
1415
1416 pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1417 #[cfg(any(test, feature = "test"))]
1418 if let Some(listener) = &self.listener {
1419 listener.on_index_build_finish(_region_file_id).await;
1420 }
1421 }
1422
1423 pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1424 #[cfg(any(test, feature = "test"))]
1425 if let Some(listener) = &self.listener {
1426 listener.on_index_build_begin(_region_file_id).await;
1427 }
1428 }
1429
1430 pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1431 #[cfg(any(test, feature = "test"))]
1432 if let Some(listener) = &self.listener {
1433 listener.on_index_build_abort(_region_file_id).await;
1434 }
1435 }
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440 use super::*;
1441 use crate::test_util::TestEnv;
1442
1443 #[test]
1444 fn test_region_id_to_index() {
1445 let num_workers = 4;
1446
1447 let region_id = RegionId::new(1, 2);
1448 let index = region_id_to_index(region_id, num_workers);
1449 assert_eq!(index, 3);
1450
1451 let region_id = RegionId::new(2, 3);
1452 let index = region_id_to_index(region_id, num_workers);
1453 assert_eq!(index, 1);
1454 }
1455
1456 #[tokio::test]
1457 async fn test_worker_group_start_stop() {
1458 let env = TestEnv::with_prefix("group-stop").await;
1459 let group = env
1460 .create_worker_group(MitoConfig {
1461 num_workers: 4,
1462 ..Default::default()
1463 })
1464 .await;
1465
1466 group.stop().await.unwrap();
1467 }
1468}