1use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17#[derive(Clone, Default)]
20pub struct FunctionState {
21 pub table_mutation_handler: Option<TableMutationHandlerRef>,
23 pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25 pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30 #[cfg(any(test, feature = "testing"))]
32 pub fn mock() -> Self {
33 use std::sync::Arc;
34
35 use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36 use async_trait::async_trait;
37 use catalog::CatalogManagerRef;
38 use common_base::AffectedRows;
39 use common_meta::rpc::procedure::{
40 ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41 };
42 use common_query::Output;
43 use common_query::error::Result;
44 use session::context::QueryContextRef;
45 use store_api::storage::RegionId;
46 use table::requests::{
47 BuildIndexTableRequest, CompactTableRequest, DeleteRequest, FlushTableRequest,
48 InsertRequest,
49 };
50
51 use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
52 struct MockProcedureServiceHandler;
53 struct MockTableMutationHandler;
54 struct MockFlowServiceHandler;
55 const ROWS: usize = 42;
56
57 #[async_trait]
58 impl ProcedureServiceHandler for MockProcedureServiceHandler {
59 async fn migrate_region(
60 &self,
61 _request: MigrateRegionRequest,
62 ) -> Result<Option<String>> {
63 Ok(Some("test_pid".to_string()))
64 }
65
66 async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
67 Ok(Some("test_pid".to_string()))
68 }
69
70 async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
71 Ok(ProcedureStateResponse {
72 status: ProcedureStatus::Done.into(),
73 error: "OK".to_string(),
74 ..Default::default()
75 })
76 }
77
78 async fn manage_region_follower(
79 &self,
80 _request: ManageRegionFollowerRequest,
81 ) -> Result<()> {
82 Ok(())
83 }
84
85 fn catalog_manager(&self) -> &CatalogManagerRef {
86 unimplemented!()
87 }
88 }
89
90 #[async_trait]
91 impl TableMutationHandler for MockTableMutationHandler {
92 async fn insert(
93 &self,
94 _request: InsertRequest,
95 _ctx: QueryContextRef,
96 ) -> Result<Output> {
97 Ok(Output::new_with_affected_rows(ROWS))
98 }
99
100 async fn delete(
101 &self,
102 _request: DeleteRequest,
103 _ctx: QueryContextRef,
104 ) -> Result<AffectedRows> {
105 Ok(ROWS)
106 }
107
108 async fn flush(
109 &self,
110 _request: FlushTableRequest,
111 _ctx: QueryContextRef,
112 ) -> Result<AffectedRows> {
113 Ok(ROWS)
114 }
115
116 async fn compact(
117 &self,
118 _request: CompactTableRequest,
119 _ctx: QueryContextRef,
120 ) -> Result<AffectedRows> {
121 Ok(ROWS)
122 }
123
124 async fn build_index(
125 &self,
126 _request: BuildIndexTableRequest,
127 _ctx: QueryContextRef,
128 ) -> Result<AffectedRows> {
129 Ok(ROWS)
130 }
131
132 async fn flush_region(
133 &self,
134 _region_id: RegionId,
135 _ctx: QueryContextRef,
136 ) -> Result<AffectedRows> {
137 Ok(ROWS)
138 }
139
140 async fn compact_region(
141 &self,
142 _region_id: RegionId,
143 _ctx: QueryContextRef,
144 ) -> Result<AffectedRows> {
145 Ok(ROWS)
146 }
147 }
148
149 #[async_trait]
150 impl FlowServiceHandler for MockFlowServiceHandler {
151 async fn flush(
152 &self,
153 _catalog: &str,
154 _flow: &str,
155 _ctx: QueryContextRef,
156 ) -> Result<api::v1::flow::FlowResponse> {
157 todo!()
158 }
159 }
160
161 Self {
162 table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
163 procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
164 flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
165 }
166 }
167}