mito2/worker/
handle_open.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling open request.
16
17use std::sync::Arc;
18use std::time::Instant;
19
20use common_telemetry::info;
21use object_store::util::join_path;
22use snafu::{OptionExt, ResultExt};
23use store_api::logstore::LogStore;
24use store_api::region_request::RegionOpenRequest;
25use store_api::storage::RegionId;
26use table::requests::STORAGE_KEY;
27
28use crate::error::{
29    ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
30};
31use crate::region::opener::RegionOpener;
32use crate::request::OptionOutputTx;
33use crate::sst::location::region_dir_from_table_dir;
34use crate::wal::entry_distributor::WalEntryReceiver;
35use crate::worker::handle_drop::remove_region_dir_once;
36use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39    async fn check_and_cleanup_region(
40        &self,
41        region_id: RegionId,
42        request: &RegionOpenRequest,
43    ) -> Result<()> {
44        let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
45            self.object_store_manager
46                .find(storage_name)
47                .with_context(|| ObjectStoreNotFoundSnafu {
48                    object_store: storage_name.clone(),
49                })?
50        } else {
51            self.object_store_manager.default_object_store()
52        };
53        // Check if this region is pending drop. And clean the entire dir if so.
54        let region_dir =
55            region_dir_from_table_dir(&request.table_dir, region_id, request.path_type);
56        if !self.dropping_regions.is_region_exists(region_id)
57            && object_store
58                .exists(&join_path(&region_dir, DROPPING_MARKER_FILE))
59                .await
60                .context(OpenDalSnafu)?
61        {
62            let result = remove_region_dir_once(&region_dir, object_store, true).await;
63            info!(
64                "Region {} is dropped, worker: {}, result: {:?}",
65                region_id, self.id, result
66            );
67            return RegionNotFoundSnafu { region_id }.fail();
68        }
69
70        Ok(())
71    }
72
73    pub(crate) async fn handle_open_request(
74        &mut self,
75        region_id: RegionId,
76        request: RegionOpenRequest,
77        wal_entry_receiver: Option<WalEntryReceiver>,
78        sender: OptionOutputTx,
79    ) {
80        if self.regions.is_region_exists(region_id) {
81            sender.send(Ok(0));
82            return;
83        }
84        let Some(sender) = self
85            .opening_regions
86            .wait_for_opening_region(region_id, sender)
87        else {
88            return;
89        };
90        if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
91            sender.send(Err(err));
92            return;
93        }
94        info!("Try to open region {}, worker: {}", region_id, self.id);
95
96        // Open region from specific region dir.
97        let opener = match RegionOpener::new(
98            region_id,
99            &request.table_dir,
100            request.path_type,
101            self.memtable_builder_provider.clone(),
102            self.object_store_manager.clone(),
103            self.purge_scheduler.clone(),
104            self.puffin_manager_factory.clone(),
105            self.intermediate_manager.clone(),
106            self.time_provider.clone(),
107            self.file_ref_manager.clone(),
108            self.partition_expr_fetcher.clone(),
109        )
110        .skip_wal_replay(request.skip_wal_replay)
111        .cache(Some(self.cache_manager.clone()))
112        .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
113        .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
114        .parse_options(request.options)
115        {
116            Ok(opener) => opener,
117            Err(err) => {
118                sender.send(Err(err));
119                return;
120            }
121        };
122
123        let now = Instant::now();
124        let regions = self.regions.clone();
125        let wal = self.wal.clone();
126        let config = self.config.clone();
127        let opening_regions = self.opening_regions.clone();
128        let region_count = self.region_count.clone();
129        let worker_id = self.id;
130        opening_regions.insert_sender(region_id, sender);
131        common_runtime::spawn_global(async move {
132            match opener.open(&config, &wal).await {
133                Ok(region) => {
134                    info!(
135                        "Region {} is opened, worker: {}, elapsed: {:?}",
136                        region_id,
137                        worker_id,
138                        now.elapsed()
139                    );
140                    region_count.inc();
141
142                    // Insert the Region into the RegionMap.
143                    regions.insert_region(region);
144
145                    let senders = opening_regions.remove_sender(region_id);
146                    for sender in senders {
147                        sender.send(Ok(0));
148                    }
149                }
150                Err(err) => {
151                    let senders = opening_regions.remove_sender(region_id);
152                    let err = Arc::new(err);
153                    for sender in senders {
154                        sender.send(Err(err.clone()).context(OpenRegionSnafu));
155                    }
156                }
157            }
158        });
159    }
160}