use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
use table::table_name::TableName;
use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::{ClusterId, DatanodeId, FlownodeId};
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct RegionIdent {
pub cluster_id: ClusterId,
pub datanode_id: DatanodeId,
pub table_id: TableId,
pub region_number: RegionNumber,
pub engine: String,
}
impl RegionIdent {
pub fn get_region_id(&self) -> RegionId {
RegionId::new(self.table_id, self.region_number)
}
}
impl Display for RegionIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RegionIdent(datanode_id='{}.{}', table_id={}, region_number={}, engine = {})",
self.cluster_id, self.datanode_id, self.table_id, self.region_number, self.engine
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct DowngradeRegionReply {
pub last_entry_id: Option<u64>,
pub exists: bool,
pub error: Option<String>,
}
impl Display for DowngradeRegionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(last_entry_id={:?}, exists={}, error={:?})",
self.last_entry_id, self.exists, self.error
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SimpleReply {
pub result: bool,
pub error: Option<String>,
}
impl Display for SimpleReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "(result={}, error={:?})", self.result, self.error)
}
}
impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"OpenRegion(region_ident={}, region_storage_path={})",
self.region_ident, self.region_storage_path
)
}
}
#[serde_with::serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OpenRegion {
pub region_ident: RegionIdent,
pub region_storage_path: String,
pub region_options: HashMap<String, String>,
#[serde(default)]
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
pub region_wal_options: HashMap<RegionNumber, String>,
#[serde(default)]
pub skip_wal_replay: bool,
}
impl OpenRegion {
pub fn new(
region_ident: RegionIdent,
path: &str,
region_options: HashMap<String, String>,
region_wal_options: HashMap<RegionNumber, String>,
skip_wal_replay: bool,
) -> Self {
Self {
region_ident,
region_storage_path: path.to_string(),
region_options,
region_wal_options,
skip_wal_replay,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DowngradeRegion {
pub region_id: RegionId,
#[serde(default)]
pub flush_timeout: Option<Duration>,
pub reject_write: bool,
}
impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DowngradeRegion(region_id={}, flush_timeout={:?}, rejct_write={})",
self.region_id, self.flush_timeout, self.reject_write
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct UpgradeRegion {
pub region_id: RegionId,
pub last_entry_id: Option<u64>,
#[serde(with = "humantime_serde")]
pub replay_timeout: Option<Duration>,
#[serde(default)]
pub location_id: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum CacheIdent {
FlowId(FlowId),
FlowName(FlowName),
TableId(TableId),
TableName(TableName),
SchemaName(SchemaName),
CreateFlow(CreateFlow),
DropFlow(DropFlow),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
pub source_table_ids: Vec<TableId>,
pub flownodes: Vec<Peer>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropFlow {
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
OpenRegion(OpenRegion),
CloseRegion(RegionIdent),
UpgradeRegion(UpgradeRegion),
DowngradeRegion(DowngradeRegion),
InvalidateCaches(Vec<CacheIdent>),
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct UpgradeRegionReply {
pub ready: bool,
pub exists: bool,
pub error: Option<String>,
}
impl Display for UpgradeRegionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"(ready={}, exists={}, error={:?})",
self.ready, self.exists, self.error
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
}
impl Display for InstructionReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply),
Self::DowngradeRegion(reply) => {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize_instruction() {
let open_region = Instruction::OpenRegion(OpenRegion::new(
RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
},
"test/foo",
HashMap::new(),
HashMap::new(),
false,
));
let serialized = serde_json::to_string(&open_region).unwrap();
assert_eq!(
r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#,
serialized
);
let close_region = Instruction::CloseRegion(RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
});
let serialized = serde_json::to_string(&close_region).unwrap();
assert_eq!(
r#"{"CloseRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#,
serialized
);
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct LegacyOpenRegion {
region_ident: RegionIdent,
region_storage_path: String,
region_options: HashMap<String, String>,
}
#[test]
fn test_compatible_serialize_open_region() {
let region_ident = RegionIdent {
cluster_id: 1,
datanode_id: 2,
table_id: 1024,
region_number: 1,
engine: "mito2".to_string(),
};
let region_storage_path = "test/foo".to_string();
let region_options = HashMap::from([
("a".to_string(), "aa".to_string()),
("b".to_string(), "bb".to_string()),
]);
let legacy_open_region = LegacyOpenRegion {
region_ident: region_ident.clone(),
region_storage_path: region_storage_path.clone(),
region_options: region_options.clone(),
};
let serialized = serde_json::to_string(&legacy_open_region).unwrap();
let deserialized = serde_json::from_str(&serialized).unwrap();
let expected = OpenRegion {
region_ident,
region_storage_path,
region_options,
region_wal_options: HashMap::new(),
skip_wal_replay: false,
};
assert_eq!(expected, deserialized);
}
}