use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common_telemetry::{debug, error, info, trace};
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::{mpsc, watch};
use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::{VersionControlData, VersionControlRef};
use crate::region::{ManifestContextRef, RegionLeaderState};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::FileMeta;
use crate::sst::parquet::WriteOptions;
use crate::worker::WorkerListener;
pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
fn should_flush_engine(&self) -> bool;
fn should_stall(&self) -> bool;
fn reserve_mem(&self, mem: usize);
fn schedule_free_mem(&self, mem: usize);
fn free_mem(&self, mem: usize);
fn memory_usage(&self) -> usize;
}
pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
#[derive(Debug)]
pub struct WriteBufferManagerImpl {
global_write_buffer_size: usize,
mutable_limit: usize,
memory_used: AtomicUsize,
memory_active: AtomicUsize,
notifier: Option<watch::Sender<()>>,
}
impl WriteBufferManagerImpl {
pub fn new(global_write_buffer_size: usize) -> Self {
Self {
global_write_buffer_size,
mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
memory_used: AtomicUsize::new(0),
memory_active: AtomicUsize::new(0),
notifier: None,
}
}
pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
self.notifier = Some(notifier);
self
}
pub fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
}
fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
global_write_buffer_size / 2
}
}
impl WriteBufferManager for WriteBufferManagerImpl {
fn should_flush_engine(&self) -> bool {
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
if mutable_memtable_memory_usage > self.mutable_limit {
debug!(
"Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
mutable_memtable_memory_usage, self.memory_usage(), self.mutable_limit, self.global_write_buffer_size,
);
return true;
}
let memory_usage = self.memory_used.load(Ordering::Relaxed);
if memory_usage >= self.global_write_buffer_size {
if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
debug!(
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
mutable_usage: {}.",
memory_usage,
self.global_write_buffer_size,
mutable_memtable_memory_usage);
return true;
} else {
trace!(
"Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
memory_usage,
self.global_write_buffer_size,
mutable_memtable_memory_usage);
}
}
false
}
fn should_stall(&self) -> bool {
self.memory_usage() >= self.global_write_buffer_size
}
fn reserve_mem(&self, mem: usize) {
self.memory_used.fetch_add(mem, Ordering::Relaxed);
self.memory_active.fetch_add(mem, Ordering::Relaxed);
}
fn schedule_free_mem(&self, mem: usize) {
self.memory_active.fetch_sub(mem, Ordering::Relaxed);
}
fn free_mem(&self, mem: usize) {
self.memory_used.fetch_sub(mem, Ordering::Relaxed);
if let Some(notifier) = &self.notifier {
let _ = notifier.send(());
}
}
fn memory_usage(&self) -> usize {
self.memory_used.load(Ordering::Relaxed)
}
}
#[derive(Debug, IntoStaticStr)]
pub enum FlushReason {
Others,
EngineFull,
Manual,
Alter,
Periodically,
Downgrading,
}
impl FlushReason {
fn as_str(&self) -> &'static str {
self.into()
}
}
pub(crate) struct RegionFlushTask {
pub(crate) region_id: RegionId,
pub(crate) reason: FlushReason,
pub(crate) senders: Vec<OutputTx>,
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) access_layer: AccessLayerRef,
pub(crate) listener: WorkerListener,
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
pub(crate) cache_manager: CacheManagerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub(crate) index_options: IndexOptions,
}
impl RegionFlushTask {
pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
if let Some(sender) = sender.take_inner() {
self.senders.push(sender);
}
}
fn on_success(self) {
for sender in self.senders {
sender.send(Ok(0));
}
}
fn on_failure(&mut self, err: Arc<Error>) {
for sender in self.senders.drain(..) {
sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
}
fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
let version_data = version_control.current();
Box::pin(async move {
INFLIGHT_FLUSH_COUNT.inc();
self.do_flush(version_data).await;
INFLIGHT_FLUSH_COUNT.dec();
})
}
async fn do_flush(&mut self, version_data: VersionControlData) {
let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
self.listener.on_flush_begin(self.region_id).await;
let worker_request = match self.flush_memtables(&version_data).await {
Ok(edit) => {
let memtables_to_remove = version_data
.version
.memtables
.immutables()
.iter()
.map(|m| m.id())
.collect();
let flush_finished = FlushFinished {
region_id: self.region_id,
flushed_entry_id: version_data.last_entry_id,
senders: std::mem::take(&mut self.senders),
_timer: timer,
edit,
memtables_to_remove,
};
WorkerRequest::Background {
region_id: self.region_id,
notify: BackgroundNotify::FlushFinished(flush_finished),
}
}
Err(e) => {
error!(e; "Failed to flush region {}", self.region_id);
timer.stop_and_discard();
let err = Arc::new(e);
self.on_failure(err.clone());
WorkerRequest::Background {
region_id: self.region_id,
notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
}
}
};
self.send_worker_request(worker_request).await;
}
async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
let version = &version_data.version;
let timer = FLUSH_ELAPSED
.with_label_values(&["flush_memtables"])
.start_timer();
let mut write_opts = WriteOptions {
write_buffer_size: self.engine_config.sst_write_buffer_size,
..Default::default()
};
if let Some(row_group_size) = self.row_group_size {
write_opts.row_group_size = row_group_size;
}
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
let mut flushed_bytes = 0;
for mem in memtables {
if mem.is_empty() {
continue;
}
let max_sequence = mem.stats().max_sequence();
let iter = mem.iter(None, None, None)?;
let source = Source::Iter(iter);
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
};
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts)
.await?;
if ssts_written.is_empty() {
continue;
}
file_metas.extend(ssts_written.into_iter().map(|sst_info| {
flushed_bytes += sst_info.file_size;
FileMeta {
region_id: self.region_id,
file_id: sst_info.file_id,
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
sequence: NonZeroU64::new(max_sequence),
}
}));
}
if !file_metas.is_empty() {
FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
}
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
info!(
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}s",
self.region_id,
self.reason.as_str(),
file_ids,
timer.stop_and_record(),
);
let edit = RegionEdit {
files_to_add: file_metas,
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(version_data.last_entry_id),
flushed_sequence: Some(version_data.committed_sequence),
};
info!("Applying {edit:?} to region {}", self.region_id);
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
RegionLeaderState::Downgrading
} else {
RegionLeaderState::Writable
};
let version = self
.manifest_ctx
.update_manifest(expected_state, action_list)
.await?;
info!(
"Successfully update manifest version to {version}, region: {}, reason: {}",
self.region_id,
self.reason.as_str()
);
Ok(edit)
}
async fn send_worker_request(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
error!(
"Failed to notify flush job status for region {}, request: {:?}",
self.region_id, e.0
);
}
}
fn merge(&mut self, mut other: RegionFlushTask) {
assert_eq!(self.region_id, other.region_id);
self.senders.append(&mut other.senders);
}
}
pub(crate) struct FlushScheduler {
region_status: HashMap<RegionId, FlushStatus>,
scheduler: SchedulerRef,
}
impl FlushScheduler {
pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
FlushScheduler {
region_status: HashMap::new(),
scheduler,
}
}
pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
self.region_status.contains_key(®ion_id)
}
pub(crate) fn schedule_flush(
&mut self,
region_id: RegionId,
version_control: &VersionControlRef,
task: RegionFlushTask,
) -> Result<()> {
debug_assert_eq!(region_id, task.region_id);
let version = version_control.current().version;
if version.memtables.is_empty() {
debug_assert!(!self.region_status.contains_key(®ion_id));
task.on_success();
return Ok(());
}
FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();
let flush_status = self
.region_status
.entry(region_id)
.or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
if flush_status.flushing {
flush_status.merge_task(task);
return Ok(());
}
if flush_status.pending_task.is_some() {
flush_status.merge_task(task);
return Ok(());
}
if let Err(e) = version_control.freeze_mutable() {
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
self.region_status.remove(®ion_id);
return Err(e);
}
let job = task.into_flush_job(version_control);
if let Err(e) = self.scheduler.schedule(job) {
error!(e; "Failed to schedule flush job for region {}", region_id);
self.region_status.remove(®ion_id);
return Err(e);
}
flush_status.flushing = true;
Ok(())
}
pub(crate) fn on_flush_success(
&mut self,
region_id: RegionId,
) -> Option<(Vec<SenderDdlRequest>, Vec<SenderWriteRequest>)> {
let flush_status = self.region_status.get_mut(®ion_id)?;
flush_status.flushing = false;
let pending_requests = if flush_status.pending_task.is_none() {
let flush_status = self.region_status.remove(®ion_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
let version_data = flush_status.version_control.current();
if version_data.version.memtables.is_empty() {
let task = flush_status.pending_task.take().unwrap();
task.on_success();
let flush_status = self.region_status.remove(®ion_id).unwrap();
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
None
}
};
if let Err(e) = self.schedule_next_flush() {
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
}
pending_requests
}
pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
FLUSH_ERRORS_TOTAL.inc();
let Some(flush_status) = self.region_status.remove(®ion_id) else {
return;
};
flush_status.on_failure(err);
if let Err(e) = self.schedule_next_flush() {
error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
}
}
pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionDroppedSnafu { region_id }.build()),
);
}
pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
}
pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
self.remove_region_on_failure(
region_id,
Arc::new(RegionTruncatedSnafu { region_id }.build()),
);
}
fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
let Some(flush_status) = self.region_status.remove(®ion_id) else {
return;
};
flush_status.on_failure(err);
}
pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
let status = self.region_status.get_mut(&request.region_id).unwrap();
status.pending_ddls.push(request);
}
pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
let status = self
.region_status
.get_mut(&request.request.region_id)
.unwrap();
status.pending_writes.push(request);
}
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
self.region_status
.get(®ion_id)
.map(|status| !status.pending_ddls.is_empty())
.unwrap_or(false)
}
pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
debug_assert!(self
.region_status
.values()
.all(|status| status.flushing || status.pending_task.is_some()));
let Some(flush_status) = self
.region_status
.values_mut()
.find(|status| status.pending_task.is_some())
else {
return Ok(());
};
debug_assert!(!flush_status.flushing);
let task = flush_status.pending_task.take().unwrap();
let region_id = flush_status.region_id;
let version_control = flush_status.version_control.clone();
self.schedule_flush(region_id, &version_control, task)
}
}
impl Drop for FlushScheduler {
fn drop(&mut self) {
for (region_id, flush_status) in self.region_status.drain() {
flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
}
}
}
struct FlushStatus {
region_id: RegionId,
version_control: VersionControlRef,
flushing: bool,
pending_task: Option<RegionFlushTask>,
pending_ddls: Vec<SenderDdlRequest>,
pending_writes: Vec<SenderWriteRequest>,
}
impl FlushStatus {
fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
FlushStatus {
region_id,
version_control,
flushing: false,
pending_task: None,
pending_ddls: Vec::new(),
pending_writes: Vec::new(),
}
}
fn merge_task(&mut self, task: RegionFlushTask) {
if let Some(pending) = &mut self.pending_task {
pending.merge(task);
} else {
self.pending_task = Some(task);
}
}
fn on_failure(self, err: Arc<Error>) {
if let Some(mut task) = self.pending_task {
task.on_failure(err.clone());
}
for ddl in self.pending_ddls {
ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
for write_req in self.pending_writes {
write_req
.sender
.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region_id,
}));
}
}
}
#[cfg(test)]
mod tests {
use tokio::sync::oneshot;
use super::*;
use crate::cache::CacheManager;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{write_rows_to_version, VersionControlBuilder};
#[test]
fn test_get_mutable_limit() {
assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
}
#[test]
fn test_over_mutable_limit() {
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(400);
assert!(!manager.should_flush_engine());
assert!(!manager.should_stall());
manager.reserve_mem(400);
assert!(manager.should_flush_engine());
manager.schedule_free_mem(400);
assert!(!manager.should_flush_engine());
assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
manager.free_mem(400);
assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
}
#[test]
fn test_over_global() {
let manager = WriteBufferManagerImpl::new(1000);
manager.reserve_mem(1100);
assert!(manager.should_stall());
manager.schedule_free_mem(200);
assert!(manager.should_flush_engine());
manager.schedule_free_mem(450);
assert!(!manager.should_flush_engine());
manager.reserve_mem(50);
assert!(manager.should_flush_engine());
manager.reserve_mem(100);
assert!(manager.should_flush_engine());
}
#[test]
fn test_manager_notify() {
let (sender, receiver) = watch::channel(());
let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
manager.reserve_mem(500);
assert!(!receiver.has_changed().unwrap());
manager.schedule_free_mem(500);
assert!(!receiver.has_changed().unwrap());
manager.free_mem(500);
assert!(receiver.has_changed().unwrap());
}
#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let (output_tx, output_rx) = oneshot::channel();
let mut task = RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
senders: Vec::new(),
request_sender: tx,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await,
index_options: IndexOptions::default(),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
assert!(scheduler.region_status.is_empty());
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
assert!(scheduler.region_status.is_empty());
}
#[tokio::test]
async fn test_schedule_pending_request() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let mut builder = VersionControlBuilder::new();
builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
let version_control = Arc::new(builder.build());
let version_data = version_control.current();
write_rows_to_version(&version_data.version, "host0", 0, 10);
let manifest_ctx = env
.mock_manifest_context(version_data.version.metadata.clone())
.await;
let mut tasks: Vec<_> = (0..3)
.map(|_| RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
senders: Vec::new(),
request_sender: tx.clone(),
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
})
.collect();
let task = tasks.pop().unwrap();
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
let version_data = version_control.current();
assert_eq!(0, version_data.version.memtables.immutables()[0].id());
let output_rxs: Vec<_> = tasks
.into_iter()
.map(|mut task| {
let (output_tx, output_rx) = oneshot::channel();
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
output_rx
})
.collect();
version_control.apply_edit(
RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
&[0],
builder.file_purger(),
);
scheduler.on_flush_success(builder.region_id());
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler.region_status.is_empty());
for output_rx in output_rxs {
let output = output_rx.await.unwrap().unwrap();
assert_eq!(output, 0);
}
}
}