mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_truncate;
28mod handle_write;
29
30use std::collections::HashMap;
31use std::path::Path;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::time::Duration;
35
36use common_base::Plugins;
37use common_error::ext::BoxedError;
38use common_meta::key::SchemaMetadataManagerRef;
39use common_runtime::JoinHandle;
40use common_telemetry::{error, info, warn};
41use futures::future::try_join_all;
42use object_store::manager::ObjectStoreManagerRef;
43use prometheus::{Histogram, IntGauge};
44use rand::{rng, Rng};
45use snafu::{ensure, ResultExt};
46use store_api::logstore::LogStore;
47use store_api::region_engine::{
48    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
49};
50use store_api::storage::RegionId;
51use tokio::sync::mpsc::{Receiver, Sender};
52use tokio::sync::{mpsc, oneshot, watch, Mutex};
53
54use crate::cache::write_cache::{WriteCache, WriteCacheRef};
55use crate::cache::{CacheManager, CacheManagerRef};
56use crate::compaction::CompactionScheduler;
57use crate::config::MitoConfig;
58use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
59use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
60use crate::memtable::MemtableBuilderProvider;
61use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
62use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
63use crate::request::{
64    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
65    WorkerRequest, WorkerRequestWithTime,
66};
67use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
68use crate::sst::file::FileId;
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::time_provider::{StdTimeProvider, TimeProviderRef};
73use crate::wal::Wal;
74use crate::worker::handle_manifest::RegionEditQueues;
75
76/// Identifier for a worker.
77pub(crate) type WorkerId = u32;
78
79pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
80
81/// Interval to check whether regions should flush.
82pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
83/// Max delay to check region periodical tasks.
84pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
85
86#[cfg_attr(doc, aquamarine::aquamarine)]
87/// A fixed size group of [RegionWorkers](RegionWorker).
88///
89/// A worker group binds each region to a specific [RegionWorker] and sends
90/// requests to region's dedicated worker.
91///
92/// ```mermaid
93/// graph LR
94///
95/// RegionRequest -- Route by region id --> Worker0 & Worker1
96///
97/// subgraph MitoEngine
98///     subgraph WorkerGroup
99///         Worker0["RegionWorker 0"]
100///         Worker1["RegionWorker 1"]
101///     end
102/// end
103///
104/// Chan0[" Request channel 0"]
105/// Chan1[" Request channel 1"]
106/// WorkerThread1["RegionWorkerLoop 1"]
107///
108/// subgraph WorkerThread0["RegionWorkerLoop 0"]
109///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
110///         Region0["Region 0"]
111///         Region2["Region 2"]
112///     end
113///     Buffer0["RequestBuffer"]
114///
115///     Buffer0 -- modify regions --> RegionMap
116/// end
117///
118/// Worker0 --> Chan0
119/// Worker1 --> Chan1
120/// Chan0 --> Buffer0
121/// Chan1 --> WorkerThread1
122/// ```
123pub(crate) struct WorkerGroup {
124    /// Workers of the group.
125    workers: Vec<RegionWorker>,
126    /// Flush background job pool.
127    flush_job_pool: SchedulerRef,
128    /// Compaction background job pool.
129    compact_job_pool: SchedulerRef,
130    /// Scheduler for file purgers.
131    purge_scheduler: SchedulerRef,
132    /// Cache.
133    cache_manager: CacheManagerRef,
134    /// File reference manager.
135    file_ref_manager: FileReferenceManagerRef,
136}
137
138impl WorkerGroup {
139    /// Starts a worker group.
140    ///
141    /// The number of workers should be power of two.
142    pub(crate) async fn start<S: LogStore>(
143        config: Arc<MitoConfig>,
144        log_store: Arc<S>,
145        object_store_manager: ObjectStoreManagerRef,
146        schema_metadata_manager: SchemaMetadataManagerRef,
147        file_ref_manager: FileReferenceManagerRef,
148        plugins: Plugins,
149    ) -> Result<WorkerGroup> {
150        let (flush_sender, flush_receiver) = watch::channel(());
151        let write_buffer_manager = Arc::new(
152            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
153                .with_notifier(flush_sender.clone()),
154        );
155        let puffin_manager_factory = PuffinManagerFactory::new(
156            &config.index.aux_path,
157            config.index.staging_size.as_bytes(),
158            Some(config.index.write_buffer_size.as_bytes() as _),
159            config.index.staging_ttl,
160        )
161        .await?;
162        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
163            .await?
164            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
165        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
166        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
167        // We use another scheduler to avoid purge jobs blocking other jobs.
168        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
169        let write_cache = write_cache_from_config(
170            &config,
171            puffin_manager_factory.clone(),
172            intermediate_manager.clone(),
173        )
174        .await?;
175        let cache_manager = Arc::new(
176            CacheManager::builder()
177                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
178                .vector_cache_size(config.vector_cache_size.as_bytes())
179                .page_cache_size(config.page_cache_size.as_bytes())
180                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
181                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
182                .index_content_size(config.index.content_cache_size.as_bytes())
183                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
184                .index_result_cache_size(config.index.result_cache_size.as_bytes())
185                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
186                .write_cache(write_cache)
187                .build(),
188        );
189        let time_provider = Arc::new(StdTimeProvider);
190
191        let workers = (0..config.num_workers)
192            .map(|id| {
193                WorkerStarter {
194                    id: id as WorkerId,
195                    config: config.clone(),
196                    log_store: log_store.clone(),
197                    object_store_manager: object_store_manager.clone(),
198                    write_buffer_manager: write_buffer_manager.clone(),
199                    flush_job_pool: flush_job_pool.clone(),
200                    compact_job_pool: compact_job_pool.clone(),
201                    purge_scheduler: purge_scheduler.clone(),
202                    listener: WorkerListener::default(),
203                    cache_manager: cache_manager.clone(),
204                    puffin_manager_factory: puffin_manager_factory.clone(),
205                    intermediate_manager: intermediate_manager.clone(),
206                    time_provider: time_provider.clone(),
207                    flush_sender: flush_sender.clone(),
208                    flush_receiver: flush_receiver.clone(),
209                    plugins: plugins.clone(),
210                    schema_metadata_manager: schema_metadata_manager.clone(),
211                    file_ref_manager: file_ref_manager.clone(),
212                }
213                .start()
214            })
215            .collect::<Result<Vec<_>>>()?;
216
217        Ok(WorkerGroup {
218            workers,
219            flush_job_pool,
220            compact_job_pool,
221            purge_scheduler,
222            cache_manager,
223            file_ref_manager,
224        })
225    }
226
227    /// Stops the worker group.
228    pub(crate) async fn stop(&self) -> Result<()> {
229        info!("Stop region worker group");
230
231        // TODO(yingwen): Do we need to stop gracefully?
232        // Stops the scheduler gracefully.
233        self.compact_job_pool.stop(true).await?;
234        // Stops the scheduler gracefully.
235        self.flush_job_pool.stop(true).await?;
236        // Stops the purge scheduler gracefully.
237        self.purge_scheduler.stop(true).await?;
238
239        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
240
241        Ok(())
242    }
243
244    /// Submits a request to a worker in the group.
245    pub(crate) async fn submit_to_worker(
246        &self,
247        region_id: RegionId,
248        request: WorkerRequest,
249    ) -> Result<()> {
250        self.worker(region_id).submit_request(request).await
251    }
252
253    /// Returns true if the specific region exists.
254    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
255        self.worker(region_id).is_region_exists(region_id)
256    }
257
258    /// Returns true if the specific region is opening.
259    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
260        self.worker(region_id).is_region_opening(region_id)
261    }
262
263    /// Returns region of specific `region_id`.
264    ///
265    /// This method should not be public.
266    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
267        self.worker(region_id).get_region(region_id)
268    }
269
270    /// Returns cache of the group.
271    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
272        self.cache_manager.clone()
273    }
274
275    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
276        self.file_ref_manager.clone()
277    }
278
279    /// Get worker for specific `region_id`.
280    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
281        let index = region_id_to_index(region_id, self.workers.len());
282
283        &self.workers[index]
284    }
285
286    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
287        self.workers
288            .iter()
289            .flat_map(|worker| worker.regions.list_regions())
290    }
291}
292
293// Tests methods.
294#[cfg(any(test, feature = "test"))]
295impl WorkerGroup {
296    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
297    ///
298    /// The number of workers should be power of two.
299    #[allow(clippy::too_many_arguments)]
300    pub(crate) async fn start_for_test<S: LogStore>(
301        config: Arc<MitoConfig>,
302        log_store: Arc<S>,
303        object_store_manager: ObjectStoreManagerRef,
304        write_buffer_manager: Option<WriteBufferManagerRef>,
305        listener: Option<crate::engine::listener::EventListenerRef>,
306        schema_metadata_manager: SchemaMetadataManagerRef,
307        file_ref_manager: FileReferenceManagerRef,
308        time_provider: TimeProviderRef,
309    ) -> Result<WorkerGroup> {
310        let (flush_sender, flush_receiver) = watch::channel(());
311        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
312            Arc::new(
313                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
314                    .with_notifier(flush_sender.clone()),
315            )
316        });
317        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
318        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
319        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
320        let puffin_manager_factory = PuffinManagerFactory::new(
321            &config.index.aux_path,
322            config.index.staging_size.as_bytes(),
323            Some(config.index.write_buffer_size.as_bytes() as _),
324            config.index.staging_ttl,
325        )
326        .await?;
327        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
328            .await?
329            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
330        let write_cache = write_cache_from_config(
331            &config,
332            puffin_manager_factory.clone(),
333            intermediate_manager.clone(),
334        )
335        .await?;
336        let cache_manager = Arc::new(
337            CacheManager::builder()
338                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
339                .vector_cache_size(config.vector_cache_size.as_bytes())
340                .page_cache_size(config.page_cache_size.as_bytes())
341                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
342                .write_cache(write_cache)
343                .build(),
344        );
345        let workers = (0..config.num_workers)
346            .map(|id| {
347                WorkerStarter {
348                    id: id as WorkerId,
349                    config: config.clone(),
350                    log_store: log_store.clone(),
351                    object_store_manager: object_store_manager.clone(),
352                    write_buffer_manager: write_buffer_manager.clone(),
353                    flush_job_pool: flush_job_pool.clone(),
354                    compact_job_pool: compact_job_pool.clone(),
355                    purge_scheduler: purge_scheduler.clone(),
356                    listener: WorkerListener::new(listener.clone()),
357                    cache_manager: cache_manager.clone(),
358                    puffin_manager_factory: puffin_manager_factory.clone(),
359                    intermediate_manager: intermediate_manager.clone(),
360                    time_provider: time_provider.clone(),
361                    flush_sender: flush_sender.clone(),
362                    flush_receiver: flush_receiver.clone(),
363                    plugins: Plugins::new(),
364                    schema_metadata_manager: schema_metadata_manager.clone(),
365                    file_ref_manager: file_ref_manager.clone(),
366                }
367                .start()
368            })
369            .collect::<Result<Vec<_>>>()?;
370
371        Ok(WorkerGroup {
372            workers,
373            flush_job_pool,
374            compact_job_pool,
375            purge_scheduler,
376            cache_manager,
377            file_ref_manager,
378        })
379    }
380
381    /// Returns the purge scheduler.
382    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
383        &self.purge_scheduler
384    }
385}
386
387fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
388    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
389        % num_workers
390}
391
392async fn write_cache_from_config(
393    config: &MitoConfig,
394    puffin_manager_factory: PuffinManagerFactory,
395    intermediate_manager: IntermediateManager,
396) -> Result<Option<WriteCacheRef>> {
397    if !config.enable_write_cache {
398        return Ok(None);
399    }
400
401    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
402        .await
403        .context(CreateDirSnafu {
404            dir: &config.write_cache_path,
405        })?;
406
407    let cache = WriteCache::new_fs(
408        &config.write_cache_path,
409        config.write_cache_size,
410        config.write_cache_ttl,
411        puffin_manager_factory,
412        intermediate_manager,
413    )
414    .await?;
415    Ok(Some(Arc::new(cache)))
416}
417
418/// Computes a initial check delay for a worker.
419pub(crate) fn worker_init_check_delay() -> Duration {
420    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
421    Duration::from_secs(init_check_delay)
422}
423
424/// Worker start config.
425struct WorkerStarter<S> {
426    id: WorkerId,
427    config: Arc<MitoConfig>,
428    log_store: Arc<S>,
429    object_store_manager: ObjectStoreManagerRef,
430    write_buffer_manager: WriteBufferManagerRef,
431    compact_job_pool: SchedulerRef,
432    flush_job_pool: SchedulerRef,
433    purge_scheduler: SchedulerRef,
434    listener: WorkerListener,
435    cache_manager: CacheManagerRef,
436    puffin_manager_factory: PuffinManagerFactory,
437    intermediate_manager: IntermediateManager,
438    time_provider: TimeProviderRef,
439    /// Watch channel sender to notify workers to handle stalled requests.
440    flush_sender: watch::Sender<()>,
441    /// Watch channel receiver to wait for background flush job.
442    flush_receiver: watch::Receiver<()>,
443    plugins: Plugins,
444    schema_metadata_manager: SchemaMetadataManagerRef,
445    file_ref_manager: FileReferenceManagerRef,
446}
447
448impl<S: LogStore> WorkerStarter<S> {
449    /// Starts a region worker and its background thread.
450    fn start(self) -> Result<RegionWorker> {
451        let regions = Arc::new(RegionMap::default());
452        let opening_regions = Arc::new(OpeningRegions::default());
453        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
454
455        let running = Arc::new(AtomicBool::new(true));
456        let now = self.time_provider.current_time_millis();
457        let id_string = self.id.to_string();
458        let mut worker_thread = RegionWorkerLoop {
459            id: self.id,
460            config: self.config.clone(),
461            regions: regions.clone(),
462            dropping_regions: Arc::new(RegionMap::default()),
463            opening_regions: opening_regions.clone(),
464            sender: sender.clone(),
465            receiver,
466            wal: Wal::new(self.log_store),
467            object_store_manager: self.object_store_manager.clone(),
468            running: running.clone(),
469            memtable_builder_provider: MemtableBuilderProvider::new(
470                Some(self.write_buffer_manager.clone()),
471                self.config.clone(),
472            ),
473            purge_scheduler: self.purge_scheduler.clone(),
474            write_buffer_manager: self.write_buffer_manager,
475            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
476            compaction_scheduler: CompactionScheduler::new(
477                self.compact_job_pool,
478                sender.clone(),
479                self.cache_manager.clone(),
480                self.config,
481                self.listener.clone(),
482                self.plugins.clone(),
483            ),
484            stalled_requests: StalledRequests::default(),
485            listener: self.listener,
486            cache_manager: self.cache_manager,
487            puffin_manager_factory: self.puffin_manager_factory,
488            intermediate_manager: self.intermediate_manager,
489            time_provider: self.time_provider,
490            last_periodical_check_millis: now,
491            flush_sender: self.flush_sender,
492            flush_receiver: self.flush_receiver,
493            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
494            region_count: REGION_COUNT.with_label_values(&[&id_string]),
495            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
496            region_edit_queues: RegionEditQueues::default(),
497            schema_metadata_manager: self.schema_metadata_manager,
498            file_ref_manager: self.file_ref_manager.clone(),
499        };
500        let handle = common_runtime::spawn_global(async move {
501            worker_thread.run().await;
502        });
503
504        Ok(RegionWorker {
505            id: self.id,
506            regions,
507            opening_regions,
508            sender,
509            handle: Mutex::new(Some(handle)),
510            running,
511        })
512    }
513}
514
515/// Worker to write and alter regions bound to it.
516pub(crate) struct RegionWorker {
517    /// Id of the worker.
518    id: WorkerId,
519    /// Regions bound to the worker.
520    regions: RegionMapRef,
521    /// The opening regions.
522    opening_regions: OpeningRegionsRef,
523    /// Request sender.
524    sender: Sender<WorkerRequestWithTime>,
525    /// Handle to the worker thread.
526    handle: Mutex<Option<JoinHandle<()>>>,
527    /// Whether to run the worker thread.
528    running: Arc<AtomicBool>,
529}
530
531impl RegionWorker {
532    /// Submits request to background worker thread.
533    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
534        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
535        let request_with_time = WorkerRequestWithTime::new(request);
536        if self.sender.send(request_with_time).await.is_err() {
537            warn!(
538                "Worker {} is already exited but the running flag is still true",
539                self.id
540            );
541            // Manually set the running flag to false to avoid printing more warning logs.
542            self.set_running(false);
543            return WorkerStoppedSnafu { id: self.id }.fail();
544        }
545
546        Ok(())
547    }
548
549    /// Stop the worker.
550    ///
551    /// This method waits until the worker thread exists.
552    async fn stop(&self) -> Result<()> {
553        let handle = self.handle.lock().await.take();
554        if let Some(handle) = handle {
555            info!("Stop region worker {}", self.id);
556
557            self.set_running(false);
558            if self
559                .sender
560                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
561                .await
562                .is_err()
563            {
564                warn!("Worker {} is already exited before stop", self.id);
565            }
566
567            handle.await.context(JoinSnafu)?;
568        }
569
570        Ok(())
571    }
572
573    /// Returns true if the worker is still running.
574    fn is_running(&self) -> bool {
575        self.running.load(Ordering::Relaxed)
576    }
577
578    /// Sets whether the worker is still running.
579    fn set_running(&self, value: bool) {
580        self.running.store(value, Ordering::Relaxed)
581    }
582
583    /// Returns true if the worker contains specific region.
584    fn is_region_exists(&self, region_id: RegionId) -> bool {
585        self.regions.is_region_exists(region_id)
586    }
587
588    /// Returns true if the region is opening.
589    fn is_region_opening(&self, region_id: RegionId) -> bool {
590        self.opening_regions.is_region_exists(region_id)
591    }
592
593    /// Returns region of specific `region_id`.
594    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
595        self.regions.get_region(region_id)
596    }
597
598    #[cfg(test)]
599    /// Returns the [OpeningRegionsRef].
600    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
601        &self.opening_regions
602    }
603}
604
605impl Drop for RegionWorker {
606    fn drop(&mut self) {
607        if self.is_running() {
608            self.set_running(false);
609            // Once we drop the sender, the worker thread will receive a disconnected error.
610        }
611    }
612}
613
614type RequestBuffer = Vec<WorkerRequest>;
615
616/// Buffer for stalled write requests.
617///
618/// Maintains stalled write requests and their estimated size.
619#[derive(Default)]
620pub(crate) struct StalledRequests {
621    /// Stalled requests.
622    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
623    /// instead of `StalledRequests::requests.len()`.
624    ///
625    /// Key: RegionId
626    /// Value: (estimated size, stalled requests)
627    pub(crate) requests:
628        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
629    /// Estimated size of all stalled requests.
630    pub(crate) estimated_size: usize,
631}
632
633impl StalledRequests {
634    /// Appends stalled requests.
635    pub(crate) fn append(
636        &mut self,
637        requests: &mut Vec<SenderWriteRequest>,
638        bulk_requests: &mut Vec<SenderBulkRequest>,
639    ) {
640        for req in requests.drain(..) {
641            self.push(req);
642        }
643        for req in bulk_requests.drain(..) {
644            self.push_bulk(req);
645        }
646    }
647
648    /// Pushes a stalled request to the buffer.
649    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
650        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
651        let req_size = req.request.estimated_size();
652        *size += req_size;
653        self.estimated_size += req_size;
654        requests.push(req);
655    }
656
657    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
658        let region_id = req.region_id;
659        let (size, _, requests) = self.requests.entry(region_id).or_default();
660        let req_size = req.request.estimated_size();
661        *size += req_size;
662        self.estimated_size += req_size;
663        requests.push(req);
664    }
665
666    /// Removes stalled requests of specific region.
667    pub(crate) fn remove(
668        &mut self,
669        region_id: &RegionId,
670    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
671        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
672            self.estimated_size -= size;
673            (write_reqs, bulk_reqs)
674        } else {
675            (vec![], vec![])
676        }
677    }
678
679    /// Returns the total number of all stalled requests.
680    pub(crate) fn stalled_count(&self) -> usize {
681        self.requests
682            .values()
683            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
684            .sum()
685    }
686}
687
688/// Background worker loop to handle requests.
689struct RegionWorkerLoop<S> {
690    /// Id of the worker.
691    id: WorkerId,
692    /// Engine config.
693    config: Arc<MitoConfig>,
694    /// Regions bound to the worker.
695    regions: RegionMapRef,
696    /// Regions that are not yet fully dropped.
697    dropping_regions: RegionMapRef,
698    /// Regions that are opening.
699    opening_regions: OpeningRegionsRef,
700    /// Request sender.
701    sender: Sender<WorkerRequestWithTime>,
702    /// Request receiver.
703    receiver: Receiver<WorkerRequestWithTime>,
704    /// WAL of the engine.
705    wal: Wal<S>,
706    /// Manages object stores for manifest and SSTs.
707    object_store_manager: ObjectStoreManagerRef,
708    /// Whether the worker thread is still running.
709    running: Arc<AtomicBool>,
710    /// Memtable builder provider for each region.
711    memtable_builder_provider: MemtableBuilderProvider,
712    /// Background purge job scheduler.
713    purge_scheduler: SchedulerRef,
714    /// Engine write buffer manager.
715    write_buffer_manager: WriteBufferManagerRef,
716    /// Schedules background flush requests.
717    flush_scheduler: FlushScheduler,
718    /// Scheduler for compaction tasks.
719    compaction_scheduler: CompactionScheduler,
720    /// Stalled write requests.
721    stalled_requests: StalledRequests,
722    /// Event listener for tests.
723    listener: WorkerListener,
724    /// Cache.
725    cache_manager: CacheManagerRef,
726    /// Puffin manager factory for index.
727    puffin_manager_factory: PuffinManagerFactory,
728    /// Intermediate manager for inverted index.
729    intermediate_manager: IntermediateManager,
730    /// Provider to get current time.
731    time_provider: TimeProviderRef,
732    /// Last time to check regions periodically.
733    last_periodical_check_millis: i64,
734    /// Watch channel sender to notify workers to handle stalled requests.
735    flush_sender: watch::Sender<()>,
736    /// Watch channel receiver to wait for background flush job.
737    flush_receiver: watch::Receiver<()>,
738    /// Gauge of stalling request count.
739    stalling_count: IntGauge,
740    /// Gauge of regions in the worker.
741    region_count: IntGauge,
742    /// Histogram of request wait time for this worker.
743    request_wait_time: Histogram,
744    /// Queues for region edit requests.
745    region_edit_queues: RegionEditQueues,
746    /// Database level metadata manager.
747    schema_metadata_manager: SchemaMetadataManagerRef,
748    /// Datanode level file references manager.
749    file_ref_manager: FileReferenceManagerRef,
750}
751
752impl<S: LogStore> RegionWorkerLoop<S> {
753    /// Starts the worker loop.
754    async fn run(&mut self) {
755        let init_check_delay = worker_init_check_delay();
756        info!(
757            "Start region worker thread {}, init_check_delay: {:?}",
758            self.id, init_check_delay
759        );
760        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
761
762        // Buffer to retrieve requests from receiver.
763        let mut write_req_buffer: Vec<SenderWriteRequest> =
764            Vec::with_capacity(self.config.worker_request_batch_size);
765        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
766            Vec::with_capacity(self.config.worker_request_batch_size);
767        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
768            Vec::with_capacity(self.config.worker_request_batch_size);
769        let mut general_req_buffer: Vec<WorkerRequest> =
770            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
771
772        while self.running.load(Ordering::Relaxed) {
773            // Clear the buffer before handling next batch of requests.
774            write_req_buffer.clear();
775            ddl_req_buffer.clear();
776            general_req_buffer.clear();
777
778            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
779            let sleep = tokio::time::sleep(max_wait_time);
780            tokio::pin!(sleep);
781
782            tokio::select! {
783                request_opt = self.receiver.recv() => {
784                    match request_opt {
785                        Some(request_with_time) => {
786                            // Observe the wait time
787                            let wait_time = request_with_time.created_at.elapsed();
788                            self.request_wait_time.observe(wait_time.as_secs_f64());
789
790                            match request_with_time.request {
791                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
792                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
793                                req => general_req_buffer.push(req),
794                            }
795                        },
796                        // The channel is disconnected.
797                        None => break,
798                    }
799                }
800                recv_res = self.flush_receiver.changed() => {
801                    if recv_res.is_err() {
802                        // The channel is disconnected.
803                        break;
804                    } else {
805                        // Also flush this worker if other workers trigger flush as this worker may have
806                        // a large memtable to flush. We may not have chance to flush that memtable if we
807                        // never write to this worker. So only flushing other workers may not release enough
808                        // memory.
809                        self.maybe_flush_worker();
810                        // A flush job is finished, handles stalled requests.
811                        self.handle_stalled_requests().await;
812                        continue;
813                    }
814                }
815                _ = &mut sleep => {
816                    // Timeout. Checks periodical tasks.
817                    self.handle_periodical_tasks();
818                    continue;
819                }
820            }
821
822            if self.flush_receiver.has_changed().unwrap_or(false) {
823                // Always checks whether we could process stalled requests to avoid a request
824                // hangs too long.
825                // If the channel is closed, do nothing.
826                self.handle_stalled_requests().await;
827            }
828
829            // Try to recv more requests from the channel.
830            for _ in 1..self.config.worker_request_batch_size {
831                // We have received one request so we start from 1.
832                match self.receiver.try_recv() {
833                    Ok(request_with_time) => {
834                        // Observe the wait time
835                        let wait_time = request_with_time.created_at.elapsed();
836                        self.request_wait_time.observe(wait_time.as_secs_f64());
837
838                        match request_with_time.request {
839                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
840                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
841                            req => general_req_buffer.push(req),
842                        }
843                    }
844                    // We still need to handle remaining requests.
845                    Err(_) => break,
846                }
847            }
848
849            self.listener.on_recv_requests(
850                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
851            );
852
853            self.handle_requests(
854                &mut write_req_buffer,
855                &mut ddl_req_buffer,
856                &mut general_req_buffer,
857                &mut bulk_req_buffer,
858            )
859            .await;
860
861            self.handle_periodical_tasks();
862        }
863
864        self.clean().await;
865
866        info!("Exit region worker thread {}", self.id);
867    }
868
869    /// Dispatches and processes requests.
870    ///
871    /// `buffer` should be empty.
872    async fn handle_requests(
873        &mut self,
874        write_requests: &mut Vec<SenderWriteRequest>,
875        ddl_requests: &mut Vec<SenderDdlRequest>,
876        general_requests: &mut Vec<WorkerRequest>,
877        bulk_requests: &mut Vec<SenderBulkRequest>,
878    ) {
879        for worker_req in general_requests.drain(..) {
880            match worker_req {
881                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
882                    // These requests are categorized into write_requests and ddl_requests.
883                    continue;
884                }
885                WorkerRequest::Background { region_id, notify } => {
886                    // For background notify, we handle it directly.
887                    self.handle_background_notify(region_id, notify).await;
888                }
889                WorkerRequest::SetRegionRoleStateGracefully {
890                    region_id,
891                    region_role_state,
892                    sender,
893                } => {
894                    self.set_role_state_gracefully(region_id, region_role_state, sender)
895                        .await;
896                }
897                WorkerRequest::EditRegion(request) => {
898                    self.handle_region_edit(request).await;
899                }
900                WorkerRequest::Stop => {
901                    debug_assert!(!self.running.load(Ordering::Relaxed));
902                }
903                WorkerRequest::SyncRegion(req) => {
904                    self.handle_region_sync(req).await;
905                }
906                WorkerRequest::BulkInserts {
907                    metadata,
908                    request,
909                    sender,
910                } => {
911                    if let Some(region_metadata) = metadata {
912                        self.handle_bulk_insert_batch(
913                            region_metadata,
914                            request,
915                            bulk_requests,
916                            sender,
917                        )
918                        .await;
919                    } else {
920                        error!("Cannot find region metadata for {}", request.region_id);
921                        sender.send(
922                            error::RegionNotFoundSnafu {
923                                region_id: request.region_id,
924                            }
925                            .fail(),
926                        );
927                    }
928                }
929            }
930        }
931
932        // Handles all write requests first. So we can alter regions without
933        // considering existing write requests.
934        self.handle_write_requests(write_requests, bulk_requests, true)
935            .await;
936
937        self.handle_ddl_requests(ddl_requests).await;
938    }
939
940    /// Takes and handles all ddl requests.
941    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
942        if ddl_requests.is_empty() {
943            return;
944        }
945
946        for ddl in ddl_requests.drain(..) {
947            let res = match ddl.request {
948                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
949                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
950                DdlRequest::Open((req, wal_entry_receiver)) => {
951                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
952                        .await;
953                    continue;
954                }
955                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
956                DdlRequest::Alter(req) => {
957                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
958                        .await;
959                    continue;
960                }
961                DdlRequest::Flush(req) => {
962                    self.handle_flush_request(ddl.region_id, req, ddl.sender)
963                        .await;
964                    continue;
965                }
966                DdlRequest::Compact(req) => {
967                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
968                        .await;
969                    continue;
970                }
971                DdlRequest::Truncate(req) => {
972                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
973                        .await;
974                    continue;
975                }
976                DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
977            };
978
979            ddl.sender.send(res);
980        }
981    }
982
983    /// Handle periodical tasks such as region auto flush.
984    fn handle_periodical_tasks(&mut self) {
985        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
986        if self
987            .time_provider
988            .elapsed_since(self.last_periodical_check_millis)
989            < interval
990        {
991            return;
992        }
993
994        self.last_periodical_check_millis = self.time_provider.current_time_millis();
995
996        if let Err(e) = self.flush_periodically() {
997            error!(e; "Failed to flush regions periodically");
998        }
999    }
1000
1001    /// Handles region background request
1002    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1003        match notify {
1004            BackgroundNotify::FlushFinished(req) => {
1005                self.handle_flush_finished(region_id, req).await
1006            }
1007            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1008            BackgroundNotify::CompactionFinished(req) => {
1009                self.handle_compaction_finished(region_id, req).await
1010            }
1011            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1012            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1013            BackgroundNotify::RegionChange(req) => {
1014                self.handle_manifest_region_change_result(req).await
1015            }
1016            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1017        }
1018    }
1019
1020    /// Handles `set_region_role_gracefully`.
1021    async fn set_role_state_gracefully(
1022        &mut self,
1023        region_id: RegionId,
1024        region_role_state: SettableRegionRoleState,
1025        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1026    ) {
1027        if let Some(region) = self.regions.get_region(region_id) {
1028            // We need to do this in background as we need the manifest lock.
1029            common_runtime::spawn_global(async move {
1030                match region.set_role_state_gracefully(region_role_state).await {
1031                    Ok(()) => {
1032                        let last_entry_id = region.version_control.current().last_entry_id;
1033                        let _ = sender.send(SetRegionRoleStateResponse::success(
1034                            SetRegionRoleStateSuccess::mito(last_entry_id),
1035                        ));
1036                    }
1037                    Err(e) => {
1038                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1039                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1040                            BoxedError::new(e),
1041                        ));
1042                    }
1043                }
1044            });
1045        } else {
1046            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1047        }
1048    }
1049}
1050
1051impl<S> RegionWorkerLoop<S> {
1052    /// Cleans up the worker.
1053    async fn clean(&self) {
1054        // Closes remaining regions.
1055        let regions = self.regions.list_regions();
1056        for region in regions {
1057            region.stop().await;
1058        }
1059
1060        self.regions.clear();
1061    }
1062
1063    /// Notifies the whole group that a flush job is finished so other
1064    /// workers can handle stalled requests.
1065    fn notify_group(&mut self) {
1066        // Notifies all receivers.
1067        let _ = self.flush_sender.send(());
1068        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1069        self.flush_receiver.borrow_and_update();
1070    }
1071}
1072
1073/// Wrapper that only calls event listener in tests.
1074#[derive(Default, Clone)]
1075pub(crate) struct WorkerListener {
1076    #[cfg(any(test, feature = "test"))]
1077    listener: Option<crate::engine::listener::EventListenerRef>,
1078}
1079
1080impl WorkerListener {
1081    #[cfg(any(test, feature = "test"))]
1082    pub(crate) fn new(
1083        listener: Option<crate::engine::listener::EventListenerRef>,
1084    ) -> WorkerListener {
1085        WorkerListener { listener }
1086    }
1087
1088    /// Flush is finished successfully.
1089    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1090        #[cfg(any(test, feature = "test"))]
1091        if let Some(listener) = &self.listener {
1092            listener.on_flush_success(region_id);
1093        }
1094        // Avoid compiler warning.
1095        let _ = region_id;
1096    }
1097
1098    /// Engine is stalled.
1099    pub(crate) fn on_write_stall(&self) {
1100        #[cfg(any(test, feature = "test"))]
1101        if let Some(listener) = &self.listener {
1102            listener.on_write_stall();
1103        }
1104    }
1105
1106    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1107        #[cfg(any(test, feature = "test"))]
1108        if let Some(listener) = &self.listener {
1109            listener.on_flush_begin(region_id).await;
1110        }
1111        // Avoid compiler warning.
1112        let _ = region_id;
1113    }
1114
1115    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1116        #[cfg(any(test, feature = "test"))]
1117        if let Some(listener) = &self.listener {
1118            return listener.on_later_drop_begin(region_id);
1119        }
1120        // Avoid compiler warning.
1121        let _ = region_id;
1122        None
1123    }
1124
1125    /// On later drop task is finished.
1126    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1127        #[cfg(any(test, feature = "test"))]
1128        if let Some(listener) = &self.listener {
1129            listener.on_later_drop_end(region_id, removed);
1130        }
1131        // Avoid compiler warning.
1132        let _ = region_id;
1133        let _ = removed;
1134    }
1135
1136    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1137        #[cfg(any(test, feature = "test"))]
1138        if let Some(listener) = &self.listener {
1139            listener.on_merge_ssts_finished(region_id).await;
1140        }
1141        // Avoid compiler warning.
1142        let _ = region_id;
1143    }
1144
1145    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1146        #[cfg(any(test, feature = "test"))]
1147        if let Some(listener) = &self.listener {
1148            listener.on_recv_requests(request_num);
1149        }
1150        // Avoid compiler warning.
1151        let _ = request_num;
1152    }
1153
1154    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1155        #[cfg(any(test, feature = "test"))]
1156        if let Some(listener) = &self.listener {
1157            listener.on_file_cache_filled(_file_id);
1158        }
1159    }
1160
1161    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1162        #[cfg(any(test, feature = "test"))]
1163        if let Some(listener) = &self.listener {
1164            listener.on_compaction_scheduled(_region_id);
1165        }
1166    }
1167
1168    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1169        #[cfg(any(test, feature = "test"))]
1170        if let Some(listener) = &self.listener {
1171            listener
1172                .on_notify_region_change_result_begin(_region_id)
1173                .await;
1174        }
1175    }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use super::*;
1181    use crate::test_util::TestEnv;
1182
1183    #[test]
1184    fn test_region_id_to_index() {
1185        let num_workers = 4;
1186
1187        let region_id = RegionId::new(1, 2);
1188        let index = region_id_to_index(region_id, num_workers);
1189        assert_eq!(index, 3);
1190
1191        let region_id = RegionId::new(2, 3);
1192        let index = region_id_to_index(region_id, num_workers);
1193        assert_eq!(index, 1);
1194    }
1195
1196    #[tokio::test]
1197    async fn test_worker_group_start_stop() {
1198        let env = TestEnv::with_prefix("group-stop").await;
1199        let group = env
1200            .create_worker_group(MitoConfig {
1201                num_workers: 4,
1202                ..Default::default()
1203            })
1204            .await;
1205
1206        group.stop().await.unwrap();
1207    }
1208}