pub mod opener;
pub mod options;
pub(crate) mod version;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::{error, info, warn};
use crossbeam_utils::atomic::AtomicCell;
use snafu::{ensure, OptionExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
};
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::error::{
FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result,
UpdateManifestSnafu,
};
use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;
const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
#[derive(Debug)]
pub struct RegionUsage {
pub region_id: RegionId,
pub wal_usage: u64,
pub sst_usage: u64,
pub manifest_usage: u64,
}
impl RegionUsage {
pub fn disk_usage(&self) -> u64 {
self.wal_usage + self.sst_usage + self.manifest_usage
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegionLeaderState {
Writable,
Altering,
Dropping,
Truncating,
Editing,
Downgrading,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegionRoleState {
Leader(RegionLeaderState),
Follower,
}
#[derive(Debug)]
pub(crate) struct MitoRegion {
pub(crate) region_id: RegionId,
pub(crate) version_control: VersionControlRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) provider: Provider,
last_flush_millis: AtomicI64,
last_compaction_millis: AtomicI64,
time_provider: TimeProviderRef,
pub(crate) memtable_builder: MemtableBuilderRef,
stats: ManifestStats,
}
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
impl MitoRegion {
pub(crate) async fn stop(&self) {
self.manifest_ctx
.manifest_manager
.write()
.await
.stop()
.await;
info!(
"Stopped region manifest manager, region_id: {}",
self.region_id
);
}
pub(crate) fn metadata(&self) -> RegionMetadataRef {
let version_data = self.version_control.current();
version_data.version.metadata.clone()
}
pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
let version_data = self.version_control.current();
version_data.version.metadata.primary_key_encoding
}
pub(crate) fn version(&self) -> VersionRef {
let version_data = self.version_control.current();
version_data.version
}
pub(crate) fn last_flush_millis(&self) -> i64 {
self.last_flush_millis.load(Ordering::Relaxed)
}
pub(crate) fn update_flush_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}
pub(crate) fn last_compaction_millis(&self) -> i64 {
self.last_compaction_millis.load(Ordering::Relaxed)
}
pub(crate) fn update_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_compaction_millis.store(now, Ordering::Relaxed);
}
pub(crate) fn region_dir(&self) -> &str {
self.access_layer.region_dir()
}
pub(crate) fn is_writable(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable)
}
pub(crate) fn is_flushable(&self) -> bool {
matches!(
self.manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Downgrading)
)
}
pub(crate) fn is_downgrading(&self) -> bool {
matches!(
self.manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Downgrading)
)
}
pub(crate) fn is_follower(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Follower
}
pub(crate) fn state(&self) -> RegionRoleState {
self.manifest_ctx.state.load()
}
pub(crate) fn set_role(&self, next_role: RegionRole) {
self.manifest_ctx.set_role(next_role, self.region_id);
}
pub(crate) fn set_altering(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Altering),
)
}
pub(crate) fn set_dropping(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Dropping),
)
}
pub(crate) fn set_truncating(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Truncating),
)
}
pub(crate) fn set_editing(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Editing),
)
}
pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) {
let _manager = self.manifest_ctx.manifest_manager.write().await;
self.set_role(state.into());
}
pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
if let Err(e) = self
.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
{
error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
}
}
pub(crate) fn region_statistic(&self) -> RegionStatistic {
let version = self.version();
let memtables = &version.memtables;
let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
let sst_usage = version.ssts.sst_usage();
let index_usage = version.ssts.index_usage();
let wal_usage = self.estimated_wal_usage(memtable_usage);
let manifest_usage = self.stats.total_manifest_size();
let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
let manifest_version = self.stats.manifest_version();
let flushed_entry_id = version.flushed_entry_id;
RegionStatistic {
num_rows,
memtable_size: memtable_usage,
wal_size: wal_usage,
manifest_size: manifest_usage,
sst_size: sst_usage,
index_size: index_usage,
manifest: RegionManifestInfo::Mito {
manifest_version,
flushed_entry_id,
},
}
}
fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
}
fn compare_exchange_state(
&self,
expect: RegionLeaderState,
state: RegionRoleState,
) -> Result<()> {
self.manifest_ctx
.state
.compare_exchange(RegionRoleState::Leader(expect), state)
.map_err(|actual| {
RegionStateSnafu {
region_id: self.region_id,
state: actual,
expect: RegionRoleState::Leader(expect),
}
.build()
})?;
Ok(())
}
}
#[derive(Debug)]
pub(crate) struct ManifestContext {
manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
state: AtomicCell<RegionRoleState>,
}
impl ManifestContext {
pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
ManifestContext {
manifest_manager: tokio::sync::RwLock::new(manager),
state: AtomicCell::new(state),
}
}
pub(crate) async fn manifest_version(&self) -> ManifestVersion {
self.manifest_manager
.read()
.await
.manifest()
.manifest_version
}
pub(crate) async fn has_update(&self) -> Result<bool> {
self.manifest_manager.read().await.has_update().await
}
pub(crate) async fn install_manifest_to(
&self,
version: ManifestVersion,
) -> Result<Arc<RegionManifest>> {
let mut manager = self.manifest_manager.write().await;
manager.install_manifest_to(version).await?;
Ok(manager.manifest())
}
pub(crate) async fn update_manifest(
&self,
expect_state: RegionLeaderState,
action_list: RegionMetaActionList,
) -> Result<ManifestVersion> {
let mut manager = self.manifest_manager.write().await;
let manifest = manager.manifest();
let current_state = self.state.load();
if expect_state != RegionLeaderState::Downgrading {
if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
info!(
"Region {} is in downgrading leader state, updating manifest. state is {:?}",
manifest.metadata.region_id, expect_state
);
}
ensure!(
current_state == RegionRoleState::Leader(expect_state)
|| current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
UpdateManifestSnafu {
region_id: manifest.metadata.region_id,
state: current_state,
}
);
} else {
ensure!(
current_state == RegionRoleState::Leader(expect_state),
RegionStateSnafu {
region_id: manifest.metadata.region_id,
state: current_state,
expect: RegionRoleState::Leader(expect_state),
}
);
}
for action in &action_list.actions {
let RegionMetaAction::Edit(edit) = &action else {
continue;
};
let Some(truncated_entry_id) = manifest.truncated_entry_id else {
continue;
};
if let Some(flushed_entry_id) = edit.flushed_entry_id {
ensure!(
truncated_entry_id < flushed_entry_id,
RegionTruncatedSnafu {
region_id: manifest.metadata.region_id,
}
);
}
if !edit.files_to_remove.is_empty() {
for file in &edit.files_to_remove {
ensure!(
manifest.files.contains_key(&file.file_id),
RegionTruncatedSnafu {
region_id: manifest.metadata.region_id,
}
);
}
}
}
let version = manager.update(action_list).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;
if self.state.load() == RegionRoleState::Follower {
warn!(
"Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
manifest.metadata.region_id
);
}
Ok(version)
}
pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
match next_role {
RegionRole::Follower => {
match self.state.fetch_update(|state| {
if !matches!(state, RegionRoleState::Follower) {
Some(RegionRoleState::Follower)
} else {
None
}
}) {
Ok(state) => info!(
"Convert region {} to follower, previous role state: {:?}",
region_id, state
),
Err(state) => {
if state != RegionRoleState::Follower {
warn!(
"Failed to convert region {} to follower, current role state: {:?}",
region_id, state
)
}
}
}
}
RegionRole::Leader => {
match self.state.fetch_update(|state| {
if matches!(
state,
RegionRoleState::Follower
| RegionRoleState::Leader(RegionLeaderState::Downgrading)
) {
Some(RegionRoleState::Leader(RegionLeaderState::Writable))
} else {
None
}
}) {
Ok(state) => info!(
"Convert region {} to leader, previous role state: {:?}",
region_id, state
),
Err(state) => {
if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
warn!(
"Failed to convert region {} to leader, current role state: {:?}",
region_id, state
)
}
}
}
}
RegionRole::DowngradingLeader => {
match self.state.compare_exchange(
RegionRoleState::Leader(RegionLeaderState::Writable),
RegionRoleState::Leader(RegionLeaderState::Downgrading),
) {
Ok(state) => info!(
"Convert region {} to downgrading region, previous role state: {:?}",
region_id, state
),
Err(state) => {
if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
warn!(
"Failed to convert region {} to downgrading leader, current role state: {:?}",
region_id, state
)
}
}
}
}
}
}
}
#[cfg(test)]
impl ManifestContext {
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
self.manifest_manager.read().await.manifest()
}
}
pub(crate) type ManifestContextRef = Arc<ManifestContext>;
#[derive(Debug, Default)]
pub(crate) struct RegionMap {
regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
}
impl RegionMap {
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
let regions = self.regions.read().unwrap();
regions.contains_key(®ion_id)
}
pub(crate) fn insert_region(&self, region: MitoRegionRef) {
let mut regions = self.regions.write().unwrap();
regions.insert(region.region_id, region);
}
pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
let regions = self.regions.read().unwrap();
regions.get(®ion_id).cloned()
}
pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_writable(),
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Leader(RegionLeaderState::Writable),
}
);
Ok(region)
}
pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_follower(),
RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Follower,
}
);
Ok(region)
}
pub(crate) fn get_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })
{
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
}
}
}
pub(crate) fn writable_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self.writable_region(region_id) {
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
}
}
}
fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
ensure!(
region.is_flushable(),
FlushableRegionStateSnafu {
region_id,
state: region.state(),
}
);
Ok(region)
}
pub(crate) fn flushable_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self.flushable_region(region_id) {
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
}
}
}
pub(crate) fn remove_region(&self, region_id: RegionId) {
let mut regions = self.regions.write().unwrap();
regions.remove(®ion_id);
}
pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
let regions = self.regions.read().unwrap();
regions.values().cloned().collect()
}
pub(crate) fn clear(&self) {
self.regions.write().unwrap().clear();
}
}
pub(crate) type RegionMapRef = Arc<RegionMap>;
#[derive(Debug, Default)]
pub(crate) struct OpeningRegions {
regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
}
impl OpeningRegions {
pub(crate) fn wait_for_opening_region(
&self,
region_id: RegionId,
sender: OptionOutputTx,
) -> Option<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
match regions.entry(region_id) {
Entry::Occupied(mut senders) => {
senders.get_mut().push(sender);
None
}
Entry::Vacant(_) => Some(sender),
}
}
pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
let regions = self.regions.read().unwrap();
regions.contains_key(®ion_id)
}
pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
let mut regions = self.regions.write().unwrap();
regions.insert(region, vec![sender]);
}
pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
let mut regions = self.regions.write().unwrap();
regions.remove(®ion_id).unwrap_or_default()
}
#[cfg(test)]
pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
let regions = self.regions.read().unwrap();
if let Some(senders) = regions.get(®ion_id) {
senders.len()
} else {
0
}
}
}
pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
#[derive(Default, Debug, Clone)]
pub(crate) struct ManifestStats {
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
}
impl ManifestStats {
fn total_manifest_size(&self) -> u64 {
self.total_manifest_size.load(Ordering::Relaxed)
}
fn manifest_version(&self) -> u64 {
self.manifest_version.load(Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crossbeam_utils::atomic::AtomicCell;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
#[test]
fn test_region_state_lock_free() {
assert!(AtomicCell::<RegionRoleState>::is_lock_free());
}
#[tokio::test]
async fn test_set_region_state() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
let region_id = RegionId::new(1024, 0);
manifest_ctx.set_role(RegionRole::Follower, region_id);
assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
manifest_ctx.set_role(RegionRole::Leader, region_id);
assert_eq!(
manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Writable)
);
manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
assert_eq!(
manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Downgrading)
);
manifest_ctx.set_role(RegionRole::Follower, region_id);
assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
manifest_ctx.set_role(RegionRole::Leader, region_id);
manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
assert_eq!(
manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Downgrading)
);
manifest_ctx.set_role(RegionRole::Leader, region_id);
assert_eq!(
manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Writable)
);
}
}