mito2/worker/
handle_open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling open request.
16
17use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::util::join_path;
21use snafu::{OptionExt, ResultExt};
22use store_api::logstore::LogStore;
23use store_api::region_request::RegionOpenRequest;
24use store_api::storage::RegionId;
25use table::requests::STORAGE_KEY;
26
27use crate::error::{
28    ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
29};
30use crate::region::opener::RegionOpener;
31use crate::request::OptionOutputTx;
32use crate::sst::location::region_dir_from_table_dir;
33use crate::wal::entry_distributor::WalEntryReceiver;
34use crate::worker::handle_drop::remove_region_dir_once;
35use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
36
37impl<S: LogStore> RegionWorkerLoop<S> {
38    async fn check_and_cleanup_region(
39        &self,
40        region_id: RegionId,
41        request: &RegionOpenRequest,
42    ) -> Result<()> {
43        let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
44            self.object_store_manager
45                .find(storage_name)
46                .context(ObjectStoreNotFoundSnafu {
47                    object_store: storage_name.to_string(),
48                })?
49        } else {
50            self.object_store_manager.default_object_store()
51        };
52        // Check if this region is pending drop. And clean the entire dir if so.
53        let region_dir =
54            region_dir_from_table_dir(&request.table_dir, region_id, request.path_type);
55        if !self.dropping_regions.is_region_exists(region_id)
56            && object_store
57                .exists(&join_path(&region_dir, DROPPING_MARKER_FILE))
58                .await
59                .context(OpenDalSnafu)?
60        {
61            let result = remove_region_dir_once(&region_dir, object_store, true).await;
62            info!(
63                "Region {} is dropped, worker: {}, result: {:?}",
64                region_id, self.id, result
65            );
66            return RegionNotFoundSnafu { region_id }.fail();
67        }
68
69        Ok(())
70    }
71
72    pub(crate) async fn handle_open_request(
73        &mut self,
74        region_id: RegionId,
75        request: RegionOpenRequest,
76        wal_entry_receiver: Option<WalEntryReceiver>,
77        sender: OptionOutputTx,
78    ) {
79        if self.regions.is_region_exists(region_id) {
80            sender.send(Ok(0));
81            return;
82        }
83        let Some(sender) = self
84            .opening_regions
85            .wait_for_opening_region(region_id, sender)
86        else {
87            return;
88        };
89        if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
90            sender.send(Err(err));
91            return;
92        }
93        info!("Try to open region {}, worker: {}", region_id, self.id);
94
95        // Open region from specific region dir.
96        let opener = match RegionOpener::new(
97            region_id,
98            &request.table_dir,
99            request.path_type,
100            self.memtable_builder_provider.clone(),
101            self.object_store_manager.clone(),
102            self.purge_scheduler.clone(),
103            self.puffin_manager_factory.clone(),
104            self.intermediate_manager.clone(),
105            self.time_provider.clone(),
106        )
107        .skip_wal_replay(request.skip_wal_replay)
108        .cache(Some(self.cache_manager.clone()))
109        .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
110        .parse_options(request.options)
111        {
112            Ok(opener) => opener,
113            Err(err) => {
114                sender.send(Err(err));
115                return;
116            }
117        };
118
119        let regions = self.regions.clone();
120        let wal = self.wal.clone();
121        let config = self.config.clone();
122        let opening_regions = self.opening_regions.clone();
123        let region_count = self.region_count.clone();
124        let worker_id = self.id;
125        opening_regions.insert_sender(region_id, sender);
126        common_runtime::spawn_global(async move {
127            match opener.open(&config, &wal).await {
128                Ok(region) => {
129                    info!("Region {} is opened, worker: {}", region_id, worker_id);
130                    region_count.inc();
131
132                    // Insert the Region into the RegionMap.
133                    regions.insert_region(Arc::new(region));
134
135                    let senders = opening_regions.remove_sender(region_id);
136                    for sender in senders {
137                        sender.send(Ok(0));
138                    }
139                }
140                Err(err) => {
141                    let senders = opening_regions.remove_sender(region_id);
142                    let err = Arc::new(err);
143                    for sender in senders {
144                        sender.send(Err(err.clone()).context(OpenRegionSnafu));
145                    }
146                }
147            }
148        });
149    }
150}