Skip to main content

mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72    CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73    RegionMapRef,
74};
75use crate::request::{
76    BackgroundNotify, BulkInsertRequest, DdlRequest, SenderBulkRequest, SenderDdlRequest,
77    SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89/// Identifier for a worker.
90pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94/// Interval to check whether regions should flush.
95pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96/// Max delay to check region periodical tasks.
97pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100/// A fixed size group of [RegionWorkers](RegionWorker).
101///
102/// A worker group binds each region to a specific [RegionWorker] and sends
103/// requests to region's dedicated worker.
104///
105/// ```mermaid
106/// graph LR
107///
108/// RegionRequest -- Route by region id --> Worker0 & Worker1
109///
110/// subgraph MitoEngine
111///     subgraph WorkerGroup
112///         Worker0["RegionWorker 0"]
113///         Worker1["RegionWorker 1"]
114///     end
115/// end
116///
117/// Chan0[" Request channel 0"]
118/// Chan1[" Request channel 1"]
119/// WorkerThread1["RegionWorkerLoop 1"]
120///
121/// subgraph WorkerThread0["RegionWorkerLoop 0"]
122///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
123///         Region0["Region 0"]
124///         Region2["Region 2"]
125///     end
126///     Buffer0["RequestBuffer"]
127///
128///     Buffer0 -- modify regions --> RegionMap
129/// end
130///
131/// Worker0 --> Chan0
132/// Worker1 --> Chan1
133/// Chan0 --> Buffer0
134/// Chan1 --> WorkerThread1
135/// ```
136pub(crate) struct WorkerGroup {
137    /// Workers of the group.
138    workers: Vec<RegionWorker>,
139    /// Flush background job pool.
140    flush_job_pool: SchedulerRef,
141    /// Compaction background job pool.
142    compact_job_pool: SchedulerRef,
143    /// Scheduler for index build jobs.
144    index_build_job_pool: SchedulerRef,
145    /// Scheduler for file purgers.
146    purge_scheduler: SchedulerRef,
147    /// Cache.
148    cache_manager: CacheManagerRef,
149    /// File reference manager.
150    file_ref_manager: FileReferenceManagerRef,
151    /// Gc limiter to limit concurrent gc jobs.
152    gc_limiter: GcLimiterRef,
153    /// Object store manager.
154    object_store_manager: ObjectStoreManagerRef,
155    /// Puffin manager factory.
156    puffin_manager_factory: PuffinManagerFactory,
157    /// Intermediate manager.
158    intermediate_manager: IntermediateManager,
159    /// Schema metadata manager.
160    schema_metadata_manager: SchemaMetadataManagerRef,
161}
162
163impl WorkerGroup {
164    /// Starts a worker group.
165    ///
166    /// The number of workers should be power of two.
167    pub(crate) async fn start<S: LogStore>(
168        config: Arc<MitoConfig>,
169        log_store: Arc<S>,
170        object_store_manager: ObjectStoreManagerRef,
171        schema_metadata_manager: SchemaMetadataManagerRef,
172        file_ref_manager: FileReferenceManagerRef,
173        partition_expr_fetcher: PartitionExprFetcherRef,
174        plugins: Plugins,
175    ) -> Result<WorkerGroup> {
176        let (flush_sender, flush_receiver) = watch::channel(());
177        let write_buffer_manager = Arc::new(
178            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
179                .with_notifier(flush_sender.clone()),
180        );
181        let puffin_manager_factory = PuffinManagerFactory::new(
182            &config.index.aux_path,
183            config.index.staging_size.as_bytes(),
184            Some(config.index.write_buffer_size.as_bytes() as _),
185            config.index.staging_ttl,
186        )
187        .await?;
188        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
189            .await?
190            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
191        let index_build_job_pool =
192            Arc::new(LocalScheduler::new(config.max_background_index_builds));
193        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
194        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
195        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
196        // We use another scheduler to avoid purge jobs blocking other jobs.
197        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
198        let write_cache = write_cache_from_config(
199            &config,
200            puffin_manager_factory.clone(),
201            intermediate_manager.clone(),
202        )
203        .await?;
204        let cache_manager = Arc::new(
205            CacheManager::builder()
206                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
207                .vector_cache_size(config.vector_cache_size.as_bytes())
208                .page_cache_size(config.page_cache_size.as_bytes())
209                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
210                .range_result_cache_size(config.range_result_cache_size.as_bytes())
211                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
212                .index_content_size(config.index.content_cache_size.as_bytes())
213                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
214                .index_result_cache_size(config.index.result_cache_size.as_bytes())
215                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
216                .write_cache(write_cache)
217                .build(),
218        );
219        let time_provider = Arc::new(StdTimeProvider);
220        let total_memory = get_total_memory_bytes();
221        let total_memory = if total_memory > 0 {
222            total_memory as u64
223        } else {
224            0
225        };
226        let compaction_limit_bytes = config
227            .experimental_compaction_memory_limit
228            .resolve(total_memory);
229        let compaction_memory_manager =
230            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
231        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
232
233        let workers = (0..config.num_workers)
234            .map(|id| {
235                WorkerStarter {
236                    id: id as WorkerId,
237                    config: config.clone(),
238                    log_store: log_store.clone(),
239                    object_store_manager: object_store_manager.clone(),
240                    write_buffer_manager: write_buffer_manager.clone(),
241                    index_build_job_pool: index_build_job_pool.clone(),
242                    flush_job_pool: flush_job_pool.clone(),
243                    compact_job_pool: compact_job_pool.clone(),
244                    purge_scheduler: purge_scheduler.clone(),
245                    listener: WorkerListener::default(),
246                    cache_manager: cache_manager.clone(),
247                    compaction_memory_manager: compaction_memory_manager.clone(),
248                    puffin_manager_factory: puffin_manager_factory.clone(),
249                    intermediate_manager: intermediate_manager.clone(),
250                    time_provider: time_provider.clone(),
251                    flush_sender: flush_sender.clone(),
252                    flush_receiver: flush_receiver.clone(),
253                    plugins: plugins.clone(),
254                    schema_metadata_manager: schema_metadata_manager.clone(),
255                    file_ref_manager: file_ref_manager.clone(),
256                    partition_expr_fetcher: partition_expr_fetcher.clone(),
257                    flush_semaphore: flush_semaphore.clone(),
258                }
259                .start()
260            })
261            .collect::<Result<Vec<_>>>()?;
262
263        Ok(WorkerGroup {
264            workers,
265            flush_job_pool,
266            compact_job_pool,
267            index_build_job_pool,
268            purge_scheduler,
269            cache_manager,
270            file_ref_manager,
271            gc_limiter,
272            object_store_manager,
273            puffin_manager_factory,
274            intermediate_manager,
275            schema_metadata_manager,
276        })
277    }
278
279    /// Stops the worker group.
280    pub(crate) async fn stop(&self) -> Result<()> {
281        info!("Stop region worker group");
282
283        // TODO(yingwen): Do we need to stop gracefully?
284        // Stops the scheduler gracefully.
285        self.compact_job_pool.stop(true).await?;
286        // Stops the scheduler gracefully.
287        self.flush_job_pool.stop(true).await?;
288        // Stops the purge scheduler gracefully.
289        self.purge_scheduler.stop(true).await?;
290        // Stops the index build job pool gracefully.
291        self.index_build_job_pool.stop(true).await?;
292
293        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
294
295        Ok(())
296    }
297
298    /// Submits a request to a worker in the group.
299    pub(crate) async fn submit_to_worker(
300        &self,
301        region_id: RegionId,
302        request: WorkerRequest,
303    ) -> Result<()> {
304        self.worker(region_id).submit_request(request).await
305    }
306
307    /// Returns true if the specific region exists.
308    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
309        self.worker(region_id).is_region_exists(region_id)
310    }
311
312    /// Returns true if the specific region is opening.
313    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
314        self.worker(region_id).is_region_opening(region_id)
315    }
316
317    /// Returns true if the specific region is catching up.
318    pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
319        self.worker(region_id).is_region_catching_up(region_id)
320    }
321
322    /// Returns region of specific `region_id`.
323    ///
324    /// This method should not be public.
325    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
326        self.worker(region_id).get_region(region_id)
327    }
328
329    /// Returns cache of the group.
330    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
331        self.cache_manager.clone()
332    }
333
334    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
335        self.file_ref_manager.clone()
336    }
337
338    pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
339        self.gc_limiter.clone()
340    }
341
342    /// Get worker for specific `region_id`.
343    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
344        let index = region_id_to_index(region_id, self.workers.len());
345
346        &self.workers[index]
347    }
348
349    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
350        self.workers
351            .iter()
352            .flat_map(|worker| worker.regions.list_regions())
353    }
354
355    pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef {
356        &self.object_store_manager
357    }
358
359    pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
360        &self.puffin_manager_factory
361    }
362
363    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
364        &self.intermediate_manager
365    }
366
367    pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
368        &self.schema_metadata_manager
369    }
370}
371
372// Tests methods.
373#[cfg(any(test, feature = "test"))]
374impl WorkerGroup {
375    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
376    ///
377    /// The number of workers should be power of two.
378    #[allow(clippy::too_many_arguments)]
379    pub(crate) async fn start_for_test<S: LogStore>(
380        config: Arc<MitoConfig>,
381        log_store: Arc<S>,
382        object_store_manager: ObjectStoreManagerRef,
383        write_buffer_manager: Option<WriteBufferManagerRef>,
384        listener: Option<crate::engine::listener::EventListenerRef>,
385        schema_metadata_manager: SchemaMetadataManagerRef,
386        file_ref_manager: FileReferenceManagerRef,
387        time_provider: TimeProviderRef,
388        partition_expr_fetcher: PartitionExprFetcherRef,
389    ) -> Result<WorkerGroup> {
390        let (flush_sender, flush_receiver) = watch::channel(());
391        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
392            Arc::new(
393                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
394                    .with_notifier(flush_sender.clone()),
395            )
396        });
397        let index_build_job_pool =
398            Arc::new(LocalScheduler::new(config.max_background_index_builds));
399        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
400        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
401        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
402        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
403        let puffin_manager_factory = PuffinManagerFactory::new(
404            &config.index.aux_path,
405            config.index.staging_size.as_bytes(),
406            Some(config.index.write_buffer_size.as_bytes() as _),
407            config.index.staging_ttl,
408        )
409        .await?;
410        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
411            .await?
412            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
413        let write_cache = write_cache_from_config(
414            &config,
415            puffin_manager_factory.clone(),
416            intermediate_manager.clone(),
417        )
418        .await?;
419        let cache_manager = Arc::new(
420            CacheManager::builder()
421                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
422                .vector_cache_size(config.vector_cache_size.as_bytes())
423                .page_cache_size(config.page_cache_size.as_bytes())
424                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
425                .range_result_cache_size(config.range_result_cache_size.as_bytes())
426                .write_cache(write_cache)
427                .build(),
428        );
429        let total_memory = get_total_memory_bytes();
430        let total_memory = if total_memory > 0 {
431            total_memory as u64
432        } else {
433            0
434        };
435        let compaction_limit_bytes = config
436            .experimental_compaction_memory_limit
437            .resolve(total_memory);
438        let compaction_memory_manager =
439            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
440        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
441        let workers = (0..config.num_workers)
442            .map(|id| {
443                WorkerStarter {
444                    id: id as WorkerId,
445                    config: config.clone(),
446                    log_store: log_store.clone(),
447                    object_store_manager: object_store_manager.clone(),
448                    write_buffer_manager: write_buffer_manager.clone(),
449                    index_build_job_pool: index_build_job_pool.clone(),
450                    flush_job_pool: flush_job_pool.clone(),
451                    compact_job_pool: compact_job_pool.clone(),
452                    purge_scheduler: purge_scheduler.clone(),
453                    listener: WorkerListener::new(listener.clone()),
454                    cache_manager: cache_manager.clone(),
455                    compaction_memory_manager: compaction_memory_manager.clone(),
456                    puffin_manager_factory: puffin_manager_factory.clone(),
457                    intermediate_manager: intermediate_manager.clone(),
458                    time_provider: time_provider.clone(),
459                    flush_sender: flush_sender.clone(),
460                    flush_receiver: flush_receiver.clone(),
461                    plugins: Plugins::new(),
462                    schema_metadata_manager: schema_metadata_manager.clone(),
463                    file_ref_manager: file_ref_manager.clone(),
464                    partition_expr_fetcher: partition_expr_fetcher.clone(),
465                    flush_semaphore: flush_semaphore.clone(),
466                }
467                .start()
468            })
469            .collect::<Result<Vec<_>>>()?;
470
471        Ok(WorkerGroup {
472            workers,
473            flush_job_pool,
474            compact_job_pool,
475            index_build_job_pool,
476            purge_scheduler,
477            cache_manager,
478            file_ref_manager,
479            gc_limiter,
480            object_store_manager,
481            puffin_manager_factory,
482            intermediate_manager,
483            schema_metadata_manager,
484        })
485    }
486
487    /// Returns the purge scheduler.
488    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
489        &self.purge_scheduler
490    }
491}
492
493fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
494    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
495        % num_workers
496}
497
498pub async fn write_cache_from_config(
499    config: &MitoConfig,
500    puffin_manager_factory: PuffinManagerFactory,
501    intermediate_manager: IntermediateManager,
502) -> Result<Option<WriteCacheRef>> {
503    if !config.enable_write_cache {
504        return Ok(None);
505    }
506
507    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
508        .await
509        .context(CreateDirSnafu {
510            dir: &config.write_cache_path,
511        })?;
512
513    let cache = WriteCache::new_fs(
514        &config.write_cache_path,
515        config.write_cache_size,
516        config.write_cache_ttl,
517        Some(config.index_cache_percent),
518        config.enable_refill_cache_on_read,
519        puffin_manager_factory,
520        intermediate_manager,
521        config.manifest_cache_size,
522    )
523    .await?;
524    Ok(Some(Arc::new(cache)))
525}
526
527/// Computes a initial check delay for a worker.
528pub(crate) fn worker_init_check_delay() -> Duration {
529    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
530    Duration::from_secs(init_check_delay)
531}
532
533/// Worker start config.
534struct WorkerStarter<S> {
535    id: WorkerId,
536    config: Arc<MitoConfig>,
537    log_store: Arc<S>,
538    object_store_manager: ObjectStoreManagerRef,
539    write_buffer_manager: WriteBufferManagerRef,
540    compact_job_pool: SchedulerRef,
541    index_build_job_pool: SchedulerRef,
542    flush_job_pool: SchedulerRef,
543    purge_scheduler: SchedulerRef,
544    listener: WorkerListener,
545    cache_manager: CacheManagerRef,
546    compaction_memory_manager: Arc<CompactionMemoryManager>,
547    puffin_manager_factory: PuffinManagerFactory,
548    intermediate_manager: IntermediateManager,
549    time_provider: TimeProviderRef,
550    /// Watch channel sender to notify workers to handle stalled requests.
551    flush_sender: watch::Sender<()>,
552    /// Watch channel receiver to wait for background flush job.
553    flush_receiver: watch::Receiver<()>,
554    plugins: Plugins,
555    schema_metadata_manager: SchemaMetadataManagerRef,
556    file_ref_manager: FileReferenceManagerRef,
557    partition_expr_fetcher: PartitionExprFetcherRef,
558    flush_semaphore: Arc<Semaphore>,
559}
560
561impl<S: LogStore> WorkerStarter<S> {
562    /// Starts a region worker and its background thread.
563    fn start(self) -> Result<RegionWorker> {
564        let regions = Arc::new(RegionMap::default());
565        let opening_regions = Arc::new(OpeningRegions::default());
566        let catchup_regions = Arc::new(CatchupRegions::default());
567        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
568
569        let running = Arc::new(AtomicBool::new(true));
570        let now = self.time_provider.current_time_millis();
571        let id_string = self.id.to_string();
572        let mut worker_thread = RegionWorkerLoop {
573            id: self.id,
574            config: self.config.clone(),
575            regions: regions.clone(),
576            catchup_regions: catchup_regions.clone(),
577            dropping_regions: Arc::new(RegionMap::default()),
578            opening_regions: opening_regions.clone(),
579            sender: sender.clone(),
580            receiver,
581            wal: Wal::new(self.log_store),
582            object_store_manager: self.object_store_manager.clone(),
583            running: running.clone(),
584            memtable_builder_provider: MemtableBuilderProvider::new(
585                Some(self.write_buffer_manager.clone()),
586                self.config.clone(),
587            ),
588            purge_scheduler: self.purge_scheduler.clone(),
589            write_buffer_manager: self.write_buffer_manager,
590            index_build_scheduler: IndexBuildScheduler::new(
591                self.index_build_job_pool,
592                self.config.max_background_index_builds,
593            ),
594            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
595            compaction_scheduler: CompactionScheduler::new(
596                self.compact_job_pool,
597                sender.clone(),
598                self.cache_manager.clone(),
599                self.config.clone(),
600                self.listener.clone(),
601                self.plugins.clone(),
602                self.compaction_memory_manager.clone(),
603                self.config.experimental_compaction_on_exhausted,
604            ),
605            stalled_requests: StalledRequests::default(),
606            listener: self.listener,
607            cache_manager: self.cache_manager,
608            puffin_manager_factory: self.puffin_manager_factory,
609            intermediate_manager: self.intermediate_manager,
610            time_provider: self.time_provider,
611            last_periodical_check_millis: now,
612            flush_sender: self.flush_sender,
613            flush_receiver: self.flush_receiver,
614            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
615            region_count: REGION_COUNT.with_label_values(&[&id_string]),
616            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
617            region_edit_queues: RegionEditQueues::default(),
618            schema_metadata_manager: self.schema_metadata_manager,
619            file_ref_manager: self.file_ref_manager.clone(),
620            partition_expr_fetcher: self.partition_expr_fetcher,
621            flush_semaphore: self.flush_semaphore,
622        };
623        let handle = common_runtime::spawn_global(async move {
624            worker_thread.run().await;
625        });
626
627        Ok(RegionWorker {
628            id: self.id,
629            regions,
630            opening_regions,
631            catchup_regions,
632            sender,
633            handle: Mutex::new(Some(handle)),
634            running,
635        })
636    }
637}
638
639/// Worker to write and alter regions bound to it.
640pub(crate) struct RegionWorker {
641    /// Id of the worker.
642    id: WorkerId,
643    /// Regions bound to the worker.
644    regions: RegionMapRef,
645    /// The opening regions.
646    opening_regions: OpeningRegionsRef,
647    /// The catching up regions.
648    catchup_regions: CatchupRegionsRef,
649    /// Request sender.
650    sender: Sender<WorkerRequestWithTime>,
651    /// Handle to the worker thread.
652    handle: Mutex<Option<JoinHandle<()>>>,
653    /// Whether to run the worker thread.
654    running: Arc<AtomicBool>,
655}
656
657impl RegionWorker {
658    /// Submits request to background worker thread.
659    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
660        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
661        let request_with_time = WorkerRequestWithTime::new(request);
662        if self.sender.send(request_with_time).await.is_err() {
663            warn!(
664                "Worker {} is already exited but the running flag is still true",
665                self.id
666            );
667            // Manually set the running flag to false to avoid printing more warning logs.
668            self.set_running(false);
669            return WorkerStoppedSnafu { id: self.id }.fail();
670        }
671
672        Ok(())
673    }
674
675    /// Stop the worker.
676    ///
677    /// This method waits until the worker thread exists.
678    async fn stop(&self) -> Result<()> {
679        let handle = self.handle.lock().await.take();
680        if let Some(handle) = handle {
681            info!("Stop region worker {}", self.id);
682
683            self.set_running(false);
684            if self
685                .sender
686                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
687                .await
688                .is_err()
689            {
690                warn!("Worker {} is already exited before stop", self.id);
691            }
692
693            handle.await.context(JoinSnafu)?;
694        }
695
696        Ok(())
697    }
698
699    /// Returns true if the worker is still running.
700    fn is_running(&self) -> bool {
701        self.running.load(Ordering::Relaxed)
702    }
703
704    /// Sets whether the worker is still running.
705    fn set_running(&self, value: bool) {
706        self.running.store(value, Ordering::Relaxed)
707    }
708
709    /// Returns true if the worker contains specific region.
710    fn is_region_exists(&self, region_id: RegionId) -> bool {
711        self.regions.is_region_exists(region_id)
712    }
713
714    /// Returns true if the region is opening.
715    fn is_region_opening(&self, region_id: RegionId) -> bool {
716        self.opening_regions.is_region_exists(region_id)
717    }
718
719    /// Returns true if the region is catching up.
720    fn is_region_catching_up(&self, region_id: RegionId) -> bool {
721        self.catchup_regions.is_region_exists(region_id)
722    }
723
724    /// Returns region of specific `region_id`.
725    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
726        self.regions.get_region(region_id)
727    }
728
729    #[cfg(test)]
730    /// Returns the [OpeningRegionsRef].
731    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
732        &self.opening_regions
733    }
734
735    #[cfg(test)]
736    /// Returns the [CatchupRegionsRef].
737    pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
738        &self.catchup_regions
739    }
740}
741
742impl Drop for RegionWorker {
743    fn drop(&mut self) {
744        if self.is_running() {
745            self.set_running(false);
746            // Once we drop the sender, the worker thread will receive a disconnected error.
747        }
748    }
749}
750
751type RequestBuffer = Vec<WorkerRequest>;
752
753/// Buffer for stalled write requests.
754///
755/// Maintains stalled write requests and their estimated size.
756#[derive(Default)]
757pub(crate) struct StalledRequests {
758    /// Stalled requests.
759    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
760    /// instead of `StalledRequests::requests.len()`.
761    ///
762    /// Key: RegionId
763    /// Value: (estimated size, stalled requests)
764    pub(crate) requests:
765        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
766    /// Estimated size of all stalled requests.
767    pub(crate) estimated_size: usize,
768}
769
770impl StalledRequests {
771    /// Appends stalled requests.
772    pub(crate) fn append(
773        &mut self,
774        requests: &mut Vec<SenderWriteRequest>,
775        bulk_requests: &mut Vec<SenderBulkRequest>,
776    ) {
777        for req in requests.drain(..) {
778            self.push(req);
779        }
780        for req in bulk_requests.drain(..) {
781            self.push_bulk(req);
782        }
783    }
784
785    /// Pushes a stalled request to the buffer.
786    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
787        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
788        let req_size = req.request.estimated_size();
789        *size += req_size;
790        self.estimated_size += req_size;
791        requests.push(req);
792    }
793
794    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
795        let region_id = req.region_id;
796        let (size, _, requests) = self.requests.entry(region_id).or_default();
797        let req_size = req.request.estimated_size();
798        *size += req_size;
799        self.estimated_size += req_size;
800        requests.push(req);
801    }
802
803    /// Removes stalled requests of specific region.
804    pub(crate) fn remove(
805        &mut self,
806        region_id: &RegionId,
807    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
808        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
809            self.estimated_size -= size;
810            (write_reqs, bulk_reqs)
811        } else {
812            (vec![], vec![])
813        }
814    }
815
816    /// Returns the total number of all stalled requests.
817    pub(crate) fn stalled_count(&self) -> usize {
818        self.requests
819            .values()
820            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
821            .sum()
822    }
823}
824
825/// Background worker loop to handle requests.
826struct RegionWorkerLoop<S> {
827    /// Id of the worker.
828    id: WorkerId,
829    /// Engine config.
830    config: Arc<MitoConfig>,
831    /// Regions bound to the worker.
832    regions: RegionMapRef,
833    /// Regions that are not yet fully dropped.
834    dropping_regions: RegionMapRef,
835    /// Regions that are opening.
836    opening_regions: OpeningRegionsRef,
837    /// Regions that are catching up.
838    catchup_regions: CatchupRegionsRef,
839    /// Request sender.
840    sender: Sender<WorkerRequestWithTime>,
841    /// Request receiver.
842    receiver: Receiver<WorkerRequestWithTime>,
843    /// WAL of the engine.
844    wal: Wal<S>,
845    /// Manages object stores for manifest and SSTs.
846    object_store_manager: ObjectStoreManagerRef,
847    /// Whether the worker thread is still running.
848    running: Arc<AtomicBool>,
849    /// Memtable builder provider for each region.
850    memtable_builder_provider: MemtableBuilderProvider,
851    /// Background purge job scheduler.
852    purge_scheduler: SchedulerRef,
853    /// Engine write buffer manager.
854    write_buffer_manager: WriteBufferManagerRef,
855    /// Scheduler for index build task.
856    index_build_scheduler: IndexBuildScheduler,
857    /// Schedules background flush requests.
858    flush_scheduler: FlushScheduler,
859    /// Scheduler for compaction tasks.
860    compaction_scheduler: CompactionScheduler,
861    /// Stalled write requests.
862    stalled_requests: StalledRequests,
863    /// Event listener for tests.
864    listener: WorkerListener,
865    /// Cache.
866    cache_manager: CacheManagerRef,
867    /// Puffin manager factory for index.
868    puffin_manager_factory: PuffinManagerFactory,
869    /// Intermediate manager for inverted index.
870    intermediate_manager: IntermediateManager,
871    /// Provider to get current time.
872    time_provider: TimeProviderRef,
873    /// Last time to check regions periodically.
874    last_periodical_check_millis: i64,
875    /// Watch channel sender to notify workers to handle stalled requests.
876    flush_sender: watch::Sender<()>,
877    /// Watch channel receiver to wait for background flush job.
878    flush_receiver: watch::Receiver<()>,
879    /// Gauge of stalling request count.
880    stalling_count: IntGauge,
881    /// Gauge of regions in the worker.
882    region_count: IntGauge,
883    /// Histogram of request wait time for this worker.
884    request_wait_time: Histogram,
885    /// Queues for region edit requests.
886    region_edit_queues: RegionEditQueues,
887    /// Database level metadata manager.
888    schema_metadata_manager: SchemaMetadataManagerRef,
889    /// Datanode level file references manager.
890    file_ref_manager: FileReferenceManagerRef,
891    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
892    partition_expr_fetcher: PartitionExprFetcherRef,
893    /// Semaphore to control flush concurrency.
894    flush_semaphore: Arc<Semaphore>,
895}
896
897impl<S: LogStore> RegionWorkerLoop<S> {
898    /// Starts the worker loop.
899    async fn run(&mut self) {
900        let init_check_delay = worker_init_check_delay();
901        info!(
902            "Start region worker thread {}, init_check_delay: {:?}",
903            self.id, init_check_delay
904        );
905        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
906
907        // Buffer to retrieve requests from receiver.
908        let mut write_req_buffer: Vec<SenderWriteRequest> =
909            Vec::with_capacity(self.config.worker_request_batch_size);
910        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
911            Vec::with_capacity(self.config.worker_request_batch_size);
912        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
913            Vec::with_capacity(self.config.worker_request_batch_size);
914        let mut general_req_buffer: Vec<WorkerRequest> =
915            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
916
917        while self.running.load(Ordering::Relaxed) {
918            // Clear the buffer before handling next batch of requests.
919            write_req_buffer.clear();
920            ddl_req_buffer.clear();
921            general_req_buffer.clear();
922            let mut bulk_insert_req_num = 0;
923
924            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
925            let sleep = tokio::time::sleep(max_wait_time);
926            tokio::pin!(sleep);
927
928            tokio::select! {
929                request_opt = self.receiver.recv() => {
930                    match request_opt {
931                        Some(request_with_time) => {
932                            // Observe the wait time
933                            let wait_time = request_with_time.created_at.elapsed();
934                            self.request_wait_time.observe(wait_time.as_secs_f64());
935
936                            match request_with_time.request {
937                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
938                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
939                                WorkerRequest::BulkInserts(bulk_insert) => {
940                                    bulk_insert_req_num += 1;
941                                    self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
942                                        .await;
943                                }
944                                req => general_req_buffer.push(req),
945                            }
946                        },
947                        // The channel is disconnected.
948                        None => break,
949                    }
950                }
951                recv_res = self.flush_receiver.changed() => {
952                    if recv_res.is_err() {
953                        // The channel is disconnected.
954                        break;
955                    } else {
956                        // Also flush this worker if other workers trigger flush as this worker may have
957                        // a large memtable to flush. We may not have chance to flush that memtable if we
958                        // never write to this worker. So only flushing other workers may not release enough
959                        // memory.
960                        self.maybe_flush_worker();
961                        // A flush job is finished, handles stalled requests.
962                        self.handle_stalled_requests().await;
963                        continue;
964                    }
965                }
966                _ = &mut sleep => {
967                    // Timeout. Checks periodical tasks.
968                    self.handle_periodical_tasks();
969                    continue;
970                }
971            }
972
973            if self.flush_receiver.has_changed().unwrap_or(false) {
974                // Always checks whether we could process stalled requests to avoid a request
975                // hangs too long.
976                // If the channel is closed, do nothing.
977                self.handle_stalled_requests().await;
978            }
979
980            // Try to recv more requests from the channel.
981            for _ in 1..self.config.worker_request_batch_size {
982                // We have received one request so we start from 1.
983                match self.receiver.try_recv() {
984                    Ok(request_with_time) => {
985                        // Observe the wait time
986                        let wait_time = request_with_time.created_at.elapsed();
987                        self.request_wait_time.observe(wait_time.as_secs_f64());
988
989                        match request_with_time.request {
990                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
991                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
992                            WorkerRequest::BulkInserts(bulk_insert) => {
993                                bulk_insert_req_num += 1;
994                                self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
995                                    .await
996                            }
997                            req => general_req_buffer.push(req),
998                        }
999                    }
1000                    // We still need to handle remaining requests.
1001                    Err(_) => break,
1002                }
1003            }
1004
1005            self.listener.on_recv_requests(
1006                write_req_buffer.len()
1007                    + ddl_req_buffer.len()
1008                    + general_req_buffer.len()
1009                    + bulk_insert_req_num,
1010            );
1011
1012            self.handle_requests(
1013                &mut write_req_buffer,
1014                &mut ddl_req_buffer,
1015                &mut general_req_buffer,
1016                &mut bulk_req_buffer,
1017            )
1018            .await;
1019
1020            self.handle_periodical_tasks();
1021        }
1022
1023        self.clean().await;
1024
1025        info!("Exit region worker thread {}", self.id);
1026    }
1027
1028    async fn buffer_bulk_insert_request(
1029        &mut self,
1030        bulk_insert: BulkInsertRequest,
1031        bulk_requests: &mut Vec<SenderBulkRequest>,
1032    ) {
1033        let BulkInsertRequest {
1034            metadata,
1035            request,
1036            sender,
1037        } = bulk_insert;
1038
1039        if let Some(region_metadata) = metadata {
1040            self.handle_bulk_insert_batch(region_metadata, request, bulk_requests, sender)
1041                .await;
1042        } else {
1043            error!("Cannot find region metadata for {}", request.region_id);
1044            sender.send(
1045                error::RegionNotFoundSnafu {
1046                    region_id: request.region_id,
1047                }
1048                .fail(),
1049            );
1050        }
1051    }
1052
1053    /// Dispatches and processes requests.
1054    ///
1055    /// `general_requests` should not contain categorized write, ddl, or bulk insert requests.
1056    async fn handle_requests(
1057        &mut self,
1058        write_requests: &mut Vec<SenderWriteRequest>,
1059        ddl_requests: &mut Vec<SenderDdlRequest>,
1060        general_requests: &mut Vec<WorkerRequest>,
1061        bulk_requests: &mut Vec<SenderBulkRequest>,
1062    ) {
1063        for worker_req in general_requests.drain(..) {
1064            match worker_req {
1065                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
1066                    // These requests are categorized before dispatching general requests.
1067                    continue;
1068                }
1069                WorkerRequest::BulkInserts(_) => unreachable!("bulk inserts are buffered"),
1070                WorkerRequest::Background { region_id, notify } => {
1071                    if matches!(
1072                        &notify,
1073                        BackgroundNotify::RegionEdit(edit_result)
1074                            if edit_result.update_region_state
1075                    ) {
1076                        // Region state must be Editing when reach here.
1077                        // This call only moves write/bulk write request into stall queue. When region edit result
1078                        // is processed inside handle_background_notify and region state is switched back to Writable,
1079                        // stalled request will be processed before the next region edit is dequeued from
1080                        // RegionEditQueue immediately in handle_region_edit_result. It not only ensured pending writes
1081                        // are processed in time, but also prevents them from starvation.
1082                        // TODO(hl): maybe we need to merge those queues for pending requests like pending_ddl,
1083                        // region edits and stalled request, so we can simplify the coordination between these queues.
1084                        self.handle_buffered_region_write_requests(
1085                            &region_id,
1086                            write_requests,
1087                            bulk_requests,
1088                        )
1089                        .await;
1090                    }
1091                    // For background notify, we handle it directly.
1092                    self.handle_background_notify(region_id, notify).await;
1093                }
1094                WorkerRequest::SetRegionRoleStateGracefully {
1095                    region_id,
1096                    region_role_state,
1097                    sender,
1098                } => {
1099                    self.set_role_state_gracefully(region_id, region_role_state, sender)
1100                        .await;
1101                }
1102                WorkerRequest::EditRegion(request) => {
1103                    self.handle_region_edit(request);
1104                }
1105                WorkerRequest::Stop => {
1106                    debug_assert!(!self.running.load(Ordering::Relaxed));
1107                }
1108                WorkerRequest::SyncRegion(req) => {
1109                    self.handle_region_sync(req).await;
1110                }
1111                WorkerRequest::RemapManifests(req) => {
1112                    self.handle_remap_manifests_request(req);
1113                }
1114                WorkerRequest::CopyRegionFrom(req) => {
1115                    self.handle_copy_region_from_request(req);
1116                }
1117            }
1118        }
1119
1120        // Handles all write requests first. So we can alter regions without
1121        // considering existing write requests.
1122        self.handle_write_requests(write_requests, bulk_requests, true)
1123            .await;
1124
1125        self.handle_ddl_requests(ddl_requests).await;
1126    }
1127
1128    /// Takes and handles all ddl requests.
1129    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1130        if ddl_requests.is_empty() {
1131            return;
1132        }
1133
1134        for ddl in ddl_requests.drain(..) {
1135            let res = match ddl.request {
1136                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1137                DdlRequest::Drop(req) => {
1138                    self.handle_drop_request(ddl.region_id, req.partial_drop)
1139                        .await
1140                }
1141                DdlRequest::Open((req, wal_entry_receiver)) => {
1142                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1143                        .await;
1144                    continue;
1145                }
1146                DdlRequest::Close(_) => {
1147                    self.handle_close_request(ddl.region_id, ddl.sender).await;
1148                    continue;
1149                }
1150                DdlRequest::Alter(req) => {
1151                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
1152                        .await;
1153                    continue;
1154                }
1155                DdlRequest::Flush(req) => {
1156                    self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
1157                    continue;
1158                }
1159                DdlRequest::Compact(req) => {
1160                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1161                        .await;
1162                    continue;
1163                }
1164                DdlRequest::BuildIndex(req) => {
1165                    self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1166                        .await;
1167                    continue;
1168                }
1169                DdlRequest::Truncate(req) => {
1170                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1171                        .await;
1172                    continue;
1173                }
1174                DdlRequest::Catchup((req, wal_entry_receiver)) => {
1175                    self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1176                        .await;
1177                    continue;
1178                }
1179                DdlRequest::EnterStaging(req) => {
1180                    self.handle_enter_staging_request(
1181                        ddl.region_id,
1182                        req.partition_directive,
1183                        ddl.sender,
1184                    )
1185                    .await;
1186                    continue;
1187                }
1188                DdlRequest::ApplyStagingManifest(req) => {
1189                    self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1190                        .await;
1191                    continue;
1192                }
1193            };
1194
1195            ddl.sender.send(res);
1196        }
1197    }
1198
1199    /// Handle periodical tasks such as region auto flush.
1200    fn handle_periodical_tasks(&mut self) {
1201        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1202        if self
1203            .time_provider
1204            .elapsed_since(self.last_periodical_check_millis)
1205            < interval
1206        {
1207            return;
1208        }
1209
1210        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1211
1212        if let Err(e) = self.flush_periodically() {
1213            error!(e; "Failed to flush regions periodically");
1214        }
1215    }
1216
1217    /// Handles region background request
1218    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1219        match notify {
1220            BackgroundNotify::FlushFinished(req) => {
1221                self.handle_flush_finished(region_id, req).await
1222            }
1223            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1224            BackgroundNotify::IndexBuildFinished(req) => {
1225                self.handle_index_build_finished(region_id, req).await
1226            }
1227            BackgroundNotify::IndexBuildStopped(req) => {
1228                self.handle_index_build_stopped(region_id, req).await
1229            }
1230            BackgroundNotify::IndexBuildFailed(req) => {
1231                self.handle_index_build_failed(region_id, req).await
1232            }
1233            BackgroundNotify::CompactionFinished(req) => {
1234                self.handle_compaction_finished(region_id, req).await
1235            }
1236            BackgroundNotify::CompactionCancelled(req) => {
1237                self.handle_compaction_cancelled(region_id, req).await
1238            }
1239            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1240            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1241            BackgroundNotify::RegionChange(req) => {
1242                self.handle_manifest_region_change_result(req).await
1243            }
1244            BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1245            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1246            BackgroundNotify::CopyRegionFromFinished(req) => {
1247                self.handle_copy_region_from_finished(req)
1248            }
1249        }
1250    }
1251
1252    /// Handles `set_region_role_gracefully`.
1253    async fn set_role_state_gracefully(
1254        &mut self,
1255        region_id: RegionId,
1256        region_role_state: SettableRegionRoleState,
1257        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1258    ) {
1259        if let Some(region) = self.regions.get_region(region_id) {
1260            // We need to do this in background as we need the manifest lock.
1261            common_runtime::spawn_global(async move {
1262                match region.set_role_state_gracefully(region_role_state).await {
1263                    Ok(()) => {
1264                        let last_entry_id = region.version_control.current().last_entry_id;
1265                        let _ = sender.send(SetRegionRoleStateResponse::success(
1266                            SetRegionRoleStateSuccess::mito(last_entry_id),
1267                        ));
1268                    }
1269                    Err(e) => {
1270                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1271                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1272                            BoxedError::new(e),
1273                        ));
1274                    }
1275                }
1276            });
1277        } else {
1278            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1279        }
1280    }
1281}
1282
1283impl<S> RegionWorkerLoop<S> {
1284    /// Cleans up the worker.
1285    async fn clean(&self) {
1286        // Closes remaining regions.
1287        let regions = self.regions.list_regions();
1288        for region in regions {
1289            region.stop().await;
1290        }
1291
1292        self.regions.clear();
1293    }
1294
1295    /// Notifies the whole group that a flush job is finished so other
1296    /// workers can handle stalled requests.
1297    fn notify_group(&mut self) {
1298        // Notifies all receivers.
1299        let _ = self.flush_sender.send(());
1300        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1301        self.flush_receiver.borrow_and_update();
1302    }
1303}
1304
1305/// Wrapper that only calls event listener in tests.
1306#[derive(Default, Clone)]
1307pub(crate) struct WorkerListener {
1308    #[cfg(any(test, feature = "test"))]
1309    listener: Option<crate::engine::listener::EventListenerRef>,
1310}
1311
1312impl WorkerListener {
1313    #[cfg(any(test, feature = "test"))]
1314    pub(crate) fn new(
1315        listener: Option<crate::engine::listener::EventListenerRef>,
1316    ) -> WorkerListener {
1317        WorkerListener { listener }
1318    }
1319
1320    /// Flush is finished successfully.
1321    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1322        #[cfg(any(test, feature = "test"))]
1323        if let Some(listener) = &self.listener {
1324            listener.on_flush_success(region_id);
1325        }
1326        // Avoid compiler warning.
1327        let _ = region_id;
1328    }
1329
1330    /// Engine is stalled.
1331    pub(crate) fn on_write_stall(&self) {
1332        #[cfg(any(test, feature = "test"))]
1333        if let Some(listener) = &self.listener {
1334            listener.on_write_stall();
1335        }
1336    }
1337
1338    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1339        #[cfg(any(test, feature = "test"))]
1340        if let Some(listener) = &self.listener {
1341            listener.on_flush_begin(region_id).await;
1342        }
1343        // Avoid compiler warning.
1344        let _ = region_id;
1345    }
1346
1347    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1348        #[cfg(any(test, feature = "test"))]
1349        if let Some(listener) = &self.listener {
1350            return listener.on_later_drop_begin(region_id);
1351        }
1352        // Avoid compiler warning.
1353        let _ = region_id;
1354        None
1355    }
1356
1357    /// On later drop task is finished.
1358    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1359        #[cfg(any(test, feature = "test"))]
1360        if let Some(listener) = &self.listener {
1361            listener.on_later_drop_end(region_id, removed);
1362        }
1363        // Avoid compiler warning.
1364        let _ = region_id;
1365        let _ = removed;
1366    }
1367
1368    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1369        #[cfg(any(test, feature = "test"))]
1370        if let Some(listener) = &self.listener {
1371            listener.on_merge_ssts_finished(region_id).await;
1372        }
1373        // Avoid compiler warning.
1374        let _ = region_id;
1375    }
1376
1377    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1378        #[cfg(any(test, feature = "test"))]
1379        if let Some(listener) = &self.listener {
1380            listener.on_recv_requests(request_num);
1381        }
1382        // Avoid compiler warning.
1383        let _ = request_num;
1384    }
1385
1386    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1387        #[cfg(any(test, feature = "test"))]
1388        if let Some(listener) = &self.listener {
1389            listener.on_file_cache_filled(_file_id);
1390        }
1391    }
1392
1393    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1394        #[cfg(any(test, feature = "test"))]
1395        if let Some(listener) = &self.listener {
1396            listener.on_compaction_scheduled(_region_id);
1397        }
1398    }
1399
1400    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1401        #[cfg(any(test, feature = "test"))]
1402        if let Some(listener) = &self.listener {
1403            listener
1404                .on_notify_region_change_result_begin(_region_id)
1405                .await;
1406        }
1407    }
1408
1409    pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1410        #[cfg(any(test, feature = "test"))]
1411        if let Some(listener) = &self.listener {
1412            listener.on_enter_staging_result_begin(_region_id).await;
1413        }
1414    }
1415
1416    pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1417        #[cfg(any(test, feature = "test"))]
1418        if let Some(listener) = &self.listener {
1419            listener.on_index_build_finish(_region_file_id).await;
1420        }
1421    }
1422
1423    pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1424        #[cfg(any(test, feature = "test"))]
1425        if let Some(listener) = &self.listener {
1426            listener.on_index_build_begin(_region_file_id).await;
1427        }
1428    }
1429
1430    pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1431        #[cfg(any(test, feature = "test"))]
1432        if let Some(listener) = &self.listener {
1433            listener.on_index_build_abort(_region_file_id).await;
1434        }
1435    }
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440    use super::*;
1441    use crate::test_util::TestEnv;
1442
1443    #[test]
1444    fn test_region_id_to_index() {
1445        let num_workers = 4;
1446
1447        let region_id = RegionId::new(1, 2);
1448        let index = region_id_to_index(region_id, num_workers);
1449        assert_eq!(index, 3);
1450
1451        let region_id = RegionId::new(2, 3);
1452        let index = region_id_to_index(region_id, num_workers);
1453        assert_eq!(index, 1);
1454    }
1455
1456    #[tokio::test]
1457    async fn test_worker_group_start_stop() {
1458        let env = TestEnv::with_prefix("group-stop").await;
1459        let group = env
1460            .create_worker_group(MitoConfig {
1461                num_workers: 4,
1462                ..Default::default()
1463            })
1464            .await;
1465
1466        group.stop().await.unwrap();
1467    }
1468}