Skip to main content

mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72    CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73    RegionMapRef,
74};
75use crate::request::{
76    BackgroundNotify, BulkInsertRequest, DdlRequest, SenderBulkRequest, SenderDdlRequest,
77    SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89/// Identifier for a worker.
90pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94/// Interval to check whether regions should flush.
95pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96/// Max delay to check region periodical tasks.
97pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100/// A fixed size group of [RegionWorkers](RegionWorker).
101///
102/// A worker group binds each region to a specific [RegionWorker] and sends
103/// requests to region's dedicated worker.
104///
105/// ```mermaid
106/// graph LR
107///
108/// RegionRequest -- Route by region id --> Worker0 & Worker1
109///
110/// subgraph MitoEngine
111///     subgraph WorkerGroup
112///         Worker0["RegionWorker 0"]
113///         Worker1["RegionWorker 1"]
114///     end
115/// end
116///
117/// Chan0[" Request channel 0"]
118/// Chan1[" Request channel 1"]
119/// WorkerThread1["RegionWorkerLoop 1"]
120///
121/// subgraph WorkerThread0["RegionWorkerLoop 0"]
122///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
123///         Region0["Region 0"]
124///         Region2["Region 2"]
125///     end
126///     Buffer0["RequestBuffer"]
127///
128///     Buffer0 -- modify regions --> RegionMap
129/// end
130///
131/// Worker0 --> Chan0
132/// Worker1 --> Chan1
133/// Chan0 --> Buffer0
134/// Chan1 --> WorkerThread1
135/// ```
136pub(crate) struct WorkerGroup {
137    /// Workers of the group.
138    workers: Vec<RegionWorker>,
139    /// Flush background job pool.
140    flush_job_pool: SchedulerRef,
141    /// Compaction background job pool.
142    compact_job_pool: SchedulerRef,
143    /// Scheduler for index build jobs.
144    index_build_job_pool: SchedulerRef,
145    /// Scheduler for file purgers.
146    purge_scheduler: SchedulerRef,
147    /// Cache.
148    cache_manager: CacheManagerRef,
149    /// File reference manager.
150    file_ref_manager: FileReferenceManagerRef,
151    /// Gc limiter to limit concurrent gc jobs.
152    gc_limiter: GcLimiterRef,
153    /// Object store manager.
154    object_store_manager: ObjectStoreManagerRef,
155    /// Puffin manager factory.
156    puffin_manager_factory: PuffinManagerFactory,
157    /// Intermediate manager.
158    intermediate_manager: IntermediateManager,
159    /// Schema metadata manager.
160    schema_metadata_manager: SchemaMetadataManagerRef,
161}
162
163impl WorkerGroup {
164    /// Starts a worker group.
165    ///
166    /// The number of workers should be power of two.
167    pub(crate) async fn start<S: LogStore>(
168        config: Arc<MitoConfig>,
169        log_store: Arc<S>,
170        object_store_manager: ObjectStoreManagerRef,
171        schema_metadata_manager: SchemaMetadataManagerRef,
172        file_ref_manager: FileReferenceManagerRef,
173        partition_expr_fetcher: PartitionExprFetcherRef,
174        plugins: Plugins,
175    ) -> Result<WorkerGroup> {
176        let (flush_sender, flush_receiver) = watch::channel(());
177        let write_buffer_manager = Arc::new(
178            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
179                .with_notifier(flush_sender.clone()),
180        );
181        let puffin_manager_factory = PuffinManagerFactory::new(
182            &config.index.aux_path,
183            config.index.staging_size.as_bytes(),
184            Some(config.index.write_buffer_size.as_bytes() as _),
185            config.index.staging_ttl,
186        )
187        .await?;
188        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
189            .await?
190            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
191        let index_build_job_pool =
192            Arc::new(LocalScheduler::new(config.max_background_index_builds));
193        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
194        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
195        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
196        // We use another scheduler to avoid purge jobs blocking other jobs.
197        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
198        let write_cache = write_cache_from_config(
199            &config,
200            puffin_manager_factory.clone(),
201            intermediate_manager.clone(),
202        )
203        .await?;
204        let cache_manager = Arc::new(
205            CacheManager::builder()
206                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
207                .vector_cache_size(config.vector_cache_size.as_bytes())
208                .page_cache_size(config.page_cache_size.as_bytes())
209                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
210                .range_result_cache_size(config.range_result_cache_size.as_bytes())
211                .prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes())
212                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
213                .index_content_size(config.index.content_cache_size.as_bytes())
214                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
215                .index_result_cache_size(config.index.result_cache_size.as_bytes())
216                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
217                .write_cache(write_cache)
218                .build(),
219        );
220        let time_provider = Arc::new(StdTimeProvider);
221        let total_memory = get_total_memory_bytes();
222        let total_memory = if total_memory > 0 {
223            total_memory as u64
224        } else {
225            0
226        };
227        let compaction_limit_bytes = config
228            .experimental_compaction_memory_limit
229            .resolve(total_memory);
230        let compaction_memory_manager =
231            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
232        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
233
234        let workers = (0..config.num_workers)
235            .map(|id| {
236                WorkerStarter {
237                    id: id as WorkerId,
238                    config: config.clone(),
239                    log_store: log_store.clone(),
240                    object_store_manager: object_store_manager.clone(),
241                    write_buffer_manager: write_buffer_manager.clone(),
242                    index_build_job_pool: index_build_job_pool.clone(),
243                    flush_job_pool: flush_job_pool.clone(),
244                    compact_job_pool: compact_job_pool.clone(),
245                    purge_scheduler: purge_scheduler.clone(),
246                    listener: WorkerListener::default(),
247                    cache_manager: cache_manager.clone(),
248                    compaction_memory_manager: compaction_memory_manager.clone(),
249                    puffin_manager_factory: puffin_manager_factory.clone(),
250                    intermediate_manager: intermediate_manager.clone(),
251                    time_provider: time_provider.clone(),
252                    flush_sender: flush_sender.clone(),
253                    flush_receiver: flush_receiver.clone(),
254                    plugins: plugins.clone(),
255                    schema_metadata_manager: schema_metadata_manager.clone(),
256                    file_ref_manager: file_ref_manager.clone(),
257                    partition_expr_fetcher: partition_expr_fetcher.clone(),
258                    flush_semaphore: flush_semaphore.clone(),
259                }
260                .start()
261            })
262            .collect::<Result<Vec<_>>>()?;
263
264        Ok(WorkerGroup {
265            workers,
266            flush_job_pool,
267            compact_job_pool,
268            index_build_job_pool,
269            purge_scheduler,
270            cache_manager,
271            file_ref_manager,
272            gc_limiter,
273            object_store_manager,
274            puffin_manager_factory,
275            intermediate_manager,
276            schema_metadata_manager,
277        })
278    }
279
280    /// Stops the worker group.
281    pub(crate) async fn stop(&self) -> Result<()> {
282        info!("Stop region worker group");
283
284        // TODO(yingwen): Do we need to stop gracefully?
285        // Stops the scheduler gracefully.
286        self.compact_job_pool.stop(true).await?;
287        // Stops the scheduler gracefully.
288        self.flush_job_pool.stop(true).await?;
289        // Stops the purge scheduler gracefully.
290        self.purge_scheduler.stop(true).await?;
291        // Stops the index build job pool gracefully.
292        self.index_build_job_pool.stop(true).await?;
293
294        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
295
296        Ok(())
297    }
298
299    /// Submits a request to a worker in the group.
300    pub(crate) async fn submit_to_worker(
301        &self,
302        region_id: RegionId,
303        request: WorkerRequest,
304    ) -> Result<()> {
305        self.worker(region_id).submit_request(request).await
306    }
307
308    /// Returns true if the specific region exists.
309    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
310        self.worker(region_id).is_region_exists(region_id)
311    }
312
313    /// Returns true if the specific region is opening.
314    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
315        self.worker(region_id).is_region_opening(region_id)
316    }
317
318    /// Returns true if the specific region is catching up.
319    pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
320        self.worker(region_id).is_region_catching_up(region_id)
321    }
322
323    /// Returns region of specific `region_id`.
324    ///
325    /// This method should not be public.
326    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
327        self.worker(region_id).get_region(region_id)
328    }
329
330    /// Returns cache of the group.
331    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
332        self.cache_manager.clone()
333    }
334
335    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
336        self.file_ref_manager.clone()
337    }
338
339    pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
340        self.gc_limiter.clone()
341    }
342
343    /// Get worker for specific `region_id`.
344    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
345        let index = region_id_to_index(region_id, self.workers.len());
346
347        &self.workers[index]
348    }
349
350    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
351        self.workers
352            .iter()
353            .flat_map(|worker| worker.regions.list_regions())
354    }
355
356    pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef {
357        &self.object_store_manager
358    }
359
360    pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
361        &self.puffin_manager_factory
362    }
363
364    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
365        &self.intermediate_manager
366    }
367
368    pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
369        &self.schema_metadata_manager
370    }
371}
372
373// Tests methods.
374#[cfg(any(test, feature = "test"))]
375impl WorkerGroup {
376    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
377    ///
378    /// The number of workers should be power of two.
379    #[allow(clippy::too_many_arguments)]
380    pub(crate) async fn start_for_test<S: LogStore>(
381        config: Arc<MitoConfig>,
382        log_store: Arc<S>,
383        object_store_manager: ObjectStoreManagerRef,
384        write_buffer_manager: Option<WriteBufferManagerRef>,
385        listener: Option<crate::engine::listener::EventListenerRef>,
386        schema_metadata_manager: SchemaMetadataManagerRef,
387        file_ref_manager: FileReferenceManagerRef,
388        time_provider: TimeProviderRef,
389        partition_expr_fetcher: PartitionExprFetcherRef,
390    ) -> Result<WorkerGroup> {
391        let (flush_sender, flush_receiver) = watch::channel(());
392        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
393            Arc::new(
394                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
395                    .with_notifier(flush_sender.clone()),
396            )
397        });
398        let index_build_job_pool =
399            Arc::new(LocalScheduler::new(config.max_background_index_builds));
400        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
401        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
402        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
403        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
404        let puffin_manager_factory = PuffinManagerFactory::new(
405            &config.index.aux_path,
406            config.index.staging_size.as_bytes(),
407            Some(config.index.write_buffer_size.as_bytes() as _),
408            config.index.staging_ttl,
409        )
410        .await?;
411        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
412            .await?
413            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
414        let write_cache = write_cache_from_config(
415            &config,
416            puffin_manager_factory.clone(),
417            intermediate_manager.clone(),
418        )
419        .await?;
420        let cache_manager = Arc::new(
421            CacheManager::builder()
422                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
423                .vector_cache_size(config.vector_cache_size.as_bytes())
424                .page_cache_size(config.page_cache_size.as_bytes())
425                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
426                .range_result_cache_size(config.range_result_cache_size.as_bytes())
427                .prefilter_result_cache_size(config.prefilter_result_cache_size.as_bytes())
428                .write_cache(write_cache)
429                .build(),
430        );
431        let total_memory = get_total_memory_bytes();
432        let total_memory = if total_memory > 0 {
433            total_memory as u64
434        } else {
435            0
436        };
437        let compaction_limit_bytes = config
438            .experimental_compaction_memory_limit
439            .resolve(total_memory);
440        let compaction_memory_manager =
441            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
442        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
443        let workers = (0..config.num_workers)
444            .map(|id| {
445                WorkerStarter {
446                    id: id as WorkerId,
447                    config: config.clone(),
448                    log_store: log_store.clone(),
449                    object_store_manager: object_store_manager.clone(),
450                    write_buffer_manager: write_buffer_manager.clone(),
451                    index_build_job_pool: index_build_job_pool.clone(),
452                    flush_job_pool: flush_job_pool.clone(),
453                    compact_job_pool: compact_job_pool.clone(),
454                    purge_scheduler: purge_scheduler.clone(),
455                    listener: WorkerListener::new(listener.clone()),
456                    cache_manager: cache_manager.clone(),
457                    compaction_memory_manager: compaction_memory_manager.clone(),
458                    puffin_manager_factory: puffin_manager_factory.clone(),
459                    intermediate_manager: intermediate_manager.clone(),
460                    time_provider: time_provider.clone(),
461                    flush_sender: flush_sender.clone(),
462                    flush_receiver: flush_receiver.clone(),
463                    plugins: Plugins::new(),
464                    schema_metadata_manager: schema_metadata_manager.clone(),
465                    file_ref_manager: file_ref_manager.clone(),
466                    partition_expr_fetcher: partition_expr_fetcher.clone(),
467                    flush_semaphore: flush_semaphore.clone(),
468                }
469                .start()
470            })
471            .collect::<Result<Vec<_>>>()?;
472
473        Ok(WorkerGroup {
474            workers,
475            flush_job_pool,
476            compact_job_pool,
477            index_build_job_pool,
478            purge_scheduler,
479            cache_manager,
480            file_ref_manager,
481            gc_limiter,
482            object_store_manager,
483            puffin_manager_factory,
484            intermediate_manager,
485            schema_metadata_manager,
486        })
487    }
488
489    /// Returns the purge scheduler.
490    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
491        &self.purge_scheduler
492    }
493}
494
495fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
496    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
497        % num_workers
498}
499
500pub async fn write_cache_from_config(
501    config: &MitoConfig,
502    puffin_manager_factory: PuffinManagerFactory,
503    intermediate_manager: IntermediateManager,
504) -> Result<Option<WriteCacheRef>> {
505    if !config.enable_write_cache {
506        return Ok(None);
507    }
508
509    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
510        .await
511        .context(CreateDirSnafu {
512            dir: &config.write_cache_path,
513        })?;
514
515    let cache = WriteCache::new_fs(
516        &config.write_cache_path,
517        config.write_cache_size,
518        config.write_cache_ttl,
519        Some(config.index_cache_percent),
520        config.enable_refill_cache_on_read,
521        puffin_manager_factory,
522        intermediate_manager,
523        config.manifest_cache_size,
524    )
525    .await?;
526    Ok(Some(Arc::new(cache)))
527}
528
529/// Computes a initial check delay for a worker.
530pub(crate) fn worker_init_check_delay() -> Duration {
531    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
532    Duration::from_secs(init_check_delay)
533}
534
535/// Worker start config.
536struct WorkerStarter<S> {
537    id: WorkerId,
538    config: Arc<MitoConfig>,
539    log_store: Arc<S>,
540    object_store_manager: ObjectStoreManagerRef,
541    write_buffer_manager: WriteBufferManagerRef,
542    compact_job_pool: SchedulerRef,
543    index_build_job_pool: SchedulerRef,
544    flush_job_pool: SchedulerRef,
545    purge_scheduler: SchedulerRef,
546    listener: WorkerListener,
547    cache_manager: CacheManagerRef,
548    compaction_memory_manager: Arc<CompactionMemoryManager>,
549    puffin_manager_factory: PuffinManagerFactory,
550    intermediate_manager: IntermediateManager,
551    time_provider: TimeProviderRef,
552    /// Watch channel sender to notify workers to handle stalled requests.
553    flush_sender: watch::Sender<()>,
554    /// Watch channel receiver to wait for background flush job.
555    flush_receiver: watch::Receiver<()>,
556    plugins: Plugins,
557    schema_metadata_manager: SchemaMetadataManagerRef,
558    file_ref_manager: FileReferenceManagerRef,
559    partition_expr_fetcher: PartitionExprFetcherRef,
560    flush_semaphore: Arc<Semaphore>,
561}
562
563impl<S: LogStore> WorkerStarter<S> {
564    /// Starts a region worker and its background thread.
565    fn start(self) -> Result<RegionWorker> {
566        let regions = Arc::new(RegionMap::default());
567        let opening_regions = Arc::new(OpeningRegions::default());
568        let catchup_regions = Arc::new(CatchupRegions::default());
569        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
570
571        let running = Arc::new(AtomicBool::new(true));
572        let now = self.time_provider.current_time_millis();
573        let id_string = self.id.to_string();
574        let mut worker_thread = RegionWorkerLoop {
575            id: self.id,
576            config: self.config.clone(),
577            regions: regions.clone(),
578            catchup_regions: catchup_regions.clone(),
579            dropping_regions: Arc::new(RegionMap::default()),
580            opening_regions: opening_regions.clone(),
581            sender: sender.clone(),
582            receiver,
583            wal: Wal::new(self.log_store),
584            object_store_manager: self.object_store_manager.clone(),
585            running: running.clone(),
586            memtable_builder_provider: MemtableBuilderProvider::new(
587                Some(self.write_buffer_manager.clone()),
588                self.config.clone(),
589            ),
590            purge_scheduler: self.purge_scheduler.clone(),
591            write_buffer_manager: self.write_buffer_manager,
592            index_build_scheduler: IndexBuildScheduler::new(
593                self.index_build_job_pool,
594                self.config.max_background_index_builds,
595            ),
596            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
597            compaction_scheduler: CompactionScheduler::new(
598                self.compact_job_pool,
599                sender.clone(),
600                self.cache_manager.clone(),
601                self.config.clone(),
602                self.listener.clone(),
603                self.plugins.clone(),
604                self.compaction_memory_manager.clone(),
605                self.config.experimental_compaction_on_exhausted,
606            ),
607            stalled_requests: StalledRequests::default(),
608            listener: self.listener,
609            cache_manager: self.cache_manager,
610            puffin_manager_factory: self.puffin_manager_factory,
611            intermediate_manager: self.intermediate_manager,
612            time_provider: self.time_provider,
613            last_periodical_check_millis: now,
614            flush_sender: self.flush_sender,
615            flush_receiver: self.flush_receiver,
616            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
617            region_count: REGION_COUNT.with_label_values(&[&id_string]),
618            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
619            region_edit_queues: RegionEditQueues::default(),
620            schema_metadata_manager: self.schema_metadata_manager,
621            file_ref_manager: self.file_ref_manager.clone(),
622            partition_expr_fetcher: self.partition_expr_fetcher,
623            flush_semaphore: self.flush_semaphore,
624            plugins: self.plugins,
625        };
626        let handle = common_runtime::spawn_global(async move {
627            worker_thread.run().await;
628        });
629
630        Ok(RegionWorker {
631            id: self.id,
632            regions,
633            opening_regions,
634            catchup_regions,
635            sender,
636            handle: Mutex::new(Some(handle)),
637            running,
638        })
639    }
640}
641
642/// Worker to write and alter regions bound to it.
643pub(crate) struct RegionWorker {
644    /// Id of the worker.
645    id: WorkerId,
646    /// Regions bound to the worker.
647    regions: RegionMapRef,
648    /// The opening regions.
649    opening_regions: OpeningRegionsRef,
650    /// The catching up regions.
651    catchup_regions: CatchupRegionsRef,
652    /// Request sender.
653    sender: Sender<WorkerRequestWithTime>,
654    /// Handle to the worker thread.
655    handle: Mutex<Option<JoinHandle<()>>>,
656    /// Whether to run the worker thread.
657    running: Arc<AtomicBool>,
658}
659
660impl RegionWorker {
661    /// Submits request to background worker thread.
662    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
663        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
664        let request_with_time = WorkerRequestWithTime::new(request);
665        if self.sender.send(request_with_time).await.is_err() {
666            warn!(
667                "Worker {} is already exited but the running flag is still true",
668                self.id
669            );
670            // Manually set the running flag to false to avoid printing more warning logs.
671            self.set_running(false);
672            return WorkerStoppedSnafu { id: self.id }.fail();
673        }
674
675        Ok(())
676    }
677
678    /// Stop the worker.
679    ///
680    /// This method waits until the worker thread exists.
681    async fn stop(&self) -> Result<()> {
682        let handle = self.handle.lock().await.take();
683        if let Some(handle) = handle {
684            info!("Stop region worker {}", self.id);
685
686            self.set_running(false);
687            if self
688                .sender
689                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
690                .await
691                .is_err()
692            {
693                warn!("Worker {} is already exited before stop", self.id);
694            }
695
696            handle.await.context(JoinSnafu)?;
697        }
698
699        Ok(())
700    }
701
702    /// Returns true if the worker is still running.
703    fn is_running(&self) -> bool {
704        self.running.load(Ordering::Relaxed)
705    }
706
707    /// Sets whether the worker is still running.
708    fn set_running(&self, value: bool) {
709        self.running.store(value, Ordering::Relaxed)
710    }
711
712    /// Returns true if the worker contains specific region.
713    fn is_region_exists(&self, region_id: RegionId) -> bool {
714        self.regions.is_region_exists(region_id)
715    }
716
717    /// Returns true if the region is opening.
718    fn is_region_opening(&self, region_id: RegionId) -> bool {
719        self.opening_regions.is_region_exists(region_id)
720    }
721
722    /// Returns true if the region is catching up.
723    fn is_region_catching_up(&self, region_id: RegionId) -> bool {
724        self.catchup_regions.is_region_exists(region_id)
725    }
726
727    /// Returns region of specific `region_id`.
728    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
729        self.regions.get_region(region_id)
730    }
731
732    #[cfg(test)]
733    /// Returns the [OpeningRegionsRef].
734    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
735        &self.opening_regions
736    }
737
738    #[cfg(test)]
739    /// Returns the [CatchupRegionsRef].
740    pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
741        &self.catchup_regions
742    }
743}
744
745impl Drop for RegionWorker {
746    fn drop(&mut self) {
747        if self.is_running() {
748            self.set_running(false);
749            // Once we drop the sender, the worker thread will receive a disconnected error.
750        }
751    }
752}
753
754type RequestBuffer = Vec<WorkerRequest>;
755
756/// Buffer for stalled write requests.
757///
758/// Maintains stalled write requests and their estimated size.
759#[derive(Default)]
760pub(crate) struct StalledRequests {
761    /// Stalled requests.
762    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
763    /// instead of `StalledRequests::requests.len()`.
764    ///
765    /// Key: RegionId
766    /// Value: (estimated size, stalled requests)
767    pub(crate) requests:
768        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
769    /// Estimated size of all stalled requests.
770    pub(crate) estimated_size: usize,
771}
772
773impl StalledRequests {
774    /// Appends stalled requests.
775    pub(crate) fn append(
776        &mut self,
777        requests: &mut Vec<SenderWriteRequest>,
778        bulk_requests: &mut Vec<SenderBulkRequest>,
779    ) {
780        for req in requests.drain(..) {
781            self.push(req);
782        }
783        for req in bulk_requests.drain(..) {
784            self.push_bulk(req);
785        }
786    }
787
788    /// Pushes a stalled request to the buffer.
789    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
790        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
791        let req_size = req.request.estimated_size();
792        *size += req_size;
793        self.estimated_size += req_size;
794        requests.push(req);
795    }
796
797    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
798        let region_id = req.region_id;
799        let (size, _, requests) = self.requests.entry(region_id).or_default();
800        let req_size = req.request.estimated_size();
801        *size += req_size;
802        self.estimated_size += req_size;
803        requests.push(req);
804    }
805
806    /// Removes stalled requests of specific region.
807    pub(crate) fn remove(
808        &mut self,
809        region_id: &RegionId,
810    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
811        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
812            self.estimated_size -= size;
813            (write_reqs, bulk_reqs)
814        } else {
815            (vec![], vec![])
816        }
817    }
818
819    /// Returns the total number of all stalled requests.
820    pub(crate) fn stalled_count(&self) -> usize {
821        self.requests
822            .values()
823            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
824            .sum()
825    }
826}
827
828/// Background worker loop to handle requests.
829struct RegionWorkerLoop<S> {
830    /// Id of the worker.
831    id: WorkerId,
832    /// Engine config.
833    config: Arc<MitoConfig>,
834    /// Regions bound to the worker.
835    regions: RegionMapRef,
836    /// Regions that are not yet fully dropped.
837    dropping_regions: RegionMapRef,
838    /// Regions that are opening.
839    opening_regions: OpeningRegionsRef,
840    /// Regions that are catching up.
841    catchup_regions: CatchupRegionsRef,
842    /// Request sender.
843    sender: Sender<WorkerRequestWithTime>,
844    /// Request receiver.
845    receiver: Receiver<WorkerRequestWithTime>,
846    /// WAL of the engine.
847    wal: Wal<S>,
848    /// Manages object stores for manifest and SSTs.
849    object_store_manager: ObjectStoreManagerRef,
850    /// Whether the worker thread is still running.
851    running: Arc<AtomicBool>,
852    /// Memtable builder provider for each region.
853    memtable_builder_provider: MemtableBuilderProvider,
854    /// Background purge job scheduler.
855    purge_scheduler: SchedulerRef,
856    /// Engine write buffer manager.
857    write_buffer_manager: WriteBufferManagerRef,
858    /// Scheduler for index build task.
859    index_build_scheduler: IndexBuildScheduler,
860    /// Schedules background flush requests.
861    flush_scheduler: FlushScheduler,
862    /// Scheduler for compaction tasks.
863    compaction_scheduler: CompactionScheduler,
864    /// Stalled write requests.
865    stalled_requests: StalledRequests,
866    /// Event listener for tests.
867    listener: WorkerListener,
868    /// Cache.
869    cache_manager: CacheManagerRef,
870    /// Puffin manager factory for index.
871    puffin_manager_factory: PuffinManagerFactory,
872    /// Intermediate manager for inverted index.
873    intermediate_manager: IntermediateManager,
874    /// Provider to get current time.
875    time_provider: TimeProviderRef,
876    /// Last time to check regions periodically.
877    last_periodical_check_millis: i64,
878    /// Watch channel sender to notify workers to handle stalled requests.
879    flush_sender: watch::Sender<()>,
880    /// Watch channel receiver to wait for background flush job.
881    flush_receiver: watch::Receiver<()>,
882    /// Gauge of stalling request count.
883    stalling_count: IntGauge,
884    /// Gauge of regions in the worker.
885    region_count: IntGauge,
886    /// Histogram of request wait time for this worker.
887    request_wait_time: Histogram,
888    /// Queues for region edit requests.
889    region_edit_queues: RegionEditQueues,
890    /// Database level metadata manager.
891    schema_metadata_manager: SchemaMetadataManagerRef,
892    /// Datanode level file references manager.
893    file_ref_manager: FileReferenceManagerRef,
894    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
895    partition_expr_fetcher: PartitionExprFetcherRef,
896    /// Semaphore to control flush concurrency.
897    flush_semaphore: Arc<Semaphore>,
898    /// Plugins for flush hooks.
899    plugins: Plugins,
900}
901
902impl<S: LogStore> RegionWorkerLoop<S> {
903    /// Starts the worker loop.
904    async fn run(&mut self) {
905        let init_check_delay = worker_init_check_delay();
906        info!(
907            "Start region worker thread {}, init_check_delay: {:?}",
908            self.id, init_check_delay
909        );
910        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
911
912        // Buffer to retrieve requests from receiver.
913        let mut write_req_buffer: Vec<SenderWriteRequest> =
914            Vec::with_capacity(self.config.worker_request_batch_size);
915        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
916            Vec::with_capacity(self.config.worker_request_batch_size);
917        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
918            Vec::with_capacity(self.config.worker_request_batch_size);
919        let mut general_req_buffer: Vec<WorkerRequest> =
920            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
921
922        while self.running.load(Ordering::Relaxed) {
923            // Clear the buffer before handling next batch of requests.
924            write_req_buffer.clear();
925            ddl_req_buffer.clear();
926            general_req_buffer.clear();
927            let mut bulk_insert_req_num = 0;
928
929            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
930            let sleep = tokio::time::sleep(max_wait_time);
931            tokio::pin!(sleep);
932
933            tokio::select! {
934                request_opt = self.receiver.recv() => {
935                    match request_opt {
936                        Some(request_with_time) => {
937                            // Observe the wait time
938                            let wait_time = request_with_time.created_at.elapsed();
939                            self.request_wait_time.observe(wait_time.as_secs_f64());
940
941                            match request_with_time.request {
942                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
943                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
944                                WorkerRequest::BulkInserts(bulk_insert) => {
945                                    bulk_insert_req_num += 1;
946                                    self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
947                                        .await;
948                                }
949                                req => general_req_buffer.push(req),
950                            }
951                        },
952                        // The channel is disconnected.
953                        None => break,
954                    }
955                }
956                recv_res = self.flush_receiver.changed() => {
957                    if recv_res.is_err() {
958                        // The channel is disconnected.
959                        break;
960                    } else {
961                        // Also flush this worker if other workers trigger flush as this worker may have
962                        // a large memtable to flush. We may not have chance to flush that memtable if we
963                        // never write to this worker. So only flushing other workers may not release enough
964                        // memory.
965                        self.maybe_flush_worker();
966                        // A flush job is finished, handles stalled requests.
967                        self.handle_stalled_requests().await;
968                        continue;
969                    }
970                }
971                _ = &mut sleep => {
972                    // Timeout. Checks periodical tasks.
973                    self.handle_periodical_tasks();
974                    continue;
975                }
976            }
977
978            if self.flush_receiver.has_changed().unwrap_or(false) {
979                // Always checks whether we could process stalled requests to avoid a request
980                // hangs too long.
981                // If the channel is closed, do nothing.
982                self.handle_stalled_requests().await;
983            }
984
985            // Try to recv more requests from the channel.
986            for _ in 1..self.config.worker_request_batch_size {
987                // We have received one request so we start from 1.
988                match self.receiver.try_recv() {
989                    Ok(request_with_time) => {
990                        // Observe the wait time
991                        let wait_time = request_with_time.created_at.elapsed();
992                        self.request_wait_time.observe(wait_time.as_secs_f64());
993
994                        match request_with_time.request {
995                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
996                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
997                            WorkerRequest::BulkInserts(bulk_insert) => {
998                                bulk_insert_req_num += 1;
999                                self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
1000                                    .await
1001                            }
1002                            req => general_req_buffer.push(req),
1003                        }
1004                    }
1005                    // We still need to handle remaining requests.
1006                    Err(_) => break,
1007                }
1008            }
1009
1010            self.listener.on_recv_requests(
1011                write_req_buffer.len()
1012                    + ddl_req_buffer.len()
1013                    + general_req_buffer.len()
1014                    + bulk_insert_req_num,
1015            );
1016
1017            self.handle_requests(
1018                &mut write_req_buffer,
1019                &mut ddl_req_buffer,
1020                &mut general_req_buffer,
1021                &mut bulk_req_buffer,
1022            )
1023            .await;
1024
1025            self.handle_periodical_tasks();
1026        }
1027
1028        self.clean().await;
1029
1030        info!("Exit region worker thread {}", self.id);
1031    }
1032
1033    async fn buffer_bulk_insert_request(
1034        &mut self,
1035        bulk_insert: BulkInsertRequest,
1036        bulk_requests: &mut Vec<SenderBulkRequest>,
1037    ) {
1038        let BulkInsertRequest {
1039            metadata,
1040            request,
1041            sender,
1042        } = bulk_insert;
1043
1044        if let Some(region_metadata) = metadata {
1045            self.handle_bulk_insert_batch(region_metadata, request, bulk_requests, sender)
1046                .await;
1047        } else {
1048            error!("Cannot find region metadata for {}", request.region_id);
1049            sender.send(
1050                error::RegionNotFoundSnafu {
1051                    region_id: request.region_id,
1052                }
1053                .fail(),
1054            );
1055        }
1056    }
1057
1058    /// Dispatches and processes requests.
1059    ///
1060    /// `general_requests` should not contain categorized write, ddl, or bulk insert requests.
1061    async fn handle_requests(
1062        &mut self,
1063        write_requests: &mut Vec<SenderWriteRequest>,
1064        ddl_requests: &mut Vec<SenderDdlRequest>,
1065        general_requests: &mut Vec<WorkerRequest>,
1066        bulk_requests: &mut Vec<SenderBulkRequest>,
1067    ) {
1068        for worker_req in general_requests.drain(..) {
1069            match worker_req {
1070                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
1071                    // These requests are categorized before dispatching general requests.
1072                    continue;
1073                }
1074                WorkerRequest::BulkInserts(_) => unreachable!("bulk inserts are buffered"),
1075                WorkerRequest::Background { region_id, notify } => {
1076                    if matches!(
1077                        &notify,
1078                        BackgroundNotify::RegionEdit(edit_result)
1079                            if edit_result.update_region_state
1080                    ) {
1081                        // Region state must be Editing when reach here.
1082                        // This call only moves write/bulk write request into stall queue. When region edit result
1083                        // is processed inside handle_background_notify and region state is switched back to Writable,
1084                        // stalled request will be processed before the next region edit is dequeued from
1085                        // RegionEditQueue immediately in handle_region_edit_result. It not only ensured pending writes
1086                        // are processed in time, but also prevents them from starvation.
1087                        // TODO(hl): maybe we need to merge those queues for pending requests like pending_ddl,
1088                        // region edits and stalled request, so we can simplify the coordination between these queues.
1089                        self.handle_buffered_region_write_requests(
1090                            &region_id,
1091                            write_requests,
1092                            bulk_requests,
1093                        )
1094                        .await;
1095                    }
1096                    // For background notify, we handle it directly.
1097                    self.handle_background_notify(region_id, notify).await;
1098                }
1099                WorkerRequest::SetRegionRoleStateGracefully {
1100                    region_id,
1101                    region_role_state,
1102                    sender,
1103                } => {
1104                    self.set_role_state_gracefully(region_id, region_role_state, sender)
1105                        .await;
1106                }
1107                WorkerRequest::EditRegion(request) => {
1108                    self.handle_region_edit(request);
1109                }
1110                WorkerRequest::Stop => {
1111                    debug_assert!(!self.running.load(Ordering::Relaxed));
1112                }
1113                WorkerRequest::SyncRegion(req) => {
1114                    self.handle_region_sync(req).await;
1115                }
1116                WorkerRequest::RemapManifests(req) => {
1117                    self.handle_remap_manifests_request(req);
1118                }
1119                WorkerRequest::CopyRegionFrom(req) => {
1120                    self.handle_copy_region_from_request(req);
1121                }
1122            }
1123        }
1124
1125        // Handles all write requests first. So we can alter regions without
1126        // considering existing write requests.
1127        self.handle_write_requests(write_requests, bulk_requests, true)
1128            .await;
1129
1130        self.handle_ddl_requests(ddl_requests).await;
1131    }
1132
1133    /// Takes and handles all ddl requests.
1134    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1135        if ddl_requests.is_empty() {
1136            return;
1137        }
1138
1139        for ddl in ddl_requests.drain(..) {
1140            let res = match ddl.request {
1141                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1142                DdlRequest::Drop(req) => {
1143                    self.handle_drop_request(ddl.region_id, req.partial_drop)
1144                        .await
1145                }
1146                DdlRequest::Open((req, wal_entry_receiver)) => {
1147                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1148                        .await;
1149                    continue;
1150                }
1151                DdlRequest::Close(_) => {
1152                    self.handle_close_request(ddl.region_id, ddl.sender).await;
1153                    continue;
1154                }
1155                DdlRequest::Alter(req) => {
1156                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
1157                        .await;
1158                    continue;
1159                }
1160                DdlRequest::Flush(req) => {
1161                    self.handle_flush_request(ddl.region_id, req, ddl.sender);
1162                    continue;
1163                }
1164                DdlRequest::Compact(req) => {
1165                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1166                        .await;
1167                    continue;
1168                }
1169                DdlRequest::BuildIndex(req) => {
1170                    self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1171                        .await;
1172                    continue;
1173                }
1174                DdlRequest::Truncate(req) => {
1175                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1176                        .await;
1177                    continue;
1178                }
1179                DdlRequest::Catchup((req, wal_entry_receiver)) => {
1180                    self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1181                        .await;
1182                    continue;
1183                }
1184                DdlRequest::EnterStaging(req) => {
1185                    self.handle_enter_staging_request(
1186                        ddl.region_id,
1187                        req.partition_directive,
1188                        ddl.sender,
1189                    )
1190                    .await;
1191                    continue;
1192                }
1193                DdlRequest::ApplyStagingManifest(req) => {
1194                    self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1195                        .await;
1196                    continue;
1197                }
1198            };
1199
1200            ddl.sender.send(res);
1201        }
1202    }
1203
1204    /// Handle periodical tasks such as region auto flush.
1205    fn handle_periodical_tasks(&mut self) {
1206        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1207        if self
1208            .time_provider
1209            .elapsed_since(self.last_periodical_check_millis)
1210            < interval
1211        {
1212            return;
1213        }
1214
1215        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1216
1217        if let Err(e) = self.flush_periodically() {
1218            error!(e; "Failed to flush regions periodically");
1219        }
1220    }
1221
1222    /// Handles region background request
1223    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1224        match notify {
1225            BackgroundNotify::FlushFinished(req) => {
1226                self.handle_flush_finished(region_id, req).await
1227            }
1228            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1229            BackgroundNotify::IndexBuildFinished(req) => {
1230                self.handle_index_build_finished(region_id, req).await
1231            }
1232            BackgroundNotify::IndexBuildStopped(req) => {
1233                self.handle_index_build_stopped(region_id, req).await
1234            }
1235            BackgroundNotify::IndexBuildFailed(req) => {
1236                self.handle_index_build_failed(region_id, req).await
1237            }
1238            BackgroundNotify::CompactionFinished(req) => {
1239                self.handle_compaction_finished(region_id, req).await
1240            }
1241            BackgroundNotify::CompactionCancelled(req) => {
1242                self.handle_compaction_cancelled(region_id, req).await
1243            }
1244            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1245            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1246            BackgroundNotify::RegionChange(req) => {
1247                self.handle_manifest_region_change_result(req).await
1248            }
1249            BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1250            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1251            BackgroundNotify::CopyRegionFromFinished(req) => {
1252                self.handle_copy_region_from_finished(req)
1253            }
1254        }
1255    }
1256
1257    /// Handles `set_region_role_gracefully`.
1258    async fn set_role_state_gracefully(
1259        &mut self,
1260        region_id: RegionId,
1261        region_role_state: SettableRegionRoleState,
1262        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1263    ) {
1264        if let Some(region) = self.regions.get_region(region_id) {
1265            // We need to do this in background as we need the manifest lock.
1266            common_runtime::spawn_global(async move {
1267                match region.set_role_state_gracefully(region_role_state).await {
1268                    Ok(()) => {
1269                        let last_entry_id = region.version_control.current().last_entry_id;
1270                        let _ = sender.send(SetRegionRoleStateResponse::success(
1271                            SetRegionRoleStateSuccess::mito(last_entry_id),
1272                        ));
1273                    }
1274                    Err(e) => {
1275                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1276                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1277                            BoxedError::new(e),
1278                        ));
1279                    }
1280                }
1281            });
1282        } else {
1283            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1284        }
1285    }
1286}
1287
1288impl<S> RegionWorkerLoop<S> {
1289    /// Cleans up the worker.
1290    async fn clean(&self) {
1291        // Closes remaining regions.
1292        let regions = self.regions.list_regions();
1293        for region in regions {
1294            region.stop().await;
1295        }
1296
1297        self.regions.clear();
1298    }
1299
1300    /// Notifies the whole group that a flush job is finished so other
1301    /// workers can handle stalled requests.
1302    fn notify_group(&mut self) {
1303        // Notifies all receivers.
1304        let _ = self.flush_sender.send(());
1305        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1306        self.flush_receiver.borrow_and_update();
1307    }
1308}
1309
1310/// Wrapper that only calls event listener in tests.
1311#[derive(Default, Clone)]
1312pub(crate) struct WorkerListener {
1313    #[cfg(any(test, feature = "test"))]
1314    listener: Option<crate::engine::listener::EventListenerRef>,
1315}
1316
1317impl WorkerListener {
1318    #[cfg(any(test, feature = "test"))]
1319    pub(crate) fn new(
1320        listener: Option<crate::engine::listener::EventListenerRef>,
1321    ) -> WorkerListener {
1322        WorkerListener { listener }
1323    }
1324
1325    /// Flush is finished successfully.
1326    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1327        #[cfg(any(test, feature = "test"))]
1328        if let Some(listener) = &self.listener {
1329            listener.on_flush_success(region_id);
1330        }
1331        // Avoid compiler warning.
1332        let _ = region_id;
1333    }
1334
1335    /// Engine is stalled.
1336    pub(crate) fn on_write_stall(&self) {
1337        #[cfg(any(test, feature = "test"))]
1338        if let Some(listener) = &self.listener {
1339            listener.on_write_stall();
1340        }
1341    }
1342
1343    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1344        #[cfg(any(test, feature = "test"))]
1345        if let Some(listener) = &self.listener {
1346            listener.on_flush_begin(region_id).await;
1347        }
1348        // Avoid compiler warning.
1349        let _ = region_id;
1350    }
1351
1352    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1353        #[cfg(any(test, feature = "test"))]
1354        if let Some(listener) = &self.listener {
1355            return listener.on_later_drop_begin(region_id);
1356        }
1357        // Avoid compiler warning.
1358        let _ = region_id;
1359        None
1360    }
1361
1362    /// On later drop task is finished.
1363    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1364        #[cfg(any(test, feature = "test"))]
1365        if let Some(listener) = &self.listener {
1366            listener.on_later_drop_end(region_id, removed);
1367        }
1368        // Avoid compiler warning.
1369        let _ = region_id;
1370        let _ = removed;
1371    }
1372
1373    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1374        #[cfg(any(test, feature = "test"))]
1375        if let Some(listener) = &self.listener {
1376            listener.on_merge_ssts_finished(region_id).await;
1377        }
1378        // Avoid compiler warning.
1379        let _ = region_id;
1380    }
1381
1382    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1383        #[cfg(any(test, feature = "test"))]
1384        if let Some(listener) = &self.listener {
1385            listener.on_recv_requests(request_num);
1386        }
1387        // Avoid compiler warning.
1388        let _ = request_num;
1389    }
1390
1391    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1392        #[cfg(any(test, feature = "test"))]
1393        if let Some(listener) = &self.listener {
1394            listener.on_file_cache_filled(_file_id);
1395        }
1396    }
1397
1398    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1399        #[cfg(any(test, feature = "test"))]
1400        if let Some(listener) = &self.listener {
1401            listener.on_compaction_scheduled(_region_id);
1402        }
1403    }
1404
1405    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1406        #[cfg(any(test, feature = "test"))]
1407        if let Some(listener) = &self.listener {
1408            listener
1409                .on_notify_region_change_result_begin(_region_id)
1410                .await;
1411        }
1412    }
1413
1414    pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1415        #[cfg(any(test, feature = "test"))]
1416        if let Some(listener) = &self.listener {
1417            listener.on_enter_staging_result_begin(_region_id).await;
1418        }
1419    }
1420
1421    pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1422        #[cfg(any(test, feature = "test"))]
1423        if let Some(listener) = &self.listener {
1424            listener.on_index_build_finish(_region_file_id).await;
1425        }
1426    }
1427
1428    pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1429        #[cfg(any(test, feature = "test"))]
1430        if let Some(listener) = &self.listener {
1431            listener.on_index_build_begin(_region_file_id).await;
1432        }
1433    }
1434
1435    pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1436        #[cfg(any(test, feature = "test"))]
1437        if let Some(listener) = &self.listener {
1438            listener.on_index_build_abort(_region_file_id).await;
1439        }
1440    }
1441}
1442
1443#[cfg(test)]
1444mod tests {
1445    use super::*;
1446    use crate::test_util::TestEnv;
1447
1448    #[test]
1449    fn test_region_id_to_index() {
1450        let num_workers = 4;
1451
1452        let region_id = RegionId::new(1, 2);
1453        let index = region_id_to_index(region_id, num_workers);
1454        assert_eq!(index, 3);
1455
1456        let region_id = RegionId::new(2, 3);
1457        let index = region_id_to_index(region_id, num_workers);
1458        assert_eq!(index, 1);
1459    }
1460
1461    #[tokio::test]
1462    async fn test_worker_group_start_stop() {
1463        let env = TestEnv::with_prefix("group-stop").await;
1464        let group = env
1465            .create_worker_group(MitoConfig {
1466                num_workers: 4,
1467                ..Default::default()
1468            })
1469            .await;
1470
1471        group.stop().await.unwrap();
1472    }
1473}