1mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72 CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73 RegionMapRef,
74};
75use crate::request::{
76 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
77 WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100pub(crate) struct WorkerGroup {
137 workers: Vec<RegionWorker>,
139 flush_job_pool: SchedulerRef,
141 compact_job_pool: SchedulerRef,
143 index_build_job_pool: SchedulerRef,
145 purge_scheduler: SchedulerRef,
147 cache_manager: CacheManagerRef,
149 file_ref_manager: FileReferenceManagerRef,
151 gc_limiter: GcLimiterRef,
153 object_store_manager: ObjectStoreManagerRef,
155 puffin_manager_factory: PuffinManagerFactory,
157 intermediate_manager: IntermediateManager,
159 schema_metadata_manager: SchemaMetadataManagerRef,
161}
162
163impl WorkerGroup {
164 pub(crate) async fn start<S: LogStore>(
168 config: Arc<MitoConfig>,
169 log_store: Arc<S>,
170 object_store_manager: ObjectStoreManagerRef,
171 schema_metadata_manager: SchemaMetadataManagerRef,
172 file_ref_manager: FileReferenceManagerRef,
173 partition_expr_fetcher: PartitionExprFetcherRef,
174 plugins: Plugins,
175 ) -> Result<WorkerGroup> {
176 let (flush_sender, flush_receiver) = watch::channel(());
177 let write_buffer_manager = Arc::new(
178 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
179 .with_notifier(flush_sender.clone()),
180 );
181 let puffin_manager_factory = PuffinManagerFactory::new(
182 &config.index.aux_path,
183 config.index.staging_size.as_bytes(),
184 Some(config.index.write_buffer_size.as_bytes() as _),
185 config.index.staging_ttl,
186 )
187 .await?;
188 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
189 .await?
190 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
191 let index_build_job_pool =
192 Arc::new(LocalScheduler::new(config.max_background_index_builds));
193 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
194 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
195 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
196 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
198 let write_cache = write_cache_from_config(
199 &config,
200 puffin_manager_factory.clone(),
201 intermediate_manager.clone(),
202 )
203 .await?;
204 let cache_manager = Arc::new(
205 CacheManager::builder()
206 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
207 .vector_cache_size(config.vector_cache_size.as_bytes())
208 .page_cache_size(config.page_cache_size.as_bytes())
209 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
210 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
211 .index_content_size(config.index.content_cache_size.as_bytes())
212 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
213 .index_result_cache_size(config.index.result_cache_size.as_bytes())
214 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
215 .write_cache(write_cache)
216 .build(),
217 );
218 let time_provider = Arc::new(StdTimeProvider);
219 let total_memory = get_total_memory_bytes();
220 let total_memory = if total_memory > 0 {
221 total_memory as u64
222 } else {
223 0
224 };
225 let compaction_limit_bytes = config
226 .experimental_compaction_memory_limit
227 .resolve(total_memory);
228 let compaction_memory_manager =
229 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
230 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
231
232 let workers = (0..config.num_workers)
233 .map(|id| {
234 WorkerStarter {
235 id: id as WorkerId,
236 config: config.clone(),
237 log_store: log_store.clone(),
238 object_store_manager: object_store_manager.clone(),
239 write_buffer_manager: write_buffer_manager.clone(),
240 index_build_job_pool: index_build_job_pool.clone(),
241 flush_job_pool: flush_job_pool.clone(),
242 compact_job_pool: compact_job_pool.clone(),
243 purge_scheduler: purge_scheduler.clone(),
244 listener: WorkerListener::default(),
245 cache_manager: cache_manager.clone(),
246 compaction_memory_manager: compaction_memory_manager.clone(),
247 puffin_manager_factory: puffin_manager_factory.clone(),
248 intermediate_manager: intermediate_manager.clone(),
249 time_provider: time_provider.clone(),
250 flush_sender: flush_sender.clone(),
251 flush_receiver: flush_receiver.clone(),
252 plugins: plugins.clone(),
253 schema_metadata_manager: schema_metadata_manager.clone(),
254 file_ref_manager: file_ref_manager.clone(),
255 partition_expr_fetcher: partition_expr_fetcher.clone(),
256 flush_semaphore: flush_semaphore.clone(),
257 }
258 .start()
259 })
260 .collect::<Result<Vec<_>>>()?;
261
262 Ok(WorkerGroup {
263 workers,
264 flush_job_pool,
265 compact_job_pool,
266 index_build_job_pool,
267 purge_scheduler,
268 cache_manager,
269 file_ref_manager,
270 gc_limiter,
271 object_store_manager,
272 puffin_manager_factory,
273 intermediate_manager,
274 schema_metadata_manager,
275 })
276 }
277
278 pub(crate) async fn stop(&self) -> Result<()> {
280 info!("Stop region worker group");
281
282 self.compact_job_pool.stop(true).await?;
285 self.flush_job_pool.stop(true).await?;
287 self.purge_scheduler.stop(true).await?;
289 self.index_build_job_pool.stop(true).await?;
291
292 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
293
294 Ok(())
295 }
296
297 pub(crate) async fn submit_to_worker(
299 &self,
300 region_id: RegionId,
301 request: WorkerRequest,
302 ) -> Result<()> {
303 self.worker(region_id).submit_request(request).await
304 }
305
306 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
308 self.worker(region_id).is_region_exists(region_id)
309 }
310
311 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
313 self.worker(region_id).is_region_opening(region_id)
314 }
315
316 pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
318 self.worker(region_id).is_region_catching_up(region_id)
319 }
320
321 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
325 self.worker(region_id).get_region(region_id)
326 }
327
328 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
330 self.cache_manager.clone()
331 }
332
333 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
334 self.file_ref_manager.clone()
335 }
336
337 pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
338 self.gc_limiter.clone()
339 }
340
341 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
343 let index = region_id_to_index(region_id, self.workers.len());
344
345 &self.workers[index]
346 }
347
348 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
349 self.workers
350 .iter()
351 .flat_map(|worker| worker.regions.list_regions())
352 }
353
354 pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef {
355 &self.object_store_manager
356 }
357
358 pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
359 &self.puffin_manager_factory
360 }
361
362 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
363 &self.intermediate_manager
364 }
365
366 pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
367 &self.schema_metadata_manager
368 }
369}
370
371#[cfg(any(test, feature = "test"))]
373impl WorkerGroup {
374 #[allow(clippy::too_many_arguments)]
378 pub(crate) async fn start_for_test<S: LogStore>(
379 config: Arc<MitoConfig>,
380 log_store: Arc<S>,
381 object_store_manager: ObjectStoreManagerRef,
382 write_buffer_manager: Option<WriteBufferManagerRef>,
383 listener: Option<crate::engine::listener::EventListenerRef>,
384 schema_metadata_manager: SchemaMetadataManagerRef,
385 file_ref_manager: FileReferenceManagerRef,
386 time_provider: TimeProviderRef,
387 partition_expr_fetcher: PartitionExprFetcherRef,
388 ) -> Result<WorkerGroup> {
389 let (flush_sender, flush_receiver) = watch::channel(());
390 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
391 Arc::new(
392 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
393 .with_notifier(flush_sender.clone()),
394 )
395 });
396 let index_build_job_pool =
397 Arc::new(LocalScheduler::new(config.max_background_index_builds));
398 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
399 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
400 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
401 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
402 let puffin_manager_factory = PuffinManagerFactory::new(
403 &config.index.aux_path,
404 config.index.staging_size.as_bytes(),
405 Some(config.index.write_buffer_size.as_bytes() as _),
406 config.index.staging_ttl,
407 )
408 .await?;
409 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
410 .await?
411 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
412 let write_cache = write_cache_from_config(
413 &config,
414 puffin_manager_factory.clone(),
415 intermediate_manager.clone(),
416 )
417 .await?;
418 let cache_manager = Arc::new(
419 CacheManager::builder()
420 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
421 .vector_cache_size(config.vector_cache_size.as_bytes())
422 .page_cache_size(config.page_cache_size.as_bytes())
423 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
424 .write_cache(write_cache)
425 .build(),
426 );
427 let total_memory = get_total_memory_bytes();
428 let total_memory = if total_memory > 0 {
429 total_memory as u64
430 } else {
431 0
432 };
433 let compaction_limit_bytes = config
434 .experimental_compaction_memory_limit
435 .resolve(total_memory);
436 let compaction_memory_manager =
437 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
438 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
439 let workers = (0..config.num_workers)
440 .map(|id| {
441 WorkerStarter {
442 id: id as WorkerId,
443 config: config.clone(),
444 log_store: log_store.clone(),
445 object_store_manager: object_store_manager.clone(),
446 write_buffer_manager: write_buffer_manager.clone(),
447 index_build_job_pool: index_build_job_pool.clone(),
448 flush_job_pool: flush_job_pool.clone(),
449 compact_job_pool: compact_job_pool.clone(),
450 purge_scheduler: purge_scheduler.clone(),
451 listener: WorkerListener::new(listener.clone()),
452 cache_manager: cache_manager.clone(),
453 compaction_memory_manager: compaction_memory_manager.clone(),
454 puffin_manager_factory: puffin_manager_factory.clone(),
455 intermediate_manager: intermediate_manager.clone(),
456 time_provider: time_provider.clone(),
457 flush_sender: flush_sender.clone(),
458 flush_receiver: flush_receiver.clone(),
459 plugins: Plugins::new(),
460 schema_metadata_manager: schema_metadata_manager.clone(),
461 file_ref_manager: file_ref_manager.clone(),
462 partition_expr_fetcher: partition_expr_fetcher.clone(),
463 flush_semaphore: flush_semaphore.clone(),
464 }
465 .start()
466 })
467 .collect::<Result<Vec<_>>>()?;
468
469 Ok(WorkerGroup {
470 workers,
471 flush_job_pool,
472 compact_job_pool,
473 index_build_job_pool,
474 purge_scheduler,
475 cache_manager,
476 file_ref_manager,
477 gc_limiter,
478 object_store_manager,
479 puffin_manager_factory,
480 intermediate_manager,
481 schema_metadata_manager,
482 })
483 }
484
485 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
487 &self.purge_scheduler
488 }
489}
490
491fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
492 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
493 % num_workers
494}
495
496pub async fn write_cache_from_config(
497 config: &MitoConfig,
498 puffin_manager_factory: PuffinManagerFactory,
499 intermediate_manager: IntermediateManager,
500) -> Result<Option<WriteCacheRef>> {
501 if !config.enable_write_cache {
502 return Ok(None);
503 }
504
505 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
506 .await
507 .context(CreateDirSnafu {
508 dir: &config.write_cache_path,
509 })?;
510
511 let cache = WriteCache::new_fs(
512 &config.write_cache_path,
513 config.write_cache_size,
514 config.write_cache_ttl,
515 Some(config.index_cache_percent),
516 config.enable_refill_cache_on_read,
517 puffin_manager_factory,
518 intermediate_manager,
519 config.manifest_cache_size,
520 )
521 .await?;
522 Ok(Some(Arc::new(cache)))
523}
524
525pub(crate) fn worker_init_check_delay() -> Duration {
527 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
528 Duration::from_secs(init_check_delay)
529}
530
531struct WorkerStarter<S> {
533 id: WorkerId,
534 config: Arc<MitoConfig>,
535 log_store: Arc<S>,
536 object_store_manager: ObjectStoreManagerRef,
537 write_buffer_manager: WriteBufferManagerRef,
538 compact_job_pool: SchedulerRef,
539 index_build_job_pool: SchedulerRef,
540 flush_job_pool: SchedulerRef,
541 purge_scheduler: SchedulerRef,
542 listener: WorkerListener,
543 cache_manager: CacheManagerRef,
544 compaction_memory_manager: Arc<CompactionMemoryManager>,
545 puffin_manager_factory: PuffinManagerFactory,
546 intermediate_manager: IntermediateManager,
547 time_provider: TimeProviderRef,
548 flush_sender: watch::Sender<()>,
550 flush_receiver: watch::Receiver<()>,
552 plugins: Plugins,
553 schema_metadata_manager: SchemaMetadataManagerRef,
554 file_ref_manager: FileReferenceManagerRef,
555 partition_expr_fetcher: PartitionExprFetcherRef,
556 flush_semaphore: Arc<Semaphore>,
557}
558
559impl<S: LogStore> WorkerStarter<S> {
560 fn start(self) -> Result<RegionWorker> {
562 let regions = Arc::new(RegionMap::default());
563 let opening_regions = Arc::new(OpeningRegions::default());
564 let catchup_regions = Arc::new(CatchupRegions::default());
565 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
566
567 let running = Arc::new(AtomicBool::new(true));
568 let now = self.time_provider.current_time_millis();
569 let id_string = self.id.to_string();
570 let mut worker_thread = RegionWorkerLoop {
571 id: self.id,
572 config: self.config.clone(),
573 regions: regions.clone(),
574 catchup_regions: catchup_regions.clone(),
575 dropping_regions: Arc::new(RegionMap::default()),
576 opening_regions: opening_regions.clone(),
577 sender: sender.clone(),
578 receiver,
579 wal: Wal::new(self.log_store),
580 object_store_manager: self.object_store_manager.clone(),
581 running: running.clone(),
582 memtable_builder_provider: MemtableBuilderProvider::new(
583 Some(self.write_buffer_manager.clone()),
584 self.config.clone(),
585 ),
586 purge_scheduler: self.purge_scheduler.clone(),
587 write_buffer_manager: self.write_buffer_manager,
588 index_build_scheduler: IndexBuildScheduler::new(
589 self.index_build_job_pool,
590 self.config.max_background_index_builds,
591 ),
592 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
593 compaction_scheduler: CompactionScheduler::new(
594 self.compact_job_pool,
595 sender.clone(),
596 self.cache_manager.clone(),
597 self.config.clone(),
598 self.listener.clone(),
599 self.plugins.clone(),
600 self.compaction_memory_manager.clone(),
601 self.config.experimental_compaction_on_exhausted,
602 ),
603 stalled_requests: StalledRequests::default(),
604 listener: self.listener,
605 cache_manager: self.cache_manager,
606 puffin_manager_factory: self.puffin_manager_factory,
607 intermediate_manager: self.intermediate_manager,
608 time_provider: self.time_provider,
609 last_periodical_check_millis: now,
610 flush_sender: self.flush_sender,
611 flush_receiver: self.flush_receiver,
612 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
613 region_count: REGION_COUNT.with_label_values(&[&id_string]),
614 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
615 region_edit_queues: RegionEditQueues::default(),
616 schema_metadata_manager: self.schema_metadata_manager,
617 file_ref_manager: self.file_ref_manager.clone(),
618 partition_expr_fetcher: self.partition_expr_fetcher,
619 flush_semaphore: self.flush_semaphore,
620 };
621 let handle = common_runtime::spawn_global(async move {
622 worker_thread.run().await;
623 });
624
625 Ok(RegionWorker {
626 id: self.id,
627 regions,
628 opening_regions,
629 catchup_regions,
630 sender,
631 handle: Mutex::new(Some(handle)),
632 running,
633 })
634 }
635}
636
637pub(crate) struct RegionWorker {
639 id: WorkerId,
641 regions: RegionMapRef,
643 opening_regions: OpeningRegionsRef,
645 catchup_regions: CatchupRegionsRef,
647 sender: Sender<WorkerRequestWithTime>,
649 handle: Mutex<Option<JoinHandle<()>>>,
651 running: Arc<AtomicBool>,
653}
654
655impl RegionWorker {
656 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
658 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
659 let request_with_time = WorkerRequestWithTime::new(request);
660 if self.sender.send(request_with_time).await.is_err() {
661 warn!(
662 "Worker {} is already exited but the running flag is still true",
663 self.id
664 );
665 self.set_running(false);
667 return WorkerStoppedSnafu { id: self.id }.fail();
668 }
669
670 Ok(())
671 }
672
673 async fn stop(&self) -> Result<()> {
677 let handle = self.handle.lock().await.take();
678 if let Some(handle) = handle {
679 info!("Stop region worker {}", self.id);
680
681 self.set_running(false);
682 if self
683 .sender
684 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
685 .await
686 .is_err()
687 {
688 warn!("Worker {} is already exited before stop", self.id);
689 }
690
691 handle.await.context(JoinSnafu)?;
692 }
693
694 Ok(())
695 }
696
697 fn is_running(&self) -> bool {
699 self.running.load(Ordering::Relaxed)
700 }
701
702 fn set_running(&self, value: bool) {
704 self.running.store(value, Ordering::Relaxed)
705 }
706
707 fn is_region_exists(&self, region_id: RegionId) -> bool {
709 self.regions.is_region_exists(region_id)
710 }
711
712 fn is_region_opening(&self, region_id: RegionId) -> bool {
714 self.opening_regions.is_region_exists(region_id)
715 }
716
717 fn is_region_catching_up(&self, region_id: RegionId) -> bool {
719 self.catchup_regions.is_region_exists(region_id)
720 }
721
722 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
724 self.regions.get_region(region_id)
725 }
726
727 #[cfg(test)]
728 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
730 &self.opening_regions
731 }
732
733 #[cfg(test)]
734 pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
736 &self.catchup_regions
737 }
738}
739
740impl Drop for RegionWorker {
741 fn drop(&mut self) {
742 if self.is_running() {
743 self.set_running(false);
744 }
746 }
747}
748
749type RequestBuffer = Vec<WorkerRequest>;
750
751#[derive(Default)]
755pub(crate) struct StalledRequests {
756 pub(crate) requests:
763 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
764 pub(crate) estimated_size: usize,
766}
767
768impl StalledRequests {
769 pub(crate) fn append(
771 &mut self,
772 requests: &mut Vec<SenderWriteRequest>,
773 bulk_requests: &mut Vec<SenderBulkRequest>,
774 ) {
775 for req in requests.drain(..) {
776 self.push(req);
777 }
778 for req in bulk_requests.drain(..) {
779 self.push_bulk(req);
780 }
781 }
782
783 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
785 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
786 let req_size = req.request.estimated_size();
787 *size += req_size;
788 self.estimated_size += req_size;
789 requests.push(req);
790 }
791
792 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
793 let region_id = req.region_id;
794 let (size, _, requests) = self.requests.entry(region_id).or_default();
795 let req_size = req.request.estimated_size();
796 *size += req_size;
797 self.estimated_size += req_size;
798 requests.push(req);
799 }
800
801 pub(crate) fn remove(
803 &mut self,
804 region_id: &RegionId,
805 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
806 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
807 self.estimated_size -= size;
808 (write_reqs, bulk_reqs)
809 } else {
810 (vec![], vec![])
811 }
812 }
813
814 pub(crate) fn stalled_count(&self) -> usize {
816 self.requests
817 .values()
818 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
819 .sum()
820 }
821}
822
823struct RegionWorkerLoop<S> {
825 id: WorkerId,
827 config: Arc<MitoConfig>,
829 regions: RegionMapRef,
831 dropping_regions: RegionMapRef,
833 opening_regions: OpeningRegionsRef,
835 catchup_regions: CatchupRegionsRef,
837 sender: Sender<WorkerRequestWithTime>,
839 receiver: Receiver<WorkerRequestWithTime>,
841 wal: Wal<S>,
843 object_store_manager: ObjectStoreManagerRef,
845 running: Arc<AtomicBool>,
847 memtable_builder_provider: MemtableBuilderProvider,
849 purge_scheduler: SchedulerRef,
851 write_buffer_manager: WriteBufferManagerRef,
853 index_build_scheduler: IndexBuildScheduler,
855 flush_scheduler: FlushScheduler,
857 compaction_scheduler: CompactionScheduler,
859 stalled_requests: StalledRequests,
861 listener: WorkerListener,
863 cache_manager: CacheManagerRef,
865 puffin_manager_factory: PuffinManagerFactory,
867 intermediate_manager: IntermediateManager,
869 time_provider: TimeProviderRef,
871 last_periodical_check_millis: i64,
873 flush_sender: watch::Sender<()>,
875 flush_receiver: watch::Receiver<()>,
877 stalling_count: IntGauge,
879 region_count: IntGauge,
881 request_wait_time: Histogram,
883 region_edit_queues: RegionEditQueues,
885 schema_metadata_manager: SchemaMetadataManagerRef,
887 file_ref_manager: FileReferenceManagerRef,
889 partition_expr_fetcher: PartitionExprFetcherRef,
891 flush_semaphore: Arc<Semaphore>,
893}
894
895impl<S: LogStore> RegionWorkerLoop<S> {
896 async fn run(&mut self) {
898 let init_check_delay = worker_init_check_delay();
899 info!(
900 "Start region worker thread {}, init_check_delay: {:?}",
901 self.id, init_check_delay
902 );
903 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
904
905 let mut write_req_buffer: Vec<SenderWriteRequest> =
907 Vec::with_capacity(self.config.worker_request_batch_size);
908 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
909 Vec::with_capacity(self.config.worker_request_batch_size);
910 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
911 Vec::with_capacity(self.config.worker_request_batch_size);
912 let mut general_req_buffer: Vec<WorkerRequest> =
913 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
914
915 while self.running.load(Ordering::Relaxed) {
916 write_req_buffer.clear();
918 ddl_req_buffer.clear();
919 general_req_buffer.clear();
920
921 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
922 let sleep = tokio::time::sleep(max_wait_time);
923 tokio::pin!(sleep);
924
925 tokio::select! {
926 request_opt = self.receiver.recv() => {
927 match request_opt {
928 Some(request_with_time) => {
929 let wait_time = request_with_time.created_at.elapsed();
931 self.request_wait_time.observe(wait_time.as_secs_f64());
932
933 match request_with_time.request {
934 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
935 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
936 req => general_req_buffer.push(req),
937 }
938 },
939 None => break,
941 }
942 }
943 recv_res = self.flush_receiver.changed() => {
944 if recv_res.is_err() {
945 break;
947 } else {
948 self.maybe_flush_worker();
953 self.handle_stalled_requests().await;
955 continue;
956 }
957 }
958 _ = &mut sleep => {
959 self.handle_periodical_tasks();
961 continue;
962 }
963 }
964
965 if self.flush_receiver.has_changed().unwrap_or(false) {
966 self.handle_stalled_requests().await;
970 }
971
972 for _ in 1..self.config.worker_request_batch_size {
974 match self.receiver.try_recv() {
976 Ok(request_with_time) => {
977 let wait_time = request_with_time.created_at.elapsed();
979 self.request_wait_time.observe(wait_time.as_secs_f64());
980
981 match request_with_time.request {
982 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
983 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
984 req => general_req_buffer.push(req),
985 }
986 }
987 Err(_) => break,
989 }
990 }
991
992 self.listener.on_recv_requests(
993 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
994 );
995
996 self.handle_requests(
997 &mut write_req_buffer,
998 &mut ddl_req_buffer,
999 &mut general_req_buffer,
1000 &mut bulk_req_buffer,
1001 )
1002 .await;
1003
1004 self.handle_periodical_tasks();
1005 }
1006
1007 self.clean().await;
1008
1009 info!("Exit region worker thread {}", self.id);
1010 }
1011
1012 async fn handle_requests(
1016 &mut self,
1017 write_requests: &mut Vec<SenderWriteRequest>,
1018 ddl_requests: &mut Vec<SenderDdlRequest>,
1019 general_requests: &mut Vec<WorkerRequest>,
1020 bulk_requests: &mut Vec<SenderBulkRequest>,
1021 ) {
1022 for worker_req in general_requests.drain(..) {
1023 match worker_req {
1024 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
1025 continue;
1027 }
1028 WorkerRequest::Background { region_id, notify } => {
1029 self.handle_background_notify(region_id, notify).await;
1031 }
1032 WorkerRequest::SetRegionRoleStateGracefully {
1033 region_id,
1034 region_role_state,
1035 sender,
1036 } => {
1037 self.set_role_state_gracefully(region_id, region_role_state, sender)
1038 .await;
1039 }
1040 WorkerRequest::EditRegion(request) => {
1041 self.handle_region_edit(request);
1042 }
1043 WorkerRequest::Stop => {
1044 debug_assert!(!self.running.load(Ordering::Relaxed));
1045 }
1046 WorkerRequest::SyncRegion(req) => {
1047 self.handle_region_sync(req).await;
1048 }
1049 WorkerRequest::BulkInserts {
1050 metadata,
1051 request,
1052 sender,
1053 } => {
1054 if let Some(region_metadata) = metadata {
1055 self.handle_bulk_insert_batch(
1056 region_metadata,
1057 request,
1058 bulk_requests,
1059 sender,
1060 )
1061 .await;
1062 } else {
1063 error!("Cannot find region metadata for {}", request.region_id);
1064 sender.send(
1065 error::RegionNotFoundSnafu {
1066 region_id: request.region_id,
1067 }
1068 .fail(),
1069 );
1070 }
1071 }
1072 WorkerRequest::RemapManifests(req) => {
1073 self.handle_remap_manifests_request(req);
1074 }
1075 WorkerRequest::CopyRegionFrom(req) => {
1076 self.handle_copy_region_from_request(req);
1077 }
1078 }
1079 }
1080
1081 self.handle_write_requests(write_requests, bulk_requests, true)
1084 .await;
1085
1086 self.handle_ddl_requests(ddl_requests).await;
1087 }
1088
1089 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1091 if ddl_requests.is_empty() {
1092 return;
1093 }
1094
1095 for ddl in ddl_requests.drain(..) {
1096 let res = match ddl.request {
1097 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1098 DdlRequest::Drop(req) => {
1099 self.handle_drop_request(ddl.region_id, req.partial_drop)
1100 .await
1101 }
1102 DdlRequest::Open((req, wal_entry_receiver)) => {
1103 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1104 .await;
1105 continue;
1106 }
1107 DdlRequest::Close(_) => {
1108 self.handle_close_request(ddl.region_id, ddl.sender).await;
1109 continue;
1110 }
1111 DdlRequest::Alter(req) => {
1112 self.handle_alter_request(ddl.region_id, req, ddl.sender)
1113 .await;
1114 continue;
1115 }
1116 DdlRequest::Flush(req) => {
1117 self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
1118 continue;
1119 }
1120 DdlRequest::Compact(req) => {
1121 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1122 .await;
1123 continue;
1124 }
1125 DdlRequest::BuildIndex(req) => {
1126 self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1127 .await;
1128 continue;
1129 }
1130 DdlRequest::Truncate(req) => {
1131 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1132 .await;
1133 continue;
1134 }
1135 DdlRequest::Catchup((req, wal_entry_receiver)) => {
1136 self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1137 .await;
1138 continue;
1139 }
1140 DdlRequest::EnterStaging(req) => {
1141 self.handle_enter_staging_request(
1142 ddl.region_id,
1143 req.partition_expr,
1144 ddl.sender,
1145 )
1146 .await;
1147 continue;
1148 }
1149 DdlRequest::ApplyStagingManifest(req) => {
1150 self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1151 .await;
1152 continue;
1153 }
1154 };
1155
1156 ddl.sender.send(res);
1157 }
1158 }
1159
1160 fn handle_periodical_tasks(&mut self) {
1162 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1163 if self
1164 .time_provider
1165 .elapsed_since(self.last_periodical_check_millis)
1166 < interval
1167 {
1168 return;
1169 }
1170
1171 self.last_periodical_check_millis = self.time_provider.current_time_millis();
1172
1173 if let Err(e) = self.flush_periodically() {
1174 error!(e; "Failed to flush regions periodically");
1175 }
1176 }
1177
1178 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1180 match notify {
1181 BackgroundNotify::FlushFinished(req) => {
1182 self.handle_flush_finished(region_id, req).await
1183 }
1184 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1185 BackgroundNotify::IndexBuildFinished(req) => {
1186 self.handle_index_build_finished(region_id, req).await
1187 }
1188 BackgroundNotify::IndexBuildStopped(req) => {
1189 self.handle_index_build_stopped(region_id, req).await
1190 }
1191 BackgroundNotify::IndexBuildFailed(req) => {
1192 self.handle_index_build_failed(region_id, req).await
1193 }
1194 BackgroundNotify::CompactionFinished(req) => {
1195 self.handle_compaction_finished(region_id, req).await
1196 }
1197 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1198 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1199 BackgroundNotify::RegionChange(req) => {
1200 self.handle_manifest_region_change_result(req).await
1201 }
1202 BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1203 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1204 BackgroundNotify::CopyRegionFromFinished(req) => {
1205 self.handle_copy_region_from_finished(req)
1206 }
1207 }
1208 }
1209
1210 async fn set_role_state_gracefully(
1212 &mut self,
1213 region_id: RegionId,
1214 region_role_state: SettableRegionRoleState,
1215 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1216 ) {
1217 if let Some(region) = self.regions.get_region(region_id) {
1218 common_runtime::spawn_global(async move {
1220 match region.set_role_state_gracefully(region_role_state).await {
1221 Ok(()) => {
1222 let last_entry_id = region.version_control.current().last_entry_id;
1223 let _ = sender.send(SetRegionRoleStateResponse::success(
1224 SetRegionRoleStateSuccess::mito(last_entry_id),
1225 ));
1226 }
1227 Err(e) => {
1228 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1229 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1230 BoxedError::new(e),
1231 ));
1232 }
1233 }
1234 });
1235 } else {
1236 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1237 }
1238 }
1239}
1240
1241impl<S> RegionWorkerLoop<S> {
1242 async fn clean(&self) {
1244 let regions = self.regions.list_regions();
1246 for region in regions {
1247 region.stop().await;
1248 }
1249
1250 self.regions.clear();
1251 }
1252
1253 fn notify_group(&mut self) {
1256 let _ = self.flush_sender.send(());
1258 self.flush_receiver.borrow_and_update();
1260 }
1261}
1262
1263#[derive(Default, Clone)]
1265pub(crate) struct WorkerListener {
1266 #[cfg(any(test, feature = "test"))]
1267 listener: Option<crate::engine::listener::EventListenerRef>,
1268}
1269
1270impl WorkerListener {
1271 #[cfg(any(test, feature = "test"))]
1272 pub(crate) fn new(
1273 listener: Option<crate::engine::listener::EventListenerRef>,
1274 ) -> WorkerListener {
1275 WorkerListener { listener }
1276 }
1277
1278 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1280 #[cfg(any(test, feature = "test"))]
1281 if let Some(listener) = &self.listener {
1282 listener.on_flush_success(region_id);
1283 }
1284 let _ = region_id;
1286 }
1287
1288 pub(crate) fn on_write_stall(&self) {
1290 #[cfg(any(test, feature = "test"))]
1291 if let Some(listener) = &self.listener {
1292 listener.on_write_stall();
1293 }
1294 }
1295
1296 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1297 #[cfg(any(test, feature = "test"))]
1298 if let Some(listener) = &self.listener {
1299 listener.on_flush_begin(region_id).await;
1300 }
1301 let _ = region_id;
1303 }
1304
1305 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1306 #[cfg(any(test, feature = "test"))]
1307 if let Some(listener) = &self.listener {
1308 return listener.on_later_drop_begin(region_id);
1309 }
1310 let _ = region_id;
1312 None
1313 }
1314
1315 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1317 #[cfg(any(test, feature = "test"))]
1318 if let Some(listener) = &self.listener {
1319 listener.on_later_drop_end(region_id, removed);
1320 }
1321 let _ = region_id;
1323 let _ = removed;
1324 }
1325
1326 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1327 #[cfg(any(test, feature = "test"))]
1328 if let Some(listener) = &self.listener {
1329 listener.on_merge_ssts_finished(region_id).await;
1330 }
1331 let _ = region_id;
1333 }
1334
1335 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1336 #[cfg(any(test, feature = "test"))]
1337 if let Some(listener) = &self.listener {
1338 listener.on_recv_requests(request_num);
1339 }
1340 let _ = request_num;
1342 }
1343
1344 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1345 #[cfg(any(test, feature = "test"))]
1346 if let Some(listener) = &self.listener {
1347 listener.on_file_cache_filled(_file_id);
1348 }
1349 }
1350
1351 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1352 #[cfg(any(test, feature = "test"))]
1353 if let Some(listener) = &self.listener {
1354 listener.on_compaction_scheduled(_region_id);
1355 }
1356 }
1357
1358 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1359 #[cfg(any(test, feature = "test"))]
1360 if let Some(listener) = &self.listener {
1361 listener
1362 .on_notify_region_change_result_begin(_region_id)
1363 .await;
1364 }
1365 }
1366
1367 pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1368 #[cfg(any(test, feature = "test"))]
1369 if let Some(listener) = &self.listener {
1370 listener.on_enter_staging_result_begin(_region_id).await;
1371 }
1372 }
1373
1374 pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1375 #[cfg(any(test, feature = "test"))]
1376 if let Some(listener) = &self.listener {
1377 listener.on_index_build_finish(_region_file_id).await;
1378 }
1379 }
1380
1381 pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1382 #[cfg(any(test, feature = "test"))]
1383 if let Some(listener) = &self.listener {
1384 listener.on_index_build_begin(_region_file_id).await;
1385 }
1386 }
1387
1388 pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1389 #[cfg(any(test, feature = "test"))]
1390 if let Some(listener) = &self.listener {
1391 listener.on_index_build_abort(_region_file_id).await;
1392 }
1393 }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398 use super::*;
1399 use crate::test_util::TestEnv;
1400
1401 #[test]
1402 fn test_region_id_to_index() {
1403 let num_workers = 4;
1404
1405 let region_id = RegionId::new(1, 2);
1406 let index = region_id_to_index(region_id, num_workers);
1407 assert_eq!(index, 3);
1408
1409 let region_id = RegionId::new(2, 3);
1410 let index = region_id_to_index(region_id, num_workers);
1411 assert_eq!(index, 1);
1412 }
1413
1414 #[tokio::test]
1415 async fn test_worker_group_start_stop() {
1416 let env = TestEnv::with_prefix("group-stop").await;
1417 let group = env
1418 .create_worker_group(MitoConfig {
1419 num_workers: 4,
1420 ..Default::default()
1421 })
1422 .await;
1423
1424 group.stop().await.unwrap();
1425 }
1426}