metric_engine/engine/drop.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Drop a metric region
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{
CloseMitoRegionSnafu, LogicalRegionNotFoundSnafu, PhysicalRegionBusySnafu, Result,
};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::utils;
impl MetricEngineInner {
pub async fn drop_region(
&self,
region_id: RegionId,
req: RegionDropRequest,
) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let fast_path = req.fast_path;
// enclose the guard in a block to prevent the guard from polluting the async context
let (is_physical_region, is_physical_region_busy) = {
if let Some(state) = self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
{
(true, !state.logical_regions().is_empty())
} else {
// the second argument is not used, just pass in a dummy value
(false, true)
}
};
if is_physical_region {
// check if there is no logical region relates to this physical region
if is_physical_region_busy && !fast_path {
// reject if there is any present logical region
return Err(PhysicalRegionBusySnafu {
region_id: data_region_id,
}
.build());
}
return self.drop_physical_region(data_region_id).await;
}
if fast_path {
// for fast path, we don't delete the metadata in the metadata region.
// it only remove the logical region from the engine state.
//
// The drop database procedure will ensure the metadata region and data region are dropped eventually.
self.state
.write()
.unwrap()
.remove_logical_region(region_id)?;
Ok(0)
} else {
let metadata_region_id = self
.state
.read()
.unwrap()
.logical_regions()
.get(®ion_id)
.copied();
if let Some(metadata_region_id) = metadata_region_id {
self.drop_logical_region(region_id, metadata_region_id)
.await
} else {
Err(LogicalRegionNotFoundSnafu { region_id }.build())
}
}
}
async fn drop_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let metadata_region_id = utils::to_metadata_region_id(region_id);
// Drop mito regions.
// Since the physical regions are going to be dropped, we don't need to
// update the contents in metadata region.
self.mito
.handle_request(
data_region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
self.mito
.handle_request(
metadata_region_id,
RegionRequest::Drop(RegionDropRequest { fast_path: false }),
)
.await
.with_context(|_| CloseMitoRegionSnafu { region_id })?;
PHYSICAL_REGION_COUNT.dec();
// Update engine state
self.state
.write()
.unwrap()
.remove_physical_region(data_region_id)?;
Ok(0)
}
async fn drop_logical_region(
&self,
logical_region_id: RegionId,
physical_region_id: RegionId,
) -> Result<AffectedRows> {
// Update metadata
self.metadata_region
.remove_logical_region(physical_region_id, logical_region_id)
.await?;
// Update engine state
self.state
.write()
.unwrap()
.remove_logical_region(logical_region_id)?;
Ok(0)
}
}