1mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72 CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73 RegionMapRef,
74};
75use crate::request::{
76 BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
77 WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100pub(crate) struct WorkerGroup {
137 workers: Vec<RegionWorker>,
139 flush_job_pool: SchedulerRef,
141 compact_job_pool: SchedulerRef,
143 index_build_job_pool: SchedulerRef,
145 purge_scheduler: SchedulerRef,
147 cache_manager: CacheManagerRef,
149 file_ref_manager: FileReferenceManagerRef,
151 gc_limiter: GcLimiterRef,
153}
154
155impl WorkerGroup {
156 pub(crate) async fn start<S: LogStore>(
160 config: Arc<MitoConfig>,
161 log_store: Arc<S>,
162 object_store_manager: ObjectStoreManagerRef,
163 schema_metadata_manager: SchemaMetadataManagerRef,
164 file_ref_manager: FileReferenceManagerRef,
165 partition_expr_fetcher: PartitionExprFetcherRef,
166 plugins: Plugins,
167 ) -> Result<WorkerGroup> {
168 let (flush_sender, flush_receiver) = watch::channel(());
169 let write_buffer_manager = Arc::new(
170 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
171 .with_notifier(flush_sender.clone()),
172 );
173 let puffin_manager_factory = PuffinManagerFactory::new(
174 &config.index.aux_path,
175 config.index.staging_size.as_bytes(),
176 Some(config.index.write_buffer_size.as_bytes() as _),
177 config.index.staging_ttl,
178 )
179 .await?;
180 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
181 .await?
182 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
183 let index_build_job_pool =
184 Arc::new(LocalScheduler::new(config.max_background_index_builds));
185 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
186 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
187 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
188 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
190 let write_cache = write_cache_from_config(
191 &config,
192 puffin_manager_factory.clone(),
193 intermediate_manager.clone(),
194 )
195 .await?;
196 let cache_manager = Arc::new(
197 CacheManager::builder()
198 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
199 .vector_cache_size(config.vector_cache_size.as_bytes())
200 .page_cache_size(config.page_cache_size.as_bytes())
201 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
202 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
203 .index_content_size(config.index.content_cache_size.as_bytes())
204 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
205 .index_result_cache_size(config.index.result_cache_size.as_bytes())
206 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
207 .write_cache(write_cache)
208 .build(),
209 );
210 let time_provider = Arc::new(StdTimeProvider);
211 let total_memory = get_total_memory_bytes();
212 let total_memory = if total_memory > 0 {
213 total_memory as u64
214 } else {
215 0
216 };
217 let compaction_limit_bytes = config
218 .experimental_compaction_memory_limit
219 .resolve(total_memory);
220 let compaction_memory_manager =
221 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
222 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
223
224 let workers = (0..config.num_workers)
225 .map(|id| {
226 WorkerStarter {
227 id: id as WorkerId,
228 config: config.clone(),
229 log_store: log_store.clone(),
230 object_store_manager: object_store_manager.clone(),
231 write_buffer_manager: write_buffer_manager.clone(),
232 index_build_job_pool: index_build_job_pool.clone(),
233 flush_job_pool: flush_job_pool.clone(),
234 compact_job_pool: compact_job_pool.clone(),
235 purge_scheduler: purge_scheduler.clone(),
236 listener: WorkerListener::default(),
237 cache_manager: cache_manager.clone(),
238 compaction_memory_manager: compaction_memory_manager.clone(),
239 puffin_manager_factory: puffin_manager_factory.clone(),
240 intermediate_manager: intermediate_manager.clone(),
241 time_provider: time_provider.clone(),
242 flush_sender: flush_sender.clone(),
243 flush_receiver: flush_receiver.clone(),
244 plugins: plugins.clone(),
245 schema_metadata_manager: schema_metadata_manager.clone(),
246 file_ref_manager: file_ref_manager.clone(),
247 partition_expr_fetcher: partition_expr_fetcher.clone(),
248 flush_semaphore: flush_semaphore.clone(),
249 }
250 .start()
251 })
252 .collect::<Result<Vec<_>>>()?;
253
254 Ok(WorkerGroup {
255 workers,
256 flush_job_pool,
257 compact_job_pool,
258 index_build_job_pool,
259 purge_scheduler,
260 cache_manager,
261 file_ref_manager,
262 gc_limiter,
263 })
264 }
265
266 pub(crate) async fn stop(&self) -> Result<()> {
268 info!("Stop region worker group");
269
270 self.compact_job_pool.stop(true).await?;
273 self.flush_job_pool.stop(true).await?;
275 self.purge_scheduler.stop(true).await?;
277 self.index_build_job_pool.stop(true).await?;
279
280 try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
281
282 Ok(())
283 }
284
285 pub(crate) async fn submit_to_worker(
287 &self,
288 region_id: RegionId,
289 request: WorkerRequest,
290 ) -> Result<()> {
291 self.worker(region_id).submit_request(request).await
292 }
293
294 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
296 self.worker(region_id).is_region_exists(region_id)
297 }
298
299 pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
301 self.worker(region_id).is_region_opening(region_id)
302 }
303
304 pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
306 self.worker(region_id).is_region_catching_up(region_id)
307 }
308
309 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
313 self.worker(region_id).get_region(region_id)
314 }
315
316 pub(crate) fn cache_manager(&self) -> CacheManagerRef {
318 self.cache_manager.clone()
319 }
320
321 pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
322 self.file_ref_manager.clone()
323 }
324
325 pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
326 self.gc_limiter.clone()
327 }
328
329 pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
331 let index = region_id_to_index(region_id, self.workers.len());
332
333 &self.workers[index]
334 }
335
336 pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
337 self.workers
338 .iter()
339 .flat_map(|worker| worker.regions.list_regions())
340 }
341}
342
343#[cfg(any(test, feature = "test"))]
345impl WorkerGroup {
346 #[allow(clippy::too_many_arguments)]
350 pub(crate) async fn start_for_test<S: LogStore>(
351 config: Arc<MitoConfig>,
352 log_store: Arc<S>,
353 object_store_manager: ObjectStoreManagerRef,
354 write_buffer_manager: Option<WriteBufferManagerRef>,
355 listener: Option<crate::engine::listener::EventListenerRef>,
356 schema_metadata_manager: SchemaMetadataManagerRef,
357 file_ref_manager: FileReferenceManagerRef,
358 time_provider: TimeProviderRef,
359 partition_expr_fetcher: PartitionExprFetcherRef,
360 ) -> Result<WorkerGroup> {
361 let (flush_sender, flush_receiver) = watch::channel(());
362 let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
363 Arc::new(
364 WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
365 .with_notifier(flush_sender.clone()),
366 )
367 });
368 let index_build_job_pool =
369 Arc::new(LocalScheduler::new(config.max_background_index_builds));
370 let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
371 let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
372 let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
373 let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
374 let puffin_manager_factory = PuffinManagerFactory::new(
375 &config.index.aux_path,
376 config.index.staging_size.as_bytes(),
377 Some(config.index.write_buffer_size.as_bytes() as _),
378 config.index.staging_ttl,
379 )
380 .await?;
381 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
382 .await?
383 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
384 let write_cache = write_cache_from_config(
385 &config,
386 puffin_manager_factory.clone(),
387 intermediate_manager.clone(),
388 )
389 .await?;
390 let cache_manager = Arc::new(
391 CacheManager::builder()
392 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
393 .vector_cache_size(config.vector_cache_size.as_bytes())
394 .page_cache_size(config.page_cache_size.as_bytes())
395 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
396 .write_cache(write_cache)
397 .build(),
398 );
399 let total_memory = get_total_memory_bytes();
400 let total_memory = if total_memory > 0 {
401 total_memory as u64
402 } else {
403 0
404 };
405 let compaction_limit_bytes = config
406 .experimental_compaction_memory_limit
407 .resolve(total_memory);
408 let compaction_memory_manager =
409 Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
410 let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
411 let workers = (0..config.num_workers)
412 .map(|id| {
413 WorkerStarter {
414 id: id as WorkerId,
415 config: config.clone(),
416 log_store: log_store.clone(),
417 object_store_manager: object_store_manager.clone(),
418 write_buffer_manager: write_buffer_manager.clone(),
419 index_build_job_pool: index_build_job_pool.clone(),
420 flush_job_pool: flush_job_pool.clone(),
421 compact_job_pool: compact_job_pool.clone(),
422 purge_scheduler: purge_scheduler.clone(),
423 listener: WorkerListener::new(listener.clone()),
424 cache_manager: cache_manager.clone(),
425 compaction_memory_manager: compaction_memory_manager.clone(),
426 puffin_manager_factory: puffin_manager_factory.clone(),
427 intermediate_manager: intermediate_manager.clone(),
428 time_provider: time_provider.clone(),
429 flush_sender: flush_sender.clone(),
430 flush_receiver: flush_receiver.clone(),
431 plugins: Plugins::new(),
432 schema_metadata_manager: schema_metadata_manager.clone(),
433 file_ref_manager: file_ref_manager.clone(),
434 partition_expr_fetcher: partition_expr_fetcher.clone(),
435 flush_semaphore: flush_semaphore.clone(),
436 }
437 .start()
438 })
439 .collect::<Result<Vec<_>>>()?;
440
441 Ok(WorkerGroup {
442 workers,
443 flush_job_pool,
444 compact_job_pool,
445 index_build_job_pool,
446 purge_scheduler,
447 cache_manager,
448 file_ref_manager,
449 gc_limiter,
450 })
451 }
452
453 pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
455 &self.purge_scheduler
456 }
457}
458
459fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
460 ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
461 % num_workers
462}
463
464pub async fn write_cache_from_config(
465 config: &MitoConfig,
466 puffin_manager_factory: PuffinManagerFactory,
467 intermediate_manager: IntermediateManager,
468) -> Result<Option<WriteCacheRef>> {
469 if !config.enable_write_cache {
470 return Ok(None);
471 }
472
473 tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
474 .await
475 .context(CreateDirSnafu {
476 dir: &config.write_cache_path,
477 })?;
478
479 let cache = WriteCache::new_fs(
480 &config.write_cache_path,
481 config.write_cache_size,
482 config.write_cache_ttl,
483 Some(config.index_cache_percent),
484 config.enable_refill_cache_on_read,
485 puffin_manager_factory,
486 intermediate_manager,
487 config.manifest_cache_size,
488 )
489 .await?;
490 Ok(Some(Arc::new(cache)))
491}
492
493pub(crate) fn worker_init_check_delay() -> Duration {
495 let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
496 Duration::from_secs(init_check_delay)
497}
498
499struct WorkerStarter<S> {
501 id: WorkerId,
502 config: Arc<MitoConfig>,
503 log_store: Arc<S>,
504 object_store_manager: ObjectStoreManagerRef,
505 write_buffer_manager: WriteBufferManagerRef,
506 compact_job_pool: SchedulerRef,
507 index_build_job_pool: SchedulerRef,
508 flush_job_pool: SchedulerRef,
509 purge_scheduler: SchedulerRef,
510 listener: WorkerListener,
511 cache_manager: CacheManagerRef,
512 compaction_memory_manager: Arc<CompactionMemoryManager>,
513 puffin_manager_factory: PuffinManagerFactory,
514 intermediate_manager: IntermediateManager,
515 time_provider: TimeProviderRef,
516 flush_sender: watch::Sender<()>,
518 flush_receiver: watch::Receiver<()>,
520 plugins: Plugins,
521 schema_metadata_manager: SchemaMetadataManagerRef,
522 file_ref_manager: FileReferenceManagerRef,
523 partition_expr_fetcher: PartitionExprFetcherRef,
524 flush_semaphore: Arc<Semaphore>,
525}
526
527impl<S: LogStore> WorkerStarter<S> {
528 fn start(self) -> Result<RegionWorker> {
530 let regions = Arc::new(RegionMap::default());
531 let opening_regions = Arc::new(OpeningRegions::default());
532 let catchup_regions = Arc::new(CatchupRegions::default());
533 let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
534
535 let running = Arc::new(AtomicBool::new(true));
536 let now = self.time_provider.current_time_millis();
537 let id_string = self.id.to_string();
538 let mut worker_thread = RegionWorkerLoop {
539 id: self.id,
540 config: self.config.clone(),
541 regions: regions.clone(),
542 catchup_regions: catchup_regions.clone(),
543 dropping_regions: Arc::new(RegionMap::default()),
544 opening_regions: opening_regions.clone(),
545 sender: sender.clone(),
546 receiver,
547 wal: Wal::new(self.log_store),
548 object_store_manager: self.object_store_manager.clone(),
549 running: running.clone(),
550 memtable_builder_provider: MemtableBuilderProvider::new(
551 Some(self.write_buffer_manager.clone()),
552 self.config.clone(),
553 ),
554 purge_scheduler: self.purge_scheduler.clone(),
555 write_buffer_manager: self.write_buffer_manager,
556 index_build_scheduler: IndexBuildScheduler::new(
557 self.index_build_job_pool,
558 self.config.max_background_index_builds,
559 ),
560 flush_scheduler: FlushScheduler::new(self.flush_job_pool),
561 compaction_scheduler: CompactionScheduler::new(
562 self.compact_job_pool,
563 sender.clone(),
564 self.cache_manager.clone(),
565 self.config.clone(),
566 self.listener.clone(),
567 self.plugins.clone(),
568 self.compaction_memory_manager.clone(),
569 self.config.experimental_compaction_on_exhausted,
570 ),
571 stalled_requests: StalledRequests::default(),
572 listener: self.listener,
573 cache_manager: self.cache_manager,
574 puffin_manager_factory: self.puffin_manager_factory,
575 intermediate_manager: self.intermediate_manager,
576 time_provider: self.time_provider,
577 last_periodical_check_millis: now,
578 flush_sender: self.flush_sender,
579 flush_receiver: self.flush_receiver,
580 stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
581 region_count: REGION_COUNT.with_label_values(&[&id_string]),
582 request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
583 region_edit_queues: RegionEditQueues::default(),
584 schema_metadata_manager: self.schema_metadata_manager,
585 file_ref_manager: self.file_ref_manager.clone(),
586 partition_expr_fetcher: self.partition_expr_fetcher,
587 flush_semaphore: self.flush_semaphore,
588 };
589 let handle = common_runtime::spawn_global(async move {
590 worker_thread.run().await;
591 });
592
593 Ok(RegionWorker {
594 id: self.id,
595 regions,
596 opening_regions,
597 catchup_regions,
598 sender,
599 handle: Mutex::new(Some(handle)),
600 running,
601 })
602 }
603}
604
605pub(crate) struct RegionWorker {
607 id: WorkerId,
609 regions: RegionMapRef,
611 opening_regions: OpeningRegionsRef,
613 catchup_regions: CatchupRegionsRef,
615 sender: Sender<WorkerRequestWithTime>,
617 handle: Mutex<Option<JoinHandle<()>>>,
619 running: Arc<AtomicBool>,
621}
622
623impl RegionWorker {
624 async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
626 ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
627 let request_with_time = WorkerRequestWithTime::new(request);
628 if self.sender.send(request_with_time).await.is_err() {
629 warn!(
630 "Worker {} is already exited but the running flag is still true",
631 self.id
632 );
633 self.set_running(false);
635 return WorkerStoppedSnafu { id: self.id }.fail();
636 }
637
638 Ok(())
639 }
640
641 async fn stop(&self) -> Result<()> {
645 let handle = self.handle.lock().await.take();
646 if let Some(handle) = handle {
647 info!("Stop region worker {}", self.id);
648
649 self.set_running(false);
650 if self
651 .sender
652 .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
653 .await
654 .is_err()
655 {
656 warn!("Worker {} is already exited before stop", self.id);
657 }
658
659 handle.await.context(JoinSnafu)?;
660 }
661
662 Ok(())
663 }
664
665 fn is_running(&self) -> bool {
667 self.running.load(Ordering::Relaxed)
668 }
669
670 fn set_running(&self, value: bool) {
672 self.running.store(value, Ordering::Relaxed)
673 }
674
675 fn is_region_exists(&self, region_id: RegionId) -> bool {
677 self.regions.is_region_exists(region_id)
678 }
679
680 fn is_region_opening(&self, region_id: RegionId) -> bool {
682 self.opening_regions.is_region_exists(region_id)
683 }
684
685 fn is_region_catching_up(&self, region_id: RegionId) -> bool {
687 self.catchup_regions.is_region_exists(region_id)
688 }
689
690 fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
692 self.regions.get_region(region_id)
693 }
694
695 #[cfg(test)]
696 pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
698 &self.opening_regions
699 }
700
701 #[cfg(test)]
702 pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
704 &self.catchup_regions
705 }
706}
707
708impl Drop for RegionWorker {
709 fn drop(&mut self) {
710 if self.is_running() {
711 self.set_running(false);
712 }
714 }
715}
716
717type RequestBuffer = Vec<WorkerRequest>;
718
719#[derive(Default)]
723pub(crate) struct StalledRequests {
724 pub(crate) requests:
731 HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
732 pub(crate) estimated_size: usize,
734}
735
736impl StalledRequests {
737 pub(crate) fn append(
739 &mut self,
740 requests: &mut Vec<SenderWriteRequest>,
741 bulk_requests: &mut Vec<SenderBulkRequest>,
742 ) {
743 for req in requests.drain(..) {
744 self.push(req);
745 }
746 for req in bulk_requests.drain(..) {
747 self.push_bulk(req);
748 }
749 }
750
751 pub(crate) fn push(&mut self, req: SenderWriteRequest) {
753 let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
754 let req_size = req.request.estimated_size();
755 *size += req_size;
756 self.estimated_size += req_size;
757 requests.push(req);
758 }
759
760 pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
761 let region_id = req.region_id;
762 let (size, _, requests) = self.requests.entry(region_id).or_default();
763 let req_size = req.request.estimated_size();
764 *size += req_size;
765 self.estimated_size += req_size;
766 requests.push(req);
767 }
768
769 pub(crate) fn remove(
771 &mut self,
772 region_id: &RegionId,
773 ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
774 if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
775 self.estimated_size -= size;
776 (write_reqs, bulk_reqs)
777 } else {
778 (vec![], vec![])
779 }
780 }
781
782 pub(crate) fn stalled_count(&self) -> usize {
784 self.requests
785 .values()
786 .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
787 .sum()
788 }
789}
790
791struct RegionWorkerLoop<S> {
793 id: WorkerId,
795 config: Arc<MitoConfig>,
797 regions: RegionMapRef,
799 dropping_regions: RegionMapRef,
801 opening_regions: OpeningRegionsRef,
803 catchup_regions: CatchupRegionsRef,
805 sender: Sender<WorkerRequestWithTime>,
807 receiver: Receiver<WorkerRequestWithTime>,
809 wal: Wal<S>,
811 object_store_manager: ObjectStoreManagerRef,
813 running: Arc<AtomicBool>,
815 memtable_builder_provider: MemtableBuilderProvider,
817 purge_scheduler: SchedulerRef,
819 write_buffer_manager: WriteBufferManagerRef,
821 index_build_scheduler: IndexBuildScheduler,
823 flush_scheduler: FlushScheduler,
825 compaction_scheduler: CompactionScheduler,
827 stalled_requests: StalledRequests,
829 listener: WorkerListener,
831 cache_manager: CacheManagerRef,
833 puffin_manager_factory: PuffinManagerFactory,
835 intermediate_manager: IntermediateManager,
837 time_provider: TimeProviderRef,
839 last_periodical_check_millis: i64,
841 flush_sender: watch::Sender<()>,
843 flush_receiver: watch::Receiver<()>,
845 stalling_count: IntGauge,
847 region_count: IntGauge,
849 request_wait_time: Histogram,
851 region_edit_queues: RegionEditQueues,
853 schema_metadata_manager: SchemaMetadataManagerRef,
855 file_ref_manager: FileReferenceManagerRef,
857 partition_expr_fetcher: PartitionExprFetcherRef,
859 flush_semaphore: Arc<Semaphore>,
861}
862
863impl<S: LogStore> RegionWorkerLoop<S> {
864 async fn run(&mut self) {
866 let init_check_delay = worker_init_check_delay();
867 info!(
868 "Start region worker thread {}, init_check_delay: {:?}",
869 self.id, init_check_delay
870 );
871 self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
872
873 let mut write_req_buffer: Vec<SenderWriteRequest> =
875 Vec::with_capacity(self.config.worker_request_batch_size);
876 let mut bulk_req_buffer: Vec<SenderBulkRequest> =
877 Vec::with_capacity(self.config.worker_request_batch_size);
878 let mut ddl_req_buffer: Vec<SenderDdlRequest> =
879 Vec::with_capacity(self.config.worker_request_batch_size);
880 let mut general_req_buffer: Vec<WorkerRequest> =
881 RequestBuffer::with_capacity(self.config.worker_request_batch_size);
882
883 while self.running.load(Ordering::Relaxed) {
884 write_req_buffer.clear();
886 ddl_req_buffer.clear();
887 general_req_buffer.clear();
888
889 let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
890 let sleep = tokio::time::sleep(max_wait_time);
891 tokio::pin!(sleep);
892
893 tokio::select! {
894 request_opt = self.receiver.recv() => {
895 match request_opt {
896 Some(request_with_time) => {
897 let wait_time = request_with_time.created_at.elapsed();
899 self.request_wait_time.observe(wait_time.as_secs_f64());
900
901 match request_with_time.request {
902 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
903 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
904 req => general_req_buffer.push(req),
905 }
906 },
907 None => break,
909 }
910 }
911 recv_res = self.flush_receiver.changed() => {
912 if recv_res.is_err() {
913 break;
915 } else {
916 self.maybe_flush_worker();
921 self.handle_stalled_requests().await;
923 continue;
924 }
925 }
926 _ = &mut sleep => {
927 self.handle_periodical_tasks();
929 continue;
930 }
931 }
932
933 if self.flush_receiver.has_changed().unwrap_or(false) {
934 self.handle_stalled_requests().await;
938 }
939
940 for _ in 1..self.config.worker_request_batch_size {
942 match self.receiver.try_recv() {
944 Ok(request_with_time) => {
945 let wait_time = request_with_time.created_at.elapsed();
947 self.request_wait_time.observe(wait_time.as_secs_f64());
948
949 match request_with_time.request {
950 WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
951 WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
952 req => general_req_buffer.push(req),
953 }
954 }
955 Err(_) => break,
957 }
958 }
959
960 self.listener.on_recv_requests(
961 write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
962 );
963
964 self.handle_requests(
965 &mut write_req_buffer,
966 &mut ddl_req_buffer,
967 &mut general_req_buffer,
968 &mut bulk_req_buffer,
969 )
970 .await;
971
972 self.handle_periodical_tasks();
973 }
974
975 self.clean().await;
976
977 info!("Exit region worker thread {}", self.id);
978 }
979
980 async fn handle_requests(
984 &mut self,
985 write_requests: &mut Vec<SenderWriteRequest>,
986 ddl_requests: &mut Vec<SenderDdlRequest>,
987 general_requests: &mut Vec<WorkerRequest>,
988 bulk_requests: &mut Vec<SenderBulkRequest>,
989 ) {
990 for worker_req in general_requests.drain(..) {
991 match worker_req {
992 WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
993 continue;
995 }
996 WorkerRequest::Background { region_id, notify } => {
997 self.handle_background_notify(region_id, notify).await;
999 }
1000 WorkerRequest::SetRegionRoleStateGracefully {
1001 region_id,
1002 region_role_state,
1003 sender,
1004 } => {
1005 self.set_role_state_gracefully(region_id, region_role_state, sender)
1006 .await;
1007 }
1008 WorkerRequest::EditRegion(request) => {
1009 self.handle_region_edit(request);
1010 }
1011 WorkerRequest::Stop => {
1012 debug_assert!(!self.running.load(Ordering::Relaxed));
1013 }
1014 WorkerRequest::SyncRegion(req) => {
1015 self.handle_region_sync(req).await;
1016 }
1017 WorkerRequest::BulkInserts {
1018 metadata,
1019 request,
1020 sender,
1021 } => {
1022 if let Some(region_metadata) = metadata {
1023 self.handle_bulk_insert_batch(
1024 region_metadata,
1025 request,
1026 bulk_requests,
1027 sender,
1028 )
1029 .await;
1030 } else {
1031 error!("Cannot find region metadata for {}", request.region_id);
1032 sender.send(
1033 error::RegionNotFoundSnafu {
1034 region_id: request.region_id,
1035 }
1036 .fail(),
1037 );
1038 }
1039 }
1040 WorkerRequest::RemapManifests(req) => {
1041 self.handle_remap_manifests_request(req);
1042 }
1043 WorkerRequest::CopyRegionFrom(req) => {
1044 self.handle_copy_region_from_request(req);
1045 }
1046 }
1047 }
1048
1049 self.handle_write_requests(write_requests, bulk_requests, true)
1052 .await;
1053
1054 self.handle_ddl_requests(ddl_requests).await;
1055 }
1056
1057 async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1059 if ddl_requests.is_empty() {
1060 return;
1061 }
1062
1063 for ddl in ddl_requests.drain(..) {
1064 let res = match ddl.request {
1065 DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1066 DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
1067 DdlRequest::Open((req, wal_entry_receiver)) => {
1068 self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1069 .await;
1070 continue;
1071 }
1072 DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
1073 DdlRequest::Alter(req) => {
1074 self.handle_alter_request(ddl.region_id, req, ddl.sender)
1075 .await;
1076 continue;
1077 }
1078 DdlRequest::Flush(req) => {
1079 self.handle_flush_request(ddl.region_id, req, ddl.sender);
1080 continue;
1081 }
1082 DdlRequest::Compact(req) => {
1083 self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1084 .await;
1085 continue;
1086 }
1087 DdlRequest::BuildIndex(req) => {
1088 self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1089 .await;
1090 continue;
1091 }
1092 DdlRequest::Truncate(req) => {
1093 self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1094 .await;
1095 continue;
1096 }
1097 DdlRequest::Catchup((req, wal_entry_receiver)) => {
1098 self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1099 .await;
1100 continue;
1101 }
1102 DdlRequest::EnterStaging(req) => {
1103 self.handle_enter_staging_request(
1104 ddl.region_id,
1105 req.partition_expr,
1106 ddl.sender,
1107 )
1108 .await;
1109 continue;
1110 }
1111 DdlRequest::ApplyStagingManifest(req) => {
1112 self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1113 .await;
1114 continue;
1115 }
1116 };
1117
1118 ddl.sender.send(res);
1119 }
1120 }
1121
1122 fn handle_periodical_tasks(&mut self) {
1124 let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1125 if self
1126 .time_provider
1127 .elapsed_since(self.last_periodical_check_millis)
1128 < interval
1129 {
1130 return;
1131 }
1132
1133 self.last_periodical_check_millis = self.time_provider.current_time_millis();
1134
1135 if let Err(e) = self.flush_periodically() {
1136 error!(e; "Failed to flush regions periodically");
1137 }
1138 }
1139
1140 async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1142 match notify {
1143 BackgroundNotify::FlushFinished(req) => {
1144 self.handle_flush_finished(region_id, req).await
1145 }
1146 BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1147 BackgroundNotify::IndexBuildFinished(req) => {
1148 self.handle_index_build_finished(region_id, req).await
1149 }
1150 BackgroundNotify::IndexBuildStopped(req) => {
1151 self.handle_index_build_stopped(region_id, req).await
1152 }
1153 BackgroundNotify::IndexBuildFailed(req) => {
1154 self.handle_index_build_failed(region_id, req).await
1155 }
1156 BackgroundNotify::CompactionFinished(req) => {
1157 self.handle_compaction_finished(region_id, req).await
1158 }
1159 BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1160 BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1161 BackgroundNotify::RegionChange(req) => {
1162 self.handle_manifest_region_change_result(req).await
1163 }
1164 BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1165 BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1166 BackgroundNotify::CopyRegionFromFinished(req) => {
1167 self.handle_copy_region_from_finished(req)
1168 }
1169 }
1170 }
1171
1172 async fn set_role_state_gracefully(
1174 &mut self,
1175 region_id: RegionId,
1176 region_role_state: SettableRegionRoleState,
1177 sender: oneshot::Sender<SetRegionRoleStateResponse>,
1178 ) {
1179 if let Some(region) = self.regions.get_region(region_id) {
1180 common_runtime::spawn_global(async move {
1182 match region.set_role_state_gracefully(region_role_state).await {
1183 Ok(()) => {
1184 let last_entry_id = region.version_control.current().last_entry_id;
1185 let _ = sender.send(SetRegionRoleStateResponse::success(
1186 SetRegionRoleStateSuccess::mito(last_entry_id),
1187 ));
1188 }
1189 Err(e) => {
1190 error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1191 let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1192 BoxedError::new(e),
1193 ));
1194 }
1195 }
1196 });
1197 } else {
1198 let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1199 }
1200 }
1201}
1202
1203impl<S> RegionWorkerLoop<S> {
1204 async fn clean(&self) {
1206 let regions = self.regions.list_regions();
1208 for region in regions {
1209 region.stop().await;
1210 }
1211
1212 self.regions.clear();
1213 }
1214
1215 fn notify_group(&mut self) {
1218 let _ = self.flush_sender.send(());
1220 self.flush_receiver.borrow_and_update();
1222 }
1223}
1224
1225#[derive(Default, Clone)]
1227pub(crate) struct WorkerListener {
1228 #[cfg(any(test, feature = "test"))]
1229 listener: Option<crate::engine::listener::EventListenerRef>,
1230}
1231
1232impl WorkerListener {
1233 #[cfg(any(test, feature = "test"))]
1234 pub(crate) fn new(
1235 listener: Option<crate::engine::listener::EventListenerRef>,
1236 ) -> WorkerListener {
1237 WorkerListener { listener }
1238 }
1239
1240 pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1242 #[cfg(any(test, feature = "test"))]
1243 if let Some(listener) = &self.listener {
1244 listener.on_flush_success(region_id);
1245 }
1246 let _ = region_id;
1248 }
1249
1250 pub(crate) fn on_write_stall(&self) {
1252 #[cfg(any(test, feature = "test"))]
1253 if let Some(listener) = &self.listener {
1254 listener.on_write_stall();
1255 }
1256 }
1257
1258 pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1259 #[cfg(any(test, feature = "test"))]
1260 if let Some(listener) = &self.listener {
1261 listener.on_flush_begin(region_id).await;
1262 }
1263 let _ = region_id;
1265 }
1266
1267 pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1268 #[cfg(any(test, feature = "test"))]
1269 if let Some(listener) = &self.listener {
1270 return listener.on_later_drop_begin(region_id);
1271 }
1272 let _ = region_id;
1274 None
1275 }
1276
1277 pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1279 #[cfg(any(test, feature = "test"))]
1280 if let Some(listener) = &self.listener {
1281 listener.on_later_drop_end(region_id, removed);
1282 }
1283 let _ = region_id;
1285 let _ = removed;
1286 }
1287
1288 pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1289 #[cfg(any(test, feature = "test"))]
1290 if let Some(listener) = &self.listener {
1291 listener.on_merge_ssts_finished(region_id).await;
1292 }
1293 let _ = region_id;
1295 }
1296
1297 pub(crate) fn on_recv_requests(&self, request_num: usize) {
1298 #[cfg(any(test, feature = "test"))]
1299 if let Some(listener) = &self.listener {
1300 listener.on_recv_requests(request_num);
1301 }
1302 let _ = request_num;
1304 }
1305
1306 pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1307 #[cfg(any(test, feature = "test"))]
1308 if let Some(listener) = &self.listener {
1309 listener.on_file_cache_filled(_file_id);
1310 }
1311 }
1312
1313 pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1314 #[cfg(any(test, feature = "test"))]
1315 if let Some(listener) = &self.listener {
1316 listener.on_compaction_scheduled(_region_id);
1317 }
1318 }
1319
1320 pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1321 #[cfg(any(test, feature = "test"))]
1322 if let Some(listener) = &self.listener {
1323 listener
1324 .on_notify_region_change_result_begin(_region_id)
1325 .await;
1326 }
1327 }
1328
1329 pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1330 #[cfg(any(test, feature = "test"))]
1331 if let Some(listener) = &self.listener {
1332 listener.on_enter_staging_result_begin(_region_id).await;
1333 }
1334 }
1335
1336 pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1337 #[cfg(any(test, feature = "test"))]
1338 if let Some(listener) = &self.listener {
1339 listener.on_index_build_finish(_region_file_id).await;
1340 }
1341 }
1342
1343 pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1344 #[cfg(any(test, feature = "test"))]
1345 if let Some(listener) = &self.listener {
1346 listener.on_index_build_begin(_region_file_id).await;
1347 }
1348 }
1349
1350 pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1351 #[cfg(any(test, feature = "test"))]
1352 if let Some(listener) = &self.listener {
1353 listener.on_index_build_abort(_region_file_id).await;
1354 }
1355 }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361 use crate::test_util::TestEnv;
1362
1363 #[test]
1364 fn test_region_id_to_index() {
1365 let num_workers = 4;
1366
1367 let region_id = RegionId::new(1, 2);
1368 let index = region_id_to_index(region_id, num_workers);
1369 assert_eq!(index, 3);
1370
1371 let region_id = RegionId::new(2, 3);
1372 let index = region_id_to_index(region_id, num_workers);
1373 assert_eq!(index, 1);
1374 }
1375
1376 #[tokio::test]
1377 async fn test_worker_group_start_stop() {
1378 let env = TestEnv::with_prefix("group-stop").await;
1379 let group = env
1380 .create_worker_group(MitoConfig {
1381 num_workers: 4,
1382 ..Default::default()
1383 })
1384 .await;
1385
1386 group.stop().await.unwrap();
1387 }
1388}