mito2/
worker.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs and utilities for writing regions.
16
17mod handle_alter;
18mod handle_bulk_insert;
19mod handle_catchup;
20mod handle_close;
21mod handle_compaction;
22mod handle_create;
23mod handle_drop;
24mod handle_flush;
25mod handle_manifest;
26mod handle_open;
27mod handle_rebuild_index;
28mod handle_truncate;
29mod handle_write;
30
31use std::collections::HashMap;
32use std::path::Path;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36
37use common_base::Plugins;
38use common_error::ext::BoxedError;
39use common_meta::key::SchemaMetadataManagerRef;
40use common_runtime::JoinHandle;
41use common_telemetry::{error, info, warn};
42use futures::future::try_join_all;
43use object_store::manager::ObjectStoreManagerRef;
44use prometheus::{Histogram, IntGauge};
45use rand::{Rng, rng};
46use snafu::{ResultExt, ensure};
47use store_api::logstore::LogStore;
48use store_api::region_engine::{
49    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
50};
51use store_api::storage::{FileId, RegionId};
52use tokio::sync::mpsc::{Receiver, Sender};
53use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
54
55use crate::cache::write_cache::{WriteCache, WriteCacheRef};
56use crate::cache::{CacheManager, CacheManagerRef};
57use crate::compaction::CompactionScheduler;
58use crate::config::MitoConfig;
59use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
60use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
61use crate::memtable::MemtableBuilderProvider;
62use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
63use crate::region::opener::PartitionExprFetcherRef;
64use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
65use crate::request::{
66    BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
67    WorkerRequest, WorkerRequestWithTime,
68};
69use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
70use crate::sst::file_ref::FileReferenceManagerRef;
71use crate::sst::index::IndexBuildScheduler;
72use crate::sst::index::intermediate::IntermediateManager;
73use crate::sst::index::puffin_manager::PuffinManagerFactory;
74use crate::time_provider::{StdTimeProvider, TimeProviderRef};
75use crate::wal::Wal;
76use crate::worker::handle_manifest::RegionEditQueues;
77
78/// Identifier for a worker.
79pub(crate) type WorkerId = u32;
80
81pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
82
83/// Interval to check whether regions should flush.
84pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
85/// Max delay to check region periodical tasks.
86pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
87
88#[cfg_attr(doc, aquamarine::aquamarine)]
89/// A fixed size group of [RegionWorkers](RegionWorker).
90///
91/// A worker group binds each region to a specific [RegionWorker] and sends
92/// requests to region's dedicated worker.
93///
94/// ```mermaid
95/// graph LR
96///
97/// RegionRequest -- Route by region id --> Worker0 & Worker1
98///
99/// subgraph MitoEngine
100///     subgraph WorkerGroup
101///         Worker0["RegionWorker 0"]
102///         Worker1["RegionWorker 1"]
103///     end
104/// end
105///
106/// Chan0[" Request channel 0"]
107/// Chan1[" Request channel 1"]
108/// WorkerThread1["RegionWorkerLoop 1"]
109///
110/// subgraph WorkerThread0["RegionWorkerLoop 0"]
111///     subgraph RegionMap["RegionMap (regions bound to worker 0)"]
112///         Region0["Region 0"]
113///         Region2["Region 2"]
114///     end
115///     Buffer0["RequestBuffer"]
116///
117///     Buffer0 -- modify regions --> RegionMap
118/// end
119///
120/// Worker0 --> Chan0
121/// Worker1 --> Chan1
122/// Chan0 --> Buffer0
123/// Chan1 --> WorkerThread1
124/// ```
125pub(crate) struct WorkerGroup {
126    /// Workers of the group.
127    workers: Vec<RegionWorker>,
128    /// Flush background job pool.
129    flush_job_pool: SchedulerRef,
130    /// Compaction background job pool.
131    compact_job_pool: SchedulerRef,
132    /// Scheduler for index build jobs.
133    index_build_job_pool: SchedulerRef,
134    /// Scheduler for file purgers.
135    purge_scheduler: SchedulerRef,
136    /// Cache.
137    cache_manager: CacheManagerRef,
138    /// File reference manager.
139    file_ref_manager: FileReferenceManagerRef,
140}
141
142impl WorkerGroup {
143    /// Starts a worker group.
144    ///
145    /// The number of workers should be power of two.
146    pub(crate) async fn start<S: LogStore>(
147        config: Arc<MitoConfig>,
148        log_store: Arc<S>,
149        object_store_manager: ObjectStoreManagerRef,
150        schema_metadata_manager: SchemaMetadataManagerRef,
151        file_ref_manager: FileReferenceManagerRef,
152        partition_expr_fetcher: PartitionExprFetcherRef,
153        plugins: Plugins,
154    ) -> Result<WorkerGroup> {
155        let (flush_sender, flush_receiver) = watch::channel(());
156        let write_buffer_manager = Arc::new(
157            WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
158                .with_notifier(flush_sender.clone()),
159        );
160        let puffin_manager_factory = PuffinManagerFactory::new(
161            &config.index.aux_path,
162            config.index.staging_size.as_bytes(),
163            Some(config.index.write_buffer_size.as_bytes() as _),
164            config.index.staging_ttl,
165        )
166        .await?;
167        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
168            .await?
169            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
170        let index_build_job_pool =
171            Arc::new(LocalScheduler::new(config.max_background_index_builds));
172        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
173        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
174        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
175        // We use another scheduler to avoid purge jobs blocking other jobs.
176        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
177        let write_cache = write_cache_from_config(
178            &config,
179            puffin_manager_factory.clone(),
180            intermediate_manager.clone(),
181        )
182        .await?;
183        let cache_manager = Arc::new(
184            CacheManager::builder()
185                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
186                .vector_cache_size(config.vector_cache_size.as_bytes())
187                .page_cache_size(config.page_cache_size.as_bytes())
188                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
189                .index_metadata_size(config.index.metadata_cache_size.as_bytes())
190                .index_content_size(config.index.content_cache_size.as_bytes())
191                .index_content_page_size(config.index.content_cache_page_size.as_bytes())
192                .index_result_cache_size(config.index.result_cache_size.as_bytes())
193                .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
194                .write_cache(write_cache)
195                .build(),
196        );
197        let time_provider = Arc::new(StdTimeProvider);
198
199        let workers = (0..config.num_workers)
200            .map(|id| {
201                WorkerStarter {
202                    id: id as WorkerId,
203                    config: config.clone(),
204                    log_store: log_store.clone(),
205                    object_store_manager: object_store_manager.clone(),
206                    write_buffer_manager: write_buffer_manager.clone(),
207                    index_build_job_pool: index_build_job_pool.clone(),
208                    flush_job_pool: flush_job_pool.clone(),
209                    compact_job_pool: compact_job_pool.clone(),
210                    purge_scheduler: purge_scheduler.clone(),
211                    listener: WorkerListener::default(),
212                    cache_manager: cache_manager.clone(),
213                    puffin_manager_factory: puffin_manager_factory.clone(),
214                    intermediate_manager: intermediate_manager.clone(),
215                    time_provider: time_provider.clone(),
216                    flush_sender: flush_sender.clone(),
217                    flush_receiver: flush_receiver.clone(),
218                    plugins: plugins.clone(),
219                    schema_metadata_manager: schema_metadata_manager.clone(),
220                    file_ref_manager: file_ref_manager.clone(),
221                    partition_expr_fetcher: partition_expr_fetcher.clone(),
222                    flush_semaphore: flush_semaphore.clone(),
223                }
224                .start()
225            })
226            .collect::<Result<Vec<_>>>()?;
227
228        Ok(WorkerGroup {
229            workers,
230            flush_job_pool,
231            compact_job_pool,
232            index_build_job_pool,
233            purge_scheduler,
234            cache_manager,
235            file_ref_manager,
236        })
237    }
238
239    /// Stops the worker group.
240    pub(crate) async fn stop(&self) -> Result<()> {
241        info!("Stop region worker group");
242
243        // TODO(yingwen): Do we need to stop gracefully?
244        // Stops the scheduler gracefully.
245        self.compact_job_pool.stop(true).await?;
246        // Stops the scheduler gracefully.
247        self.flush_job_pool.stop(true).await?;
248        // Stops the purge scheduler gracefully.
249        self.purge_scheduler.stop(true).await?;
250        // Stops the index build job pool gracefully.
251        self.index_build_job_pool.stop(true).await?;
252
253        try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
254
255        Ok(())
256    }
257
258    /// Submits a request to a worker in the group.
259    pub(crate) async fn submit_to_worker(
260        &self,
261        region_id: RegionId,
262        request: WorkerRequest,
263    ) -> Result<()> {
264        self.worker(region_id).submit_request(request).await
265    }
266
267    /// Returns true if the specific region exists.
268    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
269        self.worker(region_id).is_region_exists(region_id)
270    }
271
272    /// Returns true if the specific region is opening.
273    pub(crate) fn is_region_opening(&self, region_id: RegionId) -> bool {
274        self.worker(region_id).is_region_opening(region_id)
275    }
276
277    /// Returns region of specific `region_id`.
278    ///
279    /// This method should not be public.
280    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
281        self.worker(region_id).get_region(region_id)
282    }
283
284    /// Returns cache of the group.
285    pub(crate) fn cache_manager(&self) -> CacheManagerRef {
286        self.cache_manager.clone()
287    }
288
289    pub(crate) fn file_ref_manager(&self) -> FileReferenceManagerRef {
290        self.file_ref_manager.clone()
291    }
292
293    /// Get worker for specific `region_id`.
294    pub(crate) fn worker(&self, region_id: RegionId) -> &RegionWorker {
295        let index = region_id_to_index(region_id, self.workers.len());
296
297        &self.workers[index]
298    }
299
300    pub(crate) fn all_regions(&self) -> impl Iterator<Item = MitoRegionRef> + use<'_> {
301        self.workers
302            .iter()
303            .flat_map(|worker| worker.regions.list_regions())
304    }
305}
306
307// Tests methods.
308#[cfg(any(test, feature = "test"))]
309impl WorkerGroup {
310    /// Starts a worker group with `write_buffer_manager` and `listener` for tests.
311    ///
312    /// The number of workers should be power of two.
313    #[allow(clippy::too_many_arguments)]
314    pub(crate) async fn start_for_test<S: LogStore>(
315        config: Arc<MitoConfig>,
316        log_store: Arc<S>,
317        object_store_manager: ObjectStoreManagerRef,
318        write_buffer_manager: Option<WriteBufferManagerRef>,
319        listener: Option<crate::engine::listener::EventListenerRef>,
320        schema_metadata_manager: SchemaMetadataManagerRef,
321        file_ref_manager: FileReferenceManagerRef,
322        time_provider: TimeProviderRef,
323        partition_expr_fetcher: PartitionExprFetcherRef,
324    ) -> Result<WorkerGroup> {
325        let (flush_sender, flush_receiver) = watch::channel(());
326        let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
327            Arc::new(
328                WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
329                    .with_notifier(flush_sender.clone()),
330            )
331        });
332        let index_build_job_pool =
333            Arc::new(LocalScheduler::new(config.max_background_index_builds));
334        let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
335        let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
336        let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
337        let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
338        let puffin_manager_factory = PuffinManagerFactory::new(
339            &config.index.aux_path,
340            config.index.staging_size.as_bytes(),
341            Some(config.index.write_buffer_size.as_bytes() as _),
342            config.index.staging_ttl,
343        )
344        .await?;
345        let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
346            .await?
347            .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
348        let write_cache = write_cache_from_config(
349            &config,
350            puffin_manager_factory.clone(),
351            intermediate_manager.clone(),
352        )
353        .await?;
354        let cache_manager = Arc::new(
355            CacheManager::builder()
356                .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
357                .vector_cache_size(config.vector_cache_size.as_bytes())
358                .page_cache_size(config.page_cache_size.as_bytes())
359                .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
360                .write_cache(write_cache)
361                .build(),
362        );
363        let workers = (0..config.num_workers)
364            .map(|id| {
365                WorkerStarter {
366                    id: id as WorkerId,
367                    config: config.clone(),
368                    log_store: log_store.clone(),
369                    object_store_manager: object_store_manager.clone(),
370                    write_buffer_manager: write_buffer_manager.clone(),
371                    index_build_job_pool: index_build_job_pool.clone(),
372                    flush_job_pool: flush_job_pool.clone(),
373                    compact_job_pool: compact_job_pool.clone(),
374                    purge_scheduler: purge_scheduler.clone(),
375                    listener: WorkerListener::new(listener.clone()),
376                    cache_manager: cache_manager.clone(),
377                    puffin_manager_factory: puffin_manager_factory.clone(),
378                    intermediate_manager: intermediate_manager.clone(),
379                    time_provider: time_provider.clone(),
380                    flush_sender: flush_sender.clone(),
381                    flush_receiver: flush_receiver.clone(),
382                    plugins: Plugins::new(),
383                    schema_metadata_manager: schema_metadata_manager.clone(),
384                    file_ref_manager: file_ref_manager.clone(),
385                    partition_expr_fetcher: partition_expr_fetcher.clone(),
386                    flush_semaphore: flush_semaphore.clone(),
387                }
388                .start()
389            })
390            .collect::<Result<Vec<_>>>()?;
391
392        Ok(WorkerGroup {
393            workers,
394            flush_job_pool,
395            compact_job_pool,
396            index_build_job_pool,
397            purge_scheduler,
398            cache_manager,
399            file_ref_manager,
400        })
401    }
402
403    /// Returns the purge scheduler.
404    pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
405        &self.purge_scheduler
406    }
407}
408
409fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
410    ((id.table_id() as usize % num_workers) + (id.region_number() as usize % num_workers))
411        % num_workers
412}
413
414async fn write_cache_from_config(
415    config: &MitoConfig,
416    puffin_manager_factory: PuffinManagerFactory,
417    intermediate_manager: IntermediateManager,
418) -> Result<Option<WriteCacheRef>> {
419    if !config.enable_write_cache {
420        return Ok(None);
421    }
422
423    tokio::fs::create_dir_all(Path::new(&config.write_cache_path))
424        .await
425        .context(CreateDirSnafu {
426            dir: &config.write_cache_path,
427        })?;
428
429    let cache = WriteCache::new_fs(
430        &config.write_cache_path,
431        config.write_cache_size,
432        config.write_cache_ttl,
433        puffin_manager_factory,
434        intermediate_manager,
435    )
436    .await?;
437    Ok(Some(Arc::new(cache)))
438}
439
440/// Computes a initial check delay for a worker.
441pub(crate) fn worker_init_check_delay() -> Duration {
442    let init_check_delay = rng().random_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
443    Duration::from_secs(init_check_delay)
444}
445
446/// Worker start config.
447struct WorkerStarter<S> {
448    id: WorkerId,
449    config: Arc<MitoConfig>,
450    log_store: Arc<S>,
451    object_store_manager: ObjectStoreManagerRef,
452    write_buffer_manager: WriteBufferManagerRef,
453    compact_job_pool: SchedulerRef,
454    index_build_job_pool: SchedulerRef,
455    flush_job_pool: SchedulerRef,
456    purge_scheduler: SchedulerRef,
457    listener: WorkerListener,
458    cache_manager: CacheManagerRef,
459    puffin_manager_factory: PuffinManagerFactory,
460    intermediate_manager: IntermediateManager,
461    time_provider: TimeProviderRef,
462    /// Watch channel sender to notify workers to handle stalled requests.
463    flush_sender: watch::Sender<()>,
464    /// Watch channel receiver to wait for background flush job.
465    flush_receiver: watch::Receiver<()>,
466    plugins: Plugins,
467    schema_metadata_manager: SchemaMetadataManagerRef,
468    file_ref_manager: FileReferenceManagerRef,
469    partition_expr_fetcher: PartitionExprFetcherRef,
470    flush_semaphore: Arc<Semaphore>,
471}
472
473impl<S: LogStore> WorkerStarter<S> {
474    /// Starts a region worker and its background thread.
475    fn start(self) -> Result<RegionWorker> {
476        let regions = Arc::new(RegionMap::default());
477        let opening_regions = Arc::new(OpeningRegions::default());
478        let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
479
480        let running = Arc::new(AtomicBool::new(true));
481        let now = self.time_provider.current_time_millis();
482        let id_string = self.id.to_string();
483        let mut worker_thread = RegionWorkerLoop {
484            id: self.id,
485            config: self.config.clone(),
486            regions: regions.clone(),
487            dropping_regions: Arc::new(RegionMap::default()),
488            opening_regions: opening_regions.clone(),
489            sender: sender.clone(),
490            receiver,
491            wal: Wal::new(self.log_store),
492            object_store_manager: self.object_store_manager.clone(),
493            running: running.clone(),
494            memtable_builder_provider: MemtableBuilderProvider::new(
495                Some(self.write_buffer_manager.clone()),
496                self.config.clone(),
497            ),
498            purge_scheduler: self.purge_scheduler.clone(),
499            write_buffer_manager: self.write_buffer_manager,
500            index_build_scheduler: IndexBuildScheduler::new(self.index_build_job_pool),
501            flush_scheduler: FlushScheduler::new(self.flush_job_pool),
502            compaction_scheduler: CompactionScheduler::new(
503                self.compact_job_pool,
504                sender.clone(),
505                self.cache_manager.clone(),
506                self.config,
507                self.listener.clone(),
508                self.plugins.clone(),
509            ),
510            stalled_requests: StalledRequests::default(),
511            listener: self.listener,
512            cache_manager: self.cache_manager,
513            puffin_manager_factory: self.puffin_manager_factory,
514            intermediate_manager: self.intermediate_manager,
515            time_provider: self.time_provider,
516            last_periodical_check_millis: now,
517            flush_sender: self.flush_sender,
518            flush_receiver: self.flush_receiver,
519            stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
520            region_count: REGION_COUNT.with_label_values(&[&id_string]),
521            request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
522            region_edit_queues: RegionEditQueues::default(),
523            schema_metadata_manager: self.schema_metadata_manager,
524            file_ref_manager: self.file_ref_manager.clone(),
525            partition_expr_fetcher: self.partition_expr_fetcher,
526            flush_semaphore: self.flush_semaphore,
527        };
528        let handle = common_runtime::spawn_global(async move {
529            worker_thread.run().await;
530        });
531
532        Ok(RegionWorker {
533            id: self.id,
534            regions,
535            opening_regions,
536            sender,
537            handle: Mutex::new(Some(handle)),
538            running,
539        })
540    }
541}
542
543/// Worker to write and alter regions bound to it.
544pub(crate) struct RegionWorker {
545    /// Id of the worker.
546    id: WorkerId,
547    /// Regions bound to the worker.
548    regions: RegionMapRef,
549    /// The opening regions.
550    opening_regions: OpeningRegionsRef,
551    /// Request sender.
552    sender: Sender<WorkerRequestWithTime>,
553    /// Handle to the worker thread.
554    handle: Mutex<Option<JoinHandle<()>>>,
555    /// Whether to run the worker thread.
556    running: Arc<AtomicBool>,
557}
558
559impl RegionWorker {
560    /// Submits request to background worker thread.
561    async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
562        ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
563        let request_with_time = WorkerRequestWithTime::new(request);
564        if self.sender.send(request_with_time).await.is_err() {
565            warn!(
566                "Worker {} is already exited but the running flag is still true",
567                self.id
568            );
569            // Manually set the running flag to false to avoid printing more warning logs.
570            self.set_running(false);
571            return WorkerStoppedSnafu { id: self.id }.fail();
572        }
573
574        Ok(())
575    }
576
577    /// Stop the worker.
578    ///
579    /// This method waits until the worker thread exists.
580    async fn stop(&self) -> Result<()> {
581        let handle = self.handle.lock().await.take();
582        if let Some(handle) = handle {
583            info!("Stop region worker {}", self.id);
584
585            self.set_running(false);
586            if self
587                .sender
588                .send(WorkerRequestWithTime::new(WorkerRequest::Stop))
589                .await
590                .is_err()
591            {
592                warn!("Worker {} is already exited before stop", self.id);
593            }
594
595            handle.await.context(JoinSnafu)?;
596        }
597
598        Ok(())
599    }
600
601    /// Returns true if the worker is still running.
602    fn is_running(&self) -> bool {
603        self.running.load(Ordering::Relaxed)
604    }
605
606    /// Sets whether the worker is still running.
607    fn set_running(&self, value: bool) {
608        self.running.store(value, Ordering::Relaxed)
609    }
610
611    /// Returns true if the worker contains specific region.
612    fn is_region_exists(&self, region_id: RegionId) -> bool {
613        self.regions.is_region_exists(region_id)
614    }
615
616    /// Returns true if the region is opening.
617    fn is_region_opening(&self, region_id: RegionId) -> bool {
618        self.opening_regions.is_region_exists(region_id)
619    }
620
621    /// Returns region of specific `region_id`.
622    fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
623        self.regions.get_region(region_id)
624    }
625
626    #[cfg(test)]
627    /// Returns the [OpeningRegionsRef].
628    pub(crate) fn opening_regions(&self) -> &OpeningRegionsRef {
629        &self.opening_regions
630    }
631}
632
633impl Drop for RegionWorker {
634    fn drop(&mut self) {
635        if self.is_running() {
636            self.set_running(false);
637            // Once we drop the sender, the worker thread will receive a disconnected error.
638        }
639    }
640}
641
642type RequestBuffer = Vec<WorkerRequest>;
643
644/// Buffer for stalled write requests.
645///
646/// Maintains stalled write requests and their estimated size.
647#[derive(Default)]
648pub(crate) struct StalledRequests {
649    /// Stalled requests.
650    /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests
651    /// instead of `StalledRequests::requests.len()`.
652    ///
653    /// Key: RegionId
654    /// Value: (estimated size, stalled requests)
655    pub(crate) requests:
656        HashMap<RegionId, (usize, Vec<SenderWriteRequest>, Vec<SenderBulkRequest>)>,
657    /// Estimated size of all stalled requests.
658    pub(crate) estimated_size: usize,
659}
660
661impl StalledRequests {
662    /// Appends stalled requests.
663    pub(crate) fn append(
664        &mut self,
665        requests: &mut Vec<SenderWriteRequest>,
666        bulk_requests: &mut Vec<SenderBulkRequest>,
667    ) {
668        for req in requests.drain(..) {
669            self.push(req);
670        }
671        for req in bulk_requests.drain(..) {
672            self.push_bulk(req);
673        }
674    }
675
676    /// Pushes a stalled request to the buffer.
677    pub(crate) fn push(&mut self, req: SenderWriteRequest) {
678        let (size, requests, _) = self.requests.entry(req.request.region_id).or_default();
679        let req_size = req.request.estimated_size();
680        *size += req_size;
681        self.estimated_size += req_size;
682        requests.push(req);
683    }
684
685    pub(crate) fn push_bulk(&mut self, req: SenderBulkRequest) {
686        let region_id = req.region_id;
687        let (size, _, requests) = self.requests.entry(region_id).or_default();
688        let req_size = req.request.estimated_size();
689        *size += req_size;
690        self.estimated_size += req_size;
691        requests.push(req);
692    }
693
694    /// Removes stalled requests of specific region.
695    pub(crate) fn remove(
696        &mut self,
697        region_id: &RegionId,
698    ) -> (Vec<SenderWriteRequest>, Vec<SenderBulkRequest>) {
699        if let Some((size, write_reqs, bulk_reqs)) = self.requests.remove(region_id) {
700            self.estimated_size -= size;
701            (write_reqs, bulk_reqs)
702        } else {
703            (vec![], vec![])
704        }
705    }
706
707    /// Returns the total number of all stalled requests.
708    pub(crate) fn stalled_count(&self) -> usize {
709        self.requests
710            .values()
711            .map(|(_, reqs, bulk_reqs)| reqs.len() + bulk_reqs.len())
712            .sum()
713    }
714}
715
716/// Background worker loop to handle requests.
717struct RegionWorkerLoop<S> {
718    /// Id of the worker.
719    id: WorkerId,
720    /// Engine config.
721    config: Arc<MitoConfig>,
722    /// Regions bound to the worker.
723    regions: RegionMapRef,
724    /// Regions that are not yet fully dropped.
725    dropping_regions: RegionMapRef,
726    /// Regions that are opening.
727    opening_regions: OpeningRegionsRef,
728    /// Request sender.
729    sender: Sender<WorkerRequestWithTime>,
730    /// Request receiver.
731    receiver: Receiver<WorkerRequestWithTime>,
732    /// WAL of the engine.
733    wal: Wal<S>,
734    /// Manages object stores for manifest and SSTs.
735    object_store_manager: ObjectStoreManagerRef,
736    /// Whether the worker thread is still running.
737    running: Arc<AtomicBool>,
738    /// Memtable builder provider for each region.
739    memtable_builder_provider: MemtableBuilderProvider,
740    /// Background purge job scheduler.
741    purge_scheduler: SchedulerRef,
742    /// Engine write buffer manager.
743    write_buffer_manager: WriteBufferManagerRef,
744    /// Scheduler for index build task.
745    index_build_scheduler: IndexBuildScheduler,
746    /// Schedules background flush requests.
747    flush_scheduler: FlushScheduler,
748    /// Scheduler for compaction tasks.
749    compaction_scheduler: CompactionScheduler,
750    /// Stalled write requests.
751    stalled_requests: StalledRequests,
752    /// Event listener for tests.
753    listener: WorkerListener,
754    /// Cache.
755    cache_manager: CacheManagerRef,
756    /// Puffin manager factory for index.
757    puffin_manager_factory: PuffinManagerFactory,
758    /// Intermediate manager for inverted index.
759    intermediate_manager: IntermediateManager,
760    /// Provider to get current time.
761    time_provider: TimeProviderRef,
762    /// Last time to check regions periodically.
763    last_periodical_check_millis: i64,
764    /// Watch channel sender to notify workers to handle stalled requests.
765    flush_sender: watch::Sender<()>,
766    /// Watch channel receiver to wait for background flush job.
767    flush_receiver: watch::Receiver<()>,
768    /// Gauge of stalling request count.
769    stalling_count: IntGauge,
770    /// Gauge of regions in the worker.
771    region_count: IntGauge,
772    /// Histogram of request wait time for this worker.
773    request_wait_time: Histogram,
774    /// Queues for region edit requests.
775    region_edit_queues: RegionEditQueues,
776    /// Database level metadata manager.
777    schema_metadata_manager: SchemaMetadataManagerRef,
778    /// Datanode level file references manager.
779    file_ref_manager: FileReferenceManagerRef,
780    /// Partition expr fetcher used to backfill partition expr on open for compatibility.
781    partition_expr_fetcher: PartitionExprFetcherRef,
782    /// Semaphore to control flush concurrency.
783    flush_semaphore: Arc<Semaphore>,
784}
785
786impl<S: LogStore> RegionWorkerLoop<S> {
787    /// Starts the worker loop.
788    async fn run(&mut self) {
789        let init_check_delay = worker_init_check_delay();
790        info!(
791            "Start region worker thread {}, init_check_delay: {:?}",
792            self.id, init_check_delay
793        );
794        self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
795
796        // Buffer to retrieve requests from receiver.
797        let mut write_req_buffer: Vec<SenderWriteRequest> =
798            Vec::with_capacity(self.config.worker_request_batch_size);
799        let mut bulk_req_buffer: Vec<SenderBulkRequest> =
800            Vec::with_capacity(self.config.worker_request_batch_size);
801        let mut ddl_req_buffer: Vec<SenderDdlRequest> =
802            Vec::with_capacity(self.config.worker_request_batch_size);
803        let mut general_req_buffer: Vec<WorkerRequest> =
804            RequestBuffer::with_capacity(self.config.worker_request_batch_size);
805
806        while self.running.load(Ordering::Relaxed) {
807            // Clear the buffer before handling next batch of requests.
808            write_req_buffer.clear();
809            ddl_req_buffer.clear();
810            general_req_buffer.clear();
811
812            let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
813            let sleep = tokio::time::sleep(max_wait_time);
814            tokio::pin!(sleep);
815
816            tokio::select! {
817                request_opt = self.receiver.recv() => {
818                    match request_opt {
819                        Some(request_with_time) => {
820                            // Observe the wait time
821                            let wait_time = request_with_time.created_at.elapsed();
822                            self.request_wait_time.observe(wait_time.as_secs_f64());
823
824                            match request_with_time.request {
825                                WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
826                                WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
827                                req => general_req_buffer.push(req),
828                            }
829                        },
830                        // The channel is disconnected.
831                        None => break,
832                    }
833                }
834                recv_res = self.flush_receiver.changed() => {
835                    if recv_res.is_err() {
836                        // The channel is disconnected.
837                        break;
838                    } else {
839                        // Also flush this worker if other workers trigger flush as this worker may have
840                        // a large memtable to flush. We may not have chance to flush that memtable if we
841                        // never write to this worker. So only flushing other workers may not release enough
842                        // memory.
843                        self.maybe_flush_worker();
844                        // A flush job is finished, handles stalled requests.
845                        self.handle_stalled_requests().await;
846                        continue;
847                    }
848                }
849                _ = &mut sleep => {
850                    // Timeout. Checks periodical tasks.
851                    self.handle_periodical_tasks();
852                    continue;
853                }
854            }
855
856            if self.flush_receiver.has_changed().unwrap_or(false) {
857                // Always checks whether we could process stalled requests to avoid a request
858                // hangs too long.
859                // If the channel is closed, do nothing.
860                self.handle_stalled_requests().await;
861            }
862
863            // Try to recv more requests from the channel.
864            for _ in 1..self.config.worker_request_batch_size {
865                // We have received one request so we start from 1.
866                match self.receiver.try_recv() {
867                    Ok(request_with_time) => {
868                        // Observe the wait time
869                        let wait_time = request_with_time.created_at.elapsed();
870                        self.request_wait_time.observe(wait_time.as_secs_f64());
871
872                        match request_with_time.request {
873                            WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
874                            WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
875                            req => general_req_buffer.push(req),
876                        }
877                    }
878                    // We still need to handle remaining requests.
879                    Err(_) => break,
880                }
881            }
882
883            self.listener.on_recv_requests(
884                write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
885            );
886
887            self.handle_requests(
888                &mut write_req_buffer,
889                &mut ddl_req_buffer,
890                &mut general_req_buffer,
891                &mut bulk_req_buffer,
892            )
893            .await;
894
895            self.handle_periodical_tasks();
896        }
897
898        self.clean().await;
899
900        info!("Exit region worker thread {}", self.id);
901    }
902
903    /// Dispatches and processes requests.
904    ///
905    /// `buffer` should be empty.
906    async fn handle_requests(
907        &mut self,
908        write_requests: &mut Vec<SenderWriteRequest>,
909        ddl_requests: &mut Vec<SenderDdlRequest>,
910        general_requests: &mut Vec<WorkerRequest>,
911        bulk_requests: &mut Vec<SenderBulkRequest>,
912    ) {
913        for worker_req in general_requests.drain(..) {
914            match worker_req {
915                WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
916                    // These requests are categorized into write_requests and ddl_requests.
917                    continue;
918                }
919                WorkerRequest::Background { region_id, notify } => {
920                    // For background notify, we handle it directly.
921                    self.handle_background_notify(region_id, notify).await;
922                }
923                WorkerRequest::SetRegionRoleStateGracefully {
924                    region_id,
925                    region_role_state,
926                    sender,
927                } => {
928                    self.set_role_state_gracefully(region_id, region_role_state, sender)
929                        .await;
930                }
931                WorkerRequest::EditRegion(request) => {
932                    self.handle_region_edit(request).await;
933                }
934                WorkerRequest::BuildIndexRegion(request) => {
935                    self.handle_rebuild_index(request).await;
936                }
937                WorkerRequest::Stop => {
938                    debug_assert!(!self.running.load(Ordering::Relaxed));
939                }
940                WorkerRequest::SyncRegion(req) => {
941                    self.handle_region_sync(req).await;
942                }
943                WorkerRequest::BulkInserts {
944                    metadata,
945                    request,
946                    sender,
947                } => {
948                    if let Some(region_metadata) = metadata {
949                        self.handle_bulk_insert_batch(
950                            region_metadata,
951                            request,
952                            bulk_requests,
953                            sender,
954                        )
955                        .await;
956                    } else {
957                        error!("Cannot find region metadata for {}", request.region_id);
958                        sender.send(
959                            error::RegionNotFoundSnafu {
960                                region_id: request.region_id,
961                            }
962                            .fail(),
963                        );
964                    }
965                }
966            }
967        }
968
969        // Handles all write requests first. So we can alter regions without
970        // considering existing write requests.
971        self.handle_write_requests(write_requests, bulk_requests, true)
972            .await;
973
974        self.handle_ddl_requests(ddl_requests).await;
975    }
976
977    /// Takes and handles all ddl requests.
978    async fn handle_ddl_requests(&mut self, ddl_requests: &mut Vec<SenderDdlRequest>) {
979        if ddl_requests.is_empty() {
980            return;
981        }
982
983        for ddl in ddl_requests.drain(..) {
984            let res = match ddl.request {
985                DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await,
986                DdlRequest::Drop => self.handle_drop_request(ddl.region_id).await,
987                DdlRequest::Open((req, wal_entry_receiver)) => {
988                    self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender)
989                        .await;
990                    continue;
991                }
992                DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await,
993                DdlRequest::Alter(req) => {
994                    self.handle_alter_request(ddl.region_id, req, ddl.sender)
995                        .await;
996                    continue;
997                }
998                DdlRequest::Flush(req) => {
999                    self.handle_flush_request(ddl.region_id, req, ddl.sender)
1000                        .await;
1001                    continue;
1002                }
1003                DdlRequest::Compact(req) => {
1004                    self.handle_compaction_request(ddl.region_id, req, ddl.sender)
1005                        .await;
1006                    continue;
1007                }
1008                DdlRequest::Truncate(req) => {
1009                    self.handle_truncate_request(ddl.region_id, req, ddl.sender)
1010                        .await;
1011                    continue;
1012                }
1013                DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await,
1014            };
1015
1016            ddl.sender.send(res);
1017        }
1018    }
1019
1020    /// Handle periodical tasks such as region auto flush.
1021    fn handle_periodical_tasks(&mut self) {
1022        let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
1023        if self
1024            .time_provider
1025            .elapsed_since(self.last_periodical_check_millis)
1026            < interval
1027        {
1028            return;
1029        }
1030
1031        self.last_periodical_check_millis = self.time_provider.current_time_millis();
1032
1033        if let Err(e) = self.flush_periodically() {
1034            error!(e; "Failed to flush regions periodically");
1035        }
1036    }
1037
1038    /// Handles region background request
1039    async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
1040        match notify {
1041            BackgroundNotify::FlushFinished(req) => {
1042                self.handle_flush_finished(region_id, req).await
1043            }
1044            BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
1045            BackgroundNotify::IndexBuildFinished(req) => {
1046                self.handle_index_build_finished(region_id, req).await
1047            }
1048            BackgroundNotify::IndexBuildFailed(req) => {
1049                self.handle_index_build_failed(region_id, req).await
1050            }
1051            BackgroundNotify::CompactionFinished(req) => {
1052                self.handle_compaction_finished(region_id, req).await
1053            }
1054            BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
1055            BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
1056            BackgroundNotify::RegionChange(req) => {
1057                self.handle_manifest_region_change_result(req).await
1058            }
1059            BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
1060        }
1061    }
1062
1063    /// Handles `set_region_role_gracefully`.
1064    async fn set_role_state_gracefully(
1065        &mut self,
1066        region_id: RegionId,
1067        region_role_state: SettableRegionRoleState,
1068        sender: oneshot::Sender<SetRegionRoleStateResponse>,
1069    ) {
1070        if let Some(region) = self.regions.get_region(region_id) {
1071            // We need to do this in background as we need the manifest lock.
1072            common_runtime::spawn_global(async move {
1073                match region.set_role_state_gracefully(region_role_state).await {
1074                    Ok(()) => {
1075                        let last_entry_id = region.version_control.current().last_entry_id;
1076                        let _ = sender.send(SetRegionRoleStateResponse::success(
1077                            SetRegionRoleStateSuccess::mito(last_entry_id),
1078                        ));
1079                    }
1080                    Err(e) => {
1081                        error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
1082                        let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
1083                            BoxedError::new(e),
1084                        ));
1085                    }
1086                }
1087            });
1088        } else {
1089            let _ = sender.send(SetRegionRoleStateResponse::NotFound);
1090        }
1091    }
1092}
1093
1094impl<S> RegionWorkerLoop<S> {
1095    /// Cleans up the worker.
1096    async fn clean(&self) {
1097        // Closes remaining regions.
1098        let regions = self.regions.list_regions();
1099        for region in regions {
1100            region.stop().await;
1101        }
1102
1103        self.regions.clear();
1104    }
1105
1106    /// Notifies the whole group that a flush job is finished so other
1107    /// workers can handle stalled requests.
1108    fn notify_group(&mut self) {
1109        // Notifies all receivers.
1110        let _ = self.flush_sender.send(());
1111        // Marks the receiver in current worker as seen so the loop won't be waked up immediately.
1112        self.flush_receiver.borrow_and_update();
1113    }
1114}
1115
1116/// Wrapper that only calls event listener in tests.
1117#[derive(Default, Clone)]
1118pub(crate) struct WorkerListener {
1119    #[cfg(any(test, feature = "test"))]
1120    listener: Option<crate::engine::listener::EventListenerRef>,
1121}
1122
1123impl WorkerListener {
1124    #[cfg(any(test, feature = "test"))]
1125    pub(crate) fn new(
1126        listener: Option<crate::engine::listener::EventListenerRef>,
1127    ) -> WorkerListener {
1128        WorkerListener { listener }
1129    }
1130
1131    /// Flush is finished successfully.
1132    pub(crate) fn on_flush_success(&self, region_id: RegionId) {
1133        #[cfg(any(test, feature = "test"))]
1134        if let Some(listener) = &self.listener {
1135            listener.on_flush_success(region_id);
1136        }
1137        // Avoid compiler warning.
1138        let _ = region_id;
1139    }
1140
1141    /// Engine is stalled.
1142    pub(crate) fn on_write_stall(&self) {
1143        #[cfg(any(test, feature = "test"))]
1144        if let Some(listener) = &self.listener {
1145            listener.on_write_stall();
1146        }
1147    }
1148
1149    pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
1150        #[cfg(any(test, feature = "test"))]
1151        if let Some(listener) = &self.listener {
1152            listener.on_flush_begin(region_id).await;
1153        }
1154        // Avoid compiler warning.
1155        let _ = region_id;
1156    }
1157
1158    pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
1159        #[cfg(any(test, feature = "test"))]
1160        if let Some(listener) = &self.listener {
1161            return listener.on_later_drop_begin(region_id);
1162        }
1163        // Avoid compiler warning.
1164        let _ = region_id;
1165        None
1166    }
1167
1168    /// On later drop task is finished.
1169    pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
1170        #[cfg(any(test, feature = "test"))]
1171        if let Some(listener) = &self.listener {
1172            listener.on_later_drop_end(region_id, removed);
1173        }
1174        // Avoid compiler warning.
1175        let _ = region_id;
1176        let _ = removed;
1177    }
1178
1179    pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) {
1180        #[cfg(any(test, feature = "test"))]
1181        if let Some(listener) = &self.listener {
1182            listener.on_merge_ssts_finished(region_id).await;
1183        }
1184        // Avoid compiler warning.
1185        let _ = region_id;
1186    }
1187
1188    pub(crate) fn on_recv_requests(&self, request_num: usize) {
1189        #[cfg(any(test, feature = "test"))]
1190        if let Some(listener) = &self.listener {
1191            listener.on_recv_requests(request_num);
1192        }
1193        // Avoid compiler warning.
1194        let _ = request_num;
1195    }
1196
1197    pub(crate) fn on_file_cache_filled(&self, _file_id: FileId) {
1198        #[cfg(any(test, feature = "test"))]
1199        if let Some(listener) = &self.listener {
1200            listener.on_file_cache_filled(_file_id);
1201        }
1202    }
1203
1204    pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
1205        #[cfg(any(test, feature = "test"))]
1206        if let Some(listener) = &self.listener {
1207            listener.on_compaction_scheduled(_region_id);
1208        }
1209    }
1210
1211    pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
1212        #[cfg(any(test, feature = "test"))]
1213        if let Some(listener) = &self.listener {
1214            listener
1215                .on_notify_region_change_result_begin(_region_id)
1216                .await;
1217        }
1218    }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use super::*;
1224    use crate::test_util::TestEnv;
1225
1226    #[test]
1227    fn test_region_id_to_index() {
1228        let num_workers = 4;
1229
1230        let region_id = RegionId::new(1, 2);
1231        let index = region_id_to_index(region_id, num_workers);
1232        assert_eq!(index, 3);
1233
1234        let region_id = RegionId::new(2, 3);
1235        let index = region_id_to_index(region_id, num_workers);
1236        assert_eq!(index, 1);
1237    }
1238
1239    #[tokio::test]
1240    async fn test_worker_group_start_stop() {
1241        let env = TestEnv::with_prefix("group-stop").await;
1242        let group = env
1243            .create_worker_group(MitoConfig {
1244                num_workers: 4,
1245                ..Default::default()
1246            })
1247            .await;
1248
1249        group.stop().await.unwrap();
1250    }
1251}