use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::{Arc, Mutex};
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use common_time::Timestamp;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::{RegionOpenRequest, RegionRequest};
use crate::storage::{RegionId, ScanRequest};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum SettableRegionRoleState {
Follower,
DowngradingLeader,
}
impl From<SettableRegionRoleState> for RegionRole {
fn from(value: SettableRegionRoleState) -> Self {
match value {
SettableRegionRoleState::Follower => RegionRole::Follower,
SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct SetRegionRoleStateRequest {
region_id: RegionId,
region_role_state: SettableRegionRoleState,
}
#[derive(Debug, PartialEq, Eq)]
pub enum SetRegionRoleStateResponse {
Success {
last_entry_id: Option<entry::Id>,
},
NotFound,
}
impl SetRegionRoleStateResponse {
pub fn success(last_entry_id: Option<entry::Id>) -> Self {
Self::Success { last_entry_id }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GrantedRegion {
pub region_id: RegionId,
pub region_role: RegionRole,
}
impl GrantedRegion {
pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
Self {
region_id,
region_role,
}
}
}
impl From<GrantedRegion> for PbGrantedRegion {
fn from(value: GrantedRegion) -> Self {
PbGrantedRegion {
region_id: value.region_id.as_u64(),
role: PbRegionRole::from(value.region_role).into(),
}
}
}
impl From<PbGrantedRegion> for GrantedRegion {
fn from(value: PbGrantedRegion) -> Self {
GrantedRegion {
region_id: RegionId::from_u64(value.region_id),
region_role: value.role().into(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegionRole {
Follower,
Leader,
DowngradingLeader,
}
impl Display for RegionRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RegionRole::Follower => write!(f, "Follower"),
RegionRole::Leader => write!(f, "Leader"),
RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
}
}
}
impl RegionRole {
pub fn writable(&self) -> bool {
matches!(self, RegionRole::Leader)
}
}
impl From<RegionRole> for PbRegionRole {
fn from(value: RegionRole) -> Self {
match value {
RegionRole::Follower => PbRegionRole::Follower,
RegionRole::Leader => PbRegionRole::Leader,
RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
}
}
}
impl From<PbRegionRole> for RegionRole {
fn from(value: PbRegionRole) -> Self {
match value {
PbRegionRole::Leader => RegionRole::Leader,
PbRegionRole::Follower => RegionRole::Follower,
PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
}
}
}
#[derive(Debug)]
pub enum ScannerPartitioning {
Unknown(usize),
}
impl ScannerPartitioning {
pub fn num_partitions(&self) -> usize {
match self {
ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PartitionRange {
pub start: Timestamp,
pub end: Timestamp,
pub num_rows: usize,
pub identifier: usize,
}
#[derive(Debug, Default)]
pub struct ScannerProperties {
pub partitions: Vec<Vec<PartitionRange>>,
append_mode: bool,
total_rows: usize,
pub distinguish_partition_range: bool,
target_partitions: usize,
}
impl ScannerProperties {
pub fn with_append_mode(mut self, append_mode: bool) -> Self {
self.append_mode = append_mode;
self
}
pub fn with_total_rows(mut self, total_rows: usize) -> Self {
self.total_rows = total_rows;
self
}
pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
Self {
partitions,
append_mode,
total_rows,
distinguish_partition_range: false,
target_partitions: 0,
}
}
pub fn prepare(&mut self, request: PrepareRequest) {
if let Some(ranges) = request.ranges {
self.partitions = ranges;
}
if let Some(distinguish_partition_range) = request.distinguish_partition_range {
self.distinguish_partition_range = distinguish_partition_range;
}
if let Some(target_partitions) = request.target_partitions {
self.target_partitions = target_partitions;
}
}
pub fn num_partitions(&self) -> usize {
self.partitions.len()
}
pub fn append_mode(&self) -> bool {
self.append_mode
}
pub fn total_rows(&self) -> usize {
self.total_rows
}
pub fn target_partitions(&self) -> usize {
if self.target_partitions == 0 {
self.num_partitions()
} else {
self.target_partitions
}
}
}
#[derive(Default)]
pub struct PrepareRequest {
pub ranges: Option<Vec<Vec<PartitionRange>>>,
pub distinguish_partition_range: Option<bool>,
pub target_partitions: Option<usize>,
}
impl PrepareRequest {
pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
self.ranges = Some(ranges);
self
}
pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
self.distinguish_partition_range = Some(distinguish_partition_range);
self
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = Some(target_partitions);
self
}
}
pub trait RegionScanner: Debug + DisplayAs + Send {
fn properties(&self) -> &ScannerProperties;
fn schema(&self) -> SchemaRef;
fn metadata(&self) -> RegionMetadataRef;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError>;
fn has_predicate(&self) -> bool;
}
pub type RegionScannerRef = Box<dyn RegionScanner>;
pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct RegionStatistic {
#[serde(default)]
pub num_rows: u64,
pub memtable_size: u64,
pub wal_size: u64,
pub manifest_size: u64,
pub sst_size: u64,
#[serde(default)]
pub index_size: u64,
}
impl RegionStatistic {
pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
serde_json::from_slice(value).ok()
}
pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
serde_json::to_vec(self).ok()
}
}
impl RegionStatistic {
pub fn estimated_disk_size(&self) -> u64 {
self.wal_size + self.sst_size + self.manifest_size + self.index_size
}
}
#[async_trait]
pub trait RegionEngine: Send + Sync {
fn name(&self) -> &str;
async fn handle_batch_open_requests(
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<BatchResponses, BoxedError> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let mut tasks = Vec::with_capacity(requests.len());
for (region_id, request) in requests {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let result = self
.handle_request(region_id, RegionRequest::Open(request))
.await;
(region_id, result)
});
}
Ok(join_all(tasks).await)
}
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<RegionResponse, BoxedError>;
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError>;
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
async fn stop(&self) -> Result<(), BoxedError>;
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,
region_role_state: SettableRegionRoleState,
) -> Result<SetRegionRoleStateResponse, BoxedError>;
fn role(&self, region_id: RegionId) -> Option<RegionRole>;
fn as_any(&self) -> &dyn Any;
}
pub type RegionEngineRef = Arc<dyn RegionEngine>;
pub struct SinglePartitionScanner {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
properties: ScannerProperties,
metadata: RegionMetadataRef,
}
impl SinglePartitionScanner {
pub fn new(
stream: SendableRecordBatchStream,
append_mode: bool,
metadata: RegionMetadataRef,
) -> Self {
let schema = stream.schema();
Self {
stream: Mutex::new(Some(stream)),
schema,
properties: ScannerProperties::default().with_append_mode(append_mode),
metadata,
}
}
}
impl Debug for SinglePartitionScanner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
}
}
impl RegionScanner for SinglePartitionScanner {
fn properties(&self) -> &ScannerProperties {
&self.properties
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}
fn scan_partition(&self, _partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut stream = self.stream.lock().unwrap();
stream.take().ok_or_else(|| {
BoxedError::new(PlainError::new(
"Not expected to run ExecutionPlan more than once".to_string(),
StatusCode::Unexpected,
))
})
}
fn has_predicate(&self) -> bool {
false
}
fn metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
}
impl DisplayAs for SinglePartitionScanner {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}