mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_meta::key::SchemaMetadataManagerRef;
38use common_runtime::JoinHandle;
39use common_telemetry::{error, info, warn};
40use futures::future::try_join_all;
41use object_store::manager::ObjectStoreManagerRef;
42use prometheus::IntGauge;
43use rand::{rng, Rng};
44use snafu::{ensure, ResultExt};
45use store_api::logstore::LogStore;
46use store_api::region_engine::{
47    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
48};
49use store_api::storage::RegionId;
50use tokio::sync::mpsc::{Receiver, Sender};
51use tokio::sync::{mpsc, oneshot, watch, Mutex};
52
53use crate::cache::write_cache::{WriteCache, WriteCacheRef};
54use crate::cache::{CacheManager, CacheManagerRef};
55use crate::compaction::CompactionScheduler;
56use crate::config::MitoConfig;
57use crate::error;
58use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64    BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
65};
66use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
67use crate::sst::file::FileId;
68use crate::sst::index::intermediate::IntermediateManager;
69use crate::sst::index::puffin_manager::PuffinManagerFactory;
70use crate::time_provider::{StdTimeProvider, TimeProviderRef};
71use crate::wal::Wal;
72use crate::worker::handle_manifest::RegionEditQueues;
73
74/// Identifier for a worker.
75pub(crate) type WorkerId = u32;
76
77pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
78
79/// Interval to check whether regions should flush.
80pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
81/// Max delay to check region periodical tasks.
82pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
83
84#[cfg_attr(doc, aquamarine::aquamarine)]
85/// A fixed size group of [RegionWorkers](RegionWorker).
86///
87/// A worker group binds each region to a specific [RegionWorker] and sends
88/// requests to region's dedicated worker.
89///
90/// ```mermaid
91/// graph LR
92///
93/// RegionRequest -- Route by region id --> Worker0 & Worker1
94///
95/// subgraph MitoEngine
96///     subgraph WorkerGroup
97///         Worker0["RegionWorker 0"]
98///         Worker1["RegionWorker 1"]
99///     end
100/// end
101///
102/// Chan0[" Request channel 0"]
103/// Chan1[" Request channel 1"]
104/// WorkerThread1["RegionWorkerLoop 1"]
105///
106/// subgraph WorkerThread0["RegionWorkerLoop 0"]
107///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
108///         Region0["Region 0"]
109///         Region2["Region 2"]
110///     end
111///     Buffer0["RequestBuffer"]
112///
113///     Buffer0 -- modify regions --> RegionMap
114/// end
115///
116/// Worker0 --> Chan0
117/// Worker1 --> Chan1
118/// Chan0 --> Buffer0
119/// Chan1 --> WorkerThread1
120/// ```
121pub(crate) struct WorkerGroup {
122    /// Workers of the group.
123    workers: Vec<RegionWorker>,
124    /// Flush background job pool.
125    flush_job_pool: SchedulerRef,
126    /// Compaction background job pool.
127    compact_job_pool: SchedulerRef,
128    /// Scheduler for file purgers.
129    purge_scheduler: SchedulerRef,
130    /// Cache.
131    cache_manager: CacheManagerRef,
132}
133
134impl WorkerGroup {
135    /// Starts a worker group.
136    ///
137    /// The number of workers should be power of two.
138    pub(crate) async fn start<S: LogStore>(
139        config: Arc<MitoConfig>,
140        log_store: Arc<S>,
141        object_store_manager: ObjectStoreManagerRef,
142        schema_metadata_manager: SchemaMetadataManagerRef,
143        plugins: Plugins,
144    ) -> Result<WorkerGroup> {
145        let (flush_sender, flush_receiver) = watch::channel(());
146        let write_buffer_manager = Arc::new(
147            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
148                .with_notifier(flush_sender.clone()),
149        );
150        let puffin_manager_factory = PuffinManagerFactory::new(
151            &config.index.aux_path,
152            config.index.staging_size.as_bytes(),
153            Some(config.index.write_buffer_size.as_bytes() as _),
154            config.index.staging_ttl,
155        )
156        .await?;
157        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
158            .await?
159            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
160        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
161        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
162        // We use another scheduler to avoid purge jobs blocking other jobs.
163        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
164        let write_cache = write_cache_from_config(
165            &config,
166            puffin_manager_factory.clone(),
167            intermediate_manager.clone(),
168        )
169        .await?;
170        let cache_manager = Arc::new(
171            CacheManager::builder()
172                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
173                .vector_cache_size(config.vector_cache_size.as_bytes())
174                .page_cache_size(config.page_cache_size.as_bytes())
175                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
176                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
177                .index_content_size(config.index.content_cache_size.as_bytes())
178                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
179                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
180                .write_cache(write_cache)
181                .build(),
182        );
183        let time_provider = Arc::new(StdTimeProvider);
184
185        let workers = (0..config.num_workers)
186            .map(|id| {
187                WorkerStarter {
188                    id: id as WorkerId,
189                    config: config.clone(),
190                    log_store: log_store.clone(),
191                    object_store_manager: object_store_manager.clone(),
192                    write_buffer_manager: write_buffer_manager.clone(),
193                    flush_job_pool: flush_job_pool.clone(),
194                    compact_job_pool: compact_job_pool.clone(),
195                    purge_scheduler: purge_scheduler.clone(),
196                    listener: WorkerListener::default(),
197                    cache_manager: cache_manager.clone(),
198                    puffin_manager_factory: puffin_manager_factory.clone(),
199                    intermediate_manager: intermediate_manager.clone(),
200                    time_provider: time_provider.clone(),
201                    flush_sender: flush_sender.clone(),
202                    flush_receiver: flush_receiver.clone(),
203                    plugins: plugins.clone(),
204                    schema_metadata_manager: schema_metadata_manager.clone(),
205                }
206                .start()
207            })
208            .collect();
209
210        Ok(WorkerGroup {
211            workers,
212            flush_job_pool,
213            compact_job_pool,
214            purge_scheduler,
215            cache_manager,
216        })
217    }
218
219    /// Stops the worker group.
220    pub(crate) async fn stop(&self) -> Result<()> {
221        info!("Stop region worker group");
222
223        // TODO(yingwen): Do we need to stop gracefully?
224        // Stops the scheduler gracefully.
225        self.compact_job_pool.stop(true).await?;
226        // Stops the scheduler gracefully.
227        self.flush_job_pool.stop(true).await?;
228        // Stops the purge scheduler gracefully.
229        self.purge_scheduler.stop(true).await?;
230
231        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
232
233        Ok(())
234    }
235
236    /// Submits a request to a worker in the group.
237    pub(crate) async fn submit_to_worker(
238        &self,
239        region_id: RegionId,
240        request: WorkerRequest,
241    ) -> Result<()> {
242        self.worker(region_id).submit_request(request).await
243    }
244
245    /// Returns true if the specific region exists.
246    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
247        self.worker(region_id).is_region_exists(region_id)
248    }
249
250    /// Returns true if the specific region is opening.
251    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
252        self.worker(region_id).is_region_opening(region_id)
253    }
254
255    /// Returns region of specific `region_id`.
256    ///
257    /// This method should not be public.
258    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
259        self.worker(region_id).get_region(region_id)
260    }
261
262    /// Returns cache of the group.
263    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
264        self.cache_manager.clone()
265    }
266
267    /// Get worker for specific `region_id`.
268    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
269        let index = region_id_to_index(region_id, self.workers.len());
270
271        &self.workers[index]
272    }
273}
274
275// Tests methods.
276#[cfg(any(test, feature = "test"))]
277impl WorkerGroup {
278    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
279    ///
280    /// The number of workers should be power of two.
281    pub(crate) async fn start_for_test<S: LogStore>(
282        config: Arc<MitoConfig>,
283        log_store: Arc<S>,
284        object_store_manager: ObjectStoreManagerRef,
285        write_buffer_manager: Option<WriteBufferManagerRef>,
286        listener: Option<crate::engine::listener::EventListenerRef>,
287        schema_metadata_manager: SchemaMetadataManagerRef,
288        time_provider: TimeProviderRef,
289    ) -> Result<WorkerGroup> {
290        let (flush_sender, flush_receiver) = watch::channel(());
291        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
292            Arc::new(
293                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
294                    .with_notifier(flush_sender.clone()),
295            )
296        });
297        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
298        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
299        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
300        let puffin_manager_factory = PuffinManagerFactory::new(
301            &config.index.aux_path,
302            config.index.staging_size.as_bytes(),
303            Some(config.index.write_buffer_size.as_bytes() as _),
304            config.index.staging_ttl,
305        )
306        .await?;
307        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
308            .await?
309            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
310        let write_cache = write_cache_from_config(
311            &config,
312            puffin_manager_factory.clone(),
313            intermediate_manager.clone(),
314        )
315        .await?;
316        let cache_manager = Arc::new(
317            CacheManager::builder()
318                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
319                .vector_cache_size(config.vector_cache_size.as_bytes())
320                .page_cache_size(config.page_cache_size.as_bytes())
321                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
322                .write_cache(write_cache)
323                .build(),
324        );
325        let workers = (0..config.num_workers)
326            .map(|id| {
327                WorkerStarter {
328                    id: id as WorkerId,
329                    config: config.clone(),
330                    log_store: log_store.clone(),
331                    object_store_manager: object_store_manager.clone(),
332                    write_buffer_manager: write_buffer_manager.clone(),
333                    flush_job_pool: flush_job_pool.clone(),
334                    compact_job_pool: compact_job_pool.clone(),
335                    purge_scheduler: purge_scheduler.clone(),
336                    listener: WorkerListener::new(listener.clone()),
337                    cache_manager: cache_manager.clone(),
338                    puffin_manager_factory: puffin_manager_factory.clone(),
339                    intermediate_manager: intermediate_manager.clone(),
340                    time_provider: time_provider.clone(),
341                    flush_sender: flush_sender.clone(),
342                    flush_receiver: flush_receiver.clone(),
343                    plugins: Plugins::new(),
344                    schema_metadata_manager: schema_metadata_manager.clone(),
345                }
346                .start()
347            })
348            .collect();
349
350        Ok(WorkerGroup {
351            workers,
352            flush_job_pool,
353            compact_job_pool,
354            purge_scheduler,
355            cache_manager,
356        })
357    }
358
359    /// Returns the purge scheduler.
360    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
361        &self.purge_scheduler
362    }
363}
364
365fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
366    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
367        % num_workers
368}
369
370async fn write_cache_from_config(
371    config: &MitoConfig,
372    puffin_manager_factory: PuffinManagerFactory,
373    intermediate_manager: IntermediateManager,
374) -> Result<Option<WriteCacheRef>> {
375    if !config.enable_write_cache {
376        return Ok(None);
377    }
378
379    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
380        .await
381        .context(CreateDirSnafu {
382            dir: &config.write_cache_path,
383        })?;
384
385    let cache = WriteCache::new_fs(
386        &config.write_cache_path,
387        config.write_cache_size,
388        config.write_cache_ttl,
389        puffin_manager_factory,
390        intermediate_manager,
391    )
392    .await?;
393    Ok(Some(Arc::new(cache)))
394}
395
396/// Computes a initial check delay for a worker.
397pub(crate) fn worker_init_check_delay() -> Duration {
398    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
399    Duration::from_secs(init_check_delay)
400}
401
402/// Worker start config.
403struct WorkerStarter<S> {
404    id: WorkerId,
405    config: Arc<MitoConfig>,
406    log_store: Arc<S>,
407    object_store_manager: ObjectStoreManagerRef,
408    write_buffer_manager: WriteBufferManagerRef,
409    compact_job_pool: SchedulerRef,
410    flush_job_pool: SchedulerRef,
411    purge_scheduler: SchedulerRef,
412    listener: WorkerListener,
413    cache_manager: CacheManagerRef,
414    puffin_manager_factory: PuffinManagerFactory,
415    intermediate_manager: IntermediateManager,
416    time_provider: TimeProviderRef,
417    /// Watch channel sender to notify workers to handle stalled requests.
418    flush_sender: watch::Sender<()>,
419    /// Watch channel receiver to wait for background flush job.
420    flush_receiver: watch::Receiver<()>,
421    plugins: Plugins,
422    schema_metadata_manager: SchemaMetadataManagerRef,
423}
424
425impl<S: LogStore> WorkerStarter<S> {
426    /// Starts a region worker and its background thread.
427    fn start(self) -> RegionWorker {
428        let regions = Arc::new(RegionMap::default());
429        let opening_regions = Arc::new(OpeningRegions::default());
430        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
431
432        let running = Arc::new(AtomicBool::new(true));
433        let now = self.time_provider.current_time_millis();
434        let id_string = self.id.to_string();
435        let mut worker_thread = RegionWorkerLoop {
436            id: self.id,
437            config: self.config.clone(),
438            regions: regions.clone(),
439            dropping_regions: Arc::new(RegionMap::default()),
440            opening_regions: opening_regions.clone(),
441            sender: sender.clone(),
442            receiver,
443            wal: Wal::new(self.log_store),
444            object_store_manager: self.object_store_manager.clone(),
445            running: running.clone(),
446            memtable_builder_provider: MemtableBuilderProvider::new(
447                Some(self.write_buffer_manager.clone()),
448                self.config.clone(),
449            ),
450            purge_scheduler: self.purge_scheduler.clone(),
451            write_buffer_manager: self.write_buffer_manager,
452            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
453            compaction_scheduler: CompactionScheduler::new(
454                self.compact_job_pool,
455                sender.clone(),
456                self.cache_manager.clone(),
457                self.config,
458                self.listener.clone(),
459                self.plugins.clone(),
460            ),
461            stalled_requests: StalledRequests::default(),
462            listener: self.listener,
463            cache_manager: self.cache_manager,
464            puffin_manager_factory: self.puffin_manager_factory,
465            intermediate_manager: self.intermediate_manager,
466            time_provider: self.time_provider,
467            last_periodical_check_millis: now,
468            flush_sender: self.flush_sender,
469            flush_receiver: self.flush_receiver,
470            stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
471            region_count: REGION_COUNT.with_label_values(&[&id_string]),
472            region_edit_queues: RegionEditQueues::default(),
473            schema_metadata_manager: self.schema_metadata_manager,
474        };
475        let handle = common_runtime::spawn_global(async move {
476            worker_thread.run().await;
477        });
478
479        RegionWorker {
480            id: self.id,
481            regions,
482            opening_regions,
483            sender,
484            handle: Mutex::new(Some(handle)),
485            running,
486        }
487    }
488}
489
490/// Worker to write and alter regions bound to it.
491pub(crate) struct RegionWorker {
492    /// Id of the worker.
493    id: WorkerId,
494    /// Regions bound to the worker.
495    regions: RegionMapRef,
496    /// The opening regions.
497    opening_regions: OpeningRegionsRef,
498    /// Request sender.
499    sender: Sender<WorkerRequest>,
500    /// Handle to the worker thread.
501    handle: Mutex<Option<JoinHandle<()>>>,
502    /// Whether to run the worker thread.
503    running: Arc<AtomicBool>,
504}
505
506impl RegionWorker {
507    /// Submits request to background worker thread.
508    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
509        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
510        if self.sender.send(request).await.is_err() {
511            warn!(
512                "Worker {} is already exited but the running flag is still true",
513                self.id
514            );
515            // Manually set the running flag to false to avoid printing more warning logs.
516            self.set_running(false);
517            return WorkerStoppedSnafu { id: self.id }.fail();
518        }
519
520        Ok(())
521    }
522
523    /// Stop the worker.
524    ///
525    /// This method waits until the worker thread exists.
526    async fn stop(&self) -> Result<()> {
527        let handle = self.handle.lock().await.take();
528        if let Some(handle) = handle {
529            info!("Stop region worker {}", self.id);
530
531            self.set_running(false);
532            if self.sender.send(WorkerRequest::Stop).await.is_err() {
533                warn!("Worker {} is already exited before stop", self.id);
534            }
535
536            handle.await.context(JoinSnafu)?;
537        }
538
539        Ok(())
540    }
541
542    /// Returns true if the worker is still running.
543    fn is_running(&self) -> bool {
544        self.running.load(Ordering::Relaxed)
545    }
546
547    /// Sets whether the worker is still running.
548    fn set_running(&self, value: bool) {
549        self.running.store(value, Ordering::Relaxed)
550    }
551
552    /// Returns true if the worker contains specific region.
553    fn is_region_exists(&self, region_id: RegionId) -> bool {
554        self.regions.is_region_exists(region_id)
555    }
556
557    /// Returns true if the region is opening.
558    fn is_region_opening(&self, region_id: RegionId) -> bool {
559        self.opening_regions.is_region_exists(region_id)
560    }
561
562    /// Returns region of specific `region_id`.
563    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
564        self.regions.get_region(region_id)
565    }
566
567    #[cfg(test)]
568    /// Returns the [OpeningRegionsRef].
569    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
570        &self.opening_regions
571    }
572}
573
574impl Drop for RegionWorker {
575    fn drop(&mut self) {
576        if self.is_running() {
577            self.set_running(false);
578            // Once we drop the sender, the worker thread will receive a disconnected error.
579        }
580    }
581}
582
583type RequestBuffer = Vec<WorkerRequest>;
584
585/// Buffer for stalled write requests.
586///
587/// Maintains stalled write requests and their estimated size.
588#[derive(Default)]
589pub(crate) struct StalledRequests {
590    /// Stalled requests.
591    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
592    /// instead of `StalledRequests::requests.len()`.
593    ///
594    /// Key: RegionId
595    /// Value: (estimated size, stalled requests)
596    pub(crate) requests: HashMap<RegionId, (usize, Vec<SenderWriteRequest>)>,
597    /// Estimated size of all stalled requests.
598    pub(crate) estimated_size: usize,
599}
600
601impl StalledRequests {
602    /// Appends stalled requests.
603    pub(crate) fn append(&mut self, requests: &mut Vec<SenderWriteRequest>) {
604        for req in requests.drain(..) {
605            self.push(req);
606        }
607    }
608
609    /// Pushes a stalled request to the buffer.
610    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
611        let (size, requests) = self.requests.entry(req.request.region_id).or_default();
612        let req_size = req.request.estimated_size();
613        *size += req_size;
614        self.estimated_size += req_size;
615        requests.push(req);
616    }
617
618    /// Removes stalled requests of specific region.
619    pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec<SenderWriteRequest> {
620        if let Some((size, requests)) = self.requests.remove(region_id) {
621            self.estimated_size -= size;
622            requests
623        } else {
624            vec![]
625        }
626    }
627
628    /// Returns the total number of all stalled requests.
629    pub(crate) fn stalled_count(&self) -> usize {
630        self.requests.values().map(|reqs| reqs.1.len()).sum()
631    }
632}
633
634/// Background worker loop to handle requests.
635struct RegionWorkerLoop<S> {
636    /// Id of the worker.
637    id: WorkerId,
638    /// Engine config.
639    config: Arc<MitoConfig>,
640    /// Regions bound to the worker.
641    regions: RegionMapRef,
642    /// Regions that are not yet fully dropped.
643    dropping_regions: RegionMapRef,
644    /// Regions that are opening.
645    opening_regions: OpeningRegionsRef,
646    /// Request sender.
647    sender: Sender<WorkerRequest>,
648    /// Request receiver.
649    receiver: Receiver<WorkerRequest>,
650    /// WAL of the engine.
651    wal: Wal<S>,
652    /// Manages object stores for manifest and SSTs.
653    object_store_manager: ObjectStoreManagerRef,
654    /// Whether the worker thread is still running.
655    running: Arc<AtomicBool>,
656    /// Memtable builder provider for each region.
657    memtable_builder_provider: MemtableBuilderProvider,
658    /// Background purge job scheduler.
659    purge_scheduler: SchedulerRef,
660    /// Engine write buffer manager.
661    write_buffer_manager: WriteBufferManagerRef,
662    /// Schedules background flush requests.
663    flush_scheduler: FlushScheduler,
664    /// Scheduler for compaction tasks.
665    compaction_scheduler: CompactionScheduler,
666    /// Stalled write requests.
667    stalled_requests: StalledRequests,
668    /// Event listener for tests.
669    listener: WorkerListener,
670    /// Cache.
671    cache_manager: CacheManagerRef,
672    /// Puffin manager factory for index.
673    puffin_manager_factory: PuffinManagerFactory,
674    /// Intermediate manager for inverted index.
675    intermediate_manager: IntermediateManager,
676    /// Provider to get current time.
677    time_provider: TimeProviderRef,
678    /// Last time to check regions periodically.
679    last_periodical_check_millis: i64,
680    /// Watch channel sender to notify workers to handle stalled requests.
681    flush_sender: watch::Sender<()>,
682    /// Watch channel receiver to wait for background flush job.
683    flush_receiver: watch::Receiver<()>,
684    /// Gauge of stalled request count.
685    stalled_count: IntGauge,
686    /// Gauge of regions in the worker.
687    region_count: IntGauge,
688    /// Queues for region edit requests.
689    region_edit_queues: RegionEditQueues,
690    /// Database level metadata manager.
691    schema_metadata_manager: SchemaMetadataManagerRef,
692}
693
694impl<S: LogStore> RegionWorkerLoop<S> {
695    /// Starts the worker loop.
696    async fn run(&mut self) {
697        let init_check_delay = worker_init_check_delay();
698        info!(
699            "Start region worker thread {}, init_check_delay: {:?}",
700            self.id, init_check_delay
701        );
702        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
703
704        // Buffer to retrieve requests from receiver.
705        let mut write_req_buffer: Vec<SenderWriteRequest> =
706            Vec::with_capacity(self.config.worker_request_batch_size);
707        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
708            Vec::with_capacity(self.config.worker_request_batch_size);
709        let mut general_req_buffer: Vec<WorkerRequest> =
710            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
711
712        while self.running.load(Ordering::Relaxed) {
713            // Clear the buffer before handling next batch of requests.
714            write_req_buffer.clear();
715            ddl_req_buffer.clear();
716            general_req_buffer.clear();
717
718            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
719            let sleep = tokio::time::sleep(max_wait_time);
720            tokio::pin!(sleep);
721
722            tokio::select! {
723                request_opt = self.receiver.recv() => {
724                    match request_opt {
725                        Some(request) => match request {
726                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
727                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
728                            _ => general_req_buffer.push(request),
729                        },
730                        // The channel is disconnected.
731                        None => break,
732                    }
733                }
734                recv_res = self.flush_receiver.changed() => {
735                    if recv_res.is_err() {
736                        // The channel is disconnected.
737                        break;
738                    } else {
739                        // Also flush this worker if other workers trigger flush as this worker may have
740                        // a large memtable to flush. We may not have chance to flush that memtable if we
741                        // never write to this worker. So only flushing other workers may not release enough
742                        // memory.
743                        self.maybe_flush_worker();
744                        // A flush job is finished, handles stalled requests.
745                        self.handle_stalled_requests().await;
746                        continue;
747                    }
748                }
749                _ = &mut sleep => {
750                    // Timeout. Checks periodical tasks.
751                    self.handle_periodical_tasks();
752                    continue;
753                }
754            }
755
756            if self.flush_receiver.has_changed().unwrap_or(false) {
757                // Always checks whether we could process stalled requests to avoid a request
758                // hangs too long.
759                // If the channel is closed, do nothing.
760                self.handle_stalled_requests().await;
761            }
762
763            // Try to recv more requests from the channel.
764            for _ in 1..self.config.worker_request_batch_size {
765                // We have received one request so we start from 1.
766                match self.receiver.try_recv() {
767                    Ok(req) => match req {
768                        WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
769                        WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
770                        _ => general_req_buffer.push(req),
771                    },
772                    // We still need to handle remaining requests.
773                    Err(_) => break,
774                }
775            }
776
777            self.listener.on_recv_requests(
778                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
779            );
780
781            self.handle_requests(
782                &mut write_req_buffer,
783                &mut ddl_req_buffer,
784                &mut general_req_buffer,
785            )
786            .await;
787
788            self.handle_periodical_tasks();
789        }
790
791        self.clean().await;
792
793        info!("Exit region worker thread {}", self.id);
794    }
795
796    /// Dispatches and processes requests.
797    ///
798    /// `buffer` should be empty.
799    async fn handle_requests(
800        &mut self,
801        write_requests: &mut Vec<SenderWriteRequest>,
802        ddl_requests: &mut Vec<SenderDdlRequest>,
803        general_requests: &mut Vec<WorkerRequest>,
804    ) {
805        for worker_req in general_requests.drain(..) {
806            match worker_req {
807                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
808                    // These requests are categorized into write_requests and ddl_requests.
809                    continue;
810                }
811                WorkerRequest::Background { region_id, notify } => {
812                    // For background notify, we handle it directly.
813                    self.handle_background_notify(region_id, notify).await;
814                }
815                WorkerRequest::SetRegionRoleStateGracefully {
816                    region_id,
817                    region_role_state,
818                    sender,
819                } => {
820                    self.set_role_state_gracefully(region_id, region_role_state, sender)
821                        .await;
822                }
823                WorkerRequest::EditRegion(request) => {
824                    self.handle_region_edit(request).await;
825                }
826                WorkerRequest::Stop => {
827                    debug_assert!(!self.running.load(Ordering::Relaxed));
828                }
829                WorkerRequest::SyncRegion(req) => {
830                    self.handle_region_sync(req).await;
831                }
832                WorkerRequest::BulkInserts {
833                    metadata,
834                    request,
835                    sender,
836                } => {
837                    if let Some(region_metadata) = metadata {
838                        self.handle_bulk_inserts(request, region_metadata, write_requests, sender)
839                            .await;
840                    } else {
841                        error!("Cannot find region metadata for {}", request.region_id);
842                        sender.send(
843                            error::RegionNotFoundSnafu {
844                                region_id: request.region_id,
845                            }
846                            .fail(),
847                        );
848                    }
849                }
850            }
851        }
852
853        // Handles all write requests first. So we can alter regions without
854        // considering existing write requests.
855        self.handle_write_requests(write_requests, true).await;
856
857        self.handle_ddl_requests(ddl_requests).await;
858    }
859
860    /// Takes and handles all ddl requests.
861    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
862        if ddl_requests.is_empty() {
863            return;
864        }
865
866        for ddl in ddl_requests.drain(..) {
867            let res = match ddl.request {
868                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
869                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
870                DdlRequest::Open((req, wal_entry_receiver)) => {
871                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
872                        .await;
873                    continue;
874                }
875                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
876                DdlRequest::Alter(req) => {
877                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
878                        .await;
879                    continue;
880                }
881                DdlRequest::Flush(req) => {
882                    self.handle_flush_request(ddl.region_id, req, ddl.sender)
883                        .await;
884                    continue;
885                }
886                DdlRequest::Compact(req) => {
887                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
888                        .await;
889                    continue;
890                }
891                DdlRequest::Truncate(_) => {
892                    self.handle_truncate_request(ddl.region_id, ddl.sender)
893                        .await;
894                    continue;
895                }
896                DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
897            };
898
899            ddl.sender.send(res);
900        }
901    }
902
903    /// Handle periodical tasks such as region auto flush.
904    fn handle_periodical_tasks(&mut self) {
905        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
906        if self
907            .time_provider
908            .elapsed_since(self.last_periodical_check_millis)
909            < interval
910        {
911            return;
912        }
913
914        self.last_periodical_check_millis = self.time_provider.current_time_millis();
915
916        if let Err(e) = self.flush_periodically() {
917            error!(e; "Failed to flush regions periodically");
918        }
919    }
920
921    /// Handles region background request
922    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
923        match notify {
924            BackgroundNotify::FlushFinished(req) => {
925                self.handle_flush_finished(region_id, req).await
926            }
927            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
928            BackgroundNotify::CompactionFinished(req) => {
929                self.handle_compaction_finished(region_id, req).await
930            }
931            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
932            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
933            BackgroundNotify::RegionChange(req) => {
934                self.handle_manifest_region_change_result(req).await
935            }
936            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
937        }
938    }
939
940    /// Handles `set_region_role_gracefully`.
941    async fn set_role_state_gracefully(
942        &mut self,
943        region_id: RegionId,
944        region_role_state: SettableRegionRoleState,
945        sender: oneshot::Sender<SetRegionRoleStateResponse>,
946    ) {
947        if let Some(region) = self.regions.get_region(region_id) {
948            // We need to do this in background as we need the manifest lock.
949            common_runtime::spawn_global(async move {
950                region.set_role_state_gracefully(region_role_state).await;
951
952                let last_entry_id = region.version_control.current().last_entry_id;
953                let _ = sender.send(SetRegionRoleStateResponse::success(
954                    SetRegionRoleStateSuccess::mito(last_entry_id),
955                ));
956            });
957        } else {
958            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
959        }
960    }
961}
962
963impl<S> RegionWorkerLoop<S> {
964    /// Cleans up the worker.
965    async fn clean(&self) {
966        // Closes remaining regions.
967        let regions = self.regions.list_regions();
968        for region in regions {
969            region.stop().await;
970        }
971
972        self.regions.clear();
973    }
974
975    /// Notifies the whole group that a flush job is finished so other
976    /// workers can handle stalled requests.
977    fn notify_group(&mut self) {
978        // Notifies all receivers.
979        let _ = self.flush_sender.send(());
980        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
981        self.flush_receiver.borrow_and_update();
982    }
983}
984
985/// Wrapper that only calls event listener in tests.
986#[derive(Default, Clone)]
987pub(crate) struct WorkerListener {
988    #[cfg(any(test, feature = "test"))]
989    listener: Option<crate::engine::listener::EventListenerRef>,
990}
991
992impl WorkerListener {
993    #[cfg(any(test, feature = "test"))]
994    pub(crate) fn new(
995        listener: Option<crate::engine::listener::EventListenerRef>,
996    ) -> WorkerListener {
997        WorkerListener { listener }
998    }
999
1000    /// Flush is finished successfully.
1001    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1002        #[cfg(any(test, feature = "test"))]
1003        if let Some(listener) = &self.listener {
1004            listener.on_flush_success(region_id);
1005        }
1006        // Avoid compiler warning.
1007        let _ = region_id;
1008    }
1009
1010    /// Engine is stalled.
1011    pub(crate) fn on_write_stall(&self) {
1012        #[cfg(any(test, feature = "test"))]
1013        if let Some(listener) = &self.listener {
1014            listener.on_write_stall();
1015        }
1016    }
1017
1018    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1019        #[cfg(any(test, feature = "test"))]
1020        if let Some(listener) = &self.listener {
1021            listener.on_flush_begin(region_id).await;
1022        }
1023        // Avoid compiler warning.
1024        let _ = region_id;
1025    }
1026
1027    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1028        #[cfg(any(test, feature = "test"))]
1029        if let Some(listener) = &self.listener {
1030            return listener.on_later_drop_begin(region_id);
1031        }
1032        // Avoid compiler warning.
1033        let _ = region_id;
1034        None
1035    }
1036
1037    /// On later drop task is finished.
1038    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1039        #[cfg(any(test, feature = "test"))]
1040        if let Some(listener) = &self.listener {
1041            listener.on_later_drop_end(region_id, removed);
1042        }
1043        // Avoid compiler warning.
1044        let _ = region_id;
1045        let _ = removed;
1046    }
1047
1048    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1049        #[cfg(any(test, feature = "test"))]
1050        if let Some(listener) = &self.listener {
1051            listener.on_merge_ssts_finished(region_id).await;
1052        }
1053        // Avoid compiler warning.
1054        let _ = region_id;
1055    }
1056
1057    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1058        #[cfg(any(test, feature = "test"))]
1059        if let Some(listener) = &self.listener {
1060            listener.on_recv_requests(request_num);
1061        }
1062        // Avoid compiler warning.
1063        let _ = request_num;
1064    }
1065
1066    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1067        #[cfg(any(test, feature = "test"))]
1068        if let Some(listener) = &self.listener {
1069            listener.on_file_cache_filled(_file_id);
1070        }
1071    }
1072
1073    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1074        #[cfg(any(test, feature = "test"))]
1075        if let Some(listener) = &self.listener {
1076            listener.on_compaction_scheduled(_region_id);
1077        }
1078    }
1079
1080    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1081        #[cfg(any(test, feature = "test"))]
1082        if let Some(listener) = &self.listener {
1083            listener
1084                .on_notify_region_change_result_begin(_region_id)
1085                .await;
1086        }
1087    }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use super::*;
1093    use crate::test_util::TestEnv;
1094
1095    #[test]
1096    fn test_region_id_to_index() {
1097        let num_workers = 4;
1098
1099        let region_id = RegionId::new(1, 2);
1100        let index = region_id_to_index(region_id, num_workers);
1101        assert_eq!(index, 3);
1102
1103        let region_id = RegionId::new(2, 3);
1104        let index = region_id_to_index(region_id, num_workers);
1105        assert_eq!(index, 1);
1106    }
1107
1108    #[tokio::test]
1109    async fn test_worker_group_start_stop() {
1110        let env = TestEnv::with_prefix("group-stop");
1111        let group = env
1112            .create_worker_group(MitoConfig {
1113                num_workers: 4,
1114                ..Default::default()
1115            })
1116            .await;
1117
1118        group.stop().await.unwrap();
1119    }
1120}