#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod batch_open_test;
#[cfg(test)]
mod catchup_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
mod compaction_test;
#[cfg(test)]
mod create_test;
#[cfg(test)]
mod drop_test;
#[cfg(test)]
mod edit_region_test;
#[cfg(test)]
mod filter_deleted_test;
#[cfg(test)]
mod flush_test;
#[cfg(any(test, feature = "test"))]
pub mod listener;
#[cfg(test)]
mod merge_mode_test;
#[cfg(test)]
mod open_test;
#[cfg(test)]
mod parallel_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod prune_test;
#[cfg(test)]
mod row_selector_test;
#[cfg(test)]
mod set_role_state_test;
#[cfg(test)]
mod sync_test;
#[cfg(test)]
mod truncate_test;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use api::region::RegionResponse;
use async_trait::async_trait;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing;
use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use futures::future::{join_all, try_join_all};
use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use tokio::sync::{oneshot, Semaphore};
use crate::cache::CacheStrategy;
use crate::config::MitoConfig;
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
};
use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
use crate::worker::WorkerGroup;
pub const MITO_ENGINE_NAME: &str = "mito";
#[derive(Clone)]
pub struct MitoEngine {
inner: Arc<EngineInner>,
}
impl MitoEngine {
pub async fn new<S: LogStore>(
data_home: &str,
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
Ok(MitoEngine {
inner: Arc::new(
EngineInner::new(
config,
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await?,
),
})
}
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}
pub fn is_region_opening(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_opening(region_id)
}
pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
self.inner
.workers
.get_region(region_id)
.map(|region| region.region_statistic())
}
pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
self.inner
.workers
.get_region(region_id)
.map(|r| r.primary_key_encoding())
}
#[tracing::instrument(skip_all)]
pub async fn scan_to_stream(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
}
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner()
}
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.scan_region(region_id, request)
}
pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
let _timer = HANDLE_REQUEST_ELAPSED
.with_label_values(&["edit_region"])
.start_timer();
ensure!(
is_valid_region_edit(&edit),
InvalidRequestSnafu {
region_id,
reason: "invalid region edit"
}
);
let (tx, rx) = oneshot::channel();
let request = WorkerRequest::EditRegion(RegionEditRequest {
region_id,
edit,
tx,
});
self.inner
.workers
.submit_to_worker(region_id, request)
.await?;
rx.await.context(RecvSnafu)?
}
#[cfg(test)]
pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
self.inner.workers.get_region(id)
}
}
fn is_valid_region_edit(edit: &RegionEdit) -> bool {
!edit.files_to_add.is_empty()
&& edit.files_to_remove.is_empty()
&& matches!(
edit,
RegionEdit {
files_to_add: _,
files_to_remove: _,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
}
)
}
struct EngineInner {
workers: WorkerGroup,
config: Arc<MitoConfig>,
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
}
type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
fn prepare_batch_open_requests(
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<(
TopicGroupedRegionOpenRequests,
Vec<(RegionId, RegionOpenRequest)>,
)> {
let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
for (region_id, request) in requests {
let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
serde_json::from_str(options).context(SerdeJsonSnafu)?
} else {
WalOptions::RaftEngine
};
match options {
WalOptions::Kafka(options) => {
topic_to_regions
.entry(options.topic)
.or_default()
.push((region_id, request));
}
WalOptions::RaftEngine | WalOptions::Noop => {
remaining_regions.push((region_id, request));
}
}
}
Ok((topic_to_regions, remaining_regions))
}
impl EngineInner {
async fn new<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<EngineInner> {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(EngineInner {
workers: WorkerGroup::start(
config.clone(),
log_store,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await?,
config,
wal_raw_entry_reader,
})
}
async fn stop(&self) -> Result<()> {
self.workers.stop().await
}
fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
Ok(region.metadata())
}
async fn open_topic_regions(
&self,
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
.collect::<Vec<_>>();
let provider = Provider::kafka_provider(topic);
let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
provider,
self.wal_raw_entry_reader.clone(),
®ion_ids,
DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
);
let mut responses = Vec::with_capacity(region_requests.len());
for ((region_id, request), entry_receiver) in
region_requests.into_iter().zip(entry_receivers)
{
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
self.workers.submit_to_worker(region_id, request).await?;
responses.push(async move { receiver.await.context(RecvSnafu)? });
}
let distribution =
common_runtime::spawn_global(async move { distributor.distribute().await });
let responses = join_all(responses).await;
distribution.await.context(JoinSnafu)??;
Ok(region_ids.into_iter().zip(responses).collect())
}
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let (topic_to_region_requests, remaining_region_requests) =
prepare_batch_open_requests(requests)?;
let mut responses =
Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
if !topic_to_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
for (topic, region_requests) in topic_to_region_requests {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
self.open_topic_regions(topic, region_requests).await
})
}
let r = try_join_all(tasks).await?;
responses.extend(r.into_iter().flatten());
}
if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
let semaphore_moved = semaphore.clone();
region_ids.push(region_id);
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, None);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
})
}
let results = join_all(tasks).await;
responses.extend(region_ids.into_iter().zip(results));
}
Ok(responses)
}
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<AffectedRows> {
let region_metadata = self.get_metadata(region_id).ok();
let (request, receiver) =
WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
}
fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version_ctrl = ®ion.version_control;
let seq = Some(version_ctrl.committed_sequence());
Ok(seq)
}
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let cache_manager = self.workers.cache_manager();
let scan_region = ScanRegion::new(
version,
region.access_layer.clone(),
request,
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start);
Ok(scan_region)
}
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
region.set_role(role);
Ok(())
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,
region_role_state: SettableRegionRoleState,
) -> Result<SetRegionRoleStateResponse> {
let (request, receiver) =
WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)
}
async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<ManifestVersion> {
ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
let manifest_version = manifest_info.data_manifest_version();
let (request, receiver) =
WorkerRequest::new_sync_region_request(region_id, manifest_version);
self.workers.submit_to_worker(region_id, request).await?;
receiver.await.context(RecvSnafu)?
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_follower() {
RegionRole::Follower
} else {
RegionRole::Leader
}
})
}
}
#[async_trait]
impl RegionEngine for MitoEngine {
fn name(&self) -> &str {
MITO_ENGINE_NAME
}
#[tracing::instrument(skip_all)]
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
self.inner
.handle_batch_open_requests(parallelism, requests)
.await
.map(|responses| {
responses
.into_iter()
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<RegionResponse, BoxedError> {
let _timer = HANDLE_REQUEST_ELAPSED
.with_label_values(&[request.request_type()])
.start_timer();
self.inner
.handle_request(region_id, request)
.await
.map(RegionResponse::new)
.map_err(BoxedError::new)
}
#[tracing::instrument(skip_all)]
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.scan_region(region_id, request)
.map_err(BoxedError::new)?
.region_scanner()
.map_err(BoxedError::new)
}
async fn get_last_seq_num(
&self,
region_id: RegionId,
) -> Result<Option<SequenceNumber>, BoxedError> {
self.inner
.get_last_seq_num(region_id)
.map_err(BoxedError::new)
}
async fn get_metadata(
&self,
region_id: RegionId,
) -> std::result::Result<RegionMetadataRef, BoxedError> {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}
async fn stop(&self) -> std::result::Result<(), BoxedError> {
self.inner.stop().await.map_err(BoxedError::new)
}
fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
self.get_region_statistic(region_id)
}
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
self.inner
.set_region_role(region_id, role)
.map_err(BoxedError::new)
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,
region_role_state: SettableRegionRoleState,
) -> Result<SetRegionRoleStateResponse, BoxedError> {
let _timer = HANDLE_REQUEST_ELAPSED
.with_label_values(&["set_region_role_state_gracefully"])
.start_timer();
self.inner
.set_region_role_state_gracefully(region_id, region_role_state)
.await
.map_err(BoxedError::new)
}
async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
self.inner
.sync_region(region_id, manifest_info)
.await
.map_err(BoxedError::new)
.map(|_| ())
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(any(test, feature = "test"))]
#[allow(clippy::too_many_arguments)]
impl MitoEngine {
pub async fn new_for_test<S: LogStore>(
data_home: &str,
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: crate::time_provider::TimeProviderRef,
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
config.clone(),
log_store,
object_store_manager,
write_buffer_manager,
listener,
schema_metadata_manager,
time_provider,
)
.await?,
config,
wal_raw_entry_reader,
}),
})
}
pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
self.inner.workers.purge_scheduler()
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::sst::file::FileMeta;
#[test]
fn test_is_valid_region_edit() {
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
assert!(is_valid_region_edit(&edit));
let edit = RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
assert!(!is_valid_region_edit(&edit));
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![FileMeta::default()],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
assert!(!is_valid_region_edit(&edit));
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
compaction_time_window: Some(Duration::from_secs(1)),
flushed_entry_id: None,
flushed_sequence: None,
};
assert!(!is_valid_region_edit(&edit));
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: Some(1),
flushed_sequence: None,
};
assert!(!is_valid_region_edit(&edit));
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: Some(1),
};
assert!(!is_valid_region_edit(&edit));
}
}