servers/postgres/
fixtures.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::stream;
19use once_cell::sync::Lazy;
20use pgwire::api::Type;
21use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
22use pgwire::error::PgWireResult;
23use pgwire::messages::data::DataRow;
24use regex::Regex;
25use session::context::QueryContextRef;
26
27fn build_string_data_rows(
28    schema: Arc<Vec<FieldInfo>>,
29    rows: Vec<Vec<String>>,
30) -> Vec<PgWireResult<DataRow>> {
31    rows.iter()
32        .map(|row| {
33            let mut encoder = DataRowEncoder::new(schema.clone());
34            for value in row {
35                encoder.encode_field(&Some(value))?;
36            }
37            encoder.finish()
38        })
39        .collect()
40}
41
42static VAR_VALUES: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
43    HashMap::from([
44        ("default_transaction_isolation", "read committed"),
45        ("transaction isolation level", "read committed"),
46        ("standard_conforming_strings", "on"),
47        ("client_encoding", "UTF8"),
48    ])
49});
50
51static SHOW_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new("(?i)^SHOW (.*?);?$").unwrap());
52static SET_TRANSACTION_PATTERN: Lazy<Regex> =
53    Lazy::new(|| Regex::new("(?i)^SET TRANSACTION (.*?);?$").unwrap());
54static START_TRANSACTION_PATTERN: Lazy<Regex> =
55    Lazy::new(|| Regex::new("(?i)^(START TRANSACTION.*|BEGIN);?").unwrap());
56static COMMIT_TRANSACTION_PATTERN: Lazy<Regex> =
57    Lazy::new(|| Regex::new("(?i)^(COMMIT TRANSACTION|COMMIT);?").unwrap());
58static ABORT_TRANSACTION_PATTERN: Lazy<Regex> =
59    Lazy::new(|| Regex::new("(?i)^(ABORT TRANSACTION|ROLLBACK);?").unwrap());
60
61/// Test if given query statement matches the patterns
62pub(crate) fn matches(query: &str) -> bool {
63    START_TRANSACTION_PATTERN.is_match(query)
64        || COMMIT_TRANSACTION_PATTERN.is_match(query)
65        || ABORT_TRANSACTION_PATTERN.is_match(query)
66        || SHOW_PATTERN.captures(query).is_some()
67        || SET_TRANSACTION_PATTERN.is_match(query)
68}
69
70fn set_transaction_warning(query_ctx: QueryContextRef) {
71    query_ctx.set_warning("Please note transaction is not supported in GreptimeDB.".to_string());
72}
73
74/// Process unsupported SQL and return fixed result as a compatibility solution
75pub(crate) fn process(query: &str, query_ctx: QueryContextRef) -> Option<Vec<Response>> {
76    // Transaction directives:
77    if START_TRANSACTION_PATTERN.is_match(query) {
78        set_transaction_warning(query_ctx);
79        if query.to_lowercase().starts_with("begin") {
80            Some(vec![Response::TransactionStart(Tag::new("BEGIN"))])
81        } else {
82            Some(vec![Response::TransactionStart(Tag::new(
83                "START TRANSACTION",
84            ))])
85        }
86    } else if ABORT_TRANSACTION_PATTERN.is_match(query) {
87        Some(vec![Response::TransactionEnd(Tag::new("ROLLBACK"))])
88    } else if COMMIT_TRANSACTION_PATTERN.is_match(query) {
89        Some(vec![Response::TransactionEnd(Tag::new("COMMIT"))])
90    } else if let Some(show_var) = SHOW_PATTERN.captures(query) {
91        let show_var = show_var[1].to_lowercase();
92        if let Some(value) = VAR_VALUES.get(&show_var.as_ref()) {
93            let f1 = FieldInfo::new(
94                show_var.clone(),
95                None,
96                None,
97                Type::VARCHAR,
98                FieldFormat::Text,
99            );
100            let schema = Arc::new(vec![f1]);
101            let data = stream::iter(build_string_data_rows(
102                schema.clone(),
103                vec![vec![value.to_string()]],
104            ));
105
106            Some(vec![Response::Query(QueryResponse::new(schema, data))])
107        } else {
108            None
109        }
110    } else if SET_TRANSACTION_PATTERN.is_match(query) {
111        Some(vec![Response::Execution(Tag::new("SET"))])
112    } else {
113        None
114    }
115}
116
117#[cfg(test)]
118mod test {
119    use session::context::{QueryContext, QueryContextRef};
120
121    use super::*;
122
123    fn assert_tag(q: &str, t: &str, query_context: QueryContextRef) {
124        if let Response::Execution(tag)
125        | Response::TransactionStart(tag)
126        | Response::TransactionEnd(tag) = process(q, query_context.clone())
127            .unwrap_or_else(|| panic!("fail to match {}", q))
128            .remove(0)
129        {
130            assert_eq!(Tag::new(t), tag);
131        } else {
132            panic!("Invalid response");
133        }
134    }
135
136    fn get_data(q: &str, query_context: QueryContextRef) -> QueryResponse {
137        if let Response::Query(resp) = process(q, query_context.clone())
138            .unwrap_or_else(|| panic!("fail to match {}", q))
139            .remove(0)
140        {
141            resp
142        } else {
143            panic!("Invalid response");
144        }
145    }
146
147    #[test]
148    fn test_process() {
149        let query_context = QueryContext::arc();
150
151        assert_tag("BEGIN", "BEGIN", query_context.clone());
152        assert_tag("BEGIN;", "BEGIN", query_context.clone());
153        assert_tag("begin;", "BEGIN", query_context.clone());
154        assert_tag("ROLLBACK", "ROLLBACK", query_context.clone());
155        assert_tag("ROLLBACK;", "ROLLBACK", query_context.clone());
156        assert_tag("rollback;", "ROLLBACK", query_context.clone());
157        assert_tag("COMMIT", "COMMIT", query_context.clone());
158        assert_tag("COMMIT;", "COMMIT", query_context.clone());
159        assert_tag("commit;", "COMMIT", query_context.clone());
160        assert_tag(
161            "SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
162            "SET",
163            query_context.clone(),
164        );
165        assert_tag(
166            "SET TRANSACTION ISOLATION LEVEL READ COMMITTED;",
167            "SET",
168            query_context.clone(),
169        );
170        assert_tag(
171            "SET transaction isolation level READ COMMITTED;",
172            "SET",
173            query_context.clone(),
174        );
175        assert_tag(
176            "START TRANSACTION isolation level READ COMMITTED;",
177            "START TRANSACTION",
178            query_context.clone(),
179        );
180        assert_tag(
181            "start transaction isolation level READ COMMITTED;",
182            "START TRANSACTION",
183            query_context.clone(),
184        );
185        assert_tag("abort transaction;", "ROLLBACK", query_context.clone());
186        assert_tag("commit transaction;", "COMMIT", query_context.clone());
187        assert_tag("COMMIT transaction;", "COMMIT", query_context.clone());
188
189        let resp = get_data("SHOW transaction isolation level", query_context.clone());
190        assert_eq!(1, resp.row_schema().len());
191        let resp = get_data("show client_encoding;", query_context.clone());
192        assert_eq!(1, resp.row_schema().len());
193        let resp = get_data("show standard_conforming_strings;", query_context.clone());
194        assert_eq!(1, resp.row_schema().len());
195        let resp = get_data("show default_transaction_isolation", query_context.clone());
196        assert_eq!(1, resp.row_schema().len());
197
198        assert!(process("SELECT 1", query_context.clone()).is_none());
199        assert!(process("SHOW TABLES ", query_context.clone()).is_none());
200        assert!(process("SET TIME_ZONE=utc ", query_context.clone()).is_none());
201    }
202}