common_meta/key/
datanode_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::key::{
27    MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN,
28    DATANODE_TABLE_KEY_PREFIX,
29};
30use crate::kv_backend::txn::{Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
33use crate::rpc::router::region_distribution;
34use crate::rpc::store::{BatchGetRequest, RangeRequest};
35use crate::rpc::KeyValue;
36use crate::DatanodeId;
37
38#[serde_with::serde_as]
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
40/// RegionInfo
41/// For compatible reason, DON'T modify the field name.
42pub struct RegionInfo {
43    #[serde(default)]
44    /// The table engine, it SHOULD be immutable after created.
45    pub engine: String,
46    /// The region storage path, it SHOULD be immutable after created.
47    #[serde(default)]
48    pub region_storage_path: String,
49    /// The region options.
50    #[serde(default)]
51    pub region_options: HashMap<String, String>,
52    /// The per-region wal options.
53    /// Key: region number. Value: the encoded wal options of the region.
54    #[serde(default)]
55    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
56    pub region_wal_options: HashMap<RegionNumber, String>,
57}
58
59/// The key mapping {datanode_id} to {table_id}
60///
61/// The layout: `__dn_table/{datanode_id}/{table_id}`.
62#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct DatanodeTableKey {
64    pub datanode_id: DatanodeId,
65    pub table_id: TableId,
66}
67
68impl DatanodeTableKey {
69    pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
70        Self {
71            datanode_id,
72            table_id,
73        }
74    }
75
76    pub fn prefix(datanode_id: DatanodeId) -> String {
77        format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
78    }
79}
80
81impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
82    fn to_bytes(&self) -> Vec<u8> {
83        self.to_string().into_bytes()
84    }
85
86    fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
87        let key = std::str::from_utf8(bytes).map_err(|e| {
88            InvalidMetadataSnafu {
89                err_msg: format!(
90                    "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
91                    String::from_utf8_lossy(bytes)
92                ),
93            }
94            .build()
95        })?;
96        let captures = DATANODE_TABLE_KEY_PATTERN
97            .captures(key)
98            .context(InvalidMetadataSnafu {
99                err_msg: format!("Invalid DatanodeTableKey '{key}'"),
100            })?;
101        // Safety: pass the regex check above
102        let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
103        let table_id = captures[2].parse::<TableId>().unwrap();
104        Ok(DatanodeTableKey {
105            datanode_id,
106            table_id,
107        })
108    }
109}
110
111impl Display for DatanodeTableKey {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct DatanodeTableValue {
119    pub table_id: TableId,
120    pub regions: Vec<RegionNumber>,
121    #[serde(default)]
122    pub follower_regions: Vec<RegionNumber>,
123    #[serde(flatten)]
124    pub region_info: RegionInfo,
125    version: u64,
126}
127
128impl DatanodeTableValue {
129    pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
130        let RegionRoleSet {
131            leader_regions,
132            follower_regions,
133        } = region_role_set;
134
135        Self {
136            table_id,
137            regions: leader_regions,
138            follower_regions,
139            region_info,
140            version: 0,
141        }
142    }
143}
144
145/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
146pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
147    DatanodeTableValue::try_from_raw_value(&kv.value)
148}
149
150pub struct DatanodeTableManager {
151    kv_backend: KvBackendRef,
152}
153
154impl DatanodeTableManager {
155    pub fn new(kv_backend: KvBackendRef) -> Self {
156        Self { kv_backend }
157    }
158
159    pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
160        self.kv_backend
161            .get(&key.to_bytes())
162            .await?
163            .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
164            .transpose()
165    }
166
167    pub fn tables(
168        &self,
169        datanode_id: DatanodeId,
170    ) -> BoxStream<'static, Result<DatanodeTableValue>> {
171        let start_key = DatanodeTableKey::prefix(datanode_id);
172        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
173
174        let stream = PaginationStream::new(
175            self.kv_backend.clone(),
176            req,
177            DEFAULT_PAGE_SIZE,
178            datanode_table_value_decoder,
179        )
180        .into_stream();
181
182        Box::pin(stream)
183    }
184
185    /// Find the [DatanodeTableValue]s for the given [TableId] and [PhysicalTableRouteValue].
186    pub async fn regions(
187        &self,
188        table_id: TableId,
189        table_routes: &PhysicalTableRouteValue,
190    ) -> Result<Vec<DatanodeTableValue>> {
191        let keys = region_distribution(&table_routes.region_routes)
192            .into_keys()
193            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
194            .collect::<Vec<_>>();
195        let req = BatchGetRequest {
196            keys: keys.iter().map(|k| k.to_bytes()).collect(),
197        };
198        let resp = self.kv_backend.batch_get(req).await?;
199        resp.kvs
200            .into_iter()
201            .map(datanode_table_value_decoder)
202            .collect()
203    }
204
205    /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
206    pub fn build_create_txn(
207        &self,
208        table_id: TableId,
209        engine: &str,
210        region_storage_path: &str,
211        region_options: HashMap<String, String>,
212        region_wal_options: HashMap<RegionNumber, String>,
213        distribution: RegionDistribution,
214    ) -> Result<Txn> {
215        let txns = distribution
216            .into_iter()
217            .map(|(datanode_id, regions)| {
218                let key = DatanodeTableKey::new(datanode_id, table_id);
219                let val = DatanodeTableValue::new(
220                    table_id,
221                    regions,
222                    RegionInfo {
223                        engine: engine.to_string(),
224                        region_storage_path: region_storage_path.to_string(),
225                        region_options: region_options.clone(),
226                        // FIXME(weny): Before we store all region wal options into table metadata or somewhere,
227                        // We must store all region wal options.
228                        region_wal_options: region_wal_options.clone(),
229                    },
230                );
231
232                Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
233            })
234            .collect::<Result<Vec<_>>>()?;
235
236        let txn = Txn::new().and_then(txns);
237
238        Ok(txn)
239    }
240
241    /// Builds a transaction to updates the redundant table options (including WAL options)
242    /// for given table id, if provided.
243    ///
244    /// Note that the provided `new_region_options` must be a
245    /// complete set of all options rather than incremental changes.
246    pub(crate) async fn build_update_table_options_txn(
247        &self,
248        table_id: TableId,
249        region_distribution: RegionDistribution,
250        new_region_options: HashMap<String, String>,
251    ) -> Result<Txn> {
252        assert!(!region_distribution.is_empty());
253        // safety: region_distribution must not be empty
254        let (any_datanode, _) = region_distribution.first_key_value().unwrap();
255
256        let mut region_info = self
257            .kv_backend
258            .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
259            .await
260            .transpose()
261            .context(DatanodeTableInfoNotFoundSnafu {
262                datanode_id: *any_datanode,
263                table_id,
264            })?
265            .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
266            .region_info;
267
268        // If the region options are the same, we don't need to update it.
269        if region_info.region_options == new_region_options {
270            return Ok(Txn::new());
271        }
272        // substitute region options only.
273        region_info.region_options = new_region_options;
274
275        let mut txns = Vec::with_capacity(region_distribution.len());
276
277        for (datanode, regions) in region_distribution.into_iter() {
278            let key = DatanodeTableKey::new(datanode, table_id);
279            let key_bytes = key.to_bytes();
280            let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
281                .try_as_raw_value()?;
282            txns.push(TxnOp::Put(key_bytes, value_bytes));
283        }
284
285        let txn = Txn::new().and_then(txns);
286        Ok(txn)
287    }
288
289    /// Builds the update datanode table transactions. It only executes while the primary keys comparing successes.
290    pub(crate) fn build_update_txn(
291        &self,
292        table_id: TableId,
293        region_info: RegionInfo,
294        current_region_distribution: RegionDistribution,
295        new_region_distribution: RegionDistribution,
296        new_region_options: &HashMap<String, String>,
297        new_region_wal_options: &HashMap<RegionNumber, String>,
298    ) -> Result<Txn> {
299        let mut opts = Vec::new();
300
301        // Removes the old datanode table key value pairs
302        for current_datanode in current_region_distribution.keys() {
303            if !new_region_distribution.contains_key(current_datanode) {
304                let key = DatanodeTableKey::new(*current_datanode, table_id);
305                let raw_key = key.to_bytes();
306                opts.push(TxnOp::Delete(raw_key))
307            }
308        }
309
310        let need_update_options = region_info.region_options != *new_region_options;
311        let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
312
313        for (datanode, regions) in new_region_distribution.into_iter() {
314            let need_update =
315                if let Some(current_region) = current_region_distribution.get(&datanode) {
316                    // Updates if need.
317                    *current_region != regions || need_update_options || need_update_wal_options
318                } else {
319                    true
320                };
321            if need_update {
322                let key = DatanodeTableKey::new(datanode, table_id);
323                let raw_key = key.to_bytes();
324                // FIXME(weny): add unit tests.
325                let mut new_region_info = region_info.clone();
326                if need_update_options {
327                    new_region_info
328                        .region_options
329                        .clone_from(new_region_options);
330                }
331                if need_update_wal_options {
332                    new_region_info
333                        .region_wal_options
334                        .clone_from(new_region_wal_options);
335                }
336                let val = DatanodeTableValue::new(table_id, regions, new_region_info)
337                    .try_as_raw_value()?;
338                opts.push(TxnOp::Put(raw_key, val));
339            }
340        }
341
342        let txn = Txn::new().and_then(opts);
343        Ok(txn)
344    }
345
346    /// Builds the delete datanode table transactions. It only executes while the primary keys comparing successes.
347    pub fn build_delete_txn(
348        &self,
349        table_id: TableId,
350        distribution: RegionDistribution,
351    ) -> Result<Txn> {
352        let txns = distribution
353            .into_keys()
354            .map(|datanode_id| {
355                let key = DatanodeTableKey::new(datanode_id, table_id);
356                let raw_key = key.to_bytes();
357
358                Ok(TxnOp::Delete(raw_key))
359            })
360            .collect::<Result<Vec<_>>>()?;
361
362        let txn = Txn::new().and_then(txns);
363
364        Ok(txn)
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn test_serialization() {
374        let key = DatanodeTableKey {
375            datanode_id: 1,
376            table_id: 2,
377        };
378        let raw_key = key.to_bytes();
379        assert_eq!(raw_key, b"__dn_table/1/2");
380
381        let value = DatanodeTableValue {
382            table_id: 42,
383            regions: vec![1, 2, 3],
384            follower_regions: vec![],
385            region_info: RegionInfo::default(),
386            version: 1,
387        };
388        let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
389
390        let raw_value = value.try_as_raw_value().unwrap();
391        assert_eq!(raw_value, literal);
392
393        let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
394        assert_eq!(actual, value);
395
396        // test serde default
397        let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
398        let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
399        assert!(parsed.is_ok());
400    }
401
402    #[derive(Debug, Serialize, Deserialize, PartialEq)]
403    struct StringHashMap {
404        inner: HashMap<String, String>,
405    }
406
407    #[serde_with::serde_as]
408    #[derive(Debug, Serialize, Deserialize, PartialEq)]
409    struct IntegerHashMap {
410        #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
411        inner: HashMap<u32, String>,
412    }
413
414    #[test]
415    fn test_serde_with_integer_hash_map() {
416        let map = StringHashMap {
417            inner: HashMap::from([
418                ("1".to_string(), "aaa".to_string()),
419                ("2".to_string(), "bbb".to_string()),
420                ("3".to_string(), "ccc".to_string()),
421            ]),
422        };
423        let encoded = serde_json::to_string(&map).unwrap();
424        let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
425        assert_eq!(
426            IntegerHashMap {
427                inner: HashMap::from([
428                    (1, "aaa".to_string()),
429                    (2, "bbb".to_string()),
430                    (3, "ccc".to_string()),
431                ]),
432            },
433            decoded
434        );
435
436        let map = IntegerHashMap {
437            inner: HashMap::from([
438                (1, "aaa".to_string()),
439                (2, "bbb".to_string()),
440                (3, "ccc".to_string()),
441            ]),
442        };
443        let encoded = serde_json::to_string(&map).unwrap();
444        let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
445        assert_eq!(
446            StringHashMap {
447                inner: HashMap::from([
448                    ("1".to_string(), "aaa".to_string()),
449                    ("2".to_string(), "bbb".to_string()),
450                    ("3".to_string(), "ccc".to_string()),
451                ]),
452            },
453            decoded
454        );
455    }
456
457    // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str`
458    // and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`.
459    // Warning: if the key of `region_wal_options` is of type non-String, this test would fail.
460    #[test]
461    fn test_serde_with_region_info() {
462        let region_info = RegionInfo {
463            engine: "test_engine".to_string(),
464            region_storage_path: "test_storage_path".to_string(),
465            region_options: HashMap::from([
466                ("a".to_string(), "aa".to_string()),
467                ("b".to_string(), "bb".to_string()),
468                ("c".to_string(), "cc".to_string()),
469            ]),
470            region_wal_options: HashMap::from([
471                (1, "aaa".to_string()),
472                (2, "bbb".to_string()),
473                (3, "ccc".to_string()),
474            ]),
475        };
476        let table_value = DatanodeTableValue {
477            table_id: 1,
478            regions: vec![],
479            follower_regions: vec![],
480            region_info,
481            version: 1,
482        };
483
484        let encoded = serde_json::to_string(&table_value).unwrap();
485        let decoded = serde_json::from_str(&encoded).unwrap();
486        assert_eq!(table_value, decoded);
487
488        let encoded = serde_json::to_vec(&table_value).unwrap();
489        let decoded = serde_json::from_slice(&encoded).unwrap();
490        assert_eq!(table_value, decoded);
491    }
492
493    #[test]
494    fn test_deserialization() {
495        fn test_err(raw_key: &[u8]) {
496            let result = DatanodeTableKey::from_bytes(raw_key);
497            assert!(result.is_err());
498        }
499
500        test_err(b"");
501        test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
502        test_err(b"invalid_prefix/1/2");
503        test_err(b"__dn_table/");
504        test_err(b"__dn_table/invalid_len_1");
505        test_err(b"__dn_table/invalid_len_3/1/2");
506        test_err(b"__dn_table/invalid_node_id/2");
507        test_err(b"__dn_table/1/invalid_table_id");
508
509        let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
510        assert_eq!(DatanodeTableKey::new(11, 21), key);
511    }
512}