mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_rebuild_index;
28mod handle_truncate;
29mod handle_write;
30
31use std::collections::HashMap;
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36
37use common_base::Plugins;
38use common_error::ext::BoxedError;
39use common_meta::key::SchemaMetadataManagerRef;
40use common_runtime::JoinHandle;
41use common_telemetry::{error, info, warn};
42use futures::future::try_join_all;
43use object_store::manager::ObjectStoreManagerRef;
44use prometheus::{Histogram, IntGauge};
45use rand::{Rng, rng};
46use snafu::{ResultExt, ensure};
47use store_api::logstore::LogStore;
48use store_api::region_engine::{
49    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
50};
51use store_api::storage::{FileId, RegionId};
52use tokio::sync::mpsc::{Receiver, Sender};
53use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
54
55use crate::cache::write_cache::{WriteCache, WriteCacheRef};
56use crate::cache::{CacheManager, CacheManagerRef};
57use crate::compaction::CompactionScheduler;
58use crate::config::MitoConfig;
59use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
60use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
61use crate::gc::{GcLimiter, GcLimiterRef};
62use crate::memtable::MemtableBuilderProvider;
63use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
64use crate::region::opener::PartitionExprFetcherRef;
65use crate::region::{
66    CatchupRegions, CatchupRegionsRef, MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap,
67    RegionMapRef,
68};
69use crate::request::{
70    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
71    WorkerRequest, WorkerRequestWithTime,
72};
73use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
74use crate::sst::file::RegionFileId;
75use crate::sst::file_ref::FileReferenceManagerRef;
76use crate::sst::index::IndexBuildScheduler;
77use crate::sst::index::intermediate::IntermediateManager;
78use crate::sst::index::puffin_manager::PuffinManagerFactory;
79use crate::time_provider::{StdTimeProvider, TimeProviderRef};
80use crate::wal::Wal;
81use crate::worker::handle_manifest::RegionEditQueues;
82
83/// Identifier for a worker.
84pub(crate) type WorkerId = u32;
85
86pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
87
88/// Interval to check whether regions should flush.
89pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
90/// Max delay to check region periodical tasks.
91pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
92
93#[cfg_attr(doc, aquamarine::aquamarine)]
94/// A fixed size group of [RegionWorkers](RegionWorker).
95///
96/// A worker group binds each region to a specific [RegionWorker] and sends
97/// requests to region's dedicated worker.
98///
99/// ```mermaid
100/// graph LR
101///
102/// RegionRequest -- Route by region id --> Worker0 & Worker1
103///
104/// subgraph MitoEngine
105///     subgraph WorkerGroup
106///         Worker0["RegionWorker 0"]
107///         Worker1["RegionWorker 1"]
108///     end
109/// end
110///
111/// Chan0[" Request channel 0"]
112/// Chan1[" Request channel 1"]
113/// WorkerThread1["RegionWorkerLoop 1"]
114///
115/// subgraph WorkerThread0["RegionWorkerLoop 0"]
116///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
117///         Region0["Region 0"]
118///         Region2["Region 2"]
119///     end
120///     Buffer0["RequestBuffer"]
121///
122///     Buffer0 -- modify regions --> RegionMap
123/// end
124///
125/// Worker0 --> Chan0
126/// Worker1 --> Chan1
127/// Chan0 --> Buffer0
128/// Chan1 --> WorkerThread1
129/// ```
130pub(crate) struct WorkerGroup {
131    /// Workers of the group.
132    workers: Vec<RegionWorker>,
133    /// Flush background job pool.
134    flush_job_pool: SchedulerRef,
135    /// Compaction background job pool.
136    compact_job_pool: SchedulerRef,
137    /// Scheduler for index build jobs.
138    index_build_job_pool: SchedulerRef,
139    /// Scheduler for file purgers.
140    purge_scheduler: SchedulerRef,
141    /// Cache.
142    cache_manager: CacheManagerRef,
143    /// File reference manager.
144    file_ref_manager: FileReferenceManagerRef,
145    /// Gc limiter to limit concurrent gc jobs.
146    gc_limiter: GcLimiterRef,
147}
148
149impl WorkerGroup {
150    /// Starts a worker group.
151    ///
152    /// The number of workers should be power of two.
153    pub(crate) async fn start<S: LogStore>(
154        config: Arc<MitoConfig>,
155        log_store: Arc<S>,
156        object_store_manager: ObjectStoreManagerRef,
157        schema_metadata_manager: SchemaMetadataManagerRef,
158        file_ref_manager: FileReferenceManagerRef,
159        partition_expr_fetcher: PartitionExprFetcherRef,
160        plugins: Plugins,
161    ) -> Result<WorkerGroup> {
162        let (flush_sender, flush_receiver) = watch::channel(());
163        let write_buffer_manager = Arc::new(
164            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
165                .with_notifier(flush_sender.clone()),
166        );
167        let puffin_manager_factory = PuffinManagerFactory::new(
168            &config.index.aux_path,
169            config.index.staging_size.as_bytes(),
170            Some(config.index.write_buffer_size.as_bytes() as _),
171            config.index.staging_ttl,
172        )
173        .await?;
174        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
175            .await?
176            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
177        let index_build_job_pool =
178            Arc::new(LocalScheduler::new(config.max_background_index_builds));
179        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
180        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
181        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
182        // We use another scheduler to avoid purge jobs blocking other jobs.
183        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
184        let write_cache = write_cache_from_config(
185            &config,
186            puffin_manager_factory.clone(),
187            intermediate_manager.clone(),
188        )
189        .await?;
190        let cache_manager = Arc::new(
191            CacheManager::builder()
192                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
193                .vector_cache_size(config.vector_cache_size.as_bytes())
194                .page_cache_size(config.page_cache_size.as_bytes())
195                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
196                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
197                .index_content_size(config.index.content_cache_size.as_bytes())
198                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
199                .index_result_cache_size(config.index.result_cache_size.as_bytes())
200                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
201                .write_cache(write_cache)
202                .build(),
203        );
204        let time_provider = Arc::new(StdTimeProvider);
205        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
206
207        let workers = (0..config.num_workers)
208            .map(|id| {
209                WorkerStarter {
210                    id: id as WorkerId,
211                    config: config.clone(),
212                    log_store: log_store.clone(),
213                    object_store_manager: object_store_manager.clone(),
214                    write_buffer_manager: write_buffer_manager.clone(),
215                    index_build_job_pool: index_build_job_pool.clone(),
216                    flush_job_pool: flush_job_pool.clone(),
217                    compact_job_pool: compact_job_pool.clone(),
218                    purge_scheduler: purge_scheduler.clone(),
219                    listener: WorkerListener::default(),
220                    cache_manager: cache_manager.clone(),
221                    puffin_manager_factory: puffin_manager_factory.clone(),
222                    intermediate_manager: intermediate_manager.clone(),
223                    time_provider: time_provider.clone(),
224                    flush_sender: flush_sender.clone(),
225                    flush_receiver: flush_receiver.clone(),
226                    plugins: plugins.clone(),
227                    schema_metadata_manager: schema_metadata_manager.clone(),
228                    file_ref_manager: file_ref_manager.clone(),
229                    partition_expr_fetcher: partition_expr_fetcher.clone(),
230                    flush_semaphore: flush_semaphore.clone(),
231                }
232                .start()
233            })
234            .collect::<Result<Vec<_>>>()?;
235
236        Ok(WorkerGroup {
237            workers,
238            flush_job_pool,
239            compact_job_pool,
240            index_build_job_pool,
241            purge_scheduler,
242            cache_manager,
243            file_ref_manager,
244            gc_limiter,
245        })
246    }
247
248    /// Stops the worker group.
249    pub(crate) async fn stop(&self) -> Result<()> {
250        info!("Stop region worker group");
251
252        // TODO(yingwen): Do we need to stop gracefully?
253        // Stops the scheduler gracefully.
254        self.compact_job_pool.stop(true).await?;
255        // Stops the scheduler gracefully.
256        self.flush_job_pool.stop(true).await?;
257        // Stops the purge scheduler gracefully.
258        self.purge_scheduler.stop(true).await?;
259        // Stops the index build job pool gracefully.
260        self.index_build_job_pool.stop(true).await?;
261
262        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
263
264        Ok(())
265    }
266
267    /// Submits a request to a worker in the group.
268    pub(crate) async fn submit_to_worker(
269        &self,
270        region_id: RegionId,
271        request: WorkerRequest,
272    ) -> Result<()> {
273        self.worker(region_id).submit_request(request).await
274    }
275
276    /// Returns true if the specific region exists.
277    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
278        self.worker(region_id).is_region_exists(region_id)
279    }
280
281    /// Returns true if the specific region is opening.
282    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
283        self.worker(region_id).is_region_opening(region_id)
284    }
285
286    /// Returns true if the specific region is catching up.
287    pub(crate) fn is_region_catching_up(&self, region_id: RegionId) -> bool {
288        self.worker(region_id).is_region_catching_up(region_id)
289    }
290
291    /// Returns region of specific `region_id`.
292    ///
293    /// This method should not be public.
294    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
295        self.worker(region_id).get_region(region_id)
296    }
297
298    /// Returns cache of the group.
299    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
300        self.cache_manager.clone()
301    }
302
303    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
304        self.file_ref_manager.clone()
305    }
306
307    pub(crate) fn gc_limiter(&self) -> GcLimiterRef {
308        self.gc_limiter.clone()
309    }
310
311    /// Get worker for specific `region_id`.
312    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
313        let index = region_id_to_index(region_id, self.workers.len());
314
315        &self.workers[index]
316    }
317
318    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
319        self.workers
320            .iter()
321            .flat_map(|worker| worker.regions.list_regions())
322    }
323}
324
325// Tests methods.
326#[cfg(any(test, feature = "test"))]
327impl WorkerGroup {
328    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
329    ///
330    /// The number of workers should be power of two.
331    #[allow(clippy::too_many_arguments)]
332    pub(crate) async fn start_for_test<S: LogStore>(
333        config: Arc<MitoConfig>,
334        log_store: Arc<S>,
335        object_store_manager: ObjectStoreManagerRef,
336        write_buffer_manager: Option<WriteBufferManagerRef>,
337        listener: Option<crate::engine::listener::EventListenerRef>,
338        schema_metadata_manager: SchemaMetadataManagerRef,
339        file_ref_manager: FileReferenceManagerRef,
340        time_provider: TimeProviderRef,
341        partition_expr_fetcher: PartitionExprFetcherRef,
342    ) -> Result<WorkerGroup> {
343        let (flush_sender, flush_receiver) = watch::channel(());
344        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
345            Arc::new(
346                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
347                    .with_notifier(flush_sender.clone()),
348            )
349        });
350        let index_build_job_pool =
351            Arc::new(LocalScheduler::new(config.max_background_index_builds));
352        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
353        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
354        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
355        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
356        let puffin_manager_factory = PuffinManagerFactory::new(
357            &config.index.aux_path,
358            config.index.staging_size.as_bytes(),
359            Some(config.index.write_buffer_size.as_bytes() as _),
360            config.index.staging_ttl,
361        )
362        .await?;
363        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
364            .await?
365            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
366        let write_cache = write_cache_from_config(
367            &config,
368            puffin_manager_factory.clone(),
369            intermediate_manager.clone(),
370        )
371        .await?;
372        let cache_manager = Arc::new(
373            CacheManager::builder()
374                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
375                .vector_cache_size(config.vector_cache_size.as_bytes())
376                .page_cache_size(config.page_cache_size.as_bytes())
377                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
378                .write_cache(write_cache)
379                .build(),
380        );
381        let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
382        let workers = (0..config.num_workers)
383            .map(|id| {
384                WorkerStarter {
385                    id: id as WorkerId,
386                    config: config.clone(),
387                    log_store: log_store.clone(),
388                    object_store_manager: object_store_manager.clone(),
389                    write_buffer_manager: write_buffer_manager.clone(),
390                    index_build_job_pool: index_build_job_pool.clone(),
391                    flush_job_pool: flush_job_pool.clone(),
392                    compact_job_pool: compact_job_pool.clone(),
393                    purge_scheduler: purge_scheduler.clone(),
394                    listener: WorkerListener::new(listener.clone()),
395                    cache_manager: cache_manager.clone(),
396                    puffin_manager_factory: puffin_manager_factory.clone(),
397                    intermediate_manager: intermediate_manager.clone(),
398                    time_provider: time_provider.clone(),
399                    flush_sender: flush_sender.clone(),
400                    flush_receiver: flush_receiver.clone(),
401                    plugins: Plugins::new(),
402                    schema_metadata_manager: schema_metadata_manager.clone(),
403                    file_ref_manager: file_ref_manager.clone(),
404                    partition_expr_fetcher: partition_expr_fetcher.clone(),
405                    flush_semaphore: flush_semaphore.clone(),
406                }
407                .start()
408            })
409            .collect::<Result<Vec<_>>>()?;
410
411        Ok(WorkerGroup {
412            workers,
413            flush_job_pool,
414            compact_job_pool,
415            index_build_job_pool,
416            purge_scheduler,
417            cache_manager,
418            file_ref_manager,
419            gc_limiter,
420        })
421    }
422
423    /// Returns the purge scheduler.
424    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
425        &self.purge_scheduler
426    }
427}
428
429fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
430    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
431        % num_workers
432}
433
434pub async fn write_cache_from_config(
435    config: &MitoConfig,
436    puffin_manager_factory: PuffinManagerFactory,
437    intermediate_manager: IntermediateManager,
438) -> Result<Option<WriteCacheRef>> {
439    if !config.enable_write_cache {
440        return Ok(None);
441    }
442
443    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
444        .await
445        .context(CreateDirSnafu {
446            dir: &config.write_cache_path,
447        })?;
448
449    let cache = WriteCache::new_fs(
450        &config.write_cache_path,
451        config.write_cache_size,
452        config.write_cache_ttl,
453        Some(config.index_cache_percent),
454        puffin_manager_factory,
455        intermediate_manager,
456    )
457    .await?;
458    Ok(Some(Arc::new(cache)))
459}
460
461/// Computes a initial check delay for a worker.
462pub(crate) fn worker_init_check_delay() -> Duration {
463    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
464    Duration::from_secs(init_check_delay)
465}
466
467/// Worker start config.
468struct WorkerStarter<S> {
469    id: WorkerId,
470    config: Arc<MitoConfig>,
471    log_store: Arc<S>,
472    object_store_manager: ObjectStoreManagerRef,
473    write_buffer_manager: WriteBufferManagerRef,
474    compact_job_pool: SchedulerRef,
475    index_build_job_pool: SchedulerRef,
476    flush_job_pool: SchedulerRef,
477    purge_scheduler: SchedulerRef,
478    listener: WorkerListener,
479    cache_manager: CacheManagerRef,
480    puffin_manager_factory: PuffinManagerFactory,
481    intermediate_manager: IntermediateManager,
482    time_provider: TimeProviderRef,
483    /// Watch channel sender to notify workers to handle stalled requests.
484    flush_sender: watch::Sender<()>,
485    /// Watch channel receiver to wait for background flush job.
486    flush_receiver: watch::Receiver<()>,
487    plugins: Plugins,
488    schema_metadata_manager: SchemaMetadataManagerRef,
489    file_ref_manager: FileReferenceManagerRef,
490    partition_expr_fetcher: PartitionExprFetcherRef,
491    flush_semaphore: Arc<Semaphore>,
492}
493
494impl<S: LogStore> WorkerStarter<S> {
495    /// Starts a region worker and its background thread.
496    fn start(self) -> Result<RegionWorker> {
497        let regions = Arc::new(RegionMap::default());
498        let opening_regions = Arc::new(OpeningRegions::default());
499        let catchup_regions = Arc::new(CatchupRegions::default());
500        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
501
502        let running = Arc::new(AtomicBool::new(true));
503        let now = self.time_provider.current_time_millis();
504        let id_string = self.id.to_string();
505        let mut worker_thread = RegionWorkerLoop {
506            id: self.id,
507            config: self.config.clone(),
508            regions: regions.clone(),
509            catchup_regions: catchup_regions.clone(),
510            dropping_regions: Arc::new(RegionMap::default()),
511            opening_regions: opening_regions.clone(),
512            sender: sender.clone(),
513            receiver,
514            wal: Wal::new(self.log_store),
515            object_store_manager: self.object_store_manager.clone(),
516            running: running.clone(),
517            memtable_builder_provider: MemtableBuilderProvider::new(
518                Some(self.write_buffer_manager.clone()),
519                self.config.clone(),
520            ),
521            purge_scheduler: self.purge_scheduler.clone(),
522            write_buffer_manager: self.write_buffer_manager,
523            index_build_scheduler: IndexBuildScheduler::new(
524                self.index_build_job_pool,
525                self.config.max_background_index_builds,
526            ),
527            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
528            compaction_scheduler: CompactionScheduler::new(
529                self.compact_job_pool,
530                sender.clone(),
531                self.cache_manager.clone(),
532                self.config,
533                self.listener.clone(),
534                self.plugins.clone(),
535            ),
536            stalled_requests: StalledRequests::default(),
537            listener: self.listener,
538            cache_manager: self.cache_manager,
539            puffin_manager_factory: self.puffin_manager_factory,
540            intermediate_manager: self.intermediate_manager,
541            time_provider: self.time_provider,
542            last_periodical_check_millis: now,
543            flush_sender: self.flush_sender,
544            flush_receiver: self.flush_receiver,
545            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
546            region_count: REGION_COUNT.with_label_values(&[&id_string]),
547            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
548            region_edit_queues: RegionEditQueues::default(),
549            schema_metadata_manager: self.schema_metadata_manager,
550            file_ref_manager: self.file_ref_manager.clone(),
551            partition_expr_fetcher: self.partition_expr_fetcher,
552            flush_semaphore: self.flush_semaphore,
553        };
554        let handle = common_runtime::spawn_global(async move {
555            worker_thread.run().await;
556        });
557
558        Ok(RegionWorker {
559            id: self.id,
560            regions,
561            opening_regions,
562            catchup_regions,
563            sender,
564            handle: Mutex::new(Some(handle)),
565            running,
566        })
567    }
568}
569
570/// Worker to write and alter regions bound to it.
571pub(crate) struct RegionWorker {
572    /// Id of the worker.
573    id: WorkerId,
574    /// Regions bound to the worker.
575    regions: RegionMapRef,
576    /// The opening regions.
577    opening_regions: OpeningRegionsRef,
578    /// The catching up regions.
579    catchup_regions: CatchupRegionsRef,
580    /// Request sender.
581    sender: Sender<WorkerRequestWithTime>,
582    /// Handle to the worker thread.
583    handle: Mutex<Option<JoinHandle<()>>>,
584    /// Whether to run the worker thread.
585    running: Arc<AtomicBool>,
586}
587
588impl RegionWorker {
589    /// Submits request to background worker thread.
590    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
591        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
592        let request_with_time = WorkerRequestWithTime::new(request);
593        if self.sender.send(request_with_time).await.is_err() {
594            warn!(
595                "Worker {} is already exited but the running flag is still true",
596                self.id
597            );
598            // Manually set the running flag to false to avoid printing more warning logs.
599            self.set_running(false);
600            return WorkerStoppedSnafu { id: self.id }.fail();
601        }
602
603        Ok(())
604    }
605
606    /// Stop the worker.
607    ///
608    /// This method waits until the worker thread exists.
609    async fn stop(&self) -> Result<()> {
610        let handle = self.handle.lock().await.take();
611        if let Some(handle) = handle {
612            info!("Stop region worker {}", self.id);
613
614            self.set_running(false);
615            if self
616                .sender
617                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
618                .await
619                .is_err()
620            {
621                warn!("Worker {} is already exited before stop", self.id);
622            }
623
624            handle.await.context(JoinSnafu)?;
625        }
626
627        Ok(())
628    }
629
630    /// Returns true if the worker is still running.
631    fn is_running(&self) -> bool {
632        self.running.load(Ordering::Relaxed)
633    }
634
635    /// Sets whether the worker is still running.
636    fn set_running(&self, value: bool) {
637        self.running.store(value, Ordering::Relaxed)
638    }
639
640    /// Returns true if the worker contains specific region.
641    fn is_region_exists(&self, region_id: RegionId) -> bool {
642        self.regions.is_region_exists(region_id)
643    }
644
645    /// Returns true if the region is opening.
646    fn is_region_opening(&self, region_id: RegionId) -> bool {
647        self.opening_regions.is_region_exists(region_id)
648    }
649
650    /// Returns true if the region is catching up.
651    fn is_region_catching_up(&self, region_id: RegionId) -> bool {
652        self.catchup_regions.is_region_exists(region_id)
653    }
654
655    /// Returns region of specific `region_id`.
656    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
657        self.regions.get_region(region_id)
658    }
659
660    #[cfg(test)]
661    /// Returns the [OpeningRegionsRef].
662    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
663        &self.opening_regions
664    }
665
666    #[cfg(test)]
667    /// Returns the [CatchupRegionsRef].
668    pub(crate) fn catchup_regions(&self) -> &CatchupRegionsRef {
669        &self.catchup_regions
670    }
671}
672
673impl Drop for RegionWorker {
674    fn drop(&mut self) {
675        if self.is_running() {
676            self.set_running(false);
677            // Once we drop the sender, the worker thread will receive a disconnected error.
678        }
679    }
680}
681
682type RequestBuffer = Vec<WorkerRequest>;
683
684/// Buffer for stalled write requests.
685///
686/// Maintains stalled write requests and their estimated size.
687#[derive(Default)]
688pub(crate) struct StalledRequests {
689    /// Stalled requests.
690    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
691    /// instead of `StalledRequests::requests.len()`.
692    ///
693    /// Key: RegionId
694    /// Value: (estimated size, stalled requests)
695    pub(crate) requests:
696        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
697    /// Estimated size of all stalled requests.
698    pub(crate) estimated_size: usize,
699}
700
701impl StalledRequests {
702    /// Appends stalled requests.
703    pub(crate) fn append(
704        &mut self,
705        requests: &mut Vec<SenderWriteRequest>,
706        bulk_requests: &mut Vec<SenderBulkRequest>,
707    ) {
708        for req in requests.drain(..) {
709            self.push(req);
710        }
711        for req in bulk_requests.drain(..) {
712            self.push_bulk(req);
713        }
714    }
715
716    /// Pushes a stalled request to the buffer.
717    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
718        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
719        let req_size = req.request.estimated_size();
720        *size += req_size;
721        self.estimated_size += req_size;
722        requests.push(req);
723    }
724
725    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
726        let region_id = req.region_id;
727        let (size, _, requests) = self.requests.entry(region_id).or_default();
728        let req_size = req.request.estimated_size();
729        *size += req_size;
730        self.estimated_size += req_size;
731        requests.push(req);
732    }
733
734    /// Removes stalled requests of specific region.
735    pub(crate) fn remove(
736        &mut self,
737        region_id: &RegionId,
738    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
739        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
740            self.estimated_size -= size;
741            (write_reqs, bulk_reqs)
742        } else {
743            (vec![], vec![])
744        }
745    }
746
747    /// Returns the total number of all stalled requests.
748    pub(crate) fn stalled_count(&self) -> usize {
749        self.requests
750            .values()
751            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
752            .sum()
753    }
754}
755
756/// Background worker loop to handle requests.
757struct RegionWorkerLoop<S> {
758    /// Id of the worker.
759    id: WorkerId,
760    /// Engine config.
761    config: Arc<MitoConfig>,
762    /// Regions bound to the worker.
763    regions: RegionMapRef,
764    /// Regions that are not yet fully dropped.
765    dropping_regions: RegionMapRef,
766    /// Regions that are opening.
767    opening_regions: OpeningRegionsRef,
768    /// Regions that are catching up.
769    catchup_regions: CatchupRegionsRef,
770    /// Request sender.
771    sender: Sender<WorkerRequestWithTime>,
772    /// Request receiver.
773    receiver: Receiver<WorkerRequestWithTime>,
774    /// WAL of the engine.
775    wal: Wal<S>,
776    /// Manages object stores for manifest and SSTs.
777    object_store_manager: ObjectStoreManagerRef,
778    /// Whether the worker thread is still running.
779    running: Arc<AtomicBool>,
780    /// Memtable builder provider for each region.
781    memtable_builder_provider: MemtableBuilderProvider,
782    /// Background purge job scheduler.
783    purge_scheduler: SchedulerRef,
784    /// Engine write buffer manager.
785    write_buffer_manager: WriteBufferManagerRef,
786    /// Scheduler for index build task.
787    index_build_scheduler: IndexBuildScheduler,
788    /// Schedules background flush requests.
789    flush_scheduler: FlushScheduler,
790    /// Scheduler for compaction tasks.
791    compaction_scheduler: CompactionScheduler,
792    /// Stalled write requests.
793    stalled_requests: StalledRequests,
794    /// Event listener for tests.
795    listener: WorkerListener,
796    /// Cache.
797    cache_manager: CacheManagerRef,
798    /// Puffin manager factory for index.
799    puffin_manager_factory: PuffinManagerFactory,
800    /// Intermediate manager for inverted index.
801    intermediate_manager: IntermediateManager,
802    /// Provider to get current time.
803    time_provider: TimeProviderRef,
804    /// Last time to check regions periodically.
805    last_periodical_check_millis: i64,
806    /// Watch channel sender to notify workers to handle stalled requests.
807    flush_sender: watch::Sender<()>,
808    /// Watch channel receiver to wait for background flush job.
809    flush_receiver: watch::Receiver<()>,
810    /// Gauge of stalling request count.
811    stalling_count: IntGauge,
812    /// Gauge of regions in the worker.
813    region_count: IntGauge,
814    /// Histogram of request wait time for this worker.
815    request_wait_time: Histogram,
816    /// Queues for region edit requests.
817    region_edit_queues: RegionEditQueues,
818    /// Database level metadata manager.
819    schema_metadata_manager: SchemaMetadataManagerRef,
820    /// Datanode level file references manager.
821    file_ref_manager: FileReferenceManagerRef,
822    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
823    partition_expr_fetcher: PartitionExprFetcherRef,
824    /// Semaphore to control flush concurrency.
825    flush_semaphore: Arc<Semaphore>,
826}
827
828impl<S: LogStore> RegionWorkerLoop<S> {
829    /// Starts the worker loop.
830    async fn run(&mut self) {
831        let init_check_delay = worker_init_check_delay();
832        info!(
833            "Start region worker thread {}, init_check_delay: {:?}",
834            self.id, init_check_delay
835        );
836        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
837
838        // Buffer to retrieve requests from receiver.
839        let mut write_req_buffer: Vec<SenderWriteRequest> =
840            Vec::with_capacity(self.config.worker_request_batch_size);
841        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
842            Vec::with_capacity(self.config.worker_request_batch_size);
843        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
844            Vec::with_capacity(self.config.worker_request_batch_size);
845        let mut general_req_buffer: Vec<WorkerRequest> =
846            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
847
848        while self.running.load(Ordering::Relaxed) {
849            // Clear the buffer before handling next batch of requests.
850            write_req_buffer.clear();
851            ddl_req_buffer.clear();
852            general_req_buffer.clear();
853
854            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
855            let sleep = tokio::time::sleep(max_wait_time);
856            tokio::pin!(sleep);
857
858            tokio::select! {
859                request_opt = self.receiver.recv() => {
860                    match request_opt {
861                        Some(request_with_time) => {
862                            // Observe the wait time
863                            let wait_time = request_with_time.created_at.elapsed();
864                            self.request_wait_time.observe(wait_time.as_secs_f64());
865
866                            match request_with_time.request {
867                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
868                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
869                                req => general_req_buffer.push(req),
870                            }
871                        },
872                        // The channel is disconnected.
873                        None => break,
874                    }
875                }
876                recv_res = self.flush_receiver.changed() => {
877                    if recv_res.is_err() {
878                        // The channel is disconnected.
879                        break;
880                    } else {
881                        // Also flush this worker if other workers trigger flush as this worker may have
882                        // a large memtable to flush. We may not have chance to flush that memtable if we
883                        // never write to this worker. So only flushing other workers may not release enough
884                        // memory.
885                        self.maybe_flush_worker();
886                        // A flush job is finished, handles stalled requests.
887                        self.handle_stalled_requests().await;
888                        continue;
889                    }
890                }
891                _ = &mut sleep => {
892                    // Timeout. Checks periodical tasks.
893                    self.handle_periodical_tasks();
894                    continue;
895                }
896            }
897
898            if self.flush_receiver.has_changed().unwrap_or(false) {
899                // Always checks whether we could process stalled requests to avoid a request
900                // hangs too long.
901                // If the channel is closed, do nothing.
902                self.handle_stalled_requests().await;
903            }
904
905            // Try to recv more requests from the channel.
906            for _ in 1..self.config.worker_request_batch_size {
907                // We have received one request so we start from 1.
908                match self.receiver.try_recv() {
909                    Ok(request_with_time) => {
910                        // Observe the wait time
911                        let wait_time = request_with_time.created_at.elapsed();
912                        self.request_wait_time.observe(wait_time.as_secs_f64());
913
914                        match request_with_time.request {
915                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
916                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
917                            req => general_req_buffer.push(req),
918                        }
919                    }
920                    // We still need to handle remaining requests.
921                    Err(_) => break,
922                }
923            }
924
925            self.listener.on_recv_requests(
926                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
927            );
928
929            self.handle_requests(
930                &mut write_req_buffer,
931                &mut ddl_req_buffer,
932                &mut general_req_buffer,
933                &mut bulk_req_buffer,
934            )
935            .await;
936
937            self.handle_periodical_tasks();
938        }
939
940        self.clean().await;
941
942        info!("Exit region worker thread {}", self.id);
943    }
944
945    /// Dispatches and processes requests.
946    ///
947    /// `buffer` should be empty.
948    async fn handle_requests(
949        &mut self,
950        write_requests: &mut Vec<SenderWriteRequest>,
951        ddl_requests: &mut Vec<SenderDdlRequest>,
952        general_requests: &mut Vec<WorkerRequest>,
953        bulk_requests: &mut Vec<SenderBulkRequest>,
954    ) {
955        for worker_req in general_requests.drain(..) {
956            match worker_req {
957                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
958                    // These requests are categorized into write_requests and ddl_requests.
959                    continue;
960                }
961                WorkerRequest::Background { region_id, notify } => {
962                    // For background notify, we handle it directly.
963                    self.handle_background_notify(region_id, notify).await;
964                }
965                WorkerRequest::SetRegionRoleStateGracefully {
966                    region_id,
967                    region_role_state,
968                    sender,
969                } => {
970                    self.set_role_state_gracefully(region_id, region_role_state, sender)
971                        .await;
972                }
973                WorkerRequest::EditRegion(request) => {
974                    self.handle_region_edit(request).await;
975                }
976                WorkerRequest::Stop => {
977                    debug_assert!(!self.running.load(Ordering::Relaxed));
978                }
979                WorkerRequest::SyncRegion(req) => {
980                    self.handle_region_sync(req).await;
981                }
982                WorkerRequest::BulkInserts {
983                    metadata,
984                    request,
985                    sender,
986                } => {
987                    if let Some(region_metadata) = metadata {
988                        self.handle_bulk_insert_batch(
989                            region_metadata,
990                            request,
991                            bulk_requests,
992                            sender,
993                        )
994                        .await;
995                    } else {
996                        error!("Cannot find region metadata for {}", request.region_id);
997                        sender.send(
998                            error::RegionNotFoundSnafu {
999                                region_id: request.region_id,
1000                            }
1001                            .fail(),
1002                        );
1003                    }
1004                }
1005            }
1006        }
1007
1008        // Handles all write requests first. So we can alter regions without
1009        // considering existing write requests.
1010        self.handle_write_requests(write_requests, bulk_requests, true)
1011            .await;
1012
1013        self.handle_ddl_requests(ddl_requests).await;
1014    }
1015
1016    /// Takes and handles all ddl requests.
1017    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
1018        if ddl_requests.is_empty() {
1019            return;
1020        }
1021
1022        for ddl in ddl_requests.drain(..) {
1023            let res = match ddl.request {
1024                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
1025                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
1026                DdlRequest::Open((req, wal_entry_receiver)) => {
1027                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1028                        .await;
1029                    continue;
1030                }
1031                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
1032                DdlRequest::Alter(req) => {
1033                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
1034                        .await;
1035                    continue;
1036                }
1037                DdlRequest::Flush(req) => {
1038                    self.handle_flush_request(ddl.region_id, req, ddl.sender)
1039                        .await;
1040                    continue;
1041                }
1042                DdlRequest::Compact(req) => {
1043                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1044                        .await;
1045                    continue;
1046                }
1047                DdlRequest::BuildIndex(req) => {
1048                    self.handle_build_index_request(ddl.region_id, req, ddl.sender)
1049                        .await;
1050                    continue;
1051                }
1052                DdlRequest::Truncate(req) => {
1053                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1054                        .await;
1055                    continue;
1056                }
1057                DdlRequest::Catchup((req, wal_entry_receiver)) => {
1058                    self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
1059                        .await;
1060                    continue;
1061                }
1062            };
1063
1064            ddl.sender.send(res);
1065        }
1066    }
1067
1068    /// Handle periodical tasks such as region auto flush.
1069    fn handle_periodical_tasks(&mut self) {
1070        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1071        if self
1072            .time_provider
1073            .elapsed_since(self.last_periodical_check_millis)
1074            < interval
1075        {
1076            return;
1077        }
1078
1079        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1080
1081        if let Err(e) = self.flush_periodically() {
1082            error!(e; "Failed to flush regions periodically");
1083        }
1084    }
1085
1086    /// Handles region background request
1087    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1088        match notify {
1089            BackgroundNotify::FlushFinished(req) => {
1090                self.handle_flush_finished(region_id, req).await
1091            }
1092            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1093            BackgroundNotify::IndexBuildFinished(req) => {
1094                self.handle_index_build_finished(region_id, req).await
1095            }
1096            BackgroundNotify::IndexBuildStopped(req) => {
1097                self.handle_index_build_stopped(region_id, req).await
1098            }
1099            BackgroundNotify::IndexBuildFailed(req) => {
1100                self.handle_index_build_failed(region_id, req).await
1101            }
1102            BackgroundNotify::CompactionFinished(req) => {
1103                self.handle_compaction_finished(region_id, req).await
1104            }
1105            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1106            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1107            BackgroundNotify::RegionChange(req) => {
1108                self.handle_manifest_region_change_result(req).await
1109            }
1110            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1111        }
1112    }
1113
1114    /// Handles `set_region_role_gracefully`.
1115    async fn set_role_state_gracefully(
1116        &mut self,
1117        region_id: RegionId,
1118        region_role_state: SettableRegionRoleState,
1119        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1120    ) {
1121        if let Some(region) = self.regions.get_region(region_id) {
1122            // We need to do this in background as we need the manifest lock.
1123            common_runtime::spawn_global(async move {
1124                match region.set_role_state_gracefully(region_role_state).await {
1125                    Ok(()) => {
1126                        let last_entry_id = region.version_control.current().last_entry_id;
1127                        let _ = sender.send(SetRegionRoleStateResponse::success(
1128                            SetRegionRoleStateSuccess::mito(last_entry_id),
1129                        ));
1130                    }
1131                    Err(e) => {
1132                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1133                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1134                            BoxedError::new(e),
1135                        ));
1136                    }
1137                }
1138            });
1139        } else {
1140            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1141        }
1142    }
1143}
1144
1145impl<S> RegionWorkerLoop<S> {
1146    /// Cleans up the worker.
1147    async fn clean(&self) {
1148        // Closes remaining regions.
1149        let regions = self.regions.list_regions();
1150        for region in regions {
1151            region.stop().await;
1152        }
1153
1154        self.regions.clear();
1155    }
1156
1157    /// Notifies the whole group that a flush job is finished so other
1158    /// workers can handle stalled requests.
1159    fn notify_group(&mut self) {
1160        // Notifies all receivers.
1161        let _ = self.flush_sender.send(());
1162        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1163        self.flush_receiver.borrow_and_update();
1164    }
1165}
1166
1167/// Wrapper that only calls event listener in tests.
1168#[derive(Default, Clone)]
1169pub(crate) struct WorkerListener {
1170    #[cfg(any(test, feature = "test"))]
1171    listener: Option<crate::engine::listener::EventListenerRef>,
1172}
1173
1174impl WorkerListener {
1175    #[cfg(any(test, feature = "test"))]
1176    pub(crate) fn new(
1177        listener: Option<crate::engine::listener::EventListenerRef>,
1178    ) -> WorkerListener {
1179        WorkerListener { listener }
1180    }
1181
1182    /// Flush is finished successfully.
1183    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1184        #[cfg(any(test, feature = "test"))]
1185        if let Some(listener) = &self.listener {
1186            listener.on_flush_success(region_id);
1187        }
1188        // Avoid compiler warning.
1189        let _ = region_id;
1190    }
1191
1192    /// Engine is stalled.
1193    pub(crate) fn on_write_stall(&self) {
1194        #[cfg(any(test, feature = "test"))]
1195        if let Some(listener) = &self.listener {
1196            listener.on_write_stall();
1197        }
1198    }
1199
1200    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1201        #[cfg(any(test, feature = "test"))]
1202        if let Some(listener) = &self.listener {
1203            listener.on_flush_begin(region_id).await;
1204        }
1205        // Avoid compiler warning.
1206        let _ = region_id;
1207    }
1208
1209    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1210        #[cfg(any(test, feature = "test"))]
1211        if let Some(listener) = &self.listener {
1212            return listener.on_later_drop_begin(region_id);
1213        }
1214        // Avoid compiler warning.
1215        let _ = region_id;
1216        None
1217    }
1218
1219    /// On later drop task is finished.
1220    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1221        #[cfg(any(test, feature = "test"))]
1222        if let Some(listener) = &self.listener {
1223            listener.on_later_drop_end(region_id, removed);
1224        }
1225        // Avoid compiler warning.
1226        let _ = region_id;
1227        let _ = removed;
1228    }
1229
1230    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1231        #[cfg(any(test, feature = "test"))]
1232        if let Some(listener) = &self.listener {
1233            listener.on_merge_ssts_finished(region_id).await;
1234        }
1235        // Avoid compiler warning.
1236        let _ = region_id;
1237    }
1238
1239    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1240        #[cfg(any(test, feature = "test"))]
1241        if let Some(listener) = &self.listener {
1242            listener.on_recv_requests(request_num);
1243        }
1244        // Avoid compiler warning.
1245        let _ = request_num;
1246    }
1247
1248    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1249        #[cfg(any(test, feature = "test"))]
1250        if let Some(listener) = &self.listener {
1251            listener.on_file_cache_filled(_file_id);
1252        }
1253    }
1254
1255    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1256        #[cfg(any(test, feature = "test"))]
1257        if let Some(listener) = &self.listener {
1258            listener.on_compaction_scheduled(_region_id);
1259        }
1260    }
1261
1262    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1263        #[cfg(any(test, feature = "test"))]
1264        if let Some(listener) = &self.listener {
1265            listener
1266                .on_notify_region_change_result_begin(_region_id)
1267                .await;
1268        }
1269    }
1270
1271    pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
1272        #[cfg(any(test, feature = "test"))]
1273        if let Some(listener) = &self.listener {
1274            listener.on_index_build_finish(_region_file_id).await;
1275        }
1276    }
1277
1278    pub(crate) async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {
1279        #[cfg(any(test, feature = "test"))]
1280        if let Some(listener) = &self.listener {
1281            listener.on_index_build_begin(_region_file_id).await;
1282        }
1283    }
1284
1285    pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
1286        #[cfg(any(test, feature = "test"))]
1287        if let Some(listener) = &self.listener {
1288            listener.on_index_build_abort(_region_file_id).await;
1289        }
1290    }
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295    use super::*;
1296    use crate::test_util::TestEnv;
1297
1298    #[test]
1299    fn test_region_id_to_index() {
1300        let num_workers = 4;
1301
1302        let region_id = RegionId::new(1, 2);
1303        let index = region_id_to_index(region_id, num_workers);
1304        assert_eq!(index, 3);
1305
1306        let region_id = RegionId::new(2, 3);
1307        let index = region_id_to_index(region_id, num_workers);
1308        assert_eq!(index, 1);
1309    }
1310
1311    #[tokio::test]
1312    async fn test_worker_group_start_stop() {
1313        let env = TestEnv::with_prefix("group-stop").await;
1314        let group = env
1315            .create_worker_group(MitoConfig {
1316                num_workers: 4,
1317                ..Default::default()
1318            })
1319            .await;
1320
1321        group.stop().await.unwrap();
1322    }
1323}