servers/mysql/
federated.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Use regex to filter out some MySQL federated components' emitted statements.
16//! Inspired by Databend's "[mysql_federated.rs](https://github.com/datafuselabs/databend/blob/ac706bf65845e6895141c96c0a10bad6fdc2d367/src/query/service/src/servers/mysql/mysql_federated.rs)".
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use common_query::Output;
22use common_recordbatch::RecordBatches;
23use common_time::timezone::system_timezone_name;
24use datatypes::prelude::ConcreteDataType;
25use datatypes::schema::{ColumnSchema, Schema};
26use datatypes::vectors::StringVector;
27use once_cell::sync::Lazy;
28use regex::Regex;
29use regex::bytes::RegexSet;
30use session::SessionRef;
31use session::context::QueryContextRef;
32
33static SELECT_VAR_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new("(?i)^(SELECT @@(.*))").unwrap());
34static MYSQL_CONN_JAVA_PATTERN: Lazy<Regex> =
35    Lazy::new(|| Regex::new("(?i)^(/\\* mysql-connector-j(.*))").unwrap());
36static SHOW_LOWER_CASE_PATTERN: Lazy<Regex> =
37    Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES LIKE 'lower_case_table_names'(.*))").unwrap());
38static SHOW_VARIABLES_LIKE_PATTERN: Lazy<Regex> =
39    Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES( LIKE (.*))?)").unwrap());
40
41// SELECT TIMEDIFF(NOW(), UTC_TIMESTAMP());
42static SELECT_TIME_DIFF_FUNC_PATTERN: Lazy<Regex> =
43    Lazy::new(|| Regex::new("(?i)^(SELECT TIMEDIFF\\(NOW\\(\\), UTC_TIMESTAMP\\(\\)\\))").unwrap());
44
45// sqlalchemy < 1.4.30
46static SHOW_SQL_MODE_PATTERN: Lazy<Regex> =
47    Lazy::new(|| Regex::new("(?i)^(SHOW VARIABLES LIKE 'sql_mode'(.*))").unwrap());
48
49static OTHER_NOT_SUPPORTED_STMT: Lazy<RegexSet> = Lazy::new(|| {
50    RegexSet::new([
51        // Txn.
52        "(?i)^(ROLLBACK(.*))",
53        "(?i)^(COMMIT(.*))",
54        "(?i)^(START(.*))",
55
56        // Set.
57        "(?i)^(SET NAMES(.*))",
58        "(?i)^(SET character_set_results(.*))",
59        "(?i)^(SET net_write_timeout(.*))",
60        "(?i)^(SET FOREIGN_KEY_CHECKS(.*))",
61        "(?i)^(SET AUTOCOMMIT(.*))",
62        "(?i)^(SET SQL_LOG_BIN(.*))",
63        "(?i)^(SET SESSION TRANSACTION(.*))",
64        "(?i)^(SET TRANSACTION(.*))",
65        "(?i)^(SET sql_mode(.*))",
66        "(?i)^(SET SQL_SELECT_LIMIT(.*))",
67        "(?i)^(SET PROFILING(.*))",
68
69        // mysqlclient.
70        "(?i)^(SELECT \\$\\$)",
71
72        // mysqldump.
73        "(?i)^(SET SQL_QUOTE_SHOW_CREATE(.*))",
74        "(?i)^(LOCK TABLES(.*))",
75        "(?i)^(UNLOCK TABLES(.*))",
76        "(?i)^(SELECT LOGFILE_GROUP_NAME, FILE_NAME, TOTAL_EXTENTS, INITIAL_SIZE, ENGINE, EXTRA FROM INFORMATION_SCHEMA.FILES(.*))",
77
78        // mydumper.
79        "(?i)^(/\\*!80003 SET(.*) \\*/)$",
80        "(?i)^(SHOW MASTER STATUS)",
81        "(?i)^(SHOW ALL SLAVES STATUS)",
82        "(?i)^(LOCK BINLOG FOR BACKUP)",
83        "(?i)^(LOCK TABLES FOR BACKUP)",
84        "(?i)^(UNLOCK BINLOG(.*))",
85        "(?i)^(/\\*!40101 SET(.*) \\*/)$",
86
87        // DBeaver.
88        "(?i)^(SHOW WARNINGS)",
89        "(?i)^(/\\* ApplicationName=(.*)SHOW WARNINGS)",
90        "(?i)^(/\\* ApplicationName=(.*)SHOW PLUGINS)",
91        "(?i)^(/\\* ApplicationName=(.*)SHOW ENGINES)",
92        "(?i)^(/\\* ApplicationName=(.*)SELECT @@(.*))",
93        "(?i)^(/\\* ApplicationName=(.*)SHOW @@(.*))",
94        "(?i)^(/\\* ApplicationName=(.*)SET net_write_timeout(.*))",
95        "(?i)^(/\\* ApplicationName=(.*)SET SQL_SELECT_LIMIT(.*))",
96        "(?i)^(/\\* ApplicationName=(.*)SHOW VARIABLES(.*))",
97
98        // pt-toolkit
99        "(?i)^(/\\*!40101 SET(.*) \\*/)$",
100
101        // mysqldump 5.7.16
102        "(?i)^(/\\*!40100 SET(.*) \\*/)$",
103        "(?i)^(/\\*!40103 SET(.*) \\*/)$",
104        "(?i)^(/\\*!40111 SET(.*) \\*/)$",
105        "(?i)^(/\\*!40101 SET(.*) \\*/)$",
106        "(?i)^(/\\*!40014 SET(.*) \\*/)$",
107        "(?i)^(/\\*!40000 SET(.*) \\*/)$",
108    ]).unwrap()
109});
110
111static VAR_VALUES: Lazy<HashMap<&str, &str>> = Lazy::new(|| {
112    HashMap::from([
113        ("tx_isolation", "REPEATABLE-READ"),
114        ("session.tx_isolation", "REPEATABLE-READ"),
115        ("transaction_isolation", "REPEATABLE-READ"),
116        ("session.transaction_isolation", "REPEATABLE-READ"),
117        ("session.transaction_read_only", "0"),
118        ("max_allowed_packet", "134217728"),
119        ("interactive_timeout", "31536000"),
120        ("wait_timeout", "31536000"),
121        ("net_write_timeout", "31536000"),
122        ("version_comment", "Greptime"),
123    ])
124});
125
126// Recordbatches for select function.
127// Format:
128// |function_name|
129// |value|
130fn select_function(name: &str, value: &str) -> RecordBatches {
131    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
132        name,
133        ConcreteDataType::string_datatype(),
134        true,
135    )]));
136    let columns = vec![Arc::new(StringVector::from(vec![value])) as _];
137    RecordBatches::try_from_columns(schema, columns)
138        // unwrap is safe because the schema and data are definitely able to form a recordbatch, they are all string type
139        .unwrap()
140}
141
142// Recordbatches for show variable statement.
143// Format is:
144// | Variable_name | Value |
145// | xx            | yy    |
146fn show_variables(name: &str, value: &str) -> RecordBatches {
147    let schema = Arc::new(Schema::new(vec![
148        ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), true),
149        ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
150    ]));
151    let columns = vec![
152        Arc::new(StringVector::from(vec![name])) as _,
153        Arc::new(StringVector::from(vec![value])) as _,
154    ];
155    RecordBatches::try_from_columns(schema, columns)
156        // unwrap is safe because the schema and data are definitely able to form a recordbatch, they are all string type
157        .unwrap()
158}
159
160fn select_variable(query: &str, query_context: QueryContextRef) -> Option<Output> {
161    let mut fields = vec![];
162    let mut values = vec![];
163
164    // query like "SELECT @@aa, @@bb as cc, @dd..."
165    let query = query.to_lowercase();
166    let vars: Vec<&str> = query.split("@@").collect();
167    if vars.len() <= 1 {
168        return None;
169    }
170
171    // skip the first "select"
172    for var in vars.iter().skip(1) {
173        let var = var.trim_matches(|c| c == ' ' || c == ',' || c == ';');
174        let var_as: Vec<&str> = var
175            .split(" as ")
176            .map(|x| {
177                x.trim_matches(|c| c == ' ')
178                    .split_whitespace()
179                    .next()
180                    .unwrap_or("")
181            })
182            .collect();
183
184        // get value of variables from known sources or fallback to defaults
185        let value = match var_as[0] {
186            "session.time_zone" | "time_zone" => query_context.timezone().to_string(),
187            "system_time_zone" => system_timezone_name(),
188            "max_execution_time" | "session.max_execution_time" => {
189                query_context.query_timeout_as_millis().to_string()
190            }
191            _ => VAR_VALUES
192                .get(var_as[0])
193                .map(|v| v.to_string())
194                .unwrap_or_else(|| "0".to_owned()),
195        };
196
197        values.push(Arc::new(StringVector::from(vec![value])) as _);
198        match var_as.len() {
199            1 => {
200                // @@aa
201                // field is '@@aa'
202                fields.push(ColumnSchema::new(
203                    format!("@@{}", var_as[0]),
204                    ConcreteDataType::string_datatype(),
205                    true,
206                ));
207            }
208            2 => {
209                // @@bb as cc:
210                // var is 'bb'.
211                // field is 'cc'.
212                fields.push(ColumnSchema::new(
213                    var_as[1],
214                    ConcreteDataType::string_datatype(),
215                    true,
216                ));
217            }
218            _ => return None,
219        }
220    }
221
222    let schema = Arc::new(Schema::new(fields));
223    // unwrap is safe because the schema and data are definitely able to form a recordbatch, they are all string type
224    let batches = RecordBatches::try_from_columns(schema, values).unwrap();
225    Some(Output::new_with_record_batches(batches))
226}
227
228fn check_select_variable(query: &str, query_context: QueryContextRef) -> Option<Output> {
229    if [&SELECT_VAR_PATTERN, &MYSQL_CONN_JAVA_PATTERN]
230        .iter()
231        .any(|r| r.is_match(query))
232    {
233        select_variable(query, query_context)
234    } else {
235        None
236    }
237}
238
239fn check_show_variables(query: &str) -> Option<Output> {
240    let recordbatches = if SHOW_SQL_MODE_PATTERN.is_match(query) {
241        Some(show_variables(
242            "sql_mode",
243            "ONLY_FULL_GROUP_BY STRICT_TRANS_TABLES NO_ZERO_IN_DATE NO_ZERO_DATE ERROR_FOR_DIVISION_BY_ZERO NO_ENGINE_SUBSTITUTION",
244        ))
245    } else if SHOW_LOWER_CASE_PATTERN.is_match(query) {
246        Some(show_variables("lower_case_table_names", "0"))
247    } else if SHOW_VARIABLES_LIKE_PATTERN.is_match(query) {
248        Some(show_variables("", ""))
249    } else {
250        None
251    };
252    recordbatches.map(Output::new_with_record_batches)
253}
254
255// Check for SET or others query, this is the final check of the federated query.
256fn check_others(query: &str, _query_ctx: QueryContextRef) -> Option<Output> {
257    if OTHER_NOT_SUPPORTED_STMT.is_match(query.as_bytes()) {
258        return Some(Output::new_with_record_batches(RecordBatches::empty()));
259    }
260
261    let recordbatches = if SELECT_TIME_DIFF_FUNC_PATTERN.is_match(query) {
262        Some(select_function(
263            "TIMEDIFF(NOW(), UTC_TIMESTAMP())",
264            "00:00:00",
265        ))
266    } else {
267        None
268    };
269    recordbatches.map(Output::new_with_record_batches)
270}
271
272// Check whether the query is a federated or driver setup command,
273// and return some faked results if there are any.
274pub(crate) fn check(
275    query: &str,
276    query_ctx: QueryContextRef,
277    _session: SessionRef,
278) -> Option<Output> {
279    // INSERT don't need MySQL federated check. We assume the query doesn't contain
280    // federated or driver setup command if it starts with a 'INSERT' statement.
281    let the_6th_index = query.char_indices().nth(6).map(|(i, _)| i);
282    if let Some(index) = the_6th_index
283        && query[..index].eq_ignore_ascii_case("INSERT")
284    {
285        return None;
286    }
287
288    // First to check the query is like "select @@variables".
289    check_select_variable(query, query_ctx.clone())
290        // Then to check "show variables like ...".
291        .or_else(|| check_show_variables(query))
292        // Last check
293        .or_else(|| check_others(query, query_ctx))
294}
295
296#[cfg(test)]
297mod test {
298
299    use common_query::OutputData;
300    use common_time::timezone::set_default_timezone;
301    use session::Session;
302    use session::context::{Channel, QueryContext};
303
304    use super::*;
305
306    #[test]
307    fn test_check_abnormal() {
308        let session = Arc::new(Session::new(None, Channel::Mysql, Default::default(), 0));
309        let query = "🫣一点不正常的东西🫣";
310        let output = check(query, QueryContext::arc(), session.clone());
311
312        assert!(output.is_none());
313    }
314
315    #[test]
316    fn test_check() {
317        let session = Arc::new(Session::new(None, Channel::Mysql, Default::default(), 0));
318        let query = "select 1";
319        let result = check(query, QueryContext::arc(), session.clone());
320        assert!(result.is_none());
321
322        let query = "select version";
323        let output = check(query, QueryContext::arc(), session.clone());
324        assert!(output.is_none());
325
326        fn test(query: &str, expected: &str) {
327            let session = Arc::new(Session::new(None, Channel::Mysql, Default::default(), 0));
328            let output = check(query, QueryContext::arc(), session.clone());
329            match output.unwrap().data {
330                OutputData::RecordBatches(r) => {
331                    assert_eq!(&r.pretty_print().unwrap(), expected)
332                }
333                _ => unreachable!(),
334            }
335        }
336
337        let query = "SELECT @@version_comment LIMIT 1";
338        let expected = "\
339+-------------------+
340| @@version_comment |
341+-------------------+
342| Greptime          |
343+-------------------+";
344        test(query, expected);
345
346        // variables
347        let query = "select @@tx_isolation, @@session.tx_isolation";
348        let expected = "\
349+-----------------+------------------------+
350| @@tx_isolation  | @@session.tx_isolation |
351+-----------------+------------------------+
352| REPEATABLE-READ | REPEATABLE-READ        |
353+-----------------+------------------------+";
354        test(query, expected);
355
356        // set system timezone
357        set_default_timezone(Some("Asia/Shanghai")).unwrap();
358        // complex variables
359        let query = "/* mysql-connector-java-8.0.17 (Revision: 16a712ddb3f826a1933ab42b0039f7fb9eebc6ec) */SELECT  @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@collation_server AS collation_server, @@collation_connection AS collation_connection, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_write_timeout AS net_write_timeout, @@performance_schema AS performance_schema, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@transaction_isolation AS transaction_isolation, @@wait_timeout AS wait_timeout;";
360        let expected = "\
361+--------------------------+----------------------+--------------------------+-----------------------+----------------------+------------------+----------------------+--------------+---------------------+---------+------------------------+--------------------+-------------------+--------------------+----------+------------------+---------------+-----------------------+--------------+
362| auto_increment_increment | character_set_client | character_set_connection | character_set_results | character_set_server | collation_server | collation_connection | init_connect | interactive_timeout | license | lower_case_table_names | max_allowed_packet | net_write_timeout | performance_schema | sql_mode | system_time_zone | time_zone     | transaction_isolation | wait_timeout |
363+--------------------------+----------------------+--------------------------+-----------------------+----------------------+------------------+----------------------+--------------+---------------------+---------+------------------------+--------------------+-------------------+--------------------+----------+------------------+---------------+-----------------------+--------------+
364| 0                        | 0                    | 0                        | 0                     | 0                    | 0                | 0                    | 0            | 31536000            | 0       | 0                      | 134217728          | 31536000          | 0                  | 0        | Asia/Shanghai    | Asia/Shanghai | REPEATABLE-READ       | 31536000     |
365+--------------------------+----------------------+--------------------------+-----------------------+----------------------+------------------+----------------------+--------------+---------------------+---------+------------------------+--------------------+-------------------+--------------------+----------+------------------+---------------+-----------------------+--------------+";
366        test(query, expected);
367
368        let query = "show variables";
369        let expected = "\
370+---------------+-------+
371| Variable_name | Value |
372+---------------+-------+
373|               |       |
374+---------------+-------+";
375        test(query, expected);
376
377        let query = "show variables like 'lower_case_table_names'";
378        let expected = "\
379+------------------------+-------+
380| Variable_name          | Value |
381+------------------------+-------+
382| lower_case_table_names | 0     |
383+------------------------+-------+";
384        test(query, expected);
385
386        let query = "SELECT TIMEDIFF(NOW(), UTC_TIMESTAMP())";
387        let expected = "\
388+----------------------------------+
389| TIMEDIFF(NOW(), UTC_TIMESTAMP()) |
390+----------------------------------+
391| 00:00:00                         |
392+----------------------------------+";
393        test(query, expected);
394    }
395}