1use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17#[derive(Clone, Default)]
20pub struct FunctionState {
21 pub table_mutation_handler: Option<TableMutationHandlerRef>,
23 pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25 pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30 #[cfg(any(test, feature = "testing"))]
32 pub fn mock() -> Self {
33 use std::sync::Arc;
34
35 use api::v1::meta::ProcedureStatus;
36 use async_trait::async_trait;
37 use catalog::CatalogManagerRef;
38 use common_base::AffectedRows;
39 use common_meta::rpc::procedure::{
40 AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41 RemoveRegionFollowerRequest,
42 };
43 use common_query::error::Result;
44 use common_query::Output;
45 use session::context::QueryContextRef;
46 use store_api::storage::RegionId;
47 use table::requests::{
48 CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
49 };
50
51 use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
52 struct MockProcedureServiceHandler;
53 struct MockTableMutationHandler;
54 struct MockFlowServiceHandler;
55 const ROWS: usize = 42;
56
57 #[async_trait]
58 impl ProcedureServiceHandler for MockProcedureServiceHandler {
59 async fn migrate_region(
60 &self,
61 _request: MigrateRegionRequest,
62 ) -> Result<Option<String>> {
63 Ok(Some("test_pid".to_string()))
64 }
65
66 async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
67 Ok(ProcedureStateResponse {
68 status: ProcedureStatus::Done.into(),
69 error: "OK".to_string(),
70 ..Default::default()
71 })
72 }
73
74 async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
75 Ok(())
76 }
77
78 async fn remove_region_follower(
79 &self,
80 _request: RemoveRegionFollowerRequest,
81 ) -> Result<()> {
82 Ok(())
83 }
84
85 fn catalog_manager(&self) -> &CatalogManagerRef {
86 unimplemented!()
87 }
88 }
89
90 #[async_trait]
91 impl TableMutationHandler for MockTableMutationHandler {
92 async fn insert(
93 &self,
94 _request: InsertRequest,
95 _ctx: QueryContextRef,
96 ) -> Result<Output> {
97 Ok(Output::new_with_affected_rows(ROWS))
98 }
99
100 async fn delete(
101 &self,
102 _request: DeleteRequest,
103 _ctx: QueryContextRef,
104 ) -> Result<AffectedRows> {
105 Ok(ROWS)
106 }
107
108 async fn flush(
109 &self,
110 _request: FlushTableRequest,
111 _ctx: QueryContextRef,
112 ) -> Result<AffectedRows> {
113 Ok(ROWS)
114 }
115
116 async fn compact(
117 &self,
118 _request: CompactTableRequest,
119 _ctx: QueryContextRef,
120 ) -> Result<AffectedRows> {
121 Ok(ROWS)
122 }
123
124 async fn flush_region(
125 &self,
126 _region_id: RegionId,
127 _ctx: QueryContextRef,
128 ) -> Result<AffectedRows> {
129 Ok(ROWS)
130 }
131
132 async fn compact_region(
133 &self,
134 _region_id: RegionId,
135 _ctx: QueryContextRef,
136 ) -> Result<AffectedRows> {
137 Ok(ROWS)
138 }
139 }
140
141 #[async_trait]
142 impl FlowServiceHandler for MockFlowServiceHandler {
143 async fn flush(
144 &self,
145 _catalog: &str,
146 _flow: &str,
147 _ctx: QueryContextRef,
148 ) -> Result<api::v1::flow::FlowResponse> {
149 todo!()
150 }
151 }
152
153 Self {
154 table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
155 procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
156 flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
157 }
158 }
159}