use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
#[derive(Clone, Default)]
pub struct FunctionState {
pub table_mutation_handler: Option<TableMutationHandlerRef>,
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
pub flow_service_handler: Option<FlowServiceHandlerRef>,
}
impl FunctionState {
#[cfg(test)]
pub fn mock() -> Self {
use std::sync::Arc;
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
struct MockFlowServiceHandler;
const ROWS: usize = 42;
#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
async fn migrate_region(
&self,
_request: MigrateRegionRequest,
) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}
async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
Ok(ProcedureStateResponse {
status: ProcedureStatus::Done.into(),
error: "OK".to_string(),
..Default::default()
})
}
}
#[async_trait]
impl TableMutationHandler for MockTableMutationHandler {
async fn insert(
&self,
_request: InsertRequest,
_ctx: QueryContextRef,
) -> Result<Output> {
Ok(Output::new_with_affected_rows(ROWS))
}
async fn delete(
&self,
_request: DeleteRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush(
&self,
_request: FlushTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn compact(
&self,
_request: CompactTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn compact_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
}
#[async_trait]
impl FlowServiceHandler for MockFlowServiceHandler {
async fn flush(
&self,
_catalog: &str,
_flow: &str,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}
Self {
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
}
}
}