mito2/worker/
handle_open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling open request.
16
17use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::util::join_path;
21use snafu::{OptionExt, ResultExt};
22use store_api::logstore::LogStore;
23use store_api::region_request::RegionOpenRequest;
24use store_api::storage::RegionId;
25use table::requests::STORAGE_KEY;
26
27use crate::error::{
28    ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
29};
30use crate::region::opener::RegionOpener;
31use crate::request::OptionOutputTx;
32use crate::wal::entry_distributor::WalEntryReceiver;
33use crate::worker::handle_drop::remove_region_dir_once;
34use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
35
36impl<S: LogStore> RegionWorkerLoop<S> {
37    async fn check_and_cleanup_region(
38        &self,
39        region_id: RegionId,
40        request: &RegionOpenRequest,
41    ) -> Result<()> {
42        let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
43            self.object_store_manager
44                .find(storage_name)
45                .context(ObjectStoreNotFoundSnafu {
46                    object_store: storage_name.to_string(),
47                })?
48        } else {
49            self.object_store_manager.default_object_store()
50        };
51        // Check if this region is pending drop. And clean the entire dir if so.
52        if !self.dropping_regions.is_region_exists(region_id)
53            && object_store
54                .exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
55                .await
56                .context(OpenDalSnafu)?
57        {
58            let result = remove_region_dir_once(&request.region_dir, object_store, true).await;
59            info!(
60                "Region {} is dropped, worker: {}, result: {:?}",
61                region_id, self.id, result
62            );
63            return RegionNotFoundSnafu { region_id }.fail();
64        }
65
66        Ok(())
67    }
68
69    pub(crate) async fn handle_open_request(
70        &mut self,
71        region_id: RegionId,
72        request: RegionOpenRequest,
73        wal_entry_receiver: Option<WalEntryReceiver>,
74        sender: OptionOutputTx,
75    ) {
76        if self.regions.is_region_exists(region_id) {
77            sender.send(Ok(0));
78            return;
79        }
80        let Some(sender) = self
81            .opening_regions
82            .wait_for_opening_region(region_id, sender)
83        else {
84            return;
85        };
86        if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
87            sender.send(Err(err));
88            return;
89        }
90        info!("Try to open region {}, worker: {}", region_id, self.id);
91
92        // Open region from specific region dir.
93        let opener = match RegionOpener::new(
94            region_id,
95            &request.region_dir,
96            self.memtable_builder_provider.clone(),
97            self.object_store_manager.clone(),
98            self.purge_scheduler.clone(),
99            self.puffin_manager_factory.clone(),
100            self.intermediate_manager.clone(),
101            self.time_provider.clone(),
102        )
103        .skip_wal_replay(request.skip_wal_replay)
104        .cache(Some(self.cache_manager.clone()))
105        .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
106        .parse_options(request.options)
107        {
108            Ok(opener) => opener,
109            Err(err) => {
110                sender.send(Err(err));
111                return;
112            }
113        };
114
115        let regions = self.regions.clone();
116        let wal = self.wal.clone();
117        let config = self.config.clone();
118        let opening_regions = self.opening_regions.clone();
119        let region_count = self.region_count.clone();
120        let worker_id = self.id;
121        opening_regions.insert_sender(region_id, sender);
122        common_runtime::spawn_global(async move {
123            match opener.open(&config, &wal).await {
124                Ok(region) => {
125                    info!("Region {} is opened, worker: {}", region_id, worker_id);
126                    region_count.inc();
127
128                    // Insert the Region into the RegionMap.
129                    regions.insert_region(Arc::new(region));
130
131                    let senders = opening_regions.remove_sender(region_id);
132                    for sender in senders {
133                        sender.send(Ok(0));
134                    }
135                }
136                Err(err) => {
137                    let senders = opening_regions.remove_sender(region_id);
138                    let err = Arc::new(err);
139                    for sender in senders {
140                        sender.send(Err(err.clone()).context(OpenRegionSnafu));
141                    }
142                }
143            }
144        });
145    }
146}