common_function/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17/// Shared state for SQL functions.
18/// The handlers in state may be `None` in cli command-line or test cases.
19#[derive(Clone, Default)]
20pub struct FunctionState {
21    // The table mutation handler
22    pub table_mutation_handler: Option<TableMutationHandlerRef>,
23    // The procedure service handler
24    pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25    // The flownode handler
26    pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30    /// Create a mock [`FunctionState`] for test.
31    #[cfg(any(test, feature = "testing"))]
32    pub fn mock() -> Self {
33        use std::sync::Arc;
34
35        use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36        use async_trait::async_trait;
37        use catalog::CatalogManagerRef;
38        use common_base::AffectedRows;
39        use common_meta::rpc::procedure::{
40            AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41            RemoveRegionFollowerRequest,
42        };
43        use common_query::error::Result;
44        use common_query::Output;
45        use session::context::QueryContextRef;
46        use store_api::storage::RegionId;
47        use table::requests::{
48            CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
49        };
50
51        use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
52        struct MockProcedureServiceHandler;
53        struct MockTableMutationHandler;
54        struct MockFlowServiceHandler;
55        const ROWS: usize = 42;
56
57        #[async_trait]
58        impl ProcedureServiceHandler for MockProcedureServiceHandler {
59            async fn migrate_region(
60                &self,
61                _request: MigrateRegionRequest,
62            ) -> Result<Option<String>> {
63                Ok(Some("test_pid".to_string()))
64            }
65
66            async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
67                Ok(Some("test_pid".to_string()))
68            }
69
70            async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
71                Ok(ProcedureStateResponse {
72                    status: ProcedureStatus::Done.into(),
73                    error: "OK".to_string(),
74                    ..Default::default()
75                })
76            }
77
78            async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
79                Ok(())
80            }
81
82            async fn remove_region_follower(
83                &self,
84                _request: RemoveRegionFollowerRequest,
85            ) -> Result<()> {
86                Ok(())
87            }
88
89            fn catalog_manager(&self) -> &CatalogManagerRef {
90                unimplemented!()
91            }
92        }
93
94        #[async_trait]
95        impl TableMutationHandler for MockTableMutationHandler {
96            async fn insert(
97                &self,
98                _request: InsertRequest,
99                _ctx: QueryContextRef,
100            ) -> Result<Output> {
101                Ok(Output::new_with_affected_rows(ROWS))
102            }
103
104            async fn delete(
105                &self,
106                _request: DeleteRequest,
107                _ctx: QueryContextRef,
108            ) -> Result<AffectedRows> {
109                Ok(ROWS)
110            }
111
112            async fn flush(
113                &self,
114                _request: FlushTableRequest,
115                _ctx: QueryContextRef,
116            ) -> Result<AffectedRows> {
117                Ok(ROWS)
118            }
119
120            async fn compact(
121                &self,
122                _request: CompactTableRequest,
123                _ctx: QueryContextRef,
124            ) -> Result<AffectedRows> {
125                Ok(ROWS)
126            }
127
128            async fn flush_region(
129                &self,
130                _region_id: RegionId,
131                _ctx: QueryContextRef,
132            ) -> Result<AffectedRows> {
133                Ok(ROWS)
134            }
135
136            async fn compact_region(
137                &self,
138                _region_id: RegionId,
139                _ctx: QueryContextRef,
140            ) -> Result<AffectedRows> {
141                Ok(ROWS)
142            }
143        }
144
145        #[async_trait]
146        impl FlowServiceHandler for MockFlowServiceHandler {
147            async fn flush(
148                &self,
149                _catalog: &str,
150                _flow: &str,
151                _ctx: QueryContextRef,
152            ) -> Result<api::v1::flow::FlowResponse> {
153                todo!()
154            }
155        }
156
157        Self {
158            table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
159            procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
160            flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
161        }
162    }
163}