use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_telemetry::info;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::SequenceNumber;
use crate::error::Result;
use crate::manifest::action::RegionEdit;
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::{MemtableBuilderRef, MemtableId};
use crate::region::options::RegionOptions;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::{SstVersion, SstVersionRef};
use crate::wal::EntryId;
#[derive(Debug)]
pub(crate) struct VersionControl {
data: RwLock<VersionControlData>,
}
impl VersionControl {
pub(crate) fn new(version: Version) -> VersionControl {
let (flushed_sequence, flushed_entry_id) =
(version.flushed_sequence, version.flushed_entry_id);
VersionControl {
data: RwLock::new(VersionControlData {
version: Arc::new(version),
committed_sequence: flushed_sequence,
last_entry_id: flushed_entry_id,
is_dropped: false,
}),
}
}
pub(crate) fn current(&self) -> VersionControlData {
self.data.read().unwrap().clone()
}
pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
let mut data = self.data.write().unwrap();
data.committed_sequence = seq;
data.last_entry_id = entry_id;
}
pub(crate) fn committed_sequence(&self) -> SequenceNumber {
self.data.read().unwrap().committed_sequence
}
pub(crate) fn freeze_mutable(&self) -> Result<()> {
let version = self.current().version;
let time_window = version.compaction_time_window;
let Some(new_memtables) = version
.memtables
.freeze_mutable(&version.metadata, time_window)?
else {
return Ok(());
};
let new_version = Arc::new(
VersionBuilder::from_version(version)
.memtables(new_memtables)
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
Ok(())
}
pub(crate) fn alter_options(&self, options: RegionOptions) {
let version = self.current().version;
let new_version = Arc::new(
VersionBuilder::from_version(version)
.options(options)
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
pub(crate) fn apply_edit(
&self,
edit: RegionEdit,
memtables_to_remove: &[MemtableId],
purger: FilePurgerRef,
) {
let version = self.current().version;
let new_version = Arc::new(
VersionBuilder::from_version(version)
.apply_edit(edit, purger)
.remove_memtables(memtables_to_remove)
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
next_memtable_id,
part_duration,
));
let mut data = self.data.write().unwrap();
data.is_dropped = true;
data.version.ssts.mark_all_deleted();
let new_version =
Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
data.version = new_version;
}
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
builder.clone(),
next_memtable_id,
part_duration,
));
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
VersionBuilder::from_version(version)
.metadata(metadata)
.memtables(MemtableVersion::new(new_mutable))
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
pub(crate) fn truncate(
&self,
truncated_entry_id: EntryId,
truncated_sequence: SequenceNumber,
memtable_builder: &MemtableBuilderRef,
) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
next_memtable_id,
part_duration,
));
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
.flushed_entry_id(truncated_entry_id)
.flushed_sequence(truncated_sequence)
.truncated_entry_id(Some(truncated_entry_id))
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version.ssts.mark_all_deleted();
version_data.version = new_version;
}
pub(crate) fn overwrite_current(&self, version: VersionRef) {
let mut version_data = self.data.write().unwrap();
version_data.version = version;
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
#[derive(Debug, Clone)]
pub(crate) struct VersionControlData {
pub(crate) version: VersionRef,
pub(crate) committed_sequence: SequenceNumber,
pub(crate) last_entry_id: EntryId,
pub(crate) is_dropped: bool,
}
#[derive(Clone, Debug)]
pub(crate) struct Version {
pub(crate) metadata: RegionMetadataRef,
pub(crate) memtables: MemtableVersionRef,
pub(crate) ssts: SstVersionRef,
pub(crate) flushed_entry_id: EntryId,
pub(crate) flushed_sequence: SequenceNumber,
pub(crate) truncated_entry_id: Option<EntryId>,
pub(crate) compaction_time_window: Option<Duration>,
pub(crate) options: RegionOptions,
}
pub(crate) type VersionRef = Arc<Version>;
pub(crate) struct VersionBuilder {
metadata: RegionMetadataRef,
memtables: MemtableVersionRef,
ssts: SstVersionRef,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
truncated_entry_id: Option<EntryId>,
compaction_time_window: Option<Duration>,
options: RegionOptions,
}
impl VersionBuilder {
pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
VersionBuilder {
metadata,
memtables: Arc::new(MemtableVersion::new(mutable)),
ssts: Arc::new(SstVersion::new()),
flushed_entry_id: 0,
flushed_sequence: 0,
truncated_entry_id: None,
compaction_time_window: None,
options: RegionOptions::default(),
}
}
pub(crate) fn from_version(version: VersionRef) -> Self {
VersionBuilder {
metadata: version.metadata.clone(),
memtables: version.memtables.clone(),
ssts: version.ssts.clone(),
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
truncated_entry_id: version.truncated_entry_id,
compaction_time_window: version.compaction_time_window,
options: version.options.clone(),
}
}
pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
self.memtables = Arc::new(memtables);
self
}
pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
self.metadata = metadata;
self
}
pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
self.flushed_entry_id = entry_id;
self
}
pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
self.flushed_sequence = sequence;
self
}
pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
self.truncated_entry_id = entry_id;
self
}
pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
self.compaction_time_window = window;
self
}
pub(crate) fn options(mut self, options: RegionOptions) -> Self {
self.options = options;
self
}
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
}
if let Some(sequence) = edit.flushed_sequence {
self.flushed_sequence = self.flushed_sequence.max(sequence);
}
if let Some(window) = edit.compaction_time_window {
self.compaction_time_window = Some(window);
}
if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
let mut ssts = (*self.ssts).clone();
ssts.add_files(file_purger, edit.files_to_add.into_iter());
ssts.remove_files(edit.files_to_remove.into_iter());
self.ssts = Arc::new(ssts);
}
self
}
pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
if !ids.is_empty() {
let mut memtables = (*self.memtables).clone();
memtables.remove_memtables(ids);
self.memtables = Arc::new(memtables);
}
self
}
pub(crate) fn add_files(
mut self,
file_purger: FilePurgerRef,
files: impl Iterator<Item = FileMeta>,
) -> Self {
let mut ssts = (*self.ssts).clone();
ssts.add_files(file_purger, files);
self.ssts = Arc::new(ssts);
self
}
pub(crate) fn build(self) -> Version {
let compaction_time_window = self
.options
.compaction
.time_window()
.or(self.compaction_time_window);
if self.compaction_time_window.is_some()
&& compaction_time_window != self.compaction_time_window
{
info!(
"VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
self.compaction_time_window,
compaction_time_window,
self.metadata.region_id
);
}
Version {
metadata: self.metadata,
memtables: self.memtables,
ssts: self.ssts,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window,
options: self.options,
}
}
}