1use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17#[derive(Clone, Default)]
20pub struct FunctionState {
21 pub table_mutation_handler: Option<TableMutationHandlerRef>,
23 pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25 pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30 #[cfg(any(test, feature = "testing"))]
32 pub fn mock() -> Self {
33 use std::sync::Arc;
34
35 use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36 use async_trait::async_trait;
37 use catalog::CatalogManagerRef;
38 use common_base::AffectedRows;
39 use common_meta::rpc::procedure::{
40 ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41 };
42 use common_query::Output;
43 use common_query::error::Result;
44 use session::context::QueryContextRef;
45 use store_api::storage::RegionId;
46 use table::requests::{
47 CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
48 };
49
50 use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
51 struct MockProcedureServiceHandler;
52 struct MockTableMutationHandler;
53 struct MockFlowServiceHandler;
54 const ROWS: usize = 42;
55
56 #[async_trait]
57 impl ProcedureServiceHandler for MockProcedureServiceHandler {
58 async fn migrate_region(
59 &self,
60 _request: MigrateRegionRequest,
61 ) -> Result<Option<String>> {
62 Ok(Some("test_pid".to_string()))
63 }
64
65 async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
66 Ok(Some("test_pid".to_string()))
67 }
68
69 async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
70 Ok(ProcedureStateResponse {
71 status: ProcedureStatus::Done.into(),
72 error: "OK".to_string(),
73 ..Default::default()
74 })
75 }
76
77 async fn manage_region_follower(
78 &self,
79 _request: ManageRegionFollowerRequest,
80 ) -> Result<()> {
81 Ok(())
82 }
83
84 fn catalog_manager(&self) -> &CatalogManagerRef {
85 unimplemented!()
86 }
87 }
88
89 #[async_trait]
90 impl TableMutationHandler for MockTableMutationHandler {
91 async fn insert(
92 &self,
93 _request: InsertRequest,
94 _ctx: QueryContextRef,
95 ) -> Result<Output> {
96 Ok(Output::new_with_affected_rows(ROWS))
97 }
98
99 async fn delete(
100 &self,
101 _request: DeleteRequest,
102 _ctx: QueryContextRef,
103 ) -> Result<AffectedRows> {
104 Ok(ROWS)
105 }
106
107 async fn flush(
108 &self,
109 _request: FlushTableRequest,
110 _ctx: QueryContextRef,
111 ) -> Result<AffectedRows> {
112 Ok(ROWS)
113 }
114
115 async fn compact(
116 &self,
117 _request: CompactTableRequest,
118 _ctx: QueryContextRef,
119 ) -> Result<AffectedRows> {
120 Ok(ROWS)
121 }
122
123 async fn flush_region(
124 &self,
125 _region_id: RegionId,
126 _ctx: QueryContextRef,
127 ) -> Result<AffectedRows> {
128 Ok(ROWS)
129 }
130
131 async fn compact_region(
132 &self,
133 _region_id: RegionId,
134 _ctx: QueryContextRef,
135 ) -> Result<AffectedRows> {
136 Ok(ROWS)
137 }
138 }
139
140 #[async_trait]
141 impl FlowServiceHandler for MockFlowServiceHandler {
142 async fn flush(
143 &self,
144 _catalog: &str,
145 _flow: &str,
146 _ctx: QueryContextRef,
147 ) -> Result<api::v1::flow::FlowResponse> {
148 todo!()
149 }
150 }
151
152 Self {
153 table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
154 procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
155 flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
156 }
157 }
158}