mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72    CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73    RegionMapRef,
74};
75use crate::request::{
76    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
77    WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89/// Identifier for a worker.
90pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94/// Interval to check whether regions should flush.
95pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96/// Max delay to check region periodical tasks.
97pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100/// A fixed size group of [RegionWorkers](RegionWorker).
101///
102/// A worker group binds each region to a specific [RegionWorker] and sends
103/// requests to region's dedicated worker.
104///
105/// ```mermaid
106/// graph LR
107///
108/// RegionRequest -- Route by region id --> Worker0 & Worker1
109///
110/// subgraph MitoEngine
111///     subgraph WorkerGroup
112///         Worker0["RegionWorker 0"]
113///         Worker1["RegionWorker 1"]
114///     end
115/// end
116///
117/// Chan0[" Request channel 0"]
118/// Chan1[" Request channel 1"]
119/// WorkerThread1["RegionWorkerLoop 1"]
120///
121/// subgraph WorkerThread0["RegionWorkerLoop 0"]
122///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
123///         Region0["Region 0"]
124///         Region2["Region 2"]
125///     end
126///     Buffer0["RequestBuffer"]
127///
128///     Buffer0 -- modify regions --> RegionMap
129/// end
130///
131/// Worker0 --> Chan0
132/// Worker1 --> Chan1
133/// Chan0 --> Buffer0
134/// Chan1 --> WorkerThread1
135/// ```
136pub(crate) struct WorkerGroup {
137    /// Workers of the group.
138    workers: Vec<RegionWorker>,
139    /// Flush background job pool.
140    flush_job_pool: SchedulerRef,
141    /// Compaction background job pool.
142    compact_job_pool: SchedulerRef,
143    /// Scheduler for index build jobs.
144    index_build_job_pool: SchedulerRef,
145    /// Scheduler for file purgers.
146    purge_scheduler: SchedulerRef,
147    /// Cache.
148    cache_manager: CacheManagerRef,
149    /// File reference manager.
150    file_ref_manager: FileReferenceManagerRef,
151    /// Gc limiter to limit concurrent gc jobs.
152    gc_limiter: GcLimiterRef,
153}
154
155impl WorkerGroup {
156    /// Starts a worker group.
157    ///
158    /// The number of workers should be power of two.
159    pub(crate) async fn start<S: LogStore>(
160        config: Arc<MitoConfig>,
161        log_store: Arc<S>,
162        object_store_manager: ObjectStoreManagerRef,
163        schema_metadata_manager: SchemaMetadataManagerRef,
164        file_ref_manager: FileReferenceManagerRef,
165        partition_expr_fetcher: PartitionExprFetcherRef,
166        plugins: Plugins,
167    ) -> Result<WorkerGroup> {
168        let (flush_sender, flush_receiver) = watch::channel(());
169        let write_buffer_manager = Arc::new(
170            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
171                .with_notifier(flush_sender.clone()),
172        );
173        let puffin_manager_factory = PuffinManagerFactory::new(
174            &config.index.aux_path,
175            config.index.staging_size.as_bytes(),
176            Some(config.index.write_buffer_size.as_bytes() as _),
177            config.index.staging_ttl,
178        )
179        .await?;
180        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
181            .await?
182            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
183        let index_build_job_pool =
184            Arc::new(LocalScheduler::new(config.max_background_index_builds));
185        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
186        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
187        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
188        // We use another scheduler to avoid purge jobs blocking other jobs.
189        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
190        let write_cache = write_cache_from_config(
191            &config,
192            puffin_manager_factory.clone(),
193            intermediate_manager.clone(),
194        )
195        .await?;
196        let cache_manager = Arc::new(
197            CacheManager::builder()
198                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
199                .vector_cache_size(config.vector_cache_size.as_bytes())
200                .page_cache_size(config.page_cache_size.as_bytes())
201                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
202                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
203                .index_content_size(config.index.content_cache_size.as_bytes())
204                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
205                .index_result_cache_size(config.index.result_cache_size.as_bytes())
206                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
207                .write_cache(write_cache)
208                .build(),
209        );
210        let time_provider = Arc::new(StdTimeProvider);
211        let total_memory = get_total_memory_bytes();
212        let total_memory = if total_memory > 0 {
213            total_memory as u64
214        } else {
215            0
216        };
217        let compaction_limit_bytes = config
218            .experimental_compaction_memory_limit
219            .resolve(total_memory);
220        let compaction_memory_manager =
221            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
222        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
223
224        let workers = (0..config.num_workers)
225            .map(|id| {
226                WorkerStarter {
227                    id: id as WorkerId,
228                    config: config.clone(),
229                    log_store: log_store.clone(),
230                    object_store_manager: object_store_manager.clone(),
231                    write_buffer_manager: write_buffer_manager.clone(),
232                    index_build_job_pool: index_build_job_pool.clone(),
233                    flush_job_pool: flush_job_pool.clone(),
234                    compact_job_pool: compact_job_pool.clone(),
235                    purge_scheduler: purge_scheduler.clone(),
236                    listener: WorkerListener::default(),
237                    cache_manager: cache_manager.clone(),
238                    compaction_memory_manager: compaction_memory_manager.clone(),
239                    puffin_manager_factory: puffin_manager_factory.clone(),
240                    intermediate_manager: intermediate_manager.clone(),
241                    time_provider: time_provider.clone(),
242                    flush_sender: flush_sender.clone(),
243                    flush_receiver: flush_receiver.clone(),
244                    plugins: plugins.clone(),
245                    schema_metadata_manager: schema_metadata_manager.clone(),
246                    file_ref_manager: file_ref_manager.clone(),
247                    partition_expr_fetcher: partition_expr_fetcher.clone(),
248                    flush_semaphore: flush_semaphore.clone(),
249                }
250                .start()
251            })
252            .collect::<Result<Vec<_>>>()?;
253
254        Ok(WorkerGroup {
255            workers,
256            flush_job_pool,
257            compact_job_pool,
258            index_build_job_pool,
259            purge_scheduler,
260            cache_manager,
261            file_ref_manager,
262            gc_limiter,
263        })
264    }
265
266    /// Stops the worker group.
267    pub(crate) async fn stop(&self) -> Result<()> {
268        info!("Stop region worker group");
269
270        // TODO(yingwen): Do we need to stop gracefully?
271        // Stops the scheduler gracefully.
272        self.compact_job_pool.stop(true).await?;
273        // Stops the scheduler gracefully.
274        self.flush_job_pool.stop(true).await?;
275        // Stops the purge scheduler gracefully.
276        self.purge_scheduler.stop(true).await?;
277        // Stops the index build job pool gracefully.
278        self.index_build_job_pool.stop(true).await?;
279
280        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
281
282        Ok(())
283    }
284
285    /// Submits a request to a worker in the group.
286    pub(crate) async fn submit_to_worker(
287        &self,
288        region_id: RegionId,
289        request: WorkerRequest,
290    ) -> Result<()> {
291        self.worker(region_id).submit_request(request).await
292    }
293
294    /// Returns true if the specific region exists.
295    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
296        self.worker(region_id).is_region_exists(region_id)
297    }
298
299    /// Returns true if the specific region is opening.
300    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
301        self.worker(region_id).is_region_opening(region_id)
302    }
303
304    /// Returns true if the specific region is catching up.
305    pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
306        self.worker(region_id).is_region_catching_up(region_id)
307    }
308
309    /// Returns region of specific `region_id`.
310    ///
311    /// This method should not be public.
312    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
313        self.worker(region_id).get_region(region_id)
314    }
315
316    /// Returns cache of the group.
317    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
318        self.cache_manager.clone()
319    }
320
321    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
322        self.file_ref_manager.clone()
323    }
324
325    pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
326        self.gc_limiter.clone()
327    }
328
329    /// Get worker for specific `region_id`.
330    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
331        let index = region_id_to_index(region_id, self.workers.len());
332
333        &self.workers[index]
334    }
335
336    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
337        self.workers
338            .iter()
339            .flat_map(|worker| worker.regions.list_regions())
340    }
341}
342
343// Tests methods.
344#[cfg(any(test, feature = "test"))]
345impl WorkerGroup {
346    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
347    ///
348    /// The number of workers should be power of two.
349    #[allow(clippy::too_many_arguments)]
350    pub(crate) async fn start_for_test<S: LogStore>(
351        config: Arc<MitoConfig>,
352        log_store: Arc<S>,
353        object_store_manager: ObjectStoreManagerRef,
354        write_buffer_manager: Option<WriteBufferManagerRef>,
355        listener: Option<crate::engine::listener::EventListenerRef>,
356        schema_metadata_manager: SchemaMetadataManagerRef,
357        file_ref_manager: FileReferenceManagerRef,
358        time_provider: TimeProviderRef,
359        partition_expr_fetcher: PartitionExprFetcherRef,
360    ) -> Result<WorkerGroup> {
361        let (flush_sender, flush_receiver) = watch::channel(());
362        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
363            Arc::new(
364                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
365                    .with_notifier(flush_sender.clone()),
366            )
367        });
368        let index_build_job_pool =
369            Arc::new(LocalScheduler::new(config.max_background_index_builds));
370        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
371        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
372        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
373        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
374        let puffin_manager_factory = PuffinManagerFactory::new(
375            &config.index.aux_path,
376            config.index.staging_size.as_bytes(),
377            Some(config.index.write_buffer_size.as_bytes() as _),
378            config.index.staging_ttl,
379        )
380        .await?;
381        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
382            .await?
383            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
384        let write_cache = write_cache_from_config(
385            &config,
386            puffin_manager_factory.clone(),
387            intermediate_manager.clone(),
388        )
389        .await?;
390        let cache_manager = Arc::new(
391            CacheManager::builder()
392                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
393                .vector_cache_size(config.vector_cache_size.as_bytes())
394                .page_cache_size(config.page_cache_size.as_bytes())
395                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
396                .write_cache(write_cache)
397                .build(),
398        );
399        let total_memory = get_total_memory_bytes();
400        let total_memory = if total_memory > 0 {
401            total_memory as u64
402        } else {
403            0
404        };
405        let compaction_limit_bytes = config
406            .experimental_compaction_memory_limit
407            .resolve(total_memory);
408        let compaction_memory_manager =
409            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
410        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
411        let workers = (0..config.num_workers)
412            .map(|id| {
413                WorkerStarter {
414                    id: id as WorkerId,
415                    config: config.clone(),
416                    log_store: log_store.clone(),
417                    object_store_manager: object_store_manager.clone(),
418                    write_buffer_manager: write_buffer_manager.clone(),
419                    index_build_job_pool: index_build_job_pool.clone(),
420                    flush_job_pool: flush_job_pool.clone(),
421                    compact_job_pool: compact_job_pool.clone(),
422                    purge_scheduler: purge_scheduler.clone(),
423                    listener: WorkerListener::new(listener.clone()),
424                    cache_manager: cache_manager.clone(),
425                    compaction_memory_manager: compaction_memory_manager.clone(),
426                    puffin_manager_factory: puffin_manager_factory.clone(),
427                    intermediate_manager: intermediate_manager.clone(),
428                    time_provider: time_provider.clone(),
429                    flush_sender: flush_sender.clone(),
430                    flush_receiver: flush_receiver.clone(),
431                    plugins: Plugins::new(),
432                    schema_metadata_manager: schema_metadata_manager.clone(),
433                    file_ref_manager: file_ref_manager.clone(),
434                    partition_expr_fetcher: partition_expr_fetcher.clone(),
435                    flush_semaphore: flush_semaphore.clone(),
436                }
437                .start()
438            })
439            .collect::<Result<Vec<_>>>()?;
440
441        Ok(WorkerGroup {
442            workers,
443            flush_job_pool,
444            compact_job_pool,
445            index_build_job_pool,
446            purge_scheduler,
447            cache_manager,
448            file_ref_manager,
449            gc_limiter,
450        })
451    }
452
453    /// Returns the purge scheduler.
454    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
455        &self.purge_scheduler
456    }
457}
458
459fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
460    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
461        % num_workers
462}
463
464pub async fn write_cache_from_config(
465    config: &MitoConfig,
466    puffin_manager_factory: PuffinManagerFactory,
467    intermediate_manager: IntermediateManager,
468) -> Result<Option<WriteCacheRef>> {
469    if !config.enable_write_cache {
470        return Ok(None);
471    }
472
473    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
474        .await
475        .context(CreateDirSnafu {
476            dir: &config.write_cache_path,
477        })?;
478
479    let cache = WriteCache::new_fs(
480        &config.write_cache_path,
481        config.write_cache_size,
482        config.write_cache_ttl,
483        Some(config.index_cache_percent),
484        config.enable_refill_cache_on_read,
485        puffin_manager_factory,
486        intermediate_manager,
487        config.manifest_cache_size,
488    )
489    .await?;
490    Ok(Some(Arc::new(cache)))
491}
492
493/// Computes a initial check delay for a worker.
494pub(crate) fn worker_init_check_delay() -> Duration {
495    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
496    Duration::from_secs(init_check_delay)
497}
498
499/// Worker start config.
500struct WorkerStarter<S> {
501    id: WorkerId,
502    config: Arc<MitoConfig>,
503    log_store: Arc<S>,
504    object_store_manager: ObjectStoreManagerRef,
505    write_buffer_manager: WriteBufferManagerRef,
506    compact_job_pool: SchedulerRef,
507    index_build_job_pool: SchedulerRef,
508    flush_job_pool: SchedulerRef,
509    purge_scheduler: SchedulerRef,
510    listener: WorkerListener,
511    cache_manager: CacheManagerRef,
512    compaction_memory_manager: Arc<CompactionMemoryManager>,
513    puffin_manager_factory: PuffinManagerFactory,
514    intermediate_manager: IntermediateManager,
515    time_provider: TimeProviderRef,
516    /// Watch channel sender to notify workers to handle stalled requests.
517    flush_sender: watch::Sender<()>,
518    /// Watch channel receiver to wait for background flush job.
519    flush_receiver: watch::Receiver<()>,
520    plugins: Plugins,
521    schema_metadata_manager: SchemaMetadataManagerRef,
522    file_ref_manager: FileReferenceManagerRef,
523    partition_expr_fetcher: PartitionExprFetcherRef,
524    flush_semaphore: Arc<Semaphore>,
525}
526
527impl<S: LogStore> WorkerStarter<S> {
528    /// Starts a region worker and its background thread.
529    fn start(self) -> Result<RegionWorker> {
530        let regions = Arc::new(RegionMap::default());
531        let opening_regions = Arc::new(OpeningRegions::default());
532        let catchup_regions = Arc::new(CatchupRegions::default());
533        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
534
535        let running = Arc::new(AtomicBool::new(true));
536        let now = self.time_provider.current_time_millis();
537        let id_string = self.id.to_string();
538        let mut worker_thread = RegionWorkerLoop {
539            id: self.id,
540            config: self.config.clone(),
541            regions: regions.clone(),
542            catchup_regions: catchup_regions.clone(),
543            dropping_regions: Arc::new(RegionMap::default()),
544            opening_regions: opening_regions.clone(),
545            sender: sender.clone(),
546            receiver,
547            wal: Wal::new(self.log_store),
548            object_store_manager: self.object_store_manager.clone(),
549            running: running.clone(),
550            memtable_builder_provider: MemtableBuilderProvider::new(
551                Some(self.write_buffer_manager.clone()),
552                self.config.clone(),
553            ),
554            purge_scheduler: self.purge_scheduler.clone(),
555            write_buffer_manager: self.write_buffer_manager,
556            index_build_scheduler: IndexBuildScheduler::new(
557                self.index_build_job_pool,
558                self.config.max_background_index_builds,
559            ),
560            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
561            compaction_scheduler: CompactionScheduler::new(
562                self.compact_job_pool,
563                sender.clone(),
564                self.cache_manager.clone(),
565                self.config.clone(),
566                self.listener.clone(),
567                self.plugins.clone(),
568                self.compaction_memory_manager.clone(),
569                self.config.experimental_compaction_on_exhausted,
570            ),
571            stalled_requests: StalledRequests::default(),
572            listener: self.listener,
573            cache_manager: self.cache_manager,
574            puffin_manager_factory: self.puffin_manager_factory,
575            intermediate_manager: self.intermediate_manager,
576            time_provider: self.time_provider,
577            last_periodical_check_millis: now,
578            flush_sender: self.flush_sender,
579            flush_receiver: self.flush_receiver,
580            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
581            region_count: REGION_COUNT.with_label_values(&[&id_string]),
582            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
583            region_edit_queues: RegionEditQueues::default(),
584            schema_metadata_manager: self.schema_metadata_manager,
585            file_ref_manager: self.file_ref_manager.clone(),
586            partition_expr_fetcher: self.partition_expr_fetcher,
587            flush_semaphore: self.flush_semaphore,
588        };
589        let handle = common_runtime::spawn_global(async move {
590            worker_thread.run().await;
591        });
592
593        Ok(RegionWorker {
594            id: self.id,
595            regions,
596            opening_regions,
597            catchup_regions,
598            sender,
599            handle: Mutex::new(Some(handle)),
600            running,
601        })
602    }
603}
604
605/// Worker to write and alter regions bound to it.
606pub(crate) struct RegionWorker {
607    /// Id of the worker.
608    id: WorkerId,
609    /// Regions bound to the worker.
610    regions: RegionMapRef,
611    /// The opening regions.
612    opening_regions: OpeningRegionsRef,
613    /// The catching up regions.
614    catchup_regions: CatchupRegionsRef,
615    /// Request sender.
616    sender: Sender<WorkerRequestWithTime>,
617    /// Handle to the worker thread.
618    handle: Mutex<Option<JoinHandle<()>>>,
619    /// Whether to run the worker thread.
620    running: Arc<AtomicBool>,
621}
622
623impl RegionWorker {
624    /// Submits request to background worker thread.
625    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
626        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
627        let request_with_time = WorkerRequestWithTime::new(request);
628        if self.sender.send(request_with_time).await.is_err() {
629            warn!(
630                "Worker {} is already exited but the running flag is still true",
631                self.id
632            );
633            // Manually set the running flag to false to avoid printing more warning logs.
634            self.set_running(false);
635            return WorkerStoppedSnafu { id: self.id }.fail();
636        }
637
638        Ok(())
639    }
640
641    /// Stop the worker.
642    ///
643    /// This method waits until the worker thread exists.
644    async fn stop(&self) -> Result<()> {
645        let handle = self.handle.lock().await.take();
646        if let Some(handle) = handle {
647            info!("Stop region worker {}", self.id);
648
649            self.set_running(false);
650            if self
651                .sender
652                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
653                .await
654                .is_err()
655            {
656                warn!("Worker {} is already exited before stop", self.id);
657            }
658
659            handle.await.context(JoinSnafu)?;
660        }
661
662        Ok(())
663    }
664
665    /// Returns true if the worker is still running.
666    fn is_running(&self) -> bool {
667        self.running.load(Ordering::Relaxed)
668    }
669
670    /// Sets whether the worker is still running.
671    fn set_running(&self, value: bool) {
672        self.running.store(value, Ordering::Relaxed)
673    }
674
675    /// Returns true if the worker contains specific region.
676    fn is_region_exists(&self, region_id: RegionId) -> bool {
677        self.regions.is_region_exists(region_id)
678    }
679
680    /// Returns true if the region is opening.
681    fn is_region_opening(&self, region_id: RegionId) -> bool {
682        self.opening_regions.is_region_exists(region_id)
683    }
684
685    /// Returns true if the region is catching up.
686    fn is_region_catching_up(&self, region_id: RegionId) -> bool {
687        self.catchup_regions.is_region_exists(region_id)
688    }
689
690    /// Returns region of specific `region_id`.
691    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
692        self.regions.get_region(region_id)
693    }
694
695    #[cfg(test)]
696    /// Returns the [OpeningRegionsRef].
697    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
698        &self.opening_regions
699    }
700
701    #[cfg(test)]
702    /// Returns the [CatchupRegionsRef].
703    pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
704        &self.catchup_regions
705    }
706}
707
708impl Drop for RegionWorker {
709    fn drop(&mut self) {
710        if self.is_running() {
711            self.set_running(false);
712            // Once we drop the sender, the worker thread will receive a disconnected error.
713        }
714    }
715}
716
717type RequestBuffer = Vec<WorkerRequest>;
718
719/// Buffer for stalled write requests.
720///
721/// Maintains stalled write requests and their estimated size.
722#[derive(Default)]
723pub(crate) struct StalledRequests {
724    /// Stalled requests.
725    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
726    /// instead of `StalledRequests::requests.len()`.
727    ///
728    /// Key: RegionId
729    /// Value: (estimated size, stalled requests)
730    pub(crate) requests:
731        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
732    /// Estimated size of all stalled requests.
733    pub(crate) estimated_size: usize,
734}
735
736impl StalledRequests {
737    /// Appends stalled requests.
738    pub(crate) fn append(
739        &mut self,
740        requests: &mut Vec<SenderWriteRequest>,
741        bulk_requests: &mut Vec<SenderBulkRequest>,
742    ) {
743        for req in requests.drain(..) {
744            self.push(req);
745        }
746        for req in bulk_requests.drain(..) {
747            self.push_bulk(req);
748        }
749    }
750
751    /// Pushes a stalled request to the buffer.
752    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
753        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
754        let req_size = req.request.estimated_size();
755        *size += req_size;
756        self.estimated_size += req_size;
757        requests.push(req);
758    }
759
760    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
761        let region_id = req.region_id;
762        let (size, _, requests) = self.requests.entry(region_id).or_default();
763        let req_size = req.request.estimated_size();
764        *size += req_size;
765        self.estimated_size += req_size;
766        requests.push(req);
767    }
768
769    /// Removes stalled requests of specific region.
770    pub(crate) fn remove(
771        &mut self,
772        region_id: &RegionId,
773    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
774        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
775            self.estimated_size -= size;
776            (write_reqs, bulk_reqs)
777        } else {
778            (vec![], vec![])
779        }
780    }
781
782    /// Returns the total number of all stalled requests.
783    pub(crate) fn stalled_count(&self) -> usize {
784        self.requests
785            .values()
786            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
787            .sum()
788    }
789}
790
791/// Background worker loop to handle requests.
792struct RegionWorkerLoop<S> {
793    /// Id of the worker.
794    id: WorkerId,
795    /// Engine config.
796    config: Arc<MitoConfig>,
797    /// Regions bound to the worker.
798    regions: RegionMapRef,
799    /// Regions that are not yet fully dropped.
800    dropping_regions: RegionMapRef,
801    /// Regions that are opening.
802    opening_regions: OpeningRegionsRef,
803    /// Regions that are catching up.
804    catchup_regions: CatchupRegionsRef,
805    /// Request sender.
806    sender: Sender<WorkerRequestWithTime>,
807    /// Request receiver.
808    receiver: Receiver<WorkerRequestWithTime>,
809    /// WAL of the engine.
810    wal: Wal<S>,
811    /// Manages object stores for manifest and SSTs.
812    object_store_manager: ObjectStoreManagerRef,
813    /// Whether the worker thread is still running.
814    running: Arc<AtomicBool>,
815    /// Memtable builder provider for each region.
816    memtable_builder_provider: MemtableBuilderProvider,
817    /// Background purge job scheduler.
818    purge_scheduler: SchedulerRef,
819    /// Engine write buffer manager.
820    write_buffer_manager: WriteBufferManagerRef,
821    /// Scheduler for index build task.
822    index_build_scheduler: IndexBuildScheduler,
823    /// Schedules background flush requests.
824    flush_scheduler: FlushScheduler,
825    /// Scheduler for compaction tasks.
826    compaction_scheduler: CompactionScheduler,
827    /// Stalled write requests.
828    stalled_requests: StalledRequests,
829    /// Event listener for tests.
830    listener: WorkerListener,
831    /// Cache.
832    cache_manager: CacheManagerRef,
833    /// Puffin manager factory for index.
834    puffin_manager_factory: PuffinManagerFactory,
835    /// Intermediate manager for inverted index.
836    intermediate_manager: IntermediateManager,
837    /// Provider to get current time.
838    time_provider: TimeProviderRef,
839    /// Last time to check regions periodically.
840    last_periodical_check_millis: i64,
841    /// Watch channel sender to notify workers to handle stalled requests.
842    flush_sender: watch::Sender<()>,
843    /// Watch channel receiver to wait for background flush job.
844    flush_receiver: watch::Receiver<()>,
845    /// Gauge of stalling request count.
846    stalling_count: IntGauge,
847    /// Gauge of regions in the worker.
848    region_count: IntGauge,
849    /// Histogram of request wait time for this worker.
850    request_wait_time: Histogram,
851    /// Queues for region edit requests.
852    region_edit_queues: RegionEditQueues,
853    /// Database level metadata manager.
854    schema_metadata_manager: SchemaMetadataManagerRef,
855    /// Datanode level file references manager.
856    file_ref_manager: FileReferenceManagerRef,
857    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
858    partition_expr_fetcher: PartitionExprFetcherRef,
859    /// Semaphore to control flush concurrency.
860    flush_semaphore: Arc<Semaphore>,
861}
862
863impl<S: LogStore> RegionWorkerLoop<S> {
864    /// Starts the worker loop.
865    async fn run(&mut self) {
866        let init_check_delay = worker_init_check_delay();
867        info!(
868            "Start region worker thread {}, init_check_delay: {:?}",
869            self.id, init_check_delay
870        );
871        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
872
873        // Buffer to retrieve requests from receiver.
874        let mut write_req_buffer: Vec<SenderWriteRequest> =
875            Vec::with_capacity(self.config.worker_request_batch_size);
876        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
877            Vec::with_capacity(self.config.worker_request_batch_size);
878        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
879            Vec::with_capacity(self.config.worker_request_batch_size);
880        let mut general_req_buffer: Vec<WorkerRequest> =
881            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
882
883        while self.running.load(Ordering::Relaxed) {
884            // Clear the buffer before handling next batch of requests.
885            write_req_buffer.clear();
886            ddl_req_buffer.clear();
887            general_req_buffer.clear();
888
889            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
890            let sleep = tokio::time::sleep(max_wait_time);
891            tokio::pin!(sleep);
892
893            tokio::select! {
894                request_opt = self.receiver.recv() => {
895                    match request_opt {
896                        Some(request_with_time) => {
897                            // Observe the wait time
898                            let wait_time = request_with_time.created_at.elapsed();
899                            self.request_wait_time.observe(wait_time.as_secs_f64());
900
901                            match request_with_time.request {
902                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
903                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
904                                req => general_req_buffer.push(req),
905                            }
906                        },
907                        // The channel is disconnected.
908                        None => break,
909                    }
910                }
911                recv_res = self.flush_receiver.changed() => {
912                    if recv_res.is_err() {
913                        // The channel is disconnected.
914                        break;
915                    } else {
916                        // Also flush this worker if other workers trigger flush as this worker may have
917                        // a large memtable to flush. We may not have chance to flush that memtable if we
918                        // never write to this worker. So only flushing other workers may not release enough
919                        // memory.
920                        self.maybe_flush_worker();
921                        // A flush job is finished, handles stalled requests.
922                        self.handle_stalled_requests().await;
923                        continue;
924                    }
925                }
926                _ = &mut sleep => {
927                    // Timeout. Checks periodical tasks.
928                    self.handle_periodical_tasks();
929                    continue;
930                }
931            }
932
933            if self.flush_receiver.has_changed().unwrap_or(false) {
934                // Always checks whether we could process stalled requests to avoid a request
935                // hangs too long.
936                // If the channel is closed, do nothing.
937                self.handle_stalled_requests().await;
938            }
939
940            // Try to recv more requests from the channel.
941            for _ in 1..self.config.worker_request_batch_size {
942                // We have received one request so we start from 1.
943                match self.receiver.try_recv() {
944                    Ok(request_with_time) => {
945                        // Observe the wait time
946                        let wait_time = request_with_time.created_at.elapsed();
947                        self.request_wait_time.observe(wait_time.as_secs_f64());
948
949                        match request_with_time.request {
950                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
951                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
952                            req => general_req_buffer.push(req),
953                        }
954                    }
955                    // We still need to handle remaining requests.
956                    Err(_) => break,
957                }
958            }
959
960            self.listener.on_recv_requests(
961                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
962            );
963
964            self.handle_requests(
965                &mut write_req_buffer,
966                &mut ddl_req_buffer,
967                &mut general_req_buffer,
968                &mut bulk_req_buffer,
969            )
970            .await;
971
972            self.handle_periodical_tasks();
973        }
974
975        self.clean().await;
976
977        info!("Exit region worker thread {}", self.id);
978    }
979
980    /// Dispatches and processes requests.
981    ///
982    /// `buffer` should be empty.
983    async fn handle_requests(
984        &mut self,
985        write_requests: &mut Vec<SenderWriteRequest>,
986        ddl_requests: &mut Vec<SenderDdlRequest>,
987        general_requests: &mut Vec<WorkerRequest>,
988        bulk_requests: &mut Vec<SenderBulkRequest>,
989    ) {
990        for worker_req in general_requests.drain(..) {
991            match worker_req {
992                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
993                    // These requests are categorized into write_requests and ddl_requests.
994                    continue;
995                }
996                WorkerRequest::Background { region_id, notify } => {
997                    // For background notify, we handle it directly.
998                    self.handle_background_notify(region_id, notify).await;
999                }
1000                WorkerRequest::SetRegionRoleStateGracefully {
1001                    region_id,
1002                    region_role_state,
1003                    sender,
1004                } => {
1005                    self.set_role_state_gracefully(region_id, region_role_state, sender)
1006                        .await;
1007                }
1008                WorkerRequest::EditRegion(request) => {
1009                    self.handle_region_edit(request);
1010                }
1011                WorkerRequest::Stop => {
1012                    debug_assert!(!self.running.load(Ordering::Relaxed));
1013                }
1014                WorkerRequest::SyncRegion(req) => {
1015                    self.handle_region_sync(req).await;
1016                }
1017                WorkerRequest::BulkInserts {
1018                    metadata,
1019                    request,
1020                    sender,
1021                } => {
1022                    if let Some(region_metadata) = metadata {
1023                        self.handle_bulk_insert_batch(
1024                            region_metadata,
1025                            request,
1026                            bulk_requests,
1027                            sender,
1028                        )
1029                        .await;
1030                    } else {
1031                        error!("Cannot find region metadata for {}", request.region_id);
1032                        sender.send(
1033                            error::RegionNotFoundSnafu {
1034                                region_id: request.region_id,
1035                            }
1036                            .fail(),
1037                        );
1038                    }
1039                }
1040                WorkerRequest::RemapManifests(req) => {
1041                    self.handle_remap_manifests_request(req);
1042                }
1043                WorkerRequest::CopyRegionFrom(req) => {
1044                    self.handle_copy_region_from_request(req);
1045                }
1046            }
1047        }
1048
1049        // Handles all write requests first. So we can alter regions without
1050        // considering existing write requests.
1051        self.handle_write_requests(write_requests, bulk_requests, true)
1052            .await;
1053
1054        self.handle_ddl_requests(ddl_requests).await;
1055    }
1056
1057    /// Takes and handles all ddl requests.
1058    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1059        if ddl_requests.is_empty() {
1060            return;
1061        }
1062
1063        for ddl in ddl_requests.drain(..) {
1064            let res = match ddl.request {
1065                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1066                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
1067                DdlRequest::Open((req, wal_entry_receiver)) => {
1068                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1069                        .await;
1070                    continue;
1071                }
1072                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
1073                DdlRequest::Alter(req) => {
1074                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
1075                        .await;
1076                    continue;
1077                }
1078                DdlRequest::Flush(req) => {
1079                    self.handle_flush_request(ddl.region_id, req, ddl.sender);
1080                    continue;
1081                }
1082                DdlRequest::Compact(req) => {
1083                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1084                        .await;
1085                    continue;
1086                }
1087                DdlRequest::BuildIndex(req) => {
1088                    self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1089                        .await;
1090                    continue;
1091                }
1092                DdlRequest::Truncate(req) => {
1093                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1094                        .await;
1095                    continue;
1096                }
1097                DdlRequest::Catchup((req, wal_entry_receiver)) => {
1098                    self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1099                        .await;
1100                    continue;
1101                }
1102                DdlRequest::EnterStaging(req) => {
1103                    self.handle_enter_staging_request(
1104                        ddl.region_id,
1105                        req.partition_expr,
1106                        ddl.sender,
1107                    )
1108                    .await;
1109                    continue;
1110                }
1111                DdlRequest::ApplyStagingManifest(req) => {
1112                    self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1113                        .await;
1114                    continue;
1115                }
1116            };
1117
1118            ddl.sender.send(res);
1119        }
1120    }
1121
1122    /// Handle periodical tasks such as region auto flush.
1123    fn handle_periodical_tasks(&mut self) {
1124        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1125        if self
1126            .time_provider
1127            .elapsed_since(self.last_periodical_check_millis)
1128            < interval
1129        {
1130            return;
1131        }
1132
1133        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1134
1135        if let Err(e) = self.flush_periodically() {
1136            error!(e; "Failed to flush regions periodically");
1137        }
1138    }
1139
1140    /// Handles region background request
1141    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1142        match notify {
1143            BackgroundNotify::FlushFinished(req) => {
1144                self.handle_flush_finished(region_id, req).await
1145            }
1146            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1147            BackgroundNotify::IndexBuildFinished(req) => {
1148                self.handle_index_build_finished(region_id, req).await
1149            }
1150            BackgroundNotify::IndexBuildStopped(req) => {
1151                self.handle_index_build_stopped(region_id, req).await
1152            }
1153            BackgroundNotify::IndexBuildFailed(req) => {
1154                self.handle_index_build_failed(region_id, req).await
1155            }
1156            BackgroundNotify::CompactionFinished(req) => {
1157                self.handle_compaction_finished(region_id, req).await
1158            }
1159            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1160            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1161            BackgroundNotify::RegionChange(req) => {
1162                self.handle_manifest_region_change_result(req).await
1163            }
1164            BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1165            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1166            BackgroundNotify::CopyRegionFromFinished(req) => {
1167                self.handle_copy_region_from_finished(req)
1168            }
1169        }
1170    }
1171
1172    /// Handles `set_region_role_gracefully`.
1173    async fn set_role_state_gracefully(
1174        &mut self,
1175        region_id: RegionId,
1176        region_role_state: SettableRegionRoleState,
1177        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1178    ) {
1179        if let Some(region) = self.regions.get_region(region_id) {
1180            // We need to do this in background as we need the manifest lock.
1181            common_runtime::spawn_global(async move {
1182                match region.set_role_state_gracefully(region_role_state).await {
1183                    Ok(()) => {
1184                        let last_entry_id = region.version_control.current().last_entry_id;
1185                        let _ = sender.send(SetRegionRoleStateResponse::success(
1186                            SetRegionRoleStateSuccess::mito(last_entry_id),
1187                        ));
1188                    }
1189                    Err(e) => {
1190                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1191                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1192                            BoxedError::new(e),
1193                        ));
1194                    }
1195                }
1196            });
1197        } else {
1198            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1199        }
1200    }
1201}
1202
1203impl<S> RegionWorkerLoop<S> {
1204    /// Cleans up the worker.
1205    async fn clean(&self) {
1206        // Closes remaining regions.
1207        let regions = self.regions.list_regions();
1208        for region in regions {
1209            region.stop().await;
1210        }
1211
1212        self.regions.clear();
1213    }
1214
1215    /// Notifies the whole group that a flush job is finished so other
1216    /// workers can handle stalled requests.
1217    fn notify_group(&mut self) {
1218        // Notifies all receivers.
1219        let _ = self.flush_sender.send(());
1220        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1221        self.flush_receiver.borrow_and_update();
1222    }
1223}
1224
1225/// Wrapper that only calls event listener in tests.
1226#[derive(Default, Clone)]
1227pub(crate) struct WorkerListener {
1228    #[cfg(any(test, feature = "test"))]
1229    listener: Option<crate::engine::listener::EventListenerRef>,
1230}
1231
1232impl WorkerListener {
1233    #[cfg(any(test, feature = "test"))]
1234    pub(crate) fn new(
1235        listener: Option<crate::engine::listener::EventListenerRef>,
1236    ) -> WorkerListener {
1237        WorkerListener { listener }
1238    }
1239
1240    /// Flush is finished successfully.
1241    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1242        #[cfg(any(test, feature = "test"))]
1243        if let Some(listener) = &self.listener {
1244            listener.on_flush_success(region_id);
1245        }
1246        // Avoid compiler warning.
1247        let _ = region_id;
1248    }
1249
1250    /// Engine is stalled.
1251    pub(crate) fn on_write_stall(&self) {
1252        #[cfg(any(test, feature = "test"))]
1253        if let Some(listener) = &self.listener {
1254            listener.on_write_stall();
1255        }
1256    }
1257
1258    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1259        #[cfg(any(test, feature = "test"))]
1260        if let Some(listener) = &self.listener {
1261            listener.on_flush_begin(region_id).await;
1262        }
1263        // Avoid compiler warning.
1264        let _ = region_id;
1265    }
1266
1267    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1268        #[cfg(any(test, feature = "test"))]
1269        if let Some(listener) = &self.listener {
1270            return listener.on_later_drop_begin(region_id);
1271        }
1272        // Avoid compiler warning.
1273        let _ = region_id;
1274        None
1275    }
1276
1277    /// On later drop task is finished.
1278    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1279        #[cfg(any(test, feature = "test"))]
1280        if let Some(listener) = &self.listener {
1281            listener.on_later_drop_end(region_id, removed);
1282        }
1283        // Avoid compiler warning.
1284        let _ = region_id;
1285        let _ = removed;
1286    }
1287
1288    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1289        #[cfg(any(test, feature = "test"))]
1290        if let Some(listener) = &self.listener {
1291            listener.on_merge_ssts_finished(region_id).await;
1292        }
1293        // Avoid compiler warning.
1294        let _ = region_id;
1295    }
1296
1297    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1298        #[cfg(any(test, feature = "test"))]
1299        if let Some(listener) = &self.listener {
1300            listener.on_recv_requests(request_num);
1301        }
1302        // Avoid compiler warning.
1303        let _ = request_num;
1304    }
1305
1306    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1307        #[cfg(any(test, feature = "test"))]
1308        if let Some(listener) = &self.listener {
1309            listener.on_file_cache_filled(_file_id);
1310        }
1311    }
1312
1313    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1314        #[cfg(any(test, feature = "test"))]
1315        if let Some(listener) = &self.listener {
1316            listener.on_compaction_scheduled(_region_id);
1317        }
1318    }
1319
1320    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1321        #[cfg(any(test, feature = "test"))]
1322        if let Some(listener) = &self.listener {
1323            listener
1324                .on_notify_region_change_result_begin(_region_id)
1325                .await;
1326        }
1327    }
1328
1329    pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1330        #[cfg(any(test, feature = "test"))]
1331        if let Some(listener) = &self.listener {
1332            listener.on_enter_staging_result_begin(_region_id).await;
1333        }
1334    }
1335
1336    pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1337        #[cfg(any(test, feature = "test"))]
1338        if let Some(listener) = &self.listener {
1339            listener.on_index_build_finish(_region_file_id).await;
1340        }
1341    }
1342
1343    pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1344        #[cfg(any(test, feature = "test"))]
1345        if let Some(listener) = &self.listener {
1346            listener.on_index_build_begin(_region_file_id).await;
1347        }
1348    }
1349
1350    pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1351        #[cfg(any(test, feature = "test"))]
1352        if let Some(listener) = &self.listener {
1353            listener.on_index_build_abort(_region_file_id).await;
1354        }
1355    }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use super::*;
1361    use crate::test_util::TestEnv;
1362
1363    #[test]
1364    fn test_region_id_to_index() {
1365        let num_workers = 4;
1366
1367        let region_id = RegionId::new(1, 2);
1368        let index = region_id_to_index(region_id, num_workers);
1369        assert_eq!(index, 3);
1370
1371        let region_id = RegionId::new(2, 3);
1372        let index = region_id_to_index(region_id, num_workers);
1373        assert_eq!(index, 1);
1374    }
1375
1376    #[tokio::test]
1377    async fn test_worker_group_start_stop() {
1378        let env = TestEnv::with_prefix("group-stop").await;
1379        let group = env
1380            .create_worker_group(MitoConfig {
1381                num_workers: 4,
1382                ..Default::default()
1383            })
1384            .await;
1385
1386        group.stop().await.unwrap();
1387    }
1388}