Skip to main content

mito2/worker/
handle_close.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling close request.
16
17use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::logstore::provider::Provider;
20use store_api::region_request::RegionFlushRequest;
21use store_api::storage::RegionId;
22
23use crate::flush::FlushReason;
24use crate::request::OptionOutputTx;
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28    pub(crate) async fn handle_close_request(
29        &mut self,
30        region_id: RegionId,
31        sender: OptionOutputTx,
32    ) {
33        let Some(region) = self.regions.get_region(region_id) else {
34            sender.send(Ok(0));
35            return;
36        };
37
38        info!("Try to close region {}, worker: {}", region_id, self.id);
39
40        // If the region is using Noop WAL and has data in memtable,
41        // we should flush it before closing to ensure durability.
42        if region.provider == Provider::Noop
43            && !region
44                .version_control
45                .current()
46                .version
47                .memtables
48                .is_empty()
49        {
50            info!("Region {} has pending data, waiting for flush", region_id);
51            self.handle_flush_request(
52                region_id,
53                RegionFlushRequest::default(),
54                Some(FlushReason::Closing),
55                sender,
56            );
57            return;
58        }
59
60        // WAL configured or memtable is empty, flush is not necessary.
61        self.remove_region(region_id).await;
62        info!("Region {} closed, worker: {}", region_id, self.id);
63        sender.send(Ok(0))
64    }
65
66    /// Remove a region and stop all related tasks.
67    pub(crate) async fn remove_region(&mut self, region_id: RegionId) {
68        let Some(region) = self.regions.remove_region(region_id) else {
69            return;
70        };
71        region.stop().await;
72        self.fail_region_stalled_requests_as_not_found(&region_id);
73        self.reject_region_edit_queue_as_not_found(region_id);
74        // Clean flush status.
75        self.flush_scheduler.on_region_closed(region_id);
76        // Clean compaction status.
77        self.compaction_scheduler.on_region_closed(region_id);
78        // clean index build status.
79        self.index_build_scheduler.on_region_closed(region_id).await;
80        self.region_count.dec();
81    }
82}