1use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17#[derive(Clone, Default)]
20pub struct FunctionState {
21 pub table_mutation_handler: Option<TableMutationHandlerRef>,
23 pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25 pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30 #[cfg(any(test, feature = "testing"))]
32 pub fn mock() -> Self {
33 use std::sync::Arc;
34
35 use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36 use async_trait::async_trait;
37 use catalog::CatalogManagerRef;
38 use common_base::AffectedRows;
39 use common_meta::rpc::procedure::{
40 GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
41 MigrateRegionRequest, ProcedureStateResponse,
42 };
43 use common_query::Output;
44 use common_query::error::Result;
45 use session::context::QueryContextRef;
46 use store_api::storage::RegionId;
47 use table::requests::{
48 BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest,
49 InsertRequest,
50 };
51
52 use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
53 struct MockProcedureServiceHandler;
54 struct MockTableMutationHandler;
55 struct MockFlowServiceHandler;
56 const ROWS: usize = 42;
57
58 #[async_trait]
59 impl ProcedureServiceHandler for MockProcedureServiceHandler {
60 async fn migrate_region(
61 &self,
62 _request: MigrateRegionRequest,
63 ) -> Result<Option<String>> {
64 Ok(Some("test_pid".to_string()))
65 }
66
67 async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
68 Ok(Some("test_pid".to_string()))
69 }
70
71 async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
72 Ok(ProcedureStateResponse {
73 status: ProcedureStatus::Done.into(),
74 error: "OK".to_string(),
75 ..Default::default()
76 })
77 }
78
79 async fn manage_region_follower(
80 &self,
81 _request: ManageRegionFollowerRequest,
82 ) -> Result<()> {
83 Ok(())
84 }
85
86 async fn gc_regions(&self, _request: GcRegionsRequest) -> Result<GcResponse> {
87 Ok(GcResponse {
88 processed_regions: 1,
89 need_retry_regions: vec![],
90 deleted_files: 0,
91 deleted_indexes: 0,
92 })
93 }
94
95 async fn gc_table(&self, _request: GcTableRequest) -> Result<GcResponse> {
96 Ok(GcResponse {
97 processed_regions: 1,
98 need_retry_regions: vec![],
99 deleted_files: 0,
100 deleted_indexes: 0,
101 })
102 }
103
104 fn catalog_manager(&self) -> &CatalogManagerRef {
105 unimplemented!()
106 }
107 }
108
109 #[async_trait]
110 impl TableMutationHandler for MockTableMutationHandler {
111 async fn insert(
112 &self,
113 _request: InsertRequest,
114 _ctx: QueryContextRef,
115 ) -> Result<Output> {
116 Ok(Output::new_with_affected_rows(ROWS))
117 }
118
119 async fn delete(
120 &self,
121 _request: DeleteRequest,
122 _ctx: QueryContextRef,
123 ) -> Result<AffectedRows> {
124 Ok(ROWS)
125 }
126
127 async fn flush(
128 &self,
129 _request: FlushTableRequest,
130 _ctx: QueryContextRef,
131 ) -> Result<AffectedRows> {
132 Ok(ROWS)
133 }
134
135 async fn compact(
136 &self,
137 _request: CompactTableRequest,
138 _ctx: QueryContextRef,
139 ) -> Result<AffectedRows> {
140 Ok(ROWS)
141 }
142
143 async fn build_index(
144 &self,
145 _request: BuildIndexTableRequest,
146 _ctx: QueryContextRef,
147 ) -> Result<AffectedRows> {
148 Ok(ROWS)
149 }
150
151 async fn flush_region(
152 &self,
153 _region_id: RegionId,
154 _ctx: QueryContextRef,
155 ) -> Result<AffectedRows> {
156 Ok(ROWS)
157 }
158
159 async fn compact_region(
160 &self,
161 _region_id: RegionId,
162 _ctx: QueryContextRef,
163 ) -> Result<AffectedRows> {
164 Ok(ROWS)
165 }
166 }
167
168 #[async_trait]
169 impl FlowServiceHandler for MockFlowServiceHandler {
170 async fn flush(
171 &self,
172 _catalog: &str,
173 _flow: &str,
174 _ctx: QueryContextRef,
175 ) -> Result<api::v1::flow::FlowResponse> {
176 todo!()
177 }
178 }
179
180 Self {
181 table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
182 procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
183 flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
184 }
185 }
186}