mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_apply_staging;
19mod handle_bulk_insert;
20mod handle_catchup;
21mod handle_close;
22mod handle_compaction;
23mod handle_copy_region;
24mod handle_create;
25mod handle_drop;
26mod handle_enter_staging;
27mod handle_flush;
28mod handle_manifest;
29mod handle_open;
30mod handle_rebuild_index;
31mod handle_remap;
32mod handle_truncate;
33mod handle_write;
34
35use std::collections::HashMap;
36use std::path::Path;
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40
41use common_base::Plugins;
42use common_error::ext::BoxedError;
43use common_meta::key::SchemaMetadataManagerRef;
44use common_runtime::JoinHandle;
45use common_stat::get_total_memory_bytes;
46use common_telemetry::{error, info, warn};
47use futures::future::try_join_all;
48use object_store::manager::ObjectStoreManagerRef;
49use prometheus::{Histogram, IntGauge};
50use rand::{Rng, rng};
51use snafu::{ResultExt, ensure};
52use store_api::logstore::LogStore;
53use store_api::region_engine::{
54    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
55};
56use store_api::storage::{FileId, RegionId};
57use tokio::sync::mpsc::{Receiver, Sender};
58use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
59
60use crate::cache::write_cache::{WriteCache, WriteCacheRef};
61use crate::cache::{CacheManager, CacheManagerRef};
62use crate::compaction::CompactionScheduler;
63use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
64use crate::config::MitoConfig;
65use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
66use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
67use crate::gc::{GcLimiter, GcLimiterRef};
68use crate::memtable::MemtableBuilderProvider;
69use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
70use crate::region::opener::PartitionExprFetcherRef;
71use crate::region::{
72    CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
73    RegionMapRef,
74};
75use crate::request::{
76    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
77    WorkerRequest, WorkerRequestWithTime,
78};
79use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
80use crate::sst::file::RegionFileId;
81use crate::sst::file_ref::FileReferenceManagerRef;
82use crate::sst::index::IndexBuildScheduler;
83use crate::sst::index::intermediate::IntermediateManager;
84use crate::sst::index::puffin_manager::PuffinManagerFactory;
85use crate::time_provider::{StdTimeProvider, TimeProviderRef};
86use crate::wal::Wal;
87use crate::worker::handle_manifest::RegionEditQueues;
88
89/// Identifier for a worker.
90pub(crate) type WorkerId = u32;
91
92pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
93
94/// Interval to check whether regions should flush.
95pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
96/// Max delay to check region periodical tasks.
97pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
98
99#[cfg_attr(doc, aquamarine::aquamarine)]
100/// A fixed size group of [RegionWorkers](RegionWorker).
101///
102/// A worker group binds each region to a specific [RegionWorker] and sends
103/// requests to region's dedicated worker.
104///
105/// ```mermaid
106/// graph LR
107///
108/// RegionRequest -- Route by region id --> Worker0 & Worker1
109///
110/// subgraph MitoEngine
111///     subgraph WorkerGroup
112///         Worker0["RegionWorker 0"]
113///         Worker1["RegionWorker 1"]
114///     end
115/// end
116///
117/// Chan0[" Request channel 0"]
118/// Chan1[" Request channel 1"]
119/// WorkerThread1["RegionWorkerLoop 1"]
120///
121/// subgraph WorkerThread0["RegionWorkerLoop 0"]
122///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
123///         Region0["Region 0"]
124///         Region2["Region 2"]
125///     end
126///     Buffer0["RequestBuffer"]
127///
128///     Buffer0 -- modify regions --> RegionMap
129/// end
130///
131/// Worker0 --> Chan0
132/// Worker1 --> Chan1
133/// Chan0 --> Buffer0
134/// Chan1 --> WorkerThread1
135/// ```
136pub(crate) struct WorkerGroup {
137    /// Workers of the group.
138    workers: Vec<RegionWorker>,
139    /// Flush background job pool.
140    flush_job_pool: SchedulerRef,
141    /// Compaction background job pool.
142    compact_job_pool: SchedulerRef,
143    /// Scheduler for index build jobs.
144    index_build_job_pool: SchedulerRef,
145    /// Scheduler for file purgers.
146    purge_scheduler: SchedulerRef,
147    /// Cache.
148    cache_manager: CacheManagerRef,
149    /// File reference manager.
150    file_ref_manager: FileReferenceManagerRef,
151    /// Gc limiter to limit concurrent gc jobs.
152    gc_limiter: GcLimiterRef,
153    /// Object store manager.
154    object_store_manager: ObjectStoreManagerRef,
155    /// Puffin manager factory.
156    puffin_manager_factory: PuffinManagerFactory,
157    /// Intermediate manager.
158    intermediate_manager: IntermediateManager,
159    /// Schema metadata manager.
160    schema_metadata_manager: SchemaMetadataManagerRef,
161}
162
163impl WorkerGroup {
164    /// Starts a worker group.
165    ///
166    /// The number of workers should be power of two.
167    pub(crate) async fn start<S: LogStore>(
168        config: Arc<MitoConfig>,
169        log_store: Arc<S>,
170        object_store_manager: ObjectStoreManagerRef,
171        schema_metadata_manager: SchemaMetadataManagerRef,
172        file_ref_manager: FileReferenceManagerRef,
173        partition_expr_fetcher: PartitionExprFetcherRef,
174        plugins: Plugins,
175    ) -> Result<WorkerGroup> {
176        let (flush_sender, flush_receiver) = watch::channel(());
177        let write_buffer_manager = Arc::new(
178            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
179                .with_notifier(flush_sender.clone()),
180        );
181        let puffin_manager_factory = PuffinManagerFactory::new(
182            &config.index.aux_path,
183            config.index.staging_size.as_bytes(),
184            Some(config.index.write_buffer_size.as_bytes() as _),
185            config.index.staging_ttl,
186        )
187        .await?;
188        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
189            .await?
190            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
191        let index_build_job_pool =
192            Arc::new(LocalScheduler::new(config.max_background_index_builds));
193        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
194        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
195        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
196        // We use another scheduler to avoid purge jobs blocking other jobs.
197        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
198        let write_cache = write_cache_from_config(
199            &config,
200            puffin_manager_factory.clone(),
201            intermediate_manager.clone(),
202        )
203        .await?;
204        let cache_manager = Arc::new(
205            CacheManager::builder()
206                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
207                .vector_cache_size(config.vector_cache_size.as_bytes())
208                .page_cache_size(config.page_cache_size.as_bytes())
209                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
210                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
211                .index_content_size(config.index.content_cache_size.as_bytes())
212                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
213                .index_result_cache_size(config.index.result_cache_size.as_bytes())
214                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
215                .write_cache(write_cache)
216                .build(),
217        );
218        let time_provider = Arc::new(StdTimeProvider);
219        let total_memory = get_total_memory_bytes();
220        let total_memory = if total_memory > 0 {
221            total_memory as u64
222        } else {
223            0
224        };
225        let compaction_limit_bytes = config
226            .experimental_compaction_memory_limit
227            .resolve(total_memory);
228        let compaction_memory_manager =
229            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
230        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
231
232        let workers = (0..config.num_workers)
233            .map(|id| {
234                WorkerStarter {
235                    id: id as WorkerId,
236                    config: config.clone(),
237                    log_store: log_store.clone(),
238                    object_store_manager: object_store_manager.clone(),
239                    write_buffer_manager: write_buffer_manager.clone(),
240                    index_build_job_pool: index_build_job_pool.clone(),
241                    flush_job_pool: flush_job_pool.clone(),
242                    compact_job_pool: compact_job_pool.clone(),
243                    purge_scheduler: purge_scheduler.clone(),
244                    listener: WorkerListener::default(),
245                    cache_manager: cache_manager.clone(),
246                    compaction_memory_manager: compaction_memory_manager.clone(),
247                    puffin_manager_factory: puffin_manager_factory.clone(),
248                    intermediate_manager: intermediate_manager.clone(),
249                    time_provider: time_provider.clone(),
250                    flush_sender: flush_sender.clone(),
251                    flush_receiver: flush_receiver.clone(),
252                    plugins: plugins.clone(),
253                    schema_metadata_manager: schema_metadata_manager.clone(),
254                    file_ref_manager: file_ref_manager.clone(),
255                    partition_expr_fetcher: partition_expr_fetcher.clone(),
256                    flush_semaphore: flush_semaphore.clone(),
257                }
258                .start()
259            })
260            .collect::<Result<Vec<_>>>()?;
261
262        Ok(WorkerGroup {
263            workers,
264            flush_job_pool,
265            compact_job_pool,
266            index_build_job_pool,
267            purge_scheduler,
268            cache_manager,
269            file_ref_manager,
270            gc_limiter,
271            object_store_manager,
272            puffin_manager_factory,
273            intermediate_manager,
274            schema_metadata_manager,
275        })
276    }
277
278    /// Stops the worker group.
279    pub(crate) async fn stop(&self) -> Result<()> {
280        info!("Stop region worker group");
281
282        // TODO(yingwen): Do we need to stop gracefully?
283        // Stops the scheduler gracefully.
284        self.compact_job_pool.stop(true).await?;
285        // Stops the scheduler gracefully.
286        self.flush_job_pool.stop(true).await?;
287        // Stops the purge scheduler gracefully.
288        self.purge_scheduler.stop(true).await?;
289        // Stops the index build job pool gracefully.
290        self.index_build_job_pool.stop(true).await?;
291
292        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
293
294        Ok(())
295    }
296
297    /// Submits a request to a worker in the group.
298    pub(crate) async fn submit_to_worker(
299        &self,
300        region_id: RegionId,
301        request: WorkerRequest,
302    ) -> Result<()> {
303        self.worker(region_id).submit_request(request).await
304    }
305
306    /// Returns true if the specific region exists.
307    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
308        self.worker(region_id).is_region_exists(region_id)
309    }
310
311    /// Returns true if the specific region is opening.
312    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
313        self.worker(region_id).is_region_opening(region_id)
314    }
315
316    /// Returns true if the specific region is catching up.
317    pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
318        self.worker(region_id).is_region_catching_up(region_id)
319    }
320
321    /// Returns region of specific `region_id`.
322    ///
323    /// This method should not be public.
324    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
325        self.worker(region_id).get_region(region_id)
326    }
327
328    /// Returns cache of the group.
329    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
330        self.cache_manager.clone()
331    }
332
333    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
334        self.file_ref_manager.clone()
335    }
336
337    pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
338        self.gc_limiter.clone()
339    }
340
341    /// Get worker for specific `region_id`.
342    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
343        let index = region_id_to_index(region_id, self.workers.len());
344
345        &self.workers[index]
346    }
347
348    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
349        self.workers
350            .iter()
351            .flat_map(|worker| worker.regions.list_regions())
352    }
353
354    pub(crate) fn object_store_manager(&self) -> &ObjectStoreManagerRef {
355        &self.object_store_manager
356    }
357
358    pub(crate) fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
359        &self.puffin_manager_factory
360    }
361
362    pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
363        &self.intermediate_manager
364    }
365
366    pub(crate) fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
367        &self.schema_metadata_manager
368    }
369}
370
371// Tests methods.
372#[cfg(any(test, feature = "test"))]
373impl WorkerGroup {
374    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
375    ///
376    /// The number of workers should be power of two.
377    #[allow(clippy::too_many_arguments)]
378    pub(crate) async fn start_for_test<S: LogStore>(
379        config: Arc<MitoConfig>,
380        log_store: Arc<S>,
381        object_store_manager: ObjectStoreManagerRef,
382        write_buffer_manager: Option<WriteBufferManagerRef>,
383        listener: Option<crate::engine::listener::EventListenerRef>,
384        schema_metadata_manager: SchemaMetadataManagerRef,
385        file_ref_manager: FileReferenceManagerRef,
386        time_provider: TimeProviderRef,
387        partition_expr_fetcher: PartitionExprFetcherRef,
388    ) -> Result<WorkerGroup> {
389        let (flush_sender, flush_receiver) = watch::channel(());
390        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
391            Arc::new(
392                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
393                    .with_notifier(flush_sender.clone()),
394            )
395        });
396        let index_build_job_pool =
397            Arc::new(LocalScheduler::new(config.max_background_index_builds));
398        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
399        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
400        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
401        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
402        let puffin_manager_factory = PuffinManagerFactory::new(
403            &config.index.aux_path,
404            config.index.staging_size.as_bytes(),
405            Some(config.index.write_buffer_size.as_bytes() as _),
406            config.index.staging_ttl,
407        )
408        .await?;
409        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
410            .await?
411            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
412        let write_cache = write_cache_from_config(
413            &config,
414            puffin_manager_factory.clone(),
415            intermediate_manager.clone(),
416        )
417        .await?;
418        let cache_manager = Arc::new(
419            CacheManager::builder()
420                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
421                .vector_cache_size(config.vector_cache_size.as_bytes())
422                .page_cache_size(config.page_cache_size.as_bytes())
423                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
424                .write_cache(write_cache)
425                .build(),
426        );
427        let total_memory = get_total_memory_bytes();
428        let total_memory = if total_memory > 0 {
429            total_memory as u64
430        } else {
431            0
432        };
433        let compaction_limit_bytes = config
434            .experimental_compaction_memory_limit
435            .resolve(total_memory);
436        let compaction_memory_manager =
437            Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
438        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
439        let workers = (0..config.num_workers)
440            .map(|id| {
441                WorkerStarter {
442                    id: id as WorkerId,
443                    config: config.clone(),
444                    log_store: log_store.clone(),
445                    object_store_manager: object_store_manager.clone(),
446                    write_buffer_manager: write_buffer_manager.clone(),
447                    index_build_job_pool: index_build_job_pool.clone(),
448                    flush_job_pool: flush_job_pool.clone(),
449                    compact_job_pool: compact_job_pool.clone(),
450                    purge_scheduler: purge_scheduler.clone(),
451                    listener: WorkerListener::new(listener.clone()),
452                    cache_manager: cache_manager.clone(),
453                    compaction_memory_manager: compaction_memory_manager.clone(),
454                    puffin_manager_factory: puffin_manager_factory.clone(),
455                    intermediate_manager: intermediate_manager.clone(),
456                    time_provider: time_provider.clone(),
457                    flush_sender: flush_sender.clone(),
458                    flush_receiver: flush_receiver.clone(),
459                    plugins: Plugins::new(),
460                    schema_metadata_manager: schema_metadata_manager.clone(),
461                    file_ref_manager: file_ref_manager.clone(),
462                    partition_expr_fetcher: partition_expr_fetcher.clone(),
463                    flush_semaphore: flush_semaphore.clone(),
464                }
465                .start()
466            })
467            .collect::<Result<Vec<_>>>()?;
468
469        Ok(WorkerGroup {
470            workers,
471            flush_job_pool,
472            compact_job_pool,
473            index_build_job_pool,
474            purge_scheduler,
475            cache_manager,
476            file_ref_manager,
477            gc_limiter,
478            object_store_manager,
479            puffin_manager_factory,
480            intermediate_manager,
481            schema_metadata_manager,
482        })
483    }
484
485    /// Returns the purge scheduler.
486    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
487        &self.purge_scheduler
488    }
489}
490
491fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
492    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
493        % num_workers
494}
495
496pub async fn write_cache_from_config(
497    config: &MitoConfig,
498    puffin_manager_factory: PuffinManagerFactory,
499    intermediate_manager: IntermediateManager,
500) -> Result<Option<WriteCacheRef>> {
501    if !config.enable_write_cache {
502        return Ok(None);
503    }
504
505    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
506        .await
507        .context(CreateDirSnafu {
508            dir: &config.write_cache_path,
509        })?;
510
511    let cache = WriteCache::new_fs(
512        &config.write_cache_path,
513        config.write_cache_size,
514        config.write_cache_ttl,
515        Some(config.index_cache_percent),
516        config.enable_refill_cache_on_read,
517        puffin_manager_factory,
518        intermediate_manager,
519        config.manifest_cache_size,
520    )
521    .await?;
522    Ok(Some(Arc::new(cache)))
523}
524
525/// Computes a initial check delay for a worker.
526pub(crate) fn worker_init_check_delay() -> Duration {
527    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
528    Duration::from_secs(init_check_delay)
529}
530
531/// Worker start config.
532struct WorkerStarter<S> {
533    id: WorkerId,
534    config: Arc<MitoConfig>,
535    log_store: Arc<S>,
536    object_store_manager: ObjectStoreManagerRef,
537    write_buffer_manager: WriteBufferManagerRef,
538    compact_job_pool: SchedulerRef,
539    index_build_job_pool: SchedulerRef,
540    flush_job_pool: SchedulerRef,
541    purge_scheduler: SchedulerRef,
542    listener: WorkerListener,
543    cache_manager: CacheManagerRef,
544    compaction_memory_manager: Arc<CompactionMemoryManager>,
545    puffin_manager_factory: PuffinManagerFactory,
546    intermediate_manager: IntermediateManager,
547    time_provider: TimeProviderRef,
548    /// Watch channel sender to notify workers to handle stalled requests.
549    flush_sender: watch::Sender<()>,
550    /// Watch channel receiver to wait for background flush job.
551    flush_receiver: watch::Receiver<()>,
552    plugins: Plugins,
553    schema_metadata_manager: SchemaMetadataManagerRef,
554    file_ref_manager: FileReferenceManagerRef,
555    partition_expr_fetcher: PartitionExprFetcherRef,
556    flush_semaphore: Arc<Semaphore>,
557}
558
559impl<S: LogStore> WorkerStarter<S> {
560    /// Starts a region worker and its background thread.
561    fn start(self) -> Result<RegionWorker> {
562        let regions = Arc::new(RegionMap::default());
563        let opening_regions = Arc::new(OpeningRegions::default());
564        let catchup_regions = Arc::new(CatchupRegions::default());
565        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
566
567        let running = Arc::new(AtomicBool::new(true));
568        let now = self.time_provider.current_time_millis();
569        let id_string = self.id.to_string();
570        let mut worker_thread = RegionWorkerLoop {
571            id: self.id,
572            config: self.config.clone(),
573            regions: regions.clone(),
574            catchup_regions: catchup_regions.clone(),
575            dropping_regions: Arc::new(RegionMap::default()),
576            opening_regions: opening_regions.clone(),
577            sender: sender.clone(),
578            receiver,
579            wal: Wal::new(self.log_store),
580            object_store_manager: self.object_store_manager.clone(),
581            running: running.clone(),
582            memtable_builder_provider: MemtableBuilderProvider::new(
583                Some(self.write_buffer_manager.clone()),
584                self.config.clone(),
585            ),
586            purge_scheduler: self.purge_scheduler.clone(),
587            write_buffer_manager: self.write_buffer_manager,
588            index_build_scheduler: IndexBuildScheduler::new(
589                self.index_build_job_pool,
590                self.config.max_background_index_builds,
591            ),
592            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
593            compaction_scheduler: CompactionScheduler::new(
594                self.compact_job_pool,
595                sender.clone(),
596                self.cache_manager.clone(),
597                self.config.clone(),
598                self.listener.clone(),
599                self.plugins.clone(),
600                self.compaction_memory_manager.clone(),
601                self.config.experimental_compaction_on_exhausted,
602            ),
603            stalled_requests: StalledRequests::default(),
604            listener: self.listener,
605            cache_manager: self.cache_manager,
606            puffin_manager_factory: self.puffin_manager_factory,
607            intermediate_manager: self.intermediate_manager,
608            time_provider: self.time_provider,
609            last_periodical_check_millis: now,
610            flush_sender: self.flush_sender,
611            flush_receiver: self.flush_receiver,
612            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
613            region_count: REGION_COUNT.with_label_values(&[&id_string]),
614            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
615            region_edit_queues: RegionEditQueues::default(),
616            schema_metadata_manager: self.schema_metadata_manager,
617            file_ref_manager: self.file_ref_manager.clone(),
618            partition_expr_fetcher: self.partition_expr_fetcher,
619            flush_semaphore: self.flush_semaphore,
620        };
621        let handle = common_runtime::spawn_global(async move {
622            worker_thread.run().await;
623        });
624
625        Ok(RegionWorker {
626            id: self.id,
627            regions,
628            opening_regions,
629            catchup_regions,
630            sender,
631            handle: Mutex::new(Some(handle)),
632            running,
633        })
634    }
635}
636
637/// Worker to write and alter regions bound to it.
638pub(crate) struct RegionWorker {
639    /// Id of the worker.
640    id: WorkerId,
641    /// Regions bound to the worker.
642    regions: RegionMapRef,
643    /// The opening regions.
644    opening_regions: OpeningRegionsRef,
645    /// The catching up regions.
646    catchup_regions: CatchupRegionsRef,
647    /// Request sender.
648    sender: Sender<WorkerRequestWithTime>,
649    /// Handle to the worker thread.
650    handle: Mutex<Option<JoinHandle<()>>>,
651    /// Whether to run the worker thread.
652    running: Arc<AtomicBool>,
653}
654
655impl RegionWorker {
656    /// Submits request to background worker thread.
657    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
658        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
659        let request_with_time = WorkerRequestWithTime::new(request);
660        if self.sender.send(request_with_time).await.is_err() {
661            warn!(
662                "Worker {} is already exited but the running flag is still true",
663                self.id
664            );
665            // Manually set the running flag to false to avoid printing more warning logs.
666            self.set_running(false);
667            return WorkerStoppedSnafu { id: self.id }.fail();
668        }
669
670        Ok(())
671    }
672
673    /// Stop the worker.
674    ///
675    /// This method waits until the worker thread exists.
676    async fn stop(&self) -> Result<()> {
677        let handle = self.handle.lock().await.take();
678        if let Some(handle) = handle {
679            info!("Stop region worker {}", self.id);
680
681            self.set_running(false);
682            if self
683                .sender
684                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
685                .await
686                .is_err()
687            {
688                warn!("Worker {} is already exited before stop", self.id);
689            }
690
691            handle.await.context(JoinSnafu)?;
692        }
693
694        Ok(())
695    }
696
697    /// Returns true if the worker is still running.
698    fn is_running(&self) -> bool {
699        self.running.load(Ordering::Relaxed)
700    }
701
702    /// Sets whether the worker is still running.
703    fn set_running(&self, value: bool) {
704        self.running.store(value, Ordering::Relaxed)
705    }
706
707    /// Returns true if the worker contains specific region.
708    fn is_region_exists(&self, region_id: RegionId) -> bool {
709        self.regions.is_region_exists(region_id)
710    }
711
712    /// Returns true if the region is opening.
713    fn is_region_opening(&self, region_id: RegionId) -> bool {
714        self.opening_regions.is_region_exists(region_id)
715    }
716
717    /// Returns true if the region is catching up.
718    fn is_region_catching_up(&self, region_id: RegionId) -> bool {
719        self.catchup_regions.is_region_exists(region_id)
720    }
721
722    /// Returns region of specific `region_id`.
723    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
724        self.regions.get_region(region_id)
725    }
726
727    #[cfg(test)]
728    /// Returns the [OpeningRegionsRef].
729    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
730        &self.opening_regions
731    }
732
733    #[cfg(test)]
734    /// Returns the [CatchupRegionsRef].
735    pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
736        &self.catchup_regions
737    }
738}
739
740impl Drop for RegionWorker {
741    fn drop(&mut self) {
742        if self.is_running() {
743            self.set_running(false);
744            // Once we drop the sender, the worker thread will receive a disconnected error.
745        }
746    }
747}
748
749type RequestBuffer = Vec<WorkerRequest>;
750
751/// Buffer for stalled write requests.
752///
753/// Maintains stalled write requests and their estimated size.
754#[derive(Default)]
755pub(crate) struct StalledRequests {
756    /// Stalled requests.
757    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
758    /// instead of `StalledRequests::requests.len()`.
759    ///
760    /// Key: RegionId
761    /// Value: (estimated size, stalled requests)
762    pub(crate) requests:
763        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
764    /// Estimated size of all stalled requests.
765    pub(crate) estimated_size: usize,
766}
767
768impl StalledRequests {
769    /// Appends stalled requests.
770    pub(crate) fn append(
771        &mut self,
772        requests: &mut Vec<SenderWriteRequest>,
773        bulk_requests: &mut Vec<SenderBulkRequest>,
774    ) {
775        for req in requests.drain(..) {
776            self.push(req);
777        }
778        for req in bulk_requests.drain(..) {
779            self.push_bulk(req);
780        }
781    }
782
783    /// Pushes a stalled request to the buffer.
784    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
785        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
786        let req_size = req.request.estimated_size();
787        *size += req_size;
788        self.estimated_size += req_size;
789        requests.push(req);
790    }
791
792    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
793        let region_id = req.region_id;
794        let (size, _, requests) = self.requests.entry(region_id).or_default();
795        let req_size = req.request.estimated_size();
796        *size += req_size;
797        self.estimated_size += req_size;
798        requests.push(req);
799    }
800
801    /// Removes stalled requests of specific region.
802    pub(crate) fn remove(
803        &mut self,
804        region_id: &RegionId,
805    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
806        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
807            self.estimated_size -= size;
808            (write_reqs, bulk_reqs)
809        } else {
810            (vec![], vec![])
811        }
812    }
813
814    /// Returns the total number of all stalled requests.
815    pub(crate) fn stalled_count(&self) -> usize {
816        self.requests
817            .values()
818            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
819            .sum()
820    }
821}
822
823/// Background worker loop to handle requests.
824struct RegionWorkerLoop<S> {
825    /// Id of the worker.
826    id: WorkerId,
827    /// Engine config.
828    config: Arc<MitoConfig>,
829    /// Regions bound to the worker.
830    regions: RegionMapRef,
831    /// Regions that are not yet fully dropped.
832    dropping_regions: RegionMapRef,
833    /// Regions that are opening.
834    opening_regions: OpeningRegionsRef,
835    /// Regions that are catching up.
836    catchup_regions: CatchupRegionsRef,
837    /// Request sender.
838    sender: Sender<WorkerRequestWithTime>,
839    /// Request receiver.
840    receiver: Receiver<WorkerRequestWithTime>,
841    /// WAL of the engine.
842    wal: Wal<S>,
843    /// Manages object stores for manifest and SSTs.
844    object_store_manager: ObjectStoreManagerRef,
845    /// Whether the worker thread is still running.
846    running: Arc<AtomicBool>,
847    /// Memtable builder provider for each region.
848    memtable_builder_provider: MemtableBuilderProvider,
849    /// Background purge job scheduler.
850    purge_scheduler: SchedulerRef,
851    /// Engine write buffer manager.
852    write_buffer_manager: WriteBufferManagerRef,
853    /// Scheduler for index build task.
854    index_build_scheduler: IndexBuildScheduler,
855    /// Schedules background flush requests.
856    flush_scheduler: FlushScheduler,
857    /// Scheduler for compaction tasks.
858    compaction_scheduler: CompactionScheduler,
859    /// Stalled write requests.
860    stalled_requests: StalledRequests,
861    /// Event listener for tests.
862    listener: WorkerListener,
863    /// Cache.
864    cache_manager: CacheManagerRef,
865    /// Puffin manager factory for index.
866    puffin_manager_factory: PuffinManagerFactory,
867    /// Intermediate manager for inverted index.
868    intermediate_manager: IntermediateManager,
869    /// Provider to get current time.
870    time_provider: TimeProviderRef,
871    /// Last time to check regions periodically.
872    last_periodical_check_millis: i64,
873    /// Watch channel sender to notify workers to handle stalled requests.
874    flush_sender: watch::Sender<()>,
875    /// Watch channel receiver to wait for background flush job.
876    flush_receiver: watch::Receiver<()>,
877    /// Gauge of stalling request count.
878    stalling_count: IntGauge,
879    /// Gauge of regions in the worker.
880    region_count: IntGauge,
881    /// Histogram of request wait time for this worker.
882    request_wait_time: Histogram,
883    /// Queues for region edit requests.
884    region_edit_queues: RegionEditQueues,
885    /// Database level metadata manager.
886    schema_metadata_manager: SchemaMetadataManagerRef,
887    /// Datanode level file references manager.
888    file_ref_manager: FileReferenceManagerRef,
889    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
890    partition_expr_fetcher: PartitionExprFetcherRef,
891    /// Semaphore to control flush concurrency.
892    flush_semaphore: Arc<Semaphore>,
893}
894
895impl<S: LogStore> RegionWorkerLoop<S> {
896    /// Starts the worker loop.
897    async fn run(&mut self) {
898        let init_check_delay = worker_init_check_delay();
899        info!(
900            "Start region worker thread {}, init_check_delay: {:?}",
901            self.id, init_check_delay
902        );
903        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
904
905        // Buffer to retrieve requests from receiver.
906        let mut write_req_buffer: Vec<SenderWriteRequest> =
907            Vec::with_capacity(self.config.worker_request_batch_size);
908        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
909            Vec::with_capacity(self.config.worker_request_batch_size);
910        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
911            Vec::with_capacity(self.config.worker_request_batch_size);
912        let mut general_req_buffer: Vec<WorkerRequest> =
913            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
914
915        while self.running.load(Ordering::Relaxed) {
916            // Clear the buffer before handling next batch of requests.
917            write_req_buffer.clear();
918            ddl_req_buffer.clear();
919            general_req_buffer.clear();
920
921            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
922            let sleep = tokio::time::sleep(max_wait_time);
923            tokio::pin!(sleep);
924
925            tokio::select! {
926                request_opt = self.receiver.recv() => {
927                    match request_opt {
928                        Some(request_with_time) => {
929                            // Observe the wait time
930                            let wait_time = request_with_time.created_at.elapsed();
931                            self.request_wait_time.observe(wait_time.as_secs_f64());
932
933                            match request_with_time.request {
934                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
935                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
936                                req => general_req_buffer.push(req),
937                            }
938                        },
939                        // The channel is disconnected.
940                        None => break,
941                    }
942                }
943                recv_res = self.flush_receiver.changed() => {
944                    if recv_res.is_err() {
945                        // The channel is disconnected.
946                        break;
947                    } else {
948                        // Also flush this worker if other workers trigger flush as this worker may have
949                        // a large memtable to flush. We may not have chance to flush that memtable if we
950                        // never write to this worker. So only flushing other workers may not release enough
951                        // memory.
952                        self.maybe_flush_worker();
953                        // A flush job is finished, handles stalled requests.
954                        self.handle_stalled_requests().await;
955                        continue;
956                    }
957                }
958                _ = &mut sleep => {
959                    // Timeout. Checks periodical tasks.
960                    self.handle_periodical_tasks();
961                    continue;
962                }
963            }
964
965            if self.flush_receiver.has_changed().unwrap_or(false) {
966                // Always checks whether we could process stalled requests to avoid a request
967                // hangs too long.
968                // If the channel is closed, do nothing.
969                self.handle_stalled_requests().await;
970            }
971
972            // Try to recv more requests from the channel.
973            for _ in 1..self.config.worker_request_batch_size {
974                // We have received one request so we start from 1.
975                match self.receiver.try_recv() {
976                    Ok(request_with_time) => {
977                        // Observe the wait time
978                        let wait_time = request_with_time.created_at.elapsed();
979                        self.request_wait_time.observe(wait_time.as_secs_f64());
980
981                        match request_with_time.request {
982                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
983                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
984                            req => general_req_buffer.push(req),
985                        }
986                    }
987                    // We still need to handle remaining requests.
988                    Err(_) => break,
989                }
990            }
991
992            self.listener.on_recv_requests(
993                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
994            );
995
996            self.handle_requests(
997                &mut write_req_buffer,
998                &mut ddl_req_buffer,
999                &mut general_req_buffer,
1000                &mut bulk_req_buffer,
1001            )
1002            .await;
1003
1004            self.handle_periodical_tasks();
1005        }
1006
1007        self.clean().await;
1008
1009        info!("Exit region worker thread {}", self.id);
1010    }
1011
1012    /// Dispatches and processes requests.
1013    ///
1014    /// `buffer` should be empty.
1015    async fn handle_requests(
1016        &mut self,
1017        write_requests: &mut Vec<SenderWriteRequest>,
1018        ddl_requests: &mut Vec<SenderDdlRequest>,
1019        general_requests: &mut Vec<WorkerRequest>,
1020        bulk_requests: &mut Vec<SenderBulkRequest>,
1021    ) {
1022        for worker_req in general_requests.drain(..) {
1023            match worker_req {
1024                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
1025                    // These requests are categorized into write_requests and ddl_requests.
1026                    continue;
1027                }
1028                WorkerRequest::Background { region_id, notify } => {
1029                    // For background notify, we handle it directly.
1030                    self.handle_background_notify(region_id, notify).await;
1031                }
1032                WorkerRequest::SetRegionRoleStateGracefully {
1033                    region_id,
1034                    region_role_state,
1035                    sender,
1036                } => {
1037                    self.set_role_state_gracefully(region_id, region_role_state, sender)
1038                        .await;
1039                }
1040                WorkerRequest::EditRegion(request) => {
1041                    self.handle_region_edit(request);
1042                }
1043                WorkerRequest::Stop => {
1044                    debug_assert!(!self.running.load(Ordering::Relaxed));
1045                }
1046                WorkerRequest::SyncRegion(req) => {
1047                    self.handle_region_sync(req).await;
1048                }
1049                WorkerRequest::BulkInserts {
1050                    metadata,
1051                    request,
1052                    sender,
1053                } => {
1054                    if let Some(region_metadata) = metadata {
1055                        self.handle_bulk_insert_batch(
1056                            region_metadata,
1057                            request,
1058                            bulk_requests,
1059                            sender,
1060                        )
1061                        .await;
1062                    } else {
1063                        error!("Cannot find region metadata for {}", request.region_id);
1064                        sender.send(
1065                            error::RegionNotFoundSnafu {
1066                                region_id: request.region_id,
1067                            }
1068                            .fail(),
1069                        );
1070                    }
1071                }
1072                WorkerRequest::RemapManifests(req) => {
1073                    self.handle_remap_manifests_request(req);
1074                }
1075                WorkerRequest::CopyRegionFrom(req) => {
1076                    self.handle_copy_region_from_request(req);
1077                }
1078            }
1079        }
1080
1081        // Handles all write requests first. So we can alter regions without
1082        // considering existing write requests.
1083        self.handle_write_requests(write_requests, bulk_requests, true)
1084            .await;
1085
1086        self.handle_ddl_requests(ddl_requests).await;
1087    }
1088
1089    /// Takes and handles all ddl requests.
1090    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1091        if ddl_requests.is_empty() {
1092            return;
1093        }
1094
1095        for ddl in ddl_requests.drain(..) {
1096            let res = match ddl.request {
1097                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1098                DdlRequest::Drop(req) => {
1099                    self.handle_drop_request(ddl.region_id, req.partial_drop)
1100                        .await
1101                }
1102                DdlRequest::Open((req, wal_entry_receiver)) => {
1103                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1104                        .await;
1105                    continue;
1106                }
1107                DdlRequest::Close(_) => {
1108                    self.handle_close_request(ddl.region_id, ddl.sender).await;
1109                    continue;
1110                }
1111                DdlRequest::Alter(req) => {
1112                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
1113                        .await;
1114                    continue;
1115                }
1116                DdlRequest::Flush(req) => {
1117                    self.handle_flush_request(ddl.region_id, req, None, ddl.sender);
1118                    continue;
1119                }
1120                DdlRequest::Compact(req) => {
1121                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1122                        .await;
1123                    continue;
1124                }
1125                DdlRequest::BuildIndex(req) => {
1126                    self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1127                        .await;
1128                    continue;
1129                }
1130                DdlRequest::Truncate(req) => {
1131                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1132                        .await;
1133                    continue;
1134                }
1135                DdlRequest::Catchup((req, wal_entry_receiver)) => {
1136                    self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1137                        .await;
1138                    continue;
1139                }
1140                DdlRequest::EnterStaging(req) => {
1141                    self.handle_enter_staging_request(
1142                        ddl.region_id,
1143                        req.partition_expr,
1144                        ddl.sender,
1145                    )
1146                    .await;
1147                    continue;
1148                }
1149                DdlRequest::ApplyStagingManifest(req) => {
1150                    self.handle_apply_staging_manifest_request(ddl.region_id, req, ddl.sender)
1151                        .await;
1152                    continue;
1153                }
1154            };
1155
1156            ddl.sender.send(res);
1157        }
1158    }
1159
1160    /// Handle periodical tasks such as region auto flush.
1161    fn handle_periodical_tasks(&mut self) {
1162        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1163        if self
1164            .time_provider
1165            .elapsed_since(self.last_periodical_check_millis)
1166            < interval
1167        {
1168            return;
1169        }
1170
1171        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1172
1173        if let Err(e) = self.flush_periodically() {
1174            error!(e; "Failed to flush regions periodically");
1175        }
1176    }
1177
1178    /// Handles region background request
1179    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1180        match notify {
1181            BackgroundNotify::FlushFinished(req) => {
1182                self.handle_flush_finished(region_id, req).await
1183            }
1184            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1185            BackgroundNotify::IndexBuildFinished(req) => {
1186                self.handle_index_build_finished(region_id, req).await
1187            }
1188            BackgroundNotify::IndexBuildStopped(req) => {
1189                self.handle_index_build_stopped(region_id, req).await
1190            }
1191            BackgroundNotify::IndexBuildFailed(req) => {
1192                self.handle_index_build_failed(region_id, req).await
1193            }
1194            BackgroundNotify::CompactionFinished(req) => {
1195                self.handle_compaction_finished(region_id, req).await
1196            }
1197            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1198            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1199            BackgroundNotify::RegionChange(req) => {
1200                self.handle_manifest_region_change_result(req).await
1201            }
1202            BackgroundNotify::EnterStaging(req) => self.handle_enter_staging_result(req).await,
1203            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1204            BackgroundNotify::CopyRegionFromFinished(req) => {
1205                self.handle_copy_region_from_finished(req)
1206            }
1207        }
1208    }
1209
1210    /// Handles `set_region_role_gracefully`.
1211    async fn set_role_state_gracefully(
1212        &mut self,
1213        region_id: RegionId,
1214        region_role_state: SettableRegionRoleState,
1215        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1216    ) {
1217        if let Some(region) = self.regions.get_region(region_id) {
1218            // We need to do this in background as we need the manifest lock.
1219            common_runtime::spawn_global(async move {
1220                match region.set_role_state_gracefully(region_role_state).await {
1221                    Ok(()) => {
1222                        let last_entry_id = region.version_control.current().last_entry_id;
1223                        let _ = sender.send(SetRegionRoleStateResponse::success(
1224                            SetRegionRoleStateSuccess::mito(last_entry_id),
1225                        ));
1226                    }
1227                    Err(e) => {
1228                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1229                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1230                            BoxedError::new(e),
1231                        ));
1232                    }
1233                }
1234            });
1235        } else {
1236            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1237        }
1238    }
1239}
1240
1241impl<S> RegionWorkerLoop<S> {
1242    /// Cleans up the worker.
1243    async fn clean(&self) {
1244        // Closes remaining regions.
1245        let regions = self.regions.list_regions();
1246        for region in regions {
1247            region.stop().await;
1248        }
1249
1250        self.regions.clear();
1251    }
1252
1253    /// Notifies the whole group that a flush job is finished so other
1254    /// workers can handle stalled requests.
1255    fn notify_group(&mut self) {
1256        // Notifies all receivers.
1257        let _ = self.flush_sender.send(());
1258        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1259        self.flush_receiver.borrow_and_update();
1260    }
1261}
1262
1263/// Wrapper that only calls event listener in tests.
1264#[derive(Default, Clone)]
1265pub(crate) struct WorkerListener {
1266    #[cfg(any(test, feature = "test"))]
1267    listener: Option<crate::engine::listener::EventListenerRef>,
1268}
1269
1270impl WorkerListener {
1271    #[cfg(any(test, feature = "test"))]
1272    pub(crate) fn new(
1273        listener: Option<crate::engine::listener::EventListenerRef>,
1274    ) -> WorkerListener {
1275        WorkerListener { listener }
1276    }
1277
1278    /// Flush is finished successfully.
1279    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1280        #[cfg(any(test, feature = "test"))]
1281        if let Some(listener) = &self.listener {
1282            listener.on_flush_success(region_id);
1283        }
1284        // Avoid compiler warning.
1285        let _ = region_id;
1286    }
1287
1288    /// Engine is stalled.
1289    pub(crate) fn on_write_stall(&self) {
1290        #[cfg(any(test, feature = "test"))]
1291        if let Some(listener) = &self.listener {
1292            listener.on_write_stall();
1293        }
1294    }
1295
1296    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1297        #[cfg(any(test, feature = "test"))]
1298        if let Some(listener) = &self.listener {
1299            listener.on_flush_begin(region_id).await;
1300        }
1301        // Avoid compiler warning.
1302        let _ = region_id;
1303    }
1304
1305    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1306        #[cfg(any(test, feature = "test"))]
1307        if let Some(listener) = &self.listener {
1308            return listener.on_later_drop_begin(region_id);
1309        }
1310        // Avoid compiler warning.
1311        let _ = region_id;
1312        None
1313    }
1314
1315    /// On later drop task is finished.
1316    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1317        #[cfg(any(test, feature = "test"))]
1318        if let Some(listener) = &self.listener {
1319            listener.on_later_drop_end(region_id, removed);
1320        }
1321        // Avoid compiler warning.
1322        let _ = region_id;
1323        let _ = removed;
1324    }
1325
1326    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1327        #[cfg(any(test, feature = "test"))]
1328        if let Some(listener) = &self.listener {
1329            listener.on_merge_ssts_finished(region_id).await;
1330        }
1331        // Avoid compiler warning.
1332        let _ = region_id;
1333    }
1334
1335    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1336        #[cfg(any(test, feature = "test"))]
1337        if let Some(listener) = &self.listener {
1338            listener.on_recv_requests(request_num);
1339        }
1340        // Avoid compiler warning.
1341        let _ = request_num;
1342    }
1343
1344    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1345        #[cfg(any(test, feature = "test"))]
1346        if let Some(listener) = &self.listener {
1347            listener.on_file_cache_filled(_file_id);
1348        }
1349    }
1350
1351    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1352        #[cfg(any(test, feature = "test"))]
1353        if let Some(listener) = &self.listener {
1354            listener.on_compaction_scheduled(_region_id);
1355        }
1356    }
1357
1358    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1359        #[cfg(any(test, feature = "test"))]
1360        if let Some(listener) = &self.listener {
1361            listener
1362                .on_notify_region_change_result_begin(_region_id)
1363                .await;
1364        }
1365    }
1366
1367    pub(crate) async fn on_enter_staging_result_begin(&self, _region_id: RegionId) {
1368        #[cfg(any(test, feature = "test"))]
1369        if let Some(listener) = &self.listener {
1370            listener.on_enter_staging_result_begin(_region_id).await;
1371        }
1372    }
1373
1374    pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1375        #[cfg(any(test, feature = "test"))]
1376        if let Some(listener) = &self.listener {
1377            listener.on_index_build_finish(_region_file_id).await;
1378        }
1379    }
1380
1381    pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1382        #[cfg(any(test, feature = "test"))]
1383        if let Some(listener) = &self.listener {
1384            listener.on_index_build_begin(_region_file_id).await;
1385        }
1386    }
1387
1388    pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1389        #[cfg(any(test, feature = "test"))]
1390        if let Some(listener) = &self.listener {
1391            listener.on_index_build_abort(_region_file_id).await;
1392        }
1393    }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398    use super::*;
1399    use crate::test_util::TestEnv;
1400
1401    #[test]
1402    fn test_region_id_to_index() {
1403        let num_workers = 4;
1404
1405        let region_id = RegionId::new(1, 2);
1406        let index = region_id_to_index(region_id, num_workers);
1407        assert_eq!(index, 3);
1408
1409        let region_id = RegionId::new(2, 3);
1410        let index = region_id_to_index(region_id, num_workers);
1411        assert_eq!(index, 1);
1412    }
1413
1414    #[tokio::test]
1415    async fn test_worker_group_start_stop() {
1416        let env = TestEnv::with_prefix("group-stop").await;
1417        let group = env
1418            .create_worker_group(MitoConfig {
1419                num_workers: 4,
1420                ..Default::default()
1421            })
1422            .await;
1423
1424        group.stop().await.unwrap();
1425    }
1426}