session/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod context;
16pub mod hints;
17pub mod session_config;
18pub mod table_name;
19
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::{Arc, RwLock};
23use std::time::Duration;
24
25use auth::UserInfoRef;
26use common_catalog::build_db_string;
27use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
28use common_recordbatch::cursor::RecordBatchStreamCursor;
29pub use common_session::ReadPreference;
30use common_time::timezone::get_timezone;
31use common_time::Timezone;
32use context::{ConfigurationVariables, QueryContextBuilder};
33use derive_more::Debug;
34
35use crate::context::{Channel, ConnInfo, QueryContextRef};
36
37/// Session for persistent connection such as MySQL, PostgreSQL etc.
38#[derive(Debug)]
39pub struct Session {
40    catalog: RwLock<String>,
41    mutable_inner: Arc<RwLock<MutableInner>>,
42    conn_info: ConnInfo,
43    configuration_variables: Arc<ConfigurationVariables>,
44}
45
46pub type SessionRef = Arc<Session>;
47
48/// A container for mutable items in query context
49#[derive(Debug)]
50pub(crate) struct MutableInner {
51    schema: String,
52    user_info: UserInfoRef,
53    timezone: Timezone,
54    query_timeout: Option<Duration>,
55    read_preference: ReadPreference,
56    #[debug(skip)]
57    pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
58}
59
60impl Default for MutableInner {
61    fn default() -> Self {
62        Self {
63            schema: DEFAULT_SCHEMA_NAME.into(),
64            user_info: auth::userinfo_by_name(None),
65            timezone: get_timezone(None).clone(),
66            query_timeout: None,
67            read_preference: ReadPreference::Leader,
68            cursors: HashMap::with_capacity(0),
69        }
70    }
71}
72
73impl Session {
74    pub fn new(
75        addr: Option<SocketAddr>,
76        channel: Channel,
77        configuration_variables: ConfigurationVariables,
78    ) -> Self {
79        Session {
80            catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
81            conn_info: ConnInfo::new(addr, channel),
82            configuration_variables: Arc::new(configuration_variables),
83            mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
84        }
85    }
86
87    pub fn new_query_context(&self) -> QueryContextRef {
88        QueryContextBuilder::default()
89            // catalog is not allowed for update in query context so we use
90            // string here
91            .current_catalog(self.catalog.read().unwrap().clone())
92            .mutable_session_data(self.mutable_inner.clone())
93            .sql_dialect(self.conn_info.channel.dialect())
94            .configuration_parameter(self.configuration_variables.clone())
95            .channel(self.conn_info.channel)
96            .build()
97            .into()
98    }
99
100    pub fn conn_info(&self) -> &ConnInfo {
101        &self.conn_info
102    }
103
104    pub fn timezone(&self) -> Timezone {
105        self.mutable_inner.read().unwrap().timezone.clone()
106    }
107
108    pub fn read_preference(&self) -> ReadPreference {
109        self.mutable_inner.read().unwrap().read_preference
110    }
111
112    pub fn set_timezone(&self, tz: Timezone) {
113        let mut inner = self.mutable_inner.write().unwrap();
114        inner.timezone = tz;
115    }
116
117    pub fn set_read_preference(&self, read_preference: ReadPreference) {
118        self.mutable_inner.write().unwrap().read_preference = read_preference;
119    }
120
121    pub fn user_info(&self) -> UserInfoRef {
122        self.mutable_inner.read().unwrap().user_info.clone()
123    }
124
125    pub fn set_user_info(&self, user_info: UserInfoRef) {
126        self.mutable_inner.write().unwrap().user_info = user_info;
127    }
128
129    pub fn set_catalog(&self, catalog: String) {
130        *self.catalog.write().unwrap() = catalog;
131    }
132
133    pub fn catalog(&self) -> String {
134        self.catalog.read().unwrap().clone()
135    }
136
137    pub fn schema(&self) -> String {
138        self.mutable_inner.read().unwrap().schema.clone()
139    }
140
141    pub fn set_schema(&self, schema: String) {
142        self.mutable_inner.write().unwrap().schema = schema;
143    }
144
145    pub fn get_db_string(&self) -> String {
146        build_db_string(&self.catalog(), &self.schema())
147    }
148}