mito2/worker/
handle_close.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling close request.
16
17use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::logstore::provider::Provider;
20use store_api::region_request::RegionFlushRequest;
21use store_api::storage::RegionId;
22
23use crate::flush::FlushReason;
24use crate::request::OptionOutputTx;
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28    pub(crate) async fn handle_close_request(
29        &mut self,
30        region_id: RegionId,
31        sender: OptionOutputTx,
32    ) {
33        let Some(region) = self.regions.get_region(region_id) else {
34            sender.send(Ok(0));
35            return;
36        };
37
38        info!("Try to close region {}, worker: {}", region_id, self.id);
39
40        // If the region is using Noop WAL and has data in memtable,
41        // we should flush it before closing to ensure durability.
42        if region.provider == Provider::Noop
43            && !region
44                .version_control
45                .current()
46                .version
47                .memtables
48                .is_empty()
49        {
50            info!("Region {} has pending data, waiting for flush", region_id);
51            self.handle_flush_request(
52                region_id,
53                RegionFlushRequest {
54                    row_group_size: None,
55                },
56                Some(FlushReason::Closing),
57                sender,
58            );
59            return;
60        }
61
62        // WAL configured or memtable is empty, flush is not necessary.
63        self.remove_region(region_id).await;
64        info!("Region {} closed, worker: {}", region_id, self.id);
65        sender.send(Ok(0))
66    }
67
68    /// Remove a region and stop all related tasks.
69    pub(crate) async fn remove_region(&mut self, region_id: RegionId) {
70        let Some(region) = self.regions.remove_region(region_id) else {
71            return;
72        };
73        region.stop().await;
74        // Clean flush status.
75        self.flush_scheduler.on_region_closed(region_id);
76        // Clean compaction status.
77        self.compaction_scheduler.on_region_closed(region_id);
78        // clean index build status.
79        self.index_build_scheduler.on_region_closed(region_id).await;
80        self.region_count.dec();
81    }
82}