1use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17#[derive(Clone, Default)]
20pub struct FunctionState {
21 pub table_mutation_handler: Option<TableMutationHandlerRef>,
23 pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25 pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30 #[cfg(any(test, feature = "testing"))]
32 pub fn mock() -> Self {
33 use std::sync::Arc;
34
35 use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36 use async_trait::async_trait;
37 use catalog::CatalogManagerRef;
38 use common_base::AffectedRows;
39 use common_meta::rpc::procedure::{
40 AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41 RemoveRegionFollowerRequest,
42 };
43 use common_query::error::Result;
44 use common_query::Output;
45 use session::context::QueryContextRef;
46 use store_api::storage::RegionId;
47 use table::requests::{
48 CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
49 };
50
51 use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
52 struct MockProcedureServiceHandler;
53 struct MockTableMutationHandler;
54 struct MockFlowServiceHandler;
55 const ROWS: usize = 42;
56
57 #[async_trait]
58 impl ProcedureServiceHandler for MockProcedureServiceHandler {
59 async fn migrate_region(
60 &self,
61 _request: MigrateRegionRequest,
62 ) -> Result<Option<String>> {
63 Ok(Some("test_pid".to_string()))
64 }
65
66 async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
67 Ok(Some("test_pid".to_string()))
68 }
69
70 async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
71 Ok(ProcedureStateResponse {
72 status: ProcedureStatus::Done.into(),
73 error: "OK".to_string(),
74 ..Default::default()
75 })
76 }
77
78 async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
79 Ok(())
80 }
81
82 async fn remove_region_follower(
83 &self,
84 _request: RemoveRegionFollowerRequest,
85 ) -> Result<()> {
86 Ok(())
87 }
88
89 fn catalog_manager(&self) -> &CatalogManagerRef {
90 unimplemented!()
91 }
92 }
93
94 #[async_trait]
95 impl TableMutationHandler for MockTableMutationHandler {
96 async fn insert(
97 &self,
98 _request: InsertRequest,
99 _ctx: QueryContextRef,
100 ) -> Result<Output> {
101 Ok(Output::new_with_affected_rows(ROWS))
102 }
103
104 async fn delete(
105 &self,
106 _request: DeleteRequest,
107 _ctx: QueryContextRef,
108 ) -> Result<AffectedRows> {
109 Ok(ROWS)
110 }
111
112 async fn flush(
113 &self,
114 _request: FlushTableRequest,
115 _ctx: QueryContextRef,
116 ) -> Result<AffectedRows> {
117 Ok(ROWS)
118 }
119
120 async fn compact(
121 &self,
122 _request: CompactTableRequest,
123 _ctx: QueryContextRef,
124 ) -> Result<AffectedRows> {
125 Ok(ROWS)
126 }
127
128 async fn flush_region(
129 &self,
130 _region_id: RegionId,
131 _ctx: QueryContextRef,
132 ) -> Result<AffectedRows> {
133 Ok(ROWS)
134 }
135
136 async fn compact_region(
137 &self,
138 _region_id: RegionId,
139 _ctx: QueryContextRef,
140 ) -> Result<AffectedRows> {
141 Ok(ROWS)
142 }
143 }
144
145 #[async_trait]
146 impl FlowServiceHandler for MockFlowServiceHandler {
147 async fn flush(
148 &self,
149 _catalog: &str,
150 _flow: &str,
151 _ctx: QueryContextRef,
152 ) -> Result<api::v1::flow::FlowResponse> {
153 todo!()
154 }
155 }
156
157 Self {
158 table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
159 procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
160 flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
161 }
162 }
163}