common_function/
state.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};
16
17/// Shared state for SQL functions.
18/// The handlers in state may be `None` in cli command-line or test cases.
19#[derive(Clone, Default)]
20pub struct FunctionState {
21    // The table mutation handler
22    pub table_mutation_handler: Option<TableMutationHandlerRef>,
23    // The procedure service handler
24    pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
25    // The flownode handler
26    pub flow_service_handler: Option<FlowServiceHandlerRef>,
27}
28
29impl FunctionState {
30    /// Create a mock [`FunctionState`] for test.
31    #[cfg(any(test, feature = "testing"))]
32    pub fn mock() -> Self {
33        use std::sync::Arc;
34
35        use api::v1::meta::{ProcedureStatus, ReconcileRequest};
36        use async_trait::async_trait;
37        use catalog::CatalogManagerRef;
38        use common_base::AffectedRows;
39        use common_meta::rpc::procedure::{
40            ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
41        };
42        use common_query::Output;
43        use common_query::error::Result;
44        use session::context::QueryContextRef;
45        use store_api::storage::RegionId;
46        use table::requests::{
47            CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
48        };
49
50        use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
51        struct MockProcedureServiceHandler;
52        struct MockTableMutationHandler;
53        struct MockFlowServiceHandler;
54        const ROWS: usize = 42;
55
56        #[async_trait]
57        impl ProcedureServiceHandler for MockProcedureServiceHandler {
58            async fn migrate_region(
59                &self,
60                _request: MigrateRegionRequest,
61            ) -> Result<Option<String>> {
62                Ok(Some("test_pid".to_string()))
63            }
64
65            async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
66                Ok(Some("test_pid".to_string()))
67            }
68
69            async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
70                Ok(ProcedureStateResponse {
71                    status: ProcedureStatus::Done.into(),
72                    error: "OK".to_string(),
73                    ..Default::default()
74                })
75            }
76
77            async fn manage_region_follower(
78                &self,
79                _request: ManageRegionFollowerRequest,
80            ) -> Result<()> {
81                Ok(())
82            }
83
84            fn catalog_manager(&self) -> &CatalogManagerRef {
85                unimplemented!()
86            }
87        }
88
89        #[async_trait]
90        impl TableMutationHandler for MockTableMutationHandler {
91            async fn insert(
92                &self,
93                _request: InsertRequest,
94                _ctx: QueryContextRef,
95            ) -> Result<Output> {
96                Ok(Output::new_with_affected_rows(ROWS))
97            }
98
99            async fn delete(
100                &self,
101                _request: DeleteRequest,
102                _ctx: QueryContextRef,
103            ) -> Result<AffectedRows> {
104                Ok(ROWS)
105            }
106
107            async fn flush(
108                &self,
109                _request: FlushTableRequest,
110                _ctx: QueryContextRef,
111            ) -> Result<AffectedRows> {
112                Ok(ROWS)
113            }
114
115            async fn compact(
116                &self,
117                _request: CompactTableRequest,
118                _ctx: QueryContextRef,
119            ) -> Result<AffectedRows> {
120                Ok(ROWS)
121            }
122
123            async fn flush_region(
124                &self,
125                _region_id: RegionId,
126                _ctx: QueryContextRef,
127            ) -> Result<AffectedRows> {
128                Ok(ROWS)
129            }
130
131            async fn compact_region(
132                &self,
133                _region_id: RegionId,
134                _ctx: QueryContextRef,
135            ) -> Result<AffectedRows> {
136                Ok(ROWS)
137            }
138        }
139
140        #[async_trait]
141        impl FlowServiceHandler for MockFlowServiceHandler {
142            async fn flush(
143                &self,
144                _catalog: &str,
145                _flow: &str,
146                _ctx: QueryContextRef,
147            ) -> Result<api::v1::flow::FlowResponse> {
148                todo!()
149            }
150        }
151
152        Self {
153            table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
154            procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
155            flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
156        }
157    }
158}