mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_enter_staging;
25mod handle_flush;
26mod handle_manifest;
27mod handle_open;
28mod handle_rebuild_index;
29mod handle_remap;
30mod handle_truncate;
31mod handle_write;
32
33use std::collections::HashMap;
34use std::path::Path;
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::Duration;
38
39use common_base::Plugins;
40use common_base::readable_size::ReadableSize;
41use common_error::ext::BoxedError;
42use common_meta::key::SchemaMetadataManagerRef;
43use common_runtime::JoinHandle;
44use common_stat::get_total_memory_bytes;
45use common_telemetry::{error, info, warn};
46use futures::future::try_join_all;
47use object_store::manager::ObjectStoreManagerRef;
48use prometheus::{Histogram, IntGauge};
49use rand::{Rng, rng};
50use snafu::{ResultExt, ensure};
51use store_api::logstore::LogStore;
52use store_api::region_engine::{
53    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
54};
55use store_api::storage::{FileId, RegionId};
56use tokio::sync::mpsc::{Receiver, Sender};
57use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
58
59use crate::cache::write_cache::{WriteCache, WriteCacheRef};
60use crate::cache::{CacheManager, CacheManagerRef};
61use crate::compaction::CompactionScheduler;
62use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
63use crate::config::MitoConfig;
64use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
65use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
66use crate::gc::{GcLimiter, GcLimiterRef};
67use crate::memtable::MemtableBuilderProvider;
68use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
69use crate::region::opener::PartitionExprFetcherRef;
70use crate::region::{
71    CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
72    RegionMapRef,
73};
74use crate::request::{
75    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
76    WorkerRequest, WorkerRequestWithTime,
77};
78use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
79use crate::sst::file::RegionFileId;
80use crate::sst::file_ref::FileReferenceManagerRef;
81use crate::sst::index::IndexBuildScheduler;
82use crate::sst::index::intermediate::IntermediateManager;
83use crate::sst::index::puffin_manager::PuffinManagerFactory;
84use crate::time_provider::{StdTimeProvider, TimeProviderRef};
85use crate::wal::Wal;
86use crate::worker::handle_manifest::RegionEditQueues;
87
88/// Identifier for a worker.
89pub(crate) type WorkerId = u32;
90
91pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
92
93/// Interval to check whether regions should flush.
94pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
95/// Max delay to check region periodical tasks.
96pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
97
98#[cfg_attr(doc, aquamarine::aquamarine)]
99/// A fixed size group of [RegionWorkers](RegionWorker).
100///
101/// A worker group binds each region to a specific [RegionWorker] and sends
102/// requests to region's dedicated worker.
103///
104/// ```mermaid
105/// graph LR
106///
107/// RegionRequest -- Route by region id --> Worker0 & Worker1
108///
109/// subgraph MitoEngine
110///     subgraph WorkerGroup
111///         Worker0["RegionWorker 0"]
112///         Worker1["RegionWorker 1"]
113///     end
114/// end
115///
116/// Chan0[" Request channel 0"]
117/// Chan1[" Request channel 1"]
118/// WorkerThread1["RegionWorkerLoop 1"]
119///
120/// subgraph WorkerThread0["RegionWorkerLoop 0"]
121///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
122///         Region0["Region 0"]
123///         Region2["Region 2"]
124///     end
125///     Buffer0["RequestBuffer"]
126///
127///     Buffer0 -- modify regions --> RegionMap
128/// end
129///
130/// Worker0 --> Chan0
131/// Worker1 --> Chan1
132/// Chan0 --> Buffer0
133/// Chan1 --> WorkerThread1
134/// ```
135pub(crate) struct WorkerGroup {
136    /// Workers of the group.
137    workers: Vec<RegionWorker>,
138    /// Flush background job pool.
139    flush_job_pool: SchedulerRef,
140    /// Compaction background job pool.
141    compact_job_pool: SchedulerRef,
142    /// Scheduler for index build jobs.
143    index_build_job_pool: SchedulerRef,
144    /// Scheduler for file purgers.
145    purge_scheduler: SchedulerRef,
146    /// Cache.
147    cache_manager: CacheManagerRef,
148    /// File reference manager.
149    file_ref_manager: FileReferenceManagerRef,
150    /// Gc limiter to limit concurrent gc jobs.
151    gc_limiter: GcLimiterRef,
152}
153
154impl WorkerGroup {
155    /// Starts a worker group.
156    ///
157    /// The number of workers should be power of two.
158    pub(crate) async fn start<S: LogStore>(
159        config: Arc<MitoConfig>,
160        log_store: Arc<S>,
161        object_store_manager: ObjectStoreManagerRef,
162        schema_metadata_manager: SchemaMetadataManagerRef,
163        file_ref_manager: FileReferenceManagerRef,
164        partition_expr_fetcher: PartitionExprFetcherRef,
165        plugins: Plugins,
166    ) -> Result<WorkerGroup> {
167        let (flush_sender, flush_receiver) = watch::channel(());
168        let write_buffer_manager = Arc::new(
169            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
170                .with_notifier(flush_sender.clone()),
171        );
172        let puffin_manager_factory = PuffinManagerFactory::new(
173            &config.index.aux_path,
174            config.index.staging_size.as_bytes(),
175            Some(config.index.write_buffer_size.as_bytes() as _),
176            config.index.staging_ttl,
177        )
178        .await?;
179        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
180            .await?
181            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
182        let index_build_job_pool =
183            Arc::new(LocalScheduler::new(config.max_background_index_builds));
184        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
185        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
186        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
187        // We use another scheduler to avoid purge jobs blocking other jobs.
188        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
189        let write_cache = write_cache_from_config(
190            &config,
191            puffin_manager_factory.clone(),
192            intermediate_manager.clone(),
193        )
194        .await?;
195        let cache_manager = Arc::new(
196            CacheManager::builder()
197                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
198                .vector_cache_size(config.vector_cache_size.as_bytes())
199                .page_cache_size(config.page_cache_size.as_bytes())
200                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
201                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
202                .index_content_size(config.index.content_cache_size.as_bytes())
203                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
204                .index_result_cache_size(config.index.result_cache_size.as_bytes())
205                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
206                .write_cache(write_cache)
207                .build(),
208        );
209        let time_provider = Arc::new(StdTimeProvider);
210        let total_memory = get_total_memory_bytes();
211        let total_memory = if total_memory > 0 {
212            total_memory as u64
213        } else {
214            0
215        };
216        let compaction_limit_bytes = config
217            .experimental_compaction_memory_limit
218            .resolve(total_memory);
219        let compaction_memory_manager =
220            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
221        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
222
223        let workers = (0..config.num_workers)
224            .map(|id| {
225                WorkerStarter {
226                    id: id as WorkerId,
227                    config: config.clone(),
228                    log_store: log_store.clone(),
229                    object_store_manager: object_store_manager.clone(),
230                    write_buffer_manager: write_buffer_manager.clone(),
231                    index_build_job_pool: index_build_job_pool.clone(),
232                    flush_job_pool: flush_job_pool.clone(),
233                    compact_job_pool: compact_job_pool.clone(),
234                    purge_scheduler: purge_scheduler.clone(),
235                    listener: WorkerListener::default(),
236                    cache_manager: cache_manager.clone(),
237                    compaction_memory_manager: compaction_memory_manager.clone(),
238                    puffin_manager_factory: puffin_manager_factory.clone(),
239                    intermediate_manager: intermediate_manager.clone(),
240                    time_provider: time_provider.clone(),
241                    flush_sender: flush_sender.clone(),
242                    flush_receiver: flush_receiver.clone(),
243                    plugins: plugins.clone(),
244                    schema_metadata_manager: schema_metadata_manager.clone(),
245                    file_ref_manager: file_ref_manager.clone(),
246                    partition_expr_fetcher: partition_expr_fetcher.clone(),
247                    flush_semaphore: flush_semaphore.clone(),
248                }
249                .start()
250            })
251            .collect::<Result<Vec<_>>>()?;
252
253        Ok(WorkerGroup {
254            workers,
255            flush_job_pool,
256            compact_job_pool,
257            index_build_job_pool,
258            purge_scheduler,
259            cache_manager,
260            file_ref_manager,
261            gc_limiter,
262        })
263    }
264
265    /// Stops the worker group.
266    pub(crate) async fn stop(&self) -> Result<()> {
267        info!("Stop region worker group");
268
269        // TODO(yingwen): Do we need to stop gracefully?
270        // Stops the scheduler gracefully.
271        self.compact_job_pool.stop(true).await?;
272        // Stops the scheduler gracefully.
273        self.flush_job_pool.stop(true).await?;
274        // Stops the purge scheduler gracefully.
275        self.purge_scheduler.stop(true).await?;
276        // Stops the index build job pool gracefully.
277        self.index_build_job_pool.stop(true).await?;
278
279        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
280
281        Ok(())
282    }
283
284    /// Submits a request to a worker in the group.
285    pub(crate) async fn submit_to_worker(
286        &self,
287        region_id: RegionId,
288        request: WorkerRequest,
289    ) -> Result<()> {
290        self.worker(region_id).submit_request(request).await
291    }
292
293    /// Returns true if the specific region exists.
294    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
295        self.worker(region_id).is_region_exists(region_id)
296    }
297
298    /// Returns true if the specific region is opening.
299    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
300        self.worker(region_id).is_region_opening(region_id)
301    }
302
303    /// Returns true if the specific region is catching up.
304    pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
305        self.worker(region_id).is_region_catching_up(region_id)
306    }
307
308    /// Returns region of specific `region_id`.
309    ///
310    /// This method should not be public.
311    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
312        self.worker(region_id).get_region(region_id)
313    }
314
315    /// Returns cache of the group.
316    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
317        self.cache_manager.clone()
318    }
319
320    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
321        self.file_ref_manager.clone()
322    }
323
324    pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
325        self.gc_limiter.clone()
326    }
327
328    /// Get worker for specific `region_id`.
329    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
330        let index = region_id_to_index(region_id, self.workers.len());
331
332        &self.workers[index]
333    }
334
335    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
336        self.workers
337            .iter()
338            .flat_map(|worker| worker.regions.list_regions())
339    }
340}
341
342// Tests methods.
343#[cfg(any(test, feature = "test"))]
344impl WorkerGroup {
345    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
346    ///
347    /// The number of workers should be power of two.
348    #[allow(clippy::too_many_arguments)]
349    pub(crate) async fn start_for_test<S: LogStore>(
350        config: Arc<MitoConfig>,
351        log_store: Arc<S>,
352        object_store_manager: ObjectStoreManagerRef,
353        write_buffer_manager: Option<WriteBufferManagerRef>,
354        listener: Option<crate::engine::listener::EventListenerRef>,
355        schema_metadata_manager: SchemaMetadataManagerRef,
356        file_ref_manager: FileReferenceManagerRef,
357        time_provider: TimeProviderRef,
358        partition_expr_fetcher: PartitionExprFetcherRef,
359    ) -> Result<WorkerGroup> {
360        let (flush_sender, flush_receiver) = watch::channel(());
361        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
362            Arc::new(
363                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
364                    .with_notifier(flush_sender.clone()),
365            )
366        });
367        let index_build_job_pool =
368            Arc::new(LocalScheduler::new(config.max_background_index_builds));
369        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
370        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
371        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
372        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
373        let puffin_manager_factory = PuffinManagerFactory::new(
374            &config.index.aux_path,
375            config.index.staging_size.as_bytes(),
376            Some(config.index.write_buffer_size.as_bytes() as _),
377            config.index.staging_ttl,
378        )
379        .await?;
380        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
381            .await?
382            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
383        let write_cache = write_cache_from_config(
384            &config,
385            puffin_manager_factory.clone(),
386            intermediate_manager.clone(),
387        )
388        .await?;
389        let cache_manager = Arc::new(
390            CacheManager::builder()
391                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
392                .vector_cache_size(config.vector_cache_size.as_bytes())
393                .page_cache_size(config.page_cache_size.as_bytes())
394                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
395                .write_cache(write_cache)
396                .build(),
397        );
398        let total_memory = get_total_memory_bytes();
399        let total_memory = if total_memory > 0 {
400            total_memory as u64
401        } else {
402            0
403        };
404        let compaction_limit_bytes = config
405            .experimental_compaction_memory_limit
406            .resolve(total_memory);
407        let compaction_memory_manager =
408            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
409        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
410        let workers = (0..config.num_workers)
411            .map(|id| {
412                WorkerStarter {
413                    id: id as WorkerId,
414                    config: config.clone(),
415                    log_store: log_store.clone(),
416                    object_store_manager: object_store_manager.clone(),
417                    write_buffer_manager: write_buffer_manager.clone(),
418                    index_build_job_pool: index_build_job_pool.clone(),
419                    flush_job_pool: flush_job_pool.clone(),
420                    compact_job_pool: compact_job_pool.clone(),
421                    purge_scheduler: purge_scheduler.clone(),
422                    listener: WorkerListener::new(listener.clone()),
423                    cache_manager: cache_manager.clone(),
424                    compaction_memory_manager: compaction_memory_manager.clone(),
425                    puffin_manager_factory: puffin_manager_factory.clone(),
426                    intermediate_manager: intermediate_manager.clone(),
427                    time_provider: time_provider.clone(),
428                    flush_sender: flush_sender.clone(),
429                    flush_receiver: flush_receiver.clone(),
430                    plugins: Plugins::new(),
431                    schema_metadata_manager: schema_metadata_manager.clone(),
432                    file_ref_manager: file_ref_manager.clone(),
433                    partition_expr_fetcher: partition_expr_fetcher.clone(),
434                    flush_semaphore: flush_semaphore.clone(),
435                }
436                .start()
437            })
438            .collect::<Result<Vec<_>>>()?;
439
440        Ok(WorkerGroup {
441            workers,
442            flush_job_pool,
443            compact_job_pool,
444            index_build_job_pool,
445            purge_scheduler,
446            cache_manager,
447            file_ref_manager,
448            gc_limiter,
449        })
450    }
451
452    /// Returns the purge scheduler.
453    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
454        &self.purge_scheduler
455    }
456}
457
458fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
459    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
460        % num_workers
461}
462
463pub async fn write_cache_from_config(
464    config: &MitoConfig,
465    puffin_manager_factory: PuffinManagerFactory,
466    intermediate_manager: IntermediateManager,
467) -> Result<Option<WriteCacheRef>> {
468    if !config.enable_write_cache {
469        return Ok(None);
470    }
471
472    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
473        .await
474        .context(CreateDirSnafu {
475            dir: &config.write_cache_path,
476        })?;
477
478    let cache = WriteCache::new_fs(
479        &config.write_cache_path,
480        config.write_cache_size,
481        config.write_cache_ttl,
482        Some(config.index_cache_percent),
483        puffin_manager_factory,
484        intermediate_manager,
485        // TODO(yingwen): Enable manifest cache after removing read cache.
486        ReadableSize(0),
487    )
488    .await?;
489    Ok(Some(Arc::new(cache)))
490}
491
492/// Computes a initial check delay for a worker.
493pub(crate) fn worker_init_check_delay() -> Duration {
494    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
495    Duration::from_secs(init_check_delay)
496}
497
498/// Worker start config.
499struct WorkerStarter<S> {
500    id: WorkerId,
501    config: Arc<MitoConfig>,
502    log_store: Arc<S>,
503    object_store_manager: ObjectStoreManagerRef,
504    write_buffer_manager: WriteBufferManagerRef,
505    compact_job_pool: SchedulerRef,
506    index_build_job_pool: SchedulerRef,
507    flush_job_pool: SchedulerRef,
508    purge_scheduler: SchedulerRef,
509    listener: WorkerListener,
510    cache_manager: CacheManagerRef,
511    compaction_memory_manager: Arc<CompactionMemoryManager>,
512    puffin_manager_factory: PuffinManagerFactory,
513    intermediate_manager: IntermediateManager,
514    time_provider: TimeProviderRef,
515    /// Watch channel sender to notify workers to handle stalled requests.
516    flush_sender: watch::Sender<()>,
517    /// Watch channel receiver to wait for background flush job.
518    flush_receiver: watch::Receiver<()>,
519    plugins: Plugins,
520    schema_metadata_manager: SchemaMetadataManagerRef,
521    file_ref_manager: FileReferenceManagerRef,
522    partition_expr_fetcher: PartitionExprFetcherRef,
523    flush_semaphore: Arc<Semaphore>,
524}
525
526impl<S: LogStore> WorkerStarter<S> {
527    /// Starts a region worker and its background thread.
528    fn start(self) -> Result<RegionWorker> {
529        let regions = Arc::new(RegionMap::default());
530        let opening_regions = Arc::new(OpeningRegions::default());
531        let catchup_regions = Arc::new(CatchupRegions::default());
532        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
533
534        let running = Arc::new(AtomicBool::new(true));
535        let now = self.time_provider.current_time_millis();
536        let id_string = self.id.to_string();
537        let mut worker_thread = RegionWorkerLoop {
538            id: self.id,
539            config: self.config.clone(),
540            regions: regions.clone(),
541            catchup_regions: catchup_regions.clone(),
542            dropping_regions: Arc::new(RegionMap::default()),
543            opening_regions: opening_regions.clone(),
544            sender: sender.clone(),
545            receiver,
546            wal: Wal::new(self.log_store),
547            object_store_manager: self.object_store_manager.clone(),
548            running: running.clone(),
549            memtable_builder_provider: MemtableBuilderProvider::new(
550                Some(self.write_buffer_manager.clone()),
551                self.config.clone(),
552            ),
553            purge_scheduler: self.purge_scheduler.clone(),
554            write_buffer_manager: self.write_buffer_manager,
555            index_build_scheduler: IndexBuildScheduler::new(
556                self.index_build_job_pool,
557                self.config.max_background_index_builds,
558            ),
559            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
560            compaction_scheduler: CompactionScheduler::new(
561                self.compact_job_pool,
562                sender.clone(),
563                self.cache_manager.clone(),
564                self.config.clone(),
565                self.listener.clone(),
566                self.plugins.clone(),
567                self.compaction_memory_manager.clone(),
568                self.config.experimental_compaction_on_exhausted,
569            ),
570            stalled_requests: StalledRequests::default(),
571            listener: self.listener,
572            cache_manager: self.cache_manager,
573            puffin_manager_factory: self.puffin_manager_factory,
574            intermediate_manager: self.intermediate_manager,
575            time_provider: self.time_provider,
576            last_periodical_check_millis: now,
577            flush_sender: self.flush_sender,
578            flush_receiver: self.flush_receiver,
579            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
580            region_count: REGION_COUNT.with_label_values(&[&id_string]),
581            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
582            region_edit_queues: RegionEditQueues::default(),
583            schema_metadata_manager: self.schema_metadata_manager,
584            file_ref_manager: self.file_ref_manager.clone(),
585            partition_expr_fetcher: self.partition_expr_fetcher,
586            flush_semaphore: self.flush_semaphore,
587        };
588        let handle = common_runtime::spawn_global(async move {
589            worker_thread.run().await;
590        });
591
592        Ok(RegionWorker {
593            id: self.id,
594            regions,
595            opening_regions,
596            catchup_regions,
597            sender,
598            handle: Mutex::new(Some(handle)),
599            running,
600        })
601    }
602}
603
604/// Worker to write and alter regions bound to it.
605pub(crate) struct RegionWorker {
606    /// Id of the worker.
607    id: WorkerId,
608    /// Regions bound to the worker.
609    regions: RegionMapRef,
610    /// The opening regions.
611    opening_regions: OpeningRegionsRef,
612    /// The catching up regions.
613    catchup_regions: CatchupRegionsRef,
614    /// Request sender.
615    sender: Sender<WorkerRequestWithTime>,
616    /// Handle to the worker thread.
617    handle: Mutex<Option<JoinHandle<()>>>,
618    /// Whether to run the worker thread.
619    running: Arc<AtomicBool>,
620}
621
622impl RegionWorker {
623    /// Submits request to background worker thread.
624    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
625        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
626        let request_with_time = WorkerRequestWithTime::new(request);
627        if self.sender.send(request_with_time).await.is_err() {
628            warn!(
629                "Worker {} is already exited but the running flag is still true",
630                self.id
631            );
632            // Manually set the running flag to false to avoid printing more warning logs.
633            self.set_running(false);
634            return WorkerStoppedSnafu { id: self.id }.fail();
635        }
636
637        Ok(())
638    }
639
640    /// Stop the worker.
641    ///
642    /// This method waits until the worker thread exists.
643    async fn stop(&self) -> Result<()> {
644        let handle = self.handle.lock().await.take();
645        if let Some(handle) = handle {
646            info!("Stop region worker {}", self.id);
647
648            self.set_running(false);
649            if self
650                .sender
651                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
652                .await
653                .is_err()
654            {
655                warn!("Worker {} is already exited before stop", self.id);
656            }
657
658            handle.await.context(JoinSnafu)?;
659        }
660
661        Ok(())
662    }
663
664    /// Returns true if the worker is still running.
665    fn is_running(&self) -> bool {
666        self.running.load(Ordering::Relaxed)
667    }
668
669    /// Sets whether the worker is still running.
670    fn set_running(&self, value: bool) {
671        self.running.store(value, Ordering::Relaxed)
672    }
673
674    /// Returns true if the worker contains specific region.
675    fn is_region_exists(&self, region_id: RegionId) -> bool {
676        self.regions.is_region_exists(region_id)
677    }
678
679    /// Returns true if the region is opening.
680    fn is_region_opening(&self, region_id: RegionId) -> bool {
681        self.opening_regions.is_region_exists(region_id)
682    }
683
684    /// Returns true if the region is catching up.
685    fn is_region_catching_up(&self, region_id: RegionId) -> bool {
686        self.catchup_regions.is_region_exists(region_id)
687    }
688
689    /// Returns region of specific `region_id`.
690    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
691        self.regions.get_region(region_id)
692    }
693
694    #[cfg(test)]
695    /// Returns the [OpeningRegionsRef].
696    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
697        &self.opening_regions
698    }
699
700    #[cfg(test)]
701    /// Returns the [CatchupRegionsRef].
702    pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
703        &self.catchup_regions
704    }
705}
706
707impl Drop for RegionWorker {
708    fn drop(&mut self) {
709        if self.is_running() {
710            self.set_running(false);
711            // Once we drop the sender, the worker thread will receive a disconnected error.
712        }
713    }
714}
715
716type RequestBuffer = Vec<WorkerRequest>;
717
718/// Buffer for stalled write requests.
719///
720/// Maintains stalled write requests and their estimated size.
721#[derive(Default)]
722pub(crate) struct StalledRequests {
723    /// Stalled requests.
724    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
725    /// instead of `StalledRequests::requests.len()`.
726    ///
727    /// Key: RegionId
728    /// Value: (estimated size, stalled requests)
729    pub(crate) requests:
730        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
731    /// Estimated size of all stalled requests.
732    pub(crate) estimated_size: usize,
733}
734
735impl StalledRequests {
736    /// Appends stalled requests.
737    pub(crate) fn append(
738        &mut self,
739        requests: &mut Vec<SenderWriteRequest>,
740        bulk_requests: &mut Vec<SenderBulkRequest>,
741    ) {
742        for req in requests.drain(..) {
743            self.push(req);
744        }
745        for req in bulk_requests.drain(..) {
746            self.push_bulk(req);
747        }
748    }
749
750    /// Pushes a stalled request to the buffer.
751    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
752        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
753        let req_size = req.request.estimated_size();
754        *size += req_size;
755        self.estimated_size += req_size;
756        requests.push(req);
757    }
758
759    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
760        let region_id = req.region_id;
761        let (size, _, requests) = self.requests.entry(region_id).or_default();
762        let req_size = req.request.estimated_size();
763        *size += req_size;
764        self.estimated_size += req_size;
765        requests.push(req);
766    }
767
768    /// Removes stalled requests of specific region.
769    pub(crate) fn remove(
770        &mut self,
771        region_id: &RegionId,
772    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
773        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
774            self.estimated_size -= size;
775            (write_reqs, bulk_reqs)
776        } else {
777            (vec![], vec![])
778        }
779    }
780
781    /// Returns the total number of all stalled requests.
782    pub(crate) fn stalled_count(&self) -> usize {
783        self.requests
784            .values()
785            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
786            .sum()
787    }
788}
789
790/// Background worker loop to handle requests.
791struct RegionWorkerLoop<S> {
792    /// Id of the worker.
793    id: WorkerId,
794    /// Engine config.
795    config: Arc<MitoConfig>,
796    /// Regions bound to the worker.
797    regions: RegionMapRef,
798    /// Regions that are not yet fully dropped.
799    dropping_regions: RegionMapRef,
800    /// Regions that are opening.
801    opening_regions: OpeningRegionsRef,
802    /// Regions that are catching up.
803    catchup_regions: CatchupRegionsRef,
804    /// Request sender.
805    sender: Sender<WorkerRequestWithTime>,
806    /// Request receiver.
807    receiver: Receiver<WorkerRequestWithTime>,
808    /// WAL of the engine.
809    wal: Wal<S>,
810    /// Manages object stores for manifest and SSTs.
811    object_store_manager: ObjectStoreManagerRef,
812    /// Whether the worker thread is still running.
813    running: Arc<AtomicBool>,
814    /// Memtable builder provider for each region.
815    memtable_builder_provider: MemtableBuilderProvider,
816    /// Background purge job scheduler.
817    purge_scheduler: SchedulerRef,
818    /// Engine write buffer manager.
819    write_buffer_manager: WriteBufferManagerRef,
820    /// Scheduler for index build task.
821    index_build_scheduler: IndexBuildScheduler,
822    /// Schedules background flush requests.
823    flush_scheduler: FlushScheduler,
824    /// Scheduler for compaction tasks.
825    compaction_scheduler: CompactionScheduler,
826    /// Stalled write requests.
827    stalled_requests: StalledRequests,
828    /// Event listener for tests.
829    listener: WorkerListener,
830    /// Cache.
831    cache_manager: CacheManagerRef,
832    /// Puffin manager factory for index.
833    puffin_manager_factory: PuffinManagerFactory,
834    /// Intermediate manager for inverted index.
835    intermediate_manager: IntermediateManager,
836    /// Provider to get current time.
837    time_provider: TimeProviderRef,
838    /// Last time to check regions periodically.
839    last_periodical_check_millis: i64,
840    /// Watch channel sender to notify workers to handle stalled requests.
841    flush_sender: watch::Sender<()>,
842    /// Watch channel receiver to wait for background flush job.
843    flush_receiver: watch::Receiver<()>,
844    /// Gauge of stalling request count.
845    stalling_count: IntGauge,
846    /// Gauge of regions in the worker.
847    region_count: IntGauge,
848    /// Histogram of request wait time for this worker.
849    request_wait_time: Histogram,
850    /// Queues for region edit requests.
851    region_edit_queues: RegionEditQueues,
852    /// Database level metadata manager.
853    schema_metadata_manager: SchemaMetadataManagerRef,
854    /// Datanode level file references manager.
855    file_ref_manager: FileReferenceManagerRef,
856    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
857    partition_expr_fetcher: PartitionExprFetcherRef,
858    /// Semaphore to control flush concurrency.
859    flush_semaphore: Arc<Semaphore>,
860}
861
862impl<S: LogStore> RegionWorkerLoop<S> {
863    /// Starts the worker loop.
864    async fn run(&mut self) {
865        let init_check_delay = worker_init_check_delay();
866        info!(
867            "Start region worker thread {}, init_check_delay: {:?}",
868            self.id, init_check_delay
869        );
870        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
871
872        // Buffer to retrieve requests from receiver.
873        let mut write_req_buffer: Vec<SenderWriteRequest> =
874            Vec::with_capacity(self.config.worker_request_batch_size);
875        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
876            Vec::with_capacity(self.config.worker_request_batch_size);
877        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
878            Vec::with_capacity(self.config.worker_request_batch_size);
879        let mut general_req_buffer: Vec<WorkerRequest> =
880            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
881
882        while self.running.load(Ordering::Relaxed) {
883            // Clear the buffer before handling next batch of requests.
884            write_req_buffer.clear();
885            ddl_req_buffer.clear();
886            general_req_buffer.clear();
887
888            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
889            let sleep = tokio::time::sleep(max_wait_time);
890            tokio::pin!(sleep);
891
892            tokio::select! {
893                request_opt = self.receiver.recv() => {
894                    match request_opt {
895                        Some(request_with_time) => {
896                            // Observe the wait time
897                            let wait_time = request_with_time.created_at.elapsed();
898                            self.request_wait_time.observe(wait_time.as_secs_f64());
899
900                            match request_with_time.request {
901                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
902                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
903                                req => general_req_buffer.push(req),
904                            }
905                        },
906                        // The channel is disconnected.
907                        None => break,
908                    }
909                }
910                recv_res = self.flush_receiver.changed() => {
911                    if recv_res.is_err() {
912                        // The channel is disconnected.
913                        break;
914                    } else {
915                        // Also flush this worker if other workers trigger flush as this worker may have
916                        // a large memtable to flush. We may not have chance to flush that memtable if we
917                        // never write to this worker. So only flushing other workers may not release enough
918                        // memory.
919                        self.maybe_flush_worker();
920                        // A flush job is finished, handles stalled requests.
921                        self.handle_stalled_requests().await;
922                        continue;
923                    }
924                }
925                _ = &mut sleep => {
926                    // Timeout. Checks periodical tasks.
927                    self.handle_periodical_tasks();
928                    continue;
929                }
930            }
931
932            if self.flush_receiver.has_changed().unwrap_or(false) {
933                // Always checks whether we could process stalled requests to avoid a request
934                // hangs too long.
935                // If the channel is closed, do nothing.
936                self.handle_stalled_requests().await;
937            }
938
939            // Try to recv more requests from the channel.
940            for _ in 1..self.config.worker_request_batch_size {
941                // We have received one request so we start from 1.
942                match self.receiver.try_recv() {
943                    Ok(request_with_time) => {
944                        // Observe the wait time
945                        let wait_time = request_with_time.created_at.elapsed();
946                        self.request_wait_time.observe(wait_time.as_secs_f64());
947
948                        match request_with_time.request {
949                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
950                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
951                            req => general_req_buffer.push(req),
952                        }
953                    }
954                    // We still need to handle remaining requests.
955                    Err(_) => break,
956                }
957            }
958
959            self.listener.on_recv_requests(
960                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
961            );
962
963            self.handle_requests(
964                &mut write_req_buffer,
965                &mut ddl_req_buffer,
966                &mut general_req_buffer,
967                &mut bulk_req_buffer,
968            )
969            .await;
970
971            self.handle_periodical_tasks();
972        }
973
974        self.clean().await;
975
976        info!("Exit region worker thread {}", self.id);
977    }
978
979    /// Dispatches and processes requests.
980    ///
981    /// `buffer` should be empty.
982    async fn handle_requests(
983        &mut self,
984        write_requests: &mut Vec<SenderWriteRequest>,
985        ddl_requests: &mut Vec<SenderDdlRequest>,
986        general_requests: &mut Vec<WorkerRequest>,
987        bulk_requests: &mut Vec<SenderBulkRequest>,
988    ) {
989        for worker_req in general_requests.drain(..) {
990            match worker_req {
991                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
992                    // These requests are categorized into write_requests and ddl_requests.
993                    continue;
994                }
995                WorkerRequest::Background { region_id, notify } => {
996                    // For background notify, we handle it directly.
997                    self.handle_background_notify(region_id, notify).await;
998                }
999                WorkerRequest::SetRegionRoleStateGracefully {
1000                    region_id,
1001                    region_role_state,
1002                    sender,
1003                } => {
1004                    self.set_role_state_gracefully(region_id, region_role_state, sender)
1005                        .await;
1006                }
1007                WorkerRequest::EditRegion(request) => {
1008                    self.handle_region_edit(request).await;
1009                }
1010                WorkerRequest::Stop => {
1011                    debug_assert!(!self.running.load(Ordering::Relaxed));
1012                }
1013                WorkerRequest::SyncRegion(req) => {
1014                    self.handle_region_sync(req).await;
1015                }
1016                WorkerRequest::BulkInserts {
1017                    metadata,
1018                    request,
1019                    sender,
1020                } => {
1021                    if let Some(region_metadata) = metadata {
1022                        self.handle_bulk_insert_batch(
1023                            region_metadata,
1024                            request,
1025                            bulk_requests,
1026                            sender,
1027                        )
1028                        .await;
1029                    } else {
1030                        error!("Cannot find region metadata for {}", request.region_id);
1031                        sender.send(
1032                            error::RegionNotFoundSnafu {
1033                                region_id: request.region_id,
1034                            }
1035                            .fail(),
1036                        );
1037                    }
1038                }
1039                WorkerRequest::RemapManifests(req) => {
1040                    self.handle_remap_manifests_request(req);
1041                }
1042            }
1043        }
1044
1045        // Handles all write requests first. So we can alter regions without
1046        // considering existing write requests.
1047        self.handle_write_requests(write_requests, bulk_requests, true)
1048            .await;
1049
1050        self.handle_ddl_requests(ddl_requests).await;
1051    }
1052
1053    /// Takes and handles all ddl requests.
1054    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1055        if ddl_requests.is_empty() {
1056            return;
1057        }
1058
1059        for ddl in ddl_requests.drain(..) {
1060            let res = match ddl.request {
1061                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1062                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
1063                DdlRequest::Open((req, wal_entry_receiver)) => {
1064                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1065                        .await;
1066                    continue;
1067                }
1068                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
1069                DdlRequest::Alter(req) => {
1070                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
1071                        .await;
1072                    continue;
1073                }
1074                DdlRequest::Flush(req) => {
1075                    self.handle_flush_request(ddl.region_id, req, ddl.sender);
1076                    continue;
1077                }
1078                DdlRequest::Compact(req) => {
1079                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1080                        .await;
1081                    continue;
1082                }
1083                DdlRequest::BuildIndex(req) => {
1084                    self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1085                        .await;
1086                    continue;
1087                }
1088                DdlRequest::Truncate(req) => {
1089                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1090                        .await;
1091                    continue;
1092                }
1093                DdlRequest::Catchup((req, wal_entry_receiver)) => {
1094                    self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1095                        .await;
1096                    continue;
1097                }
1098                DdlRequest::EnterStaging(req) => {
1099                    self.handle_enter_staging_request(
1100                        ddl.region_id,
1101                        req.partition_expr,
1102                        ddl.sender,
1103                    )
1104                    .await;
1105                    continue;
1106                }
1107            };
1108
1109            ddl.sender.send(res);
1110        }
1111    }
1112
1113    /// Handle periodical tasks such as region auto flush.
1114    fn handle_periodical_tasks(&mut self) {
1115        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1116        if self
1117            .time_provider
1118            .elapsed_since(self.last_periodical_check_millis)
1119            < interval
1120        {
1121            return;
1122        }
1123
1124        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1125
1126        if let Err(e) = self.flush_periodically() {
1127            error!(e; "Failed to flush regions periodically");
1128        }
1129    }
1130
1131    /// Handles region background request
1132    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1133        match notify {
1134            BackgroundNotify::FlushFinished(req) => {
1135                self.handle_flush_finished(region_id, req).await
1136            }
1137            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1138            BackgroundNotify::IndexBuildFinished(req) => {
1139                self.handle_index_build_finished(region_id, req).await
1140            }
1141            BackgroundNotify::IndexBuildStopped(req) => {
1142                self.handle_index_build_stopped(region_id, req).await
1143            }
1144            BackgroundNotify::IndexBuildFailed(req) => {
1145                self.handle_index_build_failed(region_id, req).await
1146            }
1147            BackgroundNotify::CompactionFinished(req) => {
1148                self.handle_compaction_finished(region_id, req).await
1149            }
1150            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1151            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1152            BackgroundNotify::RegionChange(req) => {
1153                self.handle_manifest_region_change_result(req).await
1154            }
1155            BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1156            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1157        }
1158    }
1159
1160    /// Handles `set_region_role_gracefully`.
1161    async fn set_role_state_gracefully(
1162        &mut self,
1163        region_id: RegionId,
1164        region_role_state: SettableRegionRoleState,
1165        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1166    ) {
1167        if let Some(region) = self.regions.get_region(region_id) {
1168            // We need to do this in background as we need the manifest lock.
1169            common_runtime::spawn_global(async move {
1170                match region.set_role_state_gracefully(region_role_state).await {
1171                    Ok(()) => {
1172                        let last_entry_id = region.version_control.current().last_entry_id;
1173                        let _ = sender.send(SetRegionRoleStateResponse::success(
1174                            SetRegionRoleStateSuccess::mito(last_entry_id),
1175                        ));
1176                    }
1177                    Err(e) => {
1178                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1179                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1180                            BoxedError::new(e),
1181                        ));
1182                    }
1183                }
1184            });
1185        } else {
1186            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1187        }
1188    }
1189}
1190
1191impl<S> RegionWorkerLoop<S> {
1192    /// Cleans up the worker.
1193    async fn clean(&self) {
1194        // Closes remaining regions.
1195        let regions = self.regions.list_regions();
1196        for region in regions {
1197            region.stop().await;
1198        }
1199
1200        self.regions.clear();
1201    }
1202
1203    /// Notifies the whole group that a flush job is finished so other
1204    /// workers can handle stalled requests.
1205    fn notify_group(&mut self) {
1206        // Notifies all receivers.
1207        let _ = self.flush_sender.send(());
1208        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1209        self.flush_receiver.borrow_and_update();
1210    }
1211}
1212
1213/// Wrapper that only calls event listener in tests.
1214#[derive(Default, Clone)]
1215pub(crate) struct WorkerListener {
1216    #[cfg(any(test, feature = "test"))]
1217    listener: Option<crate::engine::listener::EventListenerRef>,
1218}
1219
1220impl WorkerListener {
1221    #[cfg(any(test, feature = "test"))]
1222    pub(crate) fn new(
1223        listener: Option<crate::engine::listener::EventListenerRef>,
1224    ) -> WorkerListener {
1225        WorkerListener { listener }
1226    }
1227
1228    /// Flush is finished successfully.
1229    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1230        #[cfg(any(test, feature = "test"))]
1231        if let Some(listener) = &self.listener {
1232            listener.on_flush_success(region_id);
1233        }
1234        // Avoid compiler warning.
1235        let _ = region_id;
1236    }
1237
1238    /// Engine is stalled.
1239    pub(crate) fn on_write_stall(&self) {
1240        #[cfg(any(test, feature = "test"))]
1241        if let Some(listener) = &self.listener {
1242            listener.on_write_stall();
1243        }
1244    }
1245
1246    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1247        #[cfg(any(test, feature = "test"))]
1248        if let Some(listener) = &self.listener {
1249            listener.on_flush_begin(region_id).await;
1250        }
1251        // Avoid compiler warning.
1252        let _ = region_id;
1253    }
1254
1255    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1256        #[cfg(any(test, feature = "test"))]
1257        if let Some(listener) = &self.listener {
1258            return listener.on_later_drop_begin(region_id);
1259        }
1260        // Avoid compiler warning.
1261        let _ = region_id;
1262        None
1263    }
1264
1265    /// On later drop task is finished.
1266    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1267        #[cfg(any(test, feature = "test"))]
1268        if let Some(listener) = &self.listener {
1269            listener.on_later_drop_end(region_id, removed);
1270        }
1271        // Avoid compiler warning.
1272        let _ = region_id;
1273        let _ = removed;
1274    }
1275
1276    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1277        #[cfg(any(test, feature = "test"))]
1278        if let Some(listener) = &self.listener {
1279            listener.on_merge_ssts_finished(region_id).await;
1280        }
1281        // Avoid compiler warning.
1282        let _ = region_id;
1283    }
1284
1285    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1286        #[cfg(any(test, feature = "test"))]
1287        if let Some(listener) = &self.listener {
1288            listener.on_recv_requests(request_num);
1289        }
1290        // Avoid compiler warning.
1291        let _ = request_num;
1292    }
1293
1294    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1295        #[cfg(any(test, feature = "test"))]
1296        if let Some(listener) = &self.listener {
1297            listener.on_file_cache_filled(_file_id);
1298        }
1299    }
1300
1301    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1302        #[cfg(any(test, feature = "test"))]
1303        if let Some(listener) = &self.listener {
1304            listener.on_compaction_scheduled(_region_id);
1305        }
1306    }
1307
1308    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1309        #[cfg(any(test, feature = "test"))]
1310        if let Some(listener) = &self.listener {
1311            listener
1312                .on_notify_region_change_result_begin(_region_id)
1313                .await;
1314        }
1315    }
1316
1317    pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1318        #[cfg(any(test, feature = "test"))]
1319        if let Some(listener) = &self.listener {
1320            listener.on_enter_staging_result_begin(_region_id).await;
1321        }
1322    }
1323
1324    pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1325        #[cfg(any(test, feature = "test"))]
1326        if let Some(listener) = &self.listener {
1327            listener.on_index_build_finish(_region_file_id).await;
1328        }
1329    }
1330
1331    pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1332        #[cfg(any(test, feature = "test"))]
1333        if let Some(listener) = &self.listener {
1334            listener.on_index_build_begin(_region_file_id).await;
1335        }
1336    }
1337
1338    pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1339        #[cfg(any(test, feature = "test"))]
1340        if let Some(listener) = &self.listener {
1341            listener.on_index_build_abort(_region_file_id).await;
1342        }
1343    }
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348    use super::*;
1349    use crate::test_util::TestEnv;
1350
1351    #[test]
1352    fn test_region_id_to_index() {
1353        let num_workers = 4;
1354
1355        let region_id = RegionId::new(1, 2);
1356        let index = region_id_to_index(region_id, num_workers);
1357        assert_eq!(index, 3);
1358
1359        let region_id = RegionId::new(2, 3);
1360        let index = region_id_to_index(region_id, num_workers);
1361        assert_eq!(index, 1);
1362    }
1363
1364    #[tokio::test]
1365    async fn test_worker_group_start_stop() {
1366        let env = TestEnv::with_prefix("group-stop").await;
1367        let group = env
1368            .create_worker_group(MitoConfig {
1369                num_workers: 4,
1370                ..Default::default()
1371            })
1372            .await;
1373
1374        group.stop().await.unwrap();
1375    }
1376}