mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_meta::key::SchemaMetadataManagerRef;
38use common_runtime::JoinHandle;
39use common_telemetry::{error, info, warn};
40use futures::future::try_join_all;
41use object_store::manager::ObjectStoreManagerRef;
42use prometheus::{Histogram, IntGauge};
43use rand::{rng, Rng};
44use snafu::{ensure, ResultExt};
45use store_api::logstore::LogStore;
46use store_api::region_engine::{
47    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
48};
49use store_api::storage::RegionId;
50use tokio::sync::mpsc::{Receiver, Sender};
51use tokio::sync::{mpsc, oneshot, watch, Mutex};
52
53use crate::cache::write_cache::{WriteCache, WriteCacheRef};
54use crate::cache::{CacheManager, CacheManagerRef};
55use crate::compaction::CompactionScheduler;
56use crate::config::MitoConfig;
57use crate::error;
58use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
65    WorkerRequest, WorkerRequestWithTime,
66};
67use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
68use crate::sst::file::FileId;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::puffin_manager::PuffinManagerFactory;
71use crate::time_provider::{StdTimeProvider, TimeProviderRef};
72use crate::wal::Wal;
73use crate::worker::handle_manifest::RegionEditQueues;
74
75/// Identifier for a worker.
76pub(crate) type WorkerId = u32;
77
78pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
79
80/// Interval to check whether regions should flush.
81pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
82/// Max delay to check region periodical tasks.
83pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
84
85#[cfg_attr(doc, aquamarine::aquamarine)]
86/// A fixed size group of [RegionWorkers](RegionWorker).
87///
88/// A worker group binds each region to a specific [RegionWorker] and sends
89/// requests to region's dedicated worker.
90///
91/// ```mermaid
92/// graph LR
93///
94/// RegionRequest -- Route by region id --> Worker0 & Worker1
95///
96/// subgraph MitoEngine
97///     subgraph WorkerGroup
98///         Worker0["RegionWorker 0"]
99///         Worker1["RegionWorker 1"]
100///     end
101/// end
102///
103/// Chan0[" Request channel 0"]
104/// Chan1[" Request channel 1"]
105/// WorkerThread1["RegionWorkerLoop 1"]
106///
107/// subgraph WorkerThread0["RegionWorkerLoop 0"]
108///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
109///         Region0["Region 0"]
110///         Region2["Region 2"]
111///     end
112///     Buffer0["RequestBuffer"]
113///
114///     Buffer0 -- modify regions --> RegionMap
115/// end
116///
117/// Worker0 --> Chan0
118/// Worker1 --> Chan1
119/// Chan0 --> Buffer0
120/// Chan1 --> WorkerThread1
121/// ```
122pub(crate) struct WorkerGroup {
123    /// Workers of the group.
124    workers: Vec<RegionWorker>,
125    /// Flush background job pool.
126    flush_job_pool: SchedulerRef,
127    /// Compaction background job pool.
128    compact_job_pool: SchedulerRef,
129    /// Scheduler for file purgers.
130    purge_scheduler: SchedulerRef,
131    /// Cache.
132    cache_manager: CacheManagerRef,
133}
134
135impl WorkerGroup {
136    /// Starts a worker group.
137    ///
138    /// The number of workers should be power of two.
139    pub(crate) async fn start<S: LogStore>(
140        config: Arc<MitoConfig>,
141        log_store: Arc<S>,
142        object_store_manager: ObjectStoreManagerRef,
143        schema_metadata_manager: SchemaMetadataManagerRef,
144        plugins: Plugins,
145    ) -> Result<WorkerGroup> {
146        let (flush_sender, flush_receiver) = watch::channel(());
147        let write_buffer_manager = Arc::new(
148            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
149                .with_notifier(flush_sender.clone()),
150        );
151        let puffin_manager_factory = PuffinManagerFactory::new(
152            &config.index.aux_path,
153            config.index.staging_size.as_bytes(),
154            Some(config.index.write_buffer_size.as_bytes() as _),
155            config.index.staging_ttl,
156        )
157        .await?;
158        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
159            .await?
160            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
161        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
162        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
163        // We use another scheduler to avoid purge jobs blocking other jobs.
164        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
165        let write_cache = write_cache_from_config(
166            &config,
167            puffin_manager_factory.clone(),
168            intermediate_manager.clone(),
169        )
170        .await?;
171        let cache_manager = Arc::new(
172            CacheManager::builder()
173                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
174                .vector_cache_size(config.vector_cache_size.as_bytes())
175                .page_cache_size(config.page_cache_size.as_bytes())
176                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
177                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
178                .index_content_size(config.index.content_cache_size.as_bytes())
179                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
180                .index_result_cache_size(config.index.result_cache_size.as_bytes())
181                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
182                .write_cache(write_cache)
183                .build(),
184        );
185        let time_provider = Arc::new(StdTimeProvider);
186
187        let workers = (0..config.num_workers)
188            .map(|id| {
189                WorkerStarter {
190                    id: id as WorkerId,
191                    config: config.clone(),
192                    log_store: log_store.clone(),
193                    object_store_manager: object_store_manager.clone(),
194                    write_buffer_manager: write_buffer_manager.clone(),
195                    flush_job_pool: flush_job_pool.clone(),
196                    compact_job_pool: compact_job_pool.clone(),
197                    purge_scheduler: purge_scheduler.clone(),
198                    listener: WorkerListener::default(),
199                    cache_manager: cache_manager.clone(),
200                    puffin_manager_factory: puffin_manager_factory.clone(),
201                    intermediate_manager: intermediate_manager.clone(),
202                    time_provider: time_provider.clone(),
203                    flush_sender: flush_sender.clone(),
204                    flush_receiver: flush_receiver.clone(),
205                    plugins: plugins.clone(),
206                    schema_metadata_manager: schema_metadata_manager.clone(),
207                }
208                .start()
209            })
210            .collect();
211
212        Ok(WorkerGroup {
213            workers,
214            flush_job_pool,
215            compact_job_pool,
216            purge_scheduler,
217            cache_manager,
218        })
219    }
220
221    /// Stops the worker group.
222    pub(crate) async fn stop(&self) -> Result<()> {
223        info!("Stop region worker group");
224
225        // TODO(yingwen): Do we need to stop gracefully?
226        // Stops the scheduler gracefully.
227        self.compact_job_pool.stop(true).await?;
228        // Stops the scheduler gracefully.
229        self.flush_job_pool.stop(true).await?;
230        // Stops the purge scheduler gracefully.
231        self.purge_scheduler.stop(true).await?;
232
233        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
234
235        Ok(())
236    }
237
238    /// Submits a request to a worker in the group.
239    pub(crate) async fn submit_to_worker(
240        &self,
241        region_id: RegionId,
242        request: WorkerRequest,
243    ) -> Result<()> {
244        self.worker(region_id).submit_request(request).await
245    }
246
247    /// Returns true if the specific region exists.
248    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
249        self.worker(region_id).is_region_exists(region_id)
250    }
251
252    /// Returns true if the specific region is opening.
253    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
254        self.worker(region_id).is_region_opening(region_id)
255    }
256
257    /// Returns region of specific `region_id`.
258    ///
259    /// This method should not be public.
260    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
261        self.worker(region_id).get_region(region_id)
262    }
263
264    /// Returns cache of the group.
265    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
266        self.cache_manager.clone()
267    }
268
269    /// Get worker for specific `region_id`.
270    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
271        let index = region_id_to_index(region_id, self.workers.len());
272
273        &self.workers[index]
274    }
275}
276
277// Tests methods.
278#[cfg(any(test, feature = "test"))]
279impl WorkerGroup {
280    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
281    ///
282    /// The number of workers should be power of two.
283    pub(crate) async fn start_for_test<S: LogStore>(
284        config: Arc<MitoConfig>,
285        log_store: Arc<S>,
286        object_store_manager: ObjectStoreManagerRef,
287        write_buffer_manager: Option<WriteBufferManagerRef>,
288        listener: Option<crate::engine::listener::EventListenerRef>,
289        schema_metadata_manager: SchemaMetadataManagerRef,
290        time_provider: TimeProviderRef,
291    ) -> Result<WorkerGroup> {
292        let (flush_sender, flush_receiver) = watch::channel(());
293        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
294            Arc::new(
295                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
296                    .with_notifier(flush_sender.clone()),
297            )
298        });
299        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
300        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
301        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
302        let puffin_manager_factory = PuffinManagerFactory::new(
303            &config.index.aux_path,
304            config.index.staging_size.as_bytes(),
305            Some(config.index.write_buffer_size.as_bytes() as _),
306            config.index.staging_ttl,
307        )
308        .await?;
309        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
310            .await?
311            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
312        let write_cache = write_cache_from_config(
313            &config,
314            puffin_manager_factory.clone(),
315            intermediate_manager.clone(),
316        )
317        .await?;
318        let cache_manager = Arc::new(
319            CacheManager::builder()
320                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
321                .vector_cache_size(config.vector_cache_size.as_bytes())
322                .page_cache_size(config.page_cache_size.as_bytes())
323                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
324                .write_cache(write_cache)
325                .build(),
326        );
327        let workers = (0..config.num_workers)
328            .map(|id| {
329                WorkerStarter {
330                    id: id as WorkerId,
331                    config: config.clone(),
332                    log_store: log_store.clone(),
333                    object_store_manager: object_store_manager.clone(),
334                    write_buffer_manager: write_buffer_manager.clone(),
335                    flush_job_pool: flush_job_pool.clone(),
336                    compact_job_pool: compact_job_pool.clone(),
337                    purge_scheduler: purge_scheduler.clone(),
338                    listener: WorkerListener::new(listener.clone()),
339                    cache_manager: cache_manager.clone(),
340                    puffin_manager_factory: puffin_manager_factory.clone(),
341                    intermediate_manager: intermediate_manager.clone(),
342                    time_provider: time_provider.clone(),
343                    flush_sender: flush_sender.clone(),
344                    flush_receiver: flush_receiver.clone(),
345                    plugins: Plugins::new(),
346                    schema_metadata_manager: schema_metadata_manager.clone(),
347                }
348                .start()
349            })
350            .collect();
351
352        Ok(WorkerGroup {
353            workers,
354            flush_job_pool,
355            compact_job_pool,
356            purge_scheduler,
357            cache_manager,
358        })
359    }
360
361    /// Returns the purge scheduler.
362    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
363        &self.purge_scheduler
364    }
365}
366
367fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
368    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
369        % num_workers
370}
371
372async fn write_cache_from_config(
373    config: &MitoConfig,
374    puffin_manager_factory: PuffinManagerFactory,
375    intermediate_manager: IntermediateManager,
376) -> Result<Option<WriteCacheRef>> {
377    if !config.enable_write_cache {
378        return Ok(None);
379    }
380
381    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
382        .await
383        .context(CreateDirSnafu {
384            dir: &config.write_cache_path,
385        })?;
386
387    let cache = WriteCache::new_fs(
388        &config.write_cache_path,
389        config.write_cache_size,
390        config.write_cache_ttl,
391        puffin_manager_factory,
392        intermediate_manager,
393    )
394    .await?;
395    Ok(Some(Arc::new(cache)))
396}
397
398/// Computes a initial check delay for a worker.
399pub(crate) fn worker_init_check_delay() -> Duration {
400    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
401    Duration::from_secs(init_check_delay)
402}
403
404/// Worker start config.
405struct WorkerStarter<S> {
406    id: WorkerId,
407    config: Arc<MitoConfig>,
408    log_store: Arc<S>,
409    object_store_manager: ObjectStoreManagerRef,
410    write_buffer_manager: WriteBufferManagerRef,
411    compact_job_pool: SchedulerRef,
412    flush_job_pool: SchedulerRef,
413    purge_scheduler: SchedulerRef,
414    listener: WorkerListener,
415    cache_manager: CacheManagerRef,
416    puffin_manager_factory: PuffinManagerFactory,
417    intermediate_manager: IntermediateManager,
418    time_provider: TimeProviderRef,
419    /// Watch channel sender to notify workers to handle stalled requests.
420    flush_sender: watch::Sender<()>,
421    /// Watch channel receiver to wait for background flush job.
422    flush_receiver: watch::Receiver<()>,
423    plugins: Plugins,
424    schema_metadata_manager: SchemaMetadataManagerRef,
425}
426
427impl<S: LogStore> WorkerStarter<S> {
428    /// Starts a region worker and its background thread.
429    fn start(self) -> RegionWorker {
430        let regions = Arc::new(RegionMap::default());
431        let opening_regions = Arc::new(OpeningRegions::default());
432        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
433
434        let running = Arc::new(AtomicBool::new(true));
435        let now = self.time_provider.current_time_millis();
436        let id_string = self.id.to_string();
437        let mut worker_thread = RegionWorkerLoop {
438            id: self.id,
439            config: self.config.clone(),
440            regions: regions.clone(),
441            dropping_regions: Arc::new(RegionMap::default()),
442            opening_regions: opening_regions.clone(),
443            sender: sender.clone(),
444            receiver,
445            wal: Wal::new(self.log_store),
446            object_store_manager: self.object_store_manager.clone(),
447            running: running.clone(),
448            memtable_builder_provider: MemtableBuilderProvider::new(
449                Some(self.write_buffer_manager.clone()),
450                self.config.clone(),
451            ),
452            purge_scheduler: self.purge_scheduler.clone(),
453            write_buffer_manager: self.write_buffer_manager,
454            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
455            compaction_scheduler: CompactionScheduler::new(
456                self.compact_job_pool,
457                sender.clone(),
458                self.cache_manager.clone(),
459                self.config,
460                self.listener.clone(),
461                self.plugins.clone(),
462            ),
463            stalled_requests: StalledRequests::default(),
464            listener: self.listener,
465            cache_manager: self.cache_manager,
466            puffin_manager_factory: self.puffin_manager_factory,
467            intermediate_manager: self.intermediate_manager,
468            time_provider: self.time_provider,
469            last_periodical_check_millis: now,
470            flush_sender: self.flush_sender,
471            flush_receiver: self.flush_receiver,
472            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
473            region_count: REGION_COUNT.with_label_values(&[&id_string]),
474            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
475            region_edit_queues: RegionEditQueues::default(),
476            schema_metadata_manager: self.schema_metadata_manager,
477        };
478        let handle = common_runtime::spawn_global(async move {
479            worker_thread.run().await;
480        });
481
482        RegionWorker {
483            id: self.id,
484            regions,
485            opening_regions,
486            sender,
487            handle: Mutex::new(Some(handle)),
488            running,
489        }
490    }
491}
492
493/// Worker to write and alter regions bound to it.
494pub(crate) struct RegionWorker {
495    /// Id of the worker.
496    id: WorkerId,
497    /// Regions bound to the worker.
498    regions: RegionMapRef,
499    /// The opening regions.
500    opening_regions: OpeningRegionsRef,
501    /// Request sender.
502    sender: Sender<WorkerRequestWithTime>,
503    /// Handle to the worker thread.
504    handle: Mutex<Option<JoinHandle<()>>>,
505    /// Whether to run the worker thread.
506    running: Arc<AtomicBool>,
507}
508
509impl RegionWorker {
510    /// Submits request to background worker thread.
511    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
512        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
513        let request_with_time = WorkerRequestWithTime::new(request);
514        if self.sender.send(request_with_time).await.is_err() {
515            warn!(
516                "Worker {} is already exited but the running flag is still true",
517                self.id
518            );
519            // Manually set the running flag to false to avoid printing more warning logs.
520            self.set_running(false);
521            return WorkerStoppedSnafu { id: self.id }.fail();
522        }
523
524        Ok(())
525    }
526
527    /// Stop the worker.
528    ///
529    /// This method waits until the worker thread exists.
530    async fn stop(&self) -> Result<()> {
531        let handle = self.handle.lock().await.take();
532        if let Some(handle) = handle {
533            info!("Stop region worker {}", self.id);
534
535            self.set_running(false);
536            if self
537                .sender
538                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
539                .await
540                .is_err()
541            {
542                warn!("Worker {} is already exited before stop", self.id);
543            }
544
545            handle.await.context(JoinSnafu)?;
546        }
547
548        Ok(())
549    }
550
551    /// Returns true if the worker is still running.
552    fn is_running(&self) -> bool {
553        self.running.load(Ordering::Relaxed)
554    }
555
556    /// Sets whether the worker is still running.
557    fn set_running(&self, value: bool) {
558        self.running.store(value, Ordering::Relaxed)
559    }
560
561    /// Returns true if the worker contains specific region.
562    fn is_region_exists(&self, region_id: RegionId) -> bool {
563        self.regions.is_region_exists(region_id)
564    }
565
566    /// Returns true if the region is opening.
567    fn is_region_opening(&self, region_id: RegionId) -> bool {
568        self.opening_regions.is_region_exists(region_id)
569    }
570
571    /// Returns region of specific `region_id`.
572    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
573        self.regions.get_region(region_id)
574    }
575
576    #[cfg(test)]
577    /// Returns the [OpeningRegionsRef].
578    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
579        &self.opening_regions
580    }
581}
582
583impl Drop for RegionWorker {
584    fn drop(&mut self) {
585        if self.is_running() {
586            self.set_running(false);
587            // Once we drop the sender, the worker thread will receive a disconnected error.
588        }
589    }
590}
591
592type RequestBuffer = Vec<WorkerRequest>;
593
594/// Buffer for stalled write requests.
595///
596/// Maintains stalled write requests and their estimated size.
597#[derive(Default)]
598pub(crate) struct StalledRequests {
599    /// Stalled requests.
600    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
601    /// instead of `StalledRequests::requests.len()`.
602    ///
603    /// Key: RegionId
604    /// Value: (estimated size, stalled requests)
605    pub(crate) requests:
606        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
607    /// Estimated size of all stalled requests.
608    pub(crate) estimated_size: usize,
609}
610
611impl StalledRequests {
612    /// Appends stalled requests.
613    pub(crate) fn append(
614        &mut self,
615        requests: &mut Vec<SenderWriteRequest>,
616        bulk_requests: &mut Vec<SenderBulkRequest>,
617    ) {
618        for req in requests.drain(..) {
619            self.push(req);
620        }
621        for req in bulk_requests.drain(..) {
622            self.push_bulk(req);
623        }
624    }
625
626    /// Pushes a stalled request to the buffer.
627    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
628        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
629        let req_size = req.request.estimated_size();
630        *size += req_size;
631        self.estimated_size += req_size;
632        requests.push(req);
633    }
634
635    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
636        let region_id = req.region_id;
637        let (size, _, requests) = self.requests.entry(region_id).or_default();
638        let req_size = req.request.estimated_size();
639        *size += req_size;
640        self.estimated_size += req_size;
641        requests.push(req);
642    }
643
644    /// Removes stalled requests of specific region.
645    pub(crate) fn remove(
646        &mut self,
647        region_id: &RegionId,
648    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
649        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
650            self.estimated_size -= size;
651            (write_reqs, bulk_reqs)
652        } else {
653            (vec![], vec![])
654        }
655    }
656
657    /// Returns the total number of all stalled requests.
658    pub(crate) fn stalled_count(&self) -> usize {
659        self.requests
660            .values()
661            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
662            .sum()
663    }
664}
665
666/// Background worker loop to handle requests.
667struct RegionWorkerLoop<S> {
668    /// Id of the worker.
669    id: WorkerId,
670    /// Engine config.
671    config: Arc<MitoConfig>,
672    /// Regions bound to the worker.
673    regions: RegionMapRef,
674    /// Regions that are not yet fully dropped.
675    dropping_regions: RegionMapRef,
676    /// Regions that are opening.
677    opening_regions: OpeningRegionsRef,
678    /// Request sender.
679    sender: Sender<WorkerRequestWithTime>,
680    /// Request receiver.
681    receiver: Receiver<WorkerRequestWithTime>,
682    /// WAL of the engine.
683    wal: Wal<S>,
684    /// Manages object stores for manifest and SSTs.
685    object_store_manager: ObjectStoreManagerRef,
686    /// Whether the worker thread is still running.
687    running: Arc<AtomicBool>,
688    /// Memtable builder provider for each region.
689    memtable_builder_provider: MemtableBuilderProvider,
690    /// Background purge job scheduler.
691    purge_scheduler: SchedulerRef,
692    /// Engine write buffer manager.
693    write_buffer_manager: WriteBufferManagerRef,
694    /// Schedules background flush requests.
695    flush_scheduler: FlushScheduler,
696    /// Scheduler for compaction tasks.
697    compaction_scheduler: CompactionScheduler,
698    /// Stalled write requests.
699    stalled_requests: StalledRequests,
700    /// Event listener for tests.
701    listener: WorkerListener,
702    /// Cache.
703    cache_manager: CacheManagerRef,
704    /// Puffin manager factory for index.
705    puffin_manager_factory: PuffinManagerFactory,
706    /// Intermediate manager for inverted index.
707    intermediate_manager: IntermediateManager,
708    /// Provider to get current time.
709    time_provider: TimeProviderRef,
710    /// Last time to check regions periodically.
711    last_periodical_check_millis: i64,
712    /// Watch channel sender to notify workers to handle stalled requests.
713    flush_sender: watch::Sender<()>,
714    /// Watch channel receiver to wait for background flush job.
715    flush_receiver: watch::Receiver<()>,
716    /// Gauge of stalling request count.
717    stalling_count: IntGauge,
718    /// Gauge of regions in the worker.
719    region_count: IntGauge,
720    /// Histogram of request wait time for this worker.
721    request_wait_time: Histogram,
722    /// Queues for region edit requests.
723    region_edit_queues: RegionEditQueues,
724    /// Database level metadata manager.
725    schema_metadata_manager: SchemaMetadataManagerRef,
726}
727
728impl<S: LogStore> RegionWorkerLoop<S> {
729    /// Starts the worker loop.
730    async fn run(&mut self) {
731        let init_check_delay = worker_init_check_delay();
732        info!(
733            "Start region worker thread {}, init_check_delay: {:?}",
734            self.id, init_check_delay
735        );
736        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
737
738        // Buffer to retrieve requests from receiver.
739        let mut write_req_buffer: Vec<SenderWriteRequest> =
740            Vec::with_capacity(self.config.worker_request_batch_size);
741        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
742            Vec::with_capacity(self.config.worker_request_batch_size);
743        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
744            Vec::with_capacity(self.config.worker_request_batch_size);
745        let mut general_req_buffer: Vec<WorkerRequest> =
746            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
747
748        while self.running.load(Ordering::Relaxed) {
749            // Clear the buffer before handling next batch of requests.
750            write_req_buffer.clear();
751            ddl_req_buffer.clear();
752            general_req_buffer.clear();
753
754            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
755            let sleep = tokio::time::sleep(max_wait_time);
756            tokio::pin!(sleep);
757
758            tokio::select! {
759                request_opt = self.receiver.recv() => {
760                    match request_opt {
761                        Some(request_with_time) => {
762                            // Observe the wait time
763                            let wait_time = request_with_time.created_at.elapsed();
764                            self.request_wait_time.observe(wait_time.as_secs_f64());
765
766                            match request_with_time.request {
767                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
768                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
769                                req => general_req_buffer.push(req),
770                            }
771                        },
772                        // The channel is disconnected.
773                        None => break,
774                    }
775                }
776                recv_res = self.flush_receiver.changed() => {
777                    if recv_res.is_err() {
778                        // The channel is disconnected.
779                        break;
780                    } else {
781                        // Also flush this worker if other workers trigger flush as this worker may have
782                        // a large memtable to flush. We may not have chance to flush that memtable if we
783                        // never write to this worker. So only flushing other workers may not release enough
784                        // memory.
785                        self.maybe_flush_worker();
786                        // A flush job is finished, handles stalled requests.
787                        self.handle_stalled_requests().await;
788                        continue;
789                    }
790                }
791                _ = &mut sleep => {
792                    // Timeout. Checks periodical tasks.
793                    self.handle_periodical_tasks();
794                    continue;
795                }
796            }
797
798            if self.flush_receiver.has_changed().unwrap_or(false) {
799                // Always checks whether we could process stalled requests to avoid a request
800                // hangs too long.
801                // If the channel is closed, do nothing.
802                self.handle_stalled_requests().await;
803            }
804
805            // Try to recv more requests from the channel.
806            for _ in 1..self.config.worker_request_batch_size {
807                // We have received one request so we start from 1.
808                match self.receiver.try_recv() {
809                    Ok(request_with_time) => {
810                        // Observe the wait time
811                        let wait_time = request_with_time.created_at.elapsed();
812                        self.request_wait_time.observe(wait_time.as_secs_f64());
813
814                        match request_with_time.request {
815                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
816                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
817                            req => general_req_buffer.push(req),
818                        }
819                    }
820                    // We still need to handle remaining requests.
821                    Err(_) => break,
822                }
823            }
824
825            self.listener.on_recv_requests(
826                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
827            );
828
829            self.handle_requests(
830                &mut write_req_buffer,
831                &mut ddl_req_buffer,
832                &mut general_req_buffer,
833                &mut bulk_req_buffer,
834            )
835            .await;
836
837            self.handle_periodical_tasks();
838        }
839
840        self.clean().await;
841
842        info!("Exit region worker thread {}", self.id);
843    }
844
845    /// Dispatches and processes requests.
846    ///
847    /// `buffer` should be empty.
848    async fn handle_requests(
849        &mut self,
850        write_requests: &mut Vec<SenderWriteRequest>,
851        ddl_requests: &mut Vec<SenderDdlRequest>,
852        general_requests: &mut Vec<WorkerRequest>,
853        bulk_requests: &mut Vec<SenderBulkRequest>,
854    ) {
855        for worker_req in general_requests.drain(..) {
856            match worker_req {
857                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
858                    // These requests are categorized into write_requests and ddl_requests.
859                    continue;
860                }
861                WorkerRequest::Background { region_id, notify } => {
862                    // For background notify, we handle it directly.
863                    self.handle_background_notify(region_id, notify).await;
864                }
865                WorkerRequest::SetRegionRoleStateGracefully {
866                    region_id,
867                    region_role_state,
868                    sender,
869                } => {
870                    self.set_role_state_gracefully(region_id, region_role_state, sender)
871                        .await;
872                }
873                WorkerRequest::EditRegion(request) => {
874                    self.handle_region_edit(request).await;
875                }
876                WorkerRequest::Stop => {
877                    debug_assert!(!self.running.load(Ordering::Relaxed));
878                }
879                WorkerRequest::SyncRegion(req) => {
880                    self.handle_region_sync(req).await;
881                }
882                WorkerRequest::BulkInserts {
883                    metadata,
884                    request,
885                    sender,
886                } => {
887                    if let Some(region_metadata) = metadata {
888                        self.handle_bulk_insert_batch(
889                            region_metadata,
890                            request,
891                            bulk_requests,
892                            sender,
893                        )
894                        .await;
895                    } else {
896                        error!("Cannot find region metadata for {}", request.region_id);
897                        sender.send(
898                            error::RegionNotFoundSnafu {
899                                region_id: request.region_id,
900                            }
901                            .fail(),
902                        );
903                    }
904                }
905            }
906        }
907
908        // Handles all write requests first. So we can alter regions without
909        // considering existing write requests.
910        self.handle_write_requests(write_requests, bulk_requests, true)
911            .await;
912
913        self.handle_ddl_requests(ddl_requests).await;
914    }
915
916    /// Takes and handles all ddl requests.
917    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
918        if ddl_requests.is_empty() {
919            return;
920        }
921
922        for ddl in ddl_requests.drain(..) {
923            let res = match ddl.request {
924                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
925                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
926                DdlRequest::Open((req, wal_entry_receiver)) => {
927                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
928                        .await;
929                    continue;
930                }
931                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
932                DdlRequest::Alter(req) => {
933                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
934                        .await;
935                    continue;
936                }
937                DdlRequest::Flush(req) => {
938                    self.handle_flush_request(ddl.region_id, req, ddl.sender)
939                        .await;
940                    continue;
941                }
942                DdlRequest::Compact(req) => {
943                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
944                        .await;
945                    continue;
946                }
947                DdlRequest::Truncate(_) => {
948                    self.handle_truncate_request(ddl.region_id, ddl.sender)
949                        .await;
950                    continue;
951                }
952                DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
953            };
954
955            ddl.sender.send(res);
956        }
957    }
958
959    /// Handle periodical tasks such as region auto flush.
960    fn handle_periodical_tasks(&mut self) {
961        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
962        if self
963            .time_provider
964            .elapsed_since(self.last_periodical_check_millis)
965            < interval
966        {
967            return;
968        }
969
970        self.last_periodical_check_millis = self.time_provider.current_time_millis();
971
972        if let Err(e) = self.flush_periodically() {
973            error!(e; "Failed to flush regions periodically");
974        }
975    }
976
977    /// Handles region background request
978    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
979        match notify {
980            BackgroundNotify::FlushFinished(req) => {
981                self.handle_flush_finished(region_id, req).await
982            }
983            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
984            BackgroundNotify::CompactionFinished(req) => {
985                self.handle_compaction_finished(region_id, req).await
986            }
987            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
988            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
989            BackgroundNotify::RegionChange(req) => {
990                self.handle_manifest_region_change_result(req).await
991            }
992            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
993        }
994    }
995
996    /// Handles `set_region_role_gracefully`.
997    async fn set_role_state_gracefully(
998        &mut self,
999        region_id: RegionId,
1000        region_role_state: SettableRegionRoleState,
1001        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1002    ) {
1003        if let Some(region) = self.regions.get_region(region_id) {
1004            // We need to do this in background as we need the manifest lock.
1005            common_runtime::spawn_global(async move {
1006                region.set_role_state_gracefully(region_role_state).await;
1007
1008                let last_entry_id = region.version_control.current().last_entry_id;
1009                let _ = sender.send(SetRegionRoleStateResponse::success(
1010                    SetRegionRoleStateSuccess::mito(last_entry_id),
1011                ));
1012            });
1013        } else {
1014            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1015        }
1016    }
1017}
1018
1019impl<S> RegionWorkerLoop<S> {
1020    /// Cleans up the worker.
1021    async fn clean(&self) {
1022        // Closes remaining regions.
1023        let regions = self.regions.list_regions();
1024        for region in regions {
1025            region.stop().await;
1026        }
1027
1028        self.regions.clear();
1029    }
1030
1031    /// Notifies the whole group that a flush job is finished so other
1032    /// workers can handle stalled requests.
1033    fn notify_group(&mut self) {
1034        // Notifies all receivers.
1035        let _ = self.flush_sender.send(());
1036        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1037        self.flush_receiver.borrow_and_update();
1038    }
1039}
1040
1041/// Wrapper that only calls event listener in tests.
1042#[derive(Default, Clone)]
1043pub(crate) struct WorkerListener {
1044    #[cfg(any(test, feature = "test"))]
1045    listener: Option<crate::engine::listener::EventListenerRef>,
1046}
1047
1048impl WorkerListener {
1049    #[cfg(any(test, feature = "test"))]
1050    pub(crate) fn new(
1051        listener: Option<crate::engine::listener::EventListenerRef>,
1052    ) -> WorkerListener {
1053        WorkerListener { listener }
1054    }
1055
1056    /// Flush is finished successfully.
1057    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1058        #[cfg(any(test, feature = "test"))]
1059        if let Some(listener) = &self.listener {
1060            listener.on_flush_success(region_id);
1061        }
1062        // Avoid compiler warning.
1063        let _ = region_id;
1064    }
1065
1066    /// Engine is stalled.
1067    pub(crate) fn on_write_stall(&self) {
1068        #[cfg(any(test, feature = "test"))]
1069        if let Some(listener) = &self.listener {
1070            listener.on_write_stall();
1071        }
1072    }
1073
1074    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1075        #[cfg(any(test, feature = "test"))]
1076        if let Some(listener) = &self.listener {
1077            listener.on_flush_begin(region_id).await;
1078        }
1079        // Avoid compiler warning.
1080        let _ = region_id;
1081    }
1082
1083    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1084        #[cfg(any(test, feature = "test"))]
1085        if let Some(listener) = &self.listener {
1086            return listener.on_later_drop_begin(region_id);
1087        }
1088        // Avoid compiler warning.
1089        let _ = region_id;
1090        None
1091    }
1092
1093    /// On later drop task is finished.
1094    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1095        #[cfg(any(test, feature = "test"))]
1096        if let Some(listener) = &self.listener {
1097            listener.on_later_drop_end(region_id, removed);
1098        }
1099        // Avoid compiler warning.
1100        let _ = region_id;
1101        let _ = removed;
1102    }
1103
1104    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1105        #[cfg(any(test, feature = "test"))]
1106        if let Some(listener) = &self.listener {
1107            listener.on_merge_ssts_finished(region_id).await;
1108        }
1109        // Avoid compiler warning.
1110        let _ = region_id;
1111    }
1112
1113    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1114        #[cfg(any(test, feature = "test"))]
1115        if let Some(listener) = &self.listener {
1116            listener.on_recv_requests(request_num);
1117        }
1118        // Avoid compiler warning.
1119        let _ = request_num;
1120    }
1121
1122    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1123        #[cfg(any(test, feature = "test"))]
1124        if let Some(listener) = &self.listener {
1125            listener.on_file_cache_filled(_file_id);
1126        }
1127    }
1128
1129    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1130        #[cfg(any(test, feature = "test"))]
1131        if let Some(listener) = &self.listener {
1132            listener.on_compaction_scheduled(_region_id);
1133        }
1134    }
1135
1136    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1137        #[cfg(any(test, feature = "test"))]
1138        if let Some(listener) = &self.listener {
1139            listener
1140                .on_notify_region_change_result_begin(_region_id)
1141                .await;
1142        }
1143    }
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148    use super::*;
1149    use crate::test_util::TestEnv;
1150
1151    #[test]
1152    fn test_region_id_to_index() {
1153        let num_workers = 4;
1154
1155        let region_id = RegionId::new(1, 2);
1156        let index = region_id_to_index(region_id, num_workers);
1157        assert_eq!(index, 3);
1158
1159        let region_id = RegionId::new(2, 3);
1160        let index = region_id_to_index(region_id, num_workers);
1161        assert_eq!(index, 1);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_worker_group_start_stop() {
1166        let env = TestEnv::with_prefix("group-stop").await;
1167        let group = env
1168            .create_worker_group(MitoConfig {
1169                num_workers: 4,
1170                ..Default::default()
1171            })
1172            .await;
1173
1174        group.stop().await.unwrap();
1175    }
1176}