common_meta/key/
datanode_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::key::{
27    MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN,
28    DATANODE_TABLE_KEY_PREFIX,
29};
30use crate::kv_backend::txn::{Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
33use crate::rpc::router::region_distribution;
34use crate::rpc::store::{BatchGetRequest, RangeRequest};
35use crate::rpc::KeyValue;
36use crate::DatanodeId;
37
38#[serde_with::serde_as]
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
40/// RegionInfo
41/// For compatible reason, DON'T modify the field name.
42pub struct RegionInfo {
43    #[serde(default)]
44    /// The table engine, it SHOULD be immutable after created.
45    pub engine: String,
46    /// The region storage path, it SHOULD be immutable after created.
47    #[serde(default)]
48    pub region_storage_path: String,
49    /// The region options.
50    #[serde(default)]
51    pub region_options: HashMap<String, String>,
52    /// The per-region wal options.
53    /// Key: region number. Value: the encoded wal options of the region.
54    #[serde(default)]
55    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
56    pub region_wal_options: HashMap<RegionNumber, String>,
57}
58
59/// The key mapping {datanode_id} to {table_id}
60///
61/// The layout: `__dn_table/{datanode_id}/{table_id}`.
62#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct DatanodeTableKey {
64    pub datanode_id: DatanodeId,
65    pub table_id: TableId,
66}
67
68impl DatanodeTableKey {
69    pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
70        Self {
71            datanode_id,
72            table_id,
73        }
74    }
75
76    pub fn prefix(datanode_id: DatanodeId) -> String {
77        format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
78    }
79}
80
81impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
82    fn to_bytes(&self) -> Vec<u8> {
83        self.to_string().into_bytes()
84    }
85
86    fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
87        let key = std::str::from_utf8(bytes).map_err(|e| {
88            InvalidMetadataSnafu {
89                err_msg: format!(
90                    "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
91                    String::from_utf8_lossy(bytes)
92                ),
93            }
94            .build()
95        })?;
96        let captures = DATANODE_TABLE_KEY_PATTERN
97            .captures(key)
98            .context(InvalidMetadataSnafu {
99                err_msg: format!("Invalid DatanodeTableKey '{key}'"),
100            })?;
101        // Safety: pass the regex check above
102        let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
103        let table_id = captures[2].parse::<TableId>().unwrap();
104        Ok(DatanodeTableKey {
105            datanode_id,
106            table_id,
107        })
108    }
109}
110
111impl Display for DatanodeTableKey {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct DatanodeTableValue {
119    pub table_id: TableId,
120    pub regions: Vec<RegionNumber>,
121    #[serde(flatten)]
122    pub region_info: RegionInfo,
123    version: u64,
124}
125
126impl DatanodeTableValue {
127    pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
128        Self {
129            table_id,
130            regions,
131            region_info,
132            version: 0,
133        }
134    }
135}
136
137/// Decodes `KeyValue` to ((),`DatanodeTableValue`)
138pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
139    DatanodeTableValue::try_from_raw_value(&kv.value)
140}
141
142pub struct DatanodeTableManager {
143    kv_backend: KvBackendRef,
144}
145
146impl DatanodeTableManager {
147    pub fn new(kv_backend: KvBackendRef) -> Self {
148        Self { kv_backend }
149    }
150
151    pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
152        self.kv_backend
153            .get(&key.to_bytes())
154            .await?
155            .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
156            .transpose()
157    }
158
159    pub fn tables(
160        &self,
161        datanode_id: DatanodeId,
162    ) -> BoxStream<'static, Result<DatanodeTableValue>> {
163        let start_key = DatanodeTableKey::prefix(datanode_id);
164        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
165
166        let stream = PaginationStream::new(
167            self.kv_backend.clone(),
168            req,
169            DEFAULT_PAGE_SIZE,
170            datanode_table_value_decoder,
171        )
172        .into_stream();
173
174        Box::pin(stream)
175    }
176
177    /// Find the [DatanodeTableValue]s for the given [TableId] and [PhysicalTableRouteValue].
178    pub async fn regions(
179        &self,
180        table_id: TableId,
181        table_routes: &PhysicalTableRouteValue,
182    ) -> Result<Vec<DatanodeTableValue>> {
183        let keys = region_distribution(&table_routes.region_routes)
184            .into_keys()
185            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
186            .collect::<Vec<_>>();
187        let req = BatchGetRequest {
188            keys: keys.iter().map(|k| k.to_bytes()).collect(),
189        };
190        let resp = self.kv_backend.batch_get(req).await?;
191        resp.kvs
192            .into_iter()
193            .map(datanode_table_value_decoder)
194            .collect()
195    }
196
197    /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
198    pub fn build_create_txn(
199        &self,
200        table_id: TableId,
201        engine: &str,
202        region_storage_path: &str,
203        region_options: HashMap<String, String>,
204        region_wal_options: HashMap<RegionNumber, String>,
205        distribution: RegionDistribution,
206    ) -> Result<Txn> {
207        let txns = distribution
208            .into_iter()
209            .map(|(datanode_id, regions)| {
210                let key = DatanodeTableKey::new(datanode_id, table_id);
211                let val = DatanodeTableValue::new(
212                    table_id,
213                    regions,
214                    RegionInfo {
215                        engine: engine.to_string(),
216                        region_storage_path: region_storage_path.to_string(),
217                        region_options: region_options.clone(),
218                        // FIXME(weny): Before we store all region wal options into table metadata or somewhere,
219                        // We must store all region wal options.
220                        region_wal_options: region_wal_options.clone(),
221                    },
222                );
223
224                Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
225            })
226            .collect::<Result<Vec<_>>>()?;
227
228        let txn = Txn::new().and_then(txns);
229
230        Ok(txn)
231    }
232
233    /// Builds a transaction to updates the redundant table options (including WAL options)
234    /// for given table id, if provided.
235    ///
236    /// Note that the provided `new_region_options` must be a
237    /// complete set of all options rather than incremental changes.
238    pub(crate) async fn build_update_table_options_txn(
239        &self,
240        table_id: TableId,
241        region_distribution: RegionDistribution,
242        new_region_options: HashMap<String, String>,
243    ) -> Result<Txn> {
244        assert!(!region_distribution.is_empty());
245        // safety: region_distribution must not be empty
246        let (any_datanode, _) = region_distribution.first_key_value().unwrap();
247
248        let mut region_info = self
249            .kv_backend
250            .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
251            .await
252            .transpose()
253            .context(DatanodeTableInfoNotFoundSnafu {
254                datanode_id: *any_datanode,
255                table_id,
256            })?
257            .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
258            .region_info;
259        // substitute region options only.
260        region_info.region_options = new_region_options;
261
262        let mut txns = Vec::with_capacity(region_distribution.len());
263
264        for (datanode, regions) in region_distribution.into_iter() {
265            let key = DatanodeTableKey::new(datanode, table_id);
266            let key_bytes = key.to_bytes();
267            let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
268                .try_as_raw_value()?;
269            txns.push(TxnOp::Put(key_bytes, value_bytes));
270        }
271
272        let txn = Txn::new().and_then(txns);
273        Ok(txn)
274    }
275
276    /// Builds the update datanode table transactions. It only executes while the primary keys comparing successes.
277    pub(crate) fn build_update_txn(
278        &self,
279        table_id: TableId,
280        region_info: RegionInfo,
281        current_region_distribution: RegionDistribution,
282        new_region_distribution: RegionDistribution,
283        new_region_options: &HashMap<String, String>,
284        new_region_wal_options: &HashMap<RegionNumber, String>,
285    ) -> Result<Txn> {
286        let mut opts = Vec::new();
287
288        // Removes the old datanode table key value pairs
289        for current_datanode in current_region_distribution.keys() {
290            if !new_region_distribution.contains_key(current_datanode) {
291                let key = DatanodeTableKey::new(*current_datanode, table_id);
292                let raw_key = key.to_bytes();
293                opts.push(TxnOp::Delete(raw_key))
294            }
295        }
296
297        let need_update_options = region_info.region_options != *new_region_options;
298        let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
299
300        for (datanode, regions) in new_region_distribution.into_iter() {
301            let need_update =
302                if let Some(current_region) = current_region_distribution.get(&datanode) {
303                    // Updates if need.
304                    *current_region != regions || need_update_options || need_update_wal_options
305                } else {
306                    true
307                };
308            if need_update {
309                let key = DatanodeTableKey::new(datanode, table_id);
310                let raw_key = key.to_bytes();
311                // FIXME(weny): add unit tests.
312                let mut new_region_info = region_info.clone();
313                if need_update_options {
314                    new_region_info
315                        .region_options
316                        .clone_from(new_region_options);
317                }
318                if need_update_wal_options {
319                    new_region_info
320                        .region_wal_options
321                        .clone_from(new_region_wal_options);
322                }
323                let val = DatanodeTableValue::new(table_id, regions, new_region_info)
324                    .try_as_raw_value()?;
325                opts.push(TxnOp::Put(raw_key, val));
326            }
327        }
328
329        let txn = Txn::new().and_then(opts);
330        Ok(txn)
331    }
332
333    /// Builds the delete datanode table transactions. It only executes while the primary keys comparing successes.
334    pub fn build_delete_txn(
335        &self,
336        table_id: TableId,
337        distribution: RegionDistribution,
338    ) -> Result<Txn> {
339        let txns = distribution
340            .into_keys()
341            .map(|datanode_id| {
342                let key = DatanodeTableKey::new(datanode_id, table_id);
343                let raw_key = key.to_bytes();
344
345                Ok(TxnOp::Delete(raw_key))
346            })
347            .collect::<Result<Vec<_>>>()?;
348
349        let txn = Txn::new().and_then(txns);
350
351        Ok(txn)
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_serialization() {
361        let key = DatanodeTableKey {
362            datanode_id: 1,
363            table_id: 2,
364        };
365        let raw_key = key.to_bytes();
366        assert_eq!(raw_key, b"__dn_table/1/2");
367
368        let value = DatanodeTableValue {
369            table_id: 42,
370            regions: vec![1, 2, 3],
371            region_info: RegionInfo::default(),
372            version: 1,
373        };
374        let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
375
376        let raw_value = value.try_as_raw_value().unwrap();
377        assert_eq!(raw_value, literal);
378
379        let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
380        assert_eq!(actual, value);
381
382        // test serde default
383        let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
384        let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
385        assert!(parsed.is_ok());
386    }
387
388    #[derive(Debug, Serialize, Deserialize, PartialEq)]
389    struct StringHashMap {
390        inner: HashMap<String, String>,
391    }
392
393    #[serde_with::serde_as]
394    #[derive(Debug, Serialize, Deserialize, PartialEq)]
395    struct IntegerHashMap {
396        #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
397        inner: HashMap<u32, String>,
398    }
399
400    #[test]
401    fn test_serde_with_integer_hash_map() {
402        let map = StringHashMap {
403            inner: HashMap::from([
404                ("1".to_string(), "aaa".to_string()),
405                ("2".to_string(), "bbb".to_string()),
406                ("3".to_string(), "ccc".to_string()),
407            ]),
408        };
409        let encoded = serde_json::to_string(&map).unwrap();
410        let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
411        assert_eq!(
412            IntegerHashMap {
413                inner: HashMap::from([
414                    (1, "aaa".to_string()),
415                    (2, "bbb".to_string()),
416                    (3, "ccc".to_string()),
417                ]),
418            },
419            decoded
420        );
421
422        let map = IntegerHashMap {
423            inner: HashMap::from([
424                (1, "aaa".to_string()),
425                (2, "bbb".to_string()),
426                (3, "ccc".to_string()),
427            ]),
428        };
429        let encoded = serde_json::to_string(&map).unwrap();
430        let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
431        assert_eq!(
432            StringHashMap {
433                inner: HashMap::from([
434                    ("1".to_string(), "aaa".to_string()),
435                    ("2".to_string(), "bbb".to_string()),
436                    ("3".to_string(), "ccc".to_string()),
437                ]),
438            },
439            decoded
440        );
441    }
442
443    // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str`
444    // and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`.
445    // Warning: if the key of `region_wal_options` is of type non-String, this test would fail.
446    #[test]
447    fn test_serde_with_region_info() {
448        let region_info = RegionInfo {
449            engine: "test_engine".to_string(),
450            region_storage_path: "test_storage_path".to_string(),
451            region_options: HashMap::from([
452                ("a".to_string(), "aa".to_string()),
453                ("b".to_string(), "bb".to_string()),
454                ("c".to_string(), "cc".to_string()),
455            ]),
456            region_wal_options: HashMap::from([
457                (1, "aaa".to_string()),
458                (2, "bbb".to_string()),
459                (3, "ccc".to_string()),
460            ]),
461        };
462        let table_value = DatanodeTableValue {
463            table_id: 1,
464            regions: vec![],
465            region_info,
466            version: 1,
467        };
468
469        let encoded = serde_json::to_string(&table_value).unwrap();
470        let decoded = serde_json::from_str(&encoded).unwrap();
471        assert_eq!(table_value, decoded);
472
473        let encoded = serde_json::to_vec(&table_value).unwrap();
474        let decoded = serde_json::from_slice(&encoded).unwrap();
475        assert_eq!(table_value, decoded);
476    }
477
478    #[test]
479    fn test_deserialization() {
480        fn test_err(raw_key: &[u8]) {
481            let result = DatanodeTableKey::from_bytes(raw_key);
482            assert!(result.is_err());
483        }
484
485        test_err(b"");
486        test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
487        test_err(b"invalid_prefix/1/2");
488        test_err(b"__dn_table/");
489        test_err(b"__dn_table/invalid_len_1");
490        test_err(b"__dn_table/invalid_len_3/1/2");
491        test_err(b"__dn_table/invalid_node_id/2");
492        test_err(b"__dn_table/1/invalid_table_id");
493
494        let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
495        assert_eq!(DatanodeTableKey::new(11, 21), key);
496    }
497}