servers/postgres/
fixtures.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::stream;
19use once_cell::sync::Lazy;
20use pgwire::api::Type;
21use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
22use pgwire::error::PgWireResult;
23use pgwire::messages::data::DataRow;
24use regex::Regex;
25use session::context::QueryContextRef;
26
27fn build_string_data_rows(
28 schema: Arc<Vec<FieldInfo>>,
29 rows: Vec<Vec<String>>,
30) -> Vec<PgWireResult<DataRow>> {
31 rows.iter()
32 .map(|row| {
33 let mut encoder = DataRowEncoder::new(schema.clone());
34 for value in row {
35 encoder.encode_field(&Some(value))?;
36 }
37 encoder.finish()
38 })
39 .collect()
40}
41
42static VAR_VALUES: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
43 HashMap::from([
44 ("default_transaction_isolation", "read committed"),
45 ("transaction isolation level", "read committed"),
46 ("standard_conforming_strings", "on"),
47 ("client_encoding", "UTF8"),
48 ])
49});
50
51static SHOW_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new("(?i)^SHOW (.*?);?$").unwrap());
52static SET_TRANSACTION_PATTERN: Lazy<Regex> =
53 Lazy::new(|| Regex::new("(?i)^SET TRANSACTION (.*?);?$").unwrap());
54static START_TRANSACTION_PATTERN: Lazy<Regex> =
55 Lazy::new(|| Regex::new("(?i)^(START TRANSACTION.*|BEGIN);?").unwrap());
56static COMMIT_TRANSACTION_PATTERN: Lazy<Regex> =
57 Lazy::new(|| Regex::new("(?i)^(COMMIT TRANSACTION|COMMIT);?").unwrap());
58static ABORT_TRANSACTION_PATTERN: Lazy<Regex> =
59 Lazy::new(|| Regex::new("(?i)^(ABORT TRANSACTION|ROLLBACK);?").unwrap());
60
61pub(crate) fn matches(query: &str) -> bool {
63 START_TRANSACTION_PATTERN.is_match(query)
64 || COMMIT_TRANSACTION_PATTERN.is_match(query)
65 || ABORT_TRANSACTION_PATTERN.is_match(query)
66 || SHOW_PATTERN.captures(query).is_some()
67 || SET_TRANSACTION_PATTERN.is_match(query)
68}
69
70fn set_transaction_warning(query_ctx: QueryContextRef) {
71 query_ctx.set_warning("Please note transaction is not supported in GreptimeDB.".to_string());
72}
73
74pub(crate) fn process(query: &str, query_ctx: QueryContextRef) -> Option<Vec<Response>> {
76 if START_TRANSACTION_PATTERN.is_match(query) {
78 set_transaction_warning(query_ctx);
79 if query.to_lowercase().starts_with("begin") {
80 Some(vec![Response::TransactionStart(Tag::new("BEGIN"))])
81 } else {
82 Some(vec![Response::TransactionStart(Tag::new(
83 "START TRANSACTION",
84 ))])
85 }
86 } else if ABORT_TRANSACTION_PATTERN.is_match(query) {
87 Some(vec![Response::TransactionEnd(Tag::new("ROLLBACK"))])
88 } else if COMMIT_TRANSACTION_PATTERN.is_match(query) {
89 Some(vec![Response::TransactionEnd(Tag::new("COMMIT"))])
90 } else if let Some(show_var) = SHOW_PATTERN.captures(query) {
91 let show_var = show_var[1].to_lowercase();
92 if let Some(value) = VAR_VALUES.get(&show_var.as_ref()) {
93 let f1 = FieldInfo::new(
94 show_var.clone(),
95 None,
96 None,
97 Type::VARCHAR,
98 FieldFormat::Text,
99 );
100 let schema = Arc::new(vec![f1]);
101 let data = stream::iter(build_string_data_rows(
102 schema.clone(),
103 vec![vec![value.to_string()]],
104 ));
105
106 Some(vec![Response::Query(QueryResponse::new(schema, data))])
107 } else {
108 None
109 }
110 } else if SET_TRANSACTION_PATTERN.is_match(query) {
111 Some(vec![Response::Execution(Tag::new("SET"))])
112 } else {
113 None
114 }
115}
116
117#[cfg(test)]
118mod test {
119 use session::context::{QueryContext, QueryContextRef};
120
121 use super::*;
122
123 fn assert_tag(q: &str, t: &str, query_context: QueryContextRef) {
124 if let Response::Execution(tag)
125 | Response::TransactionStart(tag)
126 | Response::TransactionEnd(tag) = process(q, query_context.clone())
127 .unwrap_or_else(|| panic!("fail to match {}", q))
128 .remove(0)
129 {
130 assert_eq!(Tag::new(t), tag);
131 } else {
132 panic!("Invalid response");
133 }
134 }
135
136 fn get_data(q: &str, query_context: QueryContextRef) -> QueryResponse {
137 if let Response::Query(resp) = process(q, query_context.clone())
138 .unwrap_or_else(|| panic!("fail to match {}", q))
139 .remove(0)
140 {
141 resp
142 } else {
143 panic!("Invalid response");
144 }
145 }
146
147 #[test]
148 fn test_process() {
149 let query_context = QueryContext::arc();
150
151 assert_tag("BEGIN", "BEGIN", query_context.clone());
152 assert_tag("BEGIN;", "BEGIN", query_context.clone());
153 assert_tag("begin;", "BEGIN", query_context.clone());
154 assert_tag("ROLLBACK", "ROLLBACK", query_context.clone());
155 assert_tag("ROLLBACK;", "ROLLBACK", query_context.clone());
156 assert_tag("rollback;", "ROLLBACK", query_context.clone());
157 assert_tag("COMMIT", "COMMIT", query_context.clone());
158 assert_tag("COMMIT;", "COMMIT", query_context.clone());
159 assert_tag("commit;", "COMMIT", query_context.clone());
160 assert_tag(
161 "SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
162 "SET",
163 query_context.clone(),
164 );
165 assert_tag(
166 "SET TRANSACTION ISOLATION LEVEL READ COMMITTED;",
167 "SET",
168 query_context.clone(),
169 );
170 assert_tag(
171 "SET transaction isolation level READ COMMITTED;",
172 "SET",
173 query_context.clone(),
174 );
175 assert_tag(
176 "START TRANSACTION isolation level READ COMMITTED;",
177 "START TRANSACTION",
178 query_context.clone(),
179 );
180 assert_tag(
181 "start transaction isolation level READ COMMITTED;",
182 "START TRANSACTION",
183 query_context.clone(),
184 );
185 assert_tag("abort transaction;", "ROLLBACK", query_context.clone());
186 assert_tag("commit transaction;", "COMMIT", query_context.clone());
187 assert_tag("COMMIT transaction;", "COMMIT", query_context.clone());
188
189 let resp = get_data("SHOW transaction isolation level", query_context.clone());
190 assert_eq!(1, resp.row_schema().len());
191 let resp = get_data("show client_encoding;", query_context.clone());
192 assert_eq!(1, resp.row_schema().len());
193 let resp = get_data("show standard_conforming_strings;", query_context.clone());
194 assert_eq!(1, resp.row_schema().len());
195 let resp = get_data("show default_transaction_isolation", query_context.clone());
196 assert_eq!(1, resp.row_schema().len());
197
198 assert!(process("SELECT 1", query_context.clone()).is_none());
199 assert!(process("SHOW TABLES ", query_context.clone()).is_none());
200 assert!(process("SET TIME_ZONE=utc ", query_context.clone()).is_none());
201 }
202}