common_meta/key/
datanode_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::DatanodeId;
25use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
26use crate::key::table_route::PhysicalTableRouteValue;
27use crate::key::{
28    DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, MetadataKey, MetadataValue,
29    RegionDistribution, RegionRoleSet,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::{Txn, TxnOp};
33use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
34use crate::rpc::KeyValue;
35use crate::rpc::router::region_distribution;
36use crate::rpc::store::{BatchGetRequest, RangeRequest};
37
38#[serde_with::serde_as]
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
40/// RegionInfo
41/// For compatible reason, DON'T modify the field name.
42pub struct RegionInfo {
43    #[serde(default)]
44    /// The table engine, it SHOULD be immutable after created.
45    pub engine: String,
46    /// The region storage path, it SHOULD be immutable after created.
47    #[serde(default)]
48    pub region_storage_path: String,
49    /// The region options.
50    #[serde(default)]
51    pub region_options: HashMap<String, String>,
52    /// The per-region wal options.
53    /// Key: region number. Value: the encoded wal options of the region.
54    #[serde(default)]
55    #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
56    pub region_wal_options: HashMap<RegionNumber, String>,
57}
58
59/// The key mapping {datanode_id} to {table_id}
60///
61/// The layout: `__dn_table/{datanode_id}/{table_id}`.
62#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct DatanodeTableKey {
64    pub datanode_id: DatanodeId,
65    pub table_id: TableId,
66}
67
68impl DatanodeTableKey {
69    pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
70        Self {
71            datanode_id,
72            table_id,
73        }
74    }
75
76    pub fn prefix(datanode_id: DatanodeId) -> String {
77        format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
78    }
79}
80
81impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
82    fn to_bytes(&self) -> Vec<u8> {
83        self.to_string().into_bytes()
84    }
85
86    fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
87        let key = std::str::from_utf8(bytes).map_err(|e| {
88            InvalidMetadataSnafu {
89                err_msg: format!(
90                    "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
91                    String::from_utf8_lossy(bytes)
92                ),
93            }
94            .build()
95        })?;
96        let captures = DATANODE_TABLE_KEY_PATTERN
97            .captures(key)
98            .context(InvalidMetadataSnafu {
99                err_msg: format!("Invalid DatanodeTableKey '{key}'"),
100            })?;
101        // Safety: pass the regex check above
102        let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
103        let table_id = captures[2].parse::<TableId>().unwrap();
104        Ok(DatanodeTableKey {
105            datanode_id,
106            table_id,
107        })
108    }
109}
110
111impl Display for DatanodeTableKey {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct DatanodeTableValue {
119    pub table_id: TableId,
120    pub regions: Vec<RegionNumber>,
121    #[serde(default)]
122    pub follower_regions: Vec<RegionNumber>,
123    #[serde(flatten)]
124    pub region_info: RegionInfo,
125    version: u64,
126}
127
128impl DatanodeTableValue {
129    pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
130        let RegionRoleSet {
131            leader_regions,
132            follower_regions,
133        } = region_role_set;
134
135        Self {
136            table_id,
137            regions: leader_regions,
138            follower_regions,
139            region_info,
140            version: 0,
141        }
142    }
143}
144
145/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
146pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
147    DatanodeTableValue::try_from_raw_value(&kv.value)
148}
149
150pub struct DatanodeTableManager {
151    kv_backend: KvBackendRef,
152}
153
154impl DatanodeTableManager {
155    pub fn new(kv_backend: KvBackendRef) -> Self {
156        Self { kv_backend }
157    }
158
159    pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
160        self.kv_backend
161            .get(&key.to_bytes())
162            .await?
163            .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
164            .transpose()
165    }
166
167    pub async fn batch_get(
168        &self,
169        keys: &[DatanodeTableKey],
170    ) -> Result<HashMap<DatanodeTableKey, DatanodeTableValue>> {
171        let req = BatchGetRequest::default().with_keys(keys.iter().map(|k| k.to_bytes()).collect());
172        let resp = self.kv_backend.batch_get(req).await?;
173        let values = resp
174            .kvs
175            .into_iter()
176            .map(|kv| {
177                Ok((
178                    DatanodeTableKey::from_bytes(&kv.key)?,
179                    DatanodeTableValue::try_from_raw_value(&kv.value)?,
180                ))
181            })
182            .collect::<Result<HashMap<_, _>>>()?;
183        Ok(values)
184    }
185
186    pub fn tables(
187        &self,
188        datanode_id: DatanodeId,
189    ) -> BoxStream<'static, Result<DatanodeTableValue>> {
190        let start_key = DatanodeTableKey::prefix(datanode_id);
191        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
192
193        let stream = PaginationStream::new(
194            self.kv_backend.clone(),
195            req,
196            DEFAULT_PAGE_SIZE,
197            datanode_table_value_decoder,
198        )
199        .into_stream();
200
201        Box::pin(stream)
202    }
203
204    /// Find the [DatanodeTableValue]s for the given [TableId] and [PhysicalTableRouteValue].
205    pub async fn regions(
206        &self,
207        table_id: TableId,
208        table_routes: &PhysicalTableRouteValue,
209    ) -> Result<Vec<DatanodeTableValue>> {
210        let keys = region_distribution(&table_routes.region_routes)
211            .into_keys()
212            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
213            .collect::<Vec<_>>();
214        let req = BatchGetRequest {
215            keys: keys.iter().map(|k| k.to_bytes()).collect(),
216        };
217        let resp = self.kv_backend.batch_get(req).await?;
218        resp.kvs
219            .into_iter()
220            .map(datanode_table_value_decoder)
221            .collect()
222    }
223
224    /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
225    pub fn build_create_txn(
226        &self,
227        table_id: TableId,
228        engine: &str,
229        region_storage_path: &str,
230        region_options: HashMap<String, String>,
231        region_wal_options: HashMap<RegionNumber, String>,
232        distribution: RegionDistribution,
233    ) -> Result<Txn> {
234        let txns = distribution
235            .into_iter()
236            .map(|(datanode_id, regions)| {
237                let key = DatanodeTableKey::new(datanode_id, table_id);
238                let val = DatanodeTableValue::new(
239                    table_id,
240                    regions,
241                    RegionInfo {
242                        engine: engine.to_string(),
243                        region_storage_path: region_storage_path.to_string(),
244                        region_options: region_options.clone(),
245                        // FIXME(weny): Before we store all region wal options into table metadata or somewhere,
246                        // We must store all region wal options.
247                        region_wal_options: region_wal_options.clone(),
248                    },
249                );
250
251                Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
252            })
253            .collect::<Result<Vec<_>>>()?;
254
255        let txn = Txn::new().and_then(txns);
256
257        Ok(txn)
258    }
259
260    /// Builds a transaction to updates the redundant table options (including WAL options)
261    /// for given table id, if provided.
262    ///
263    /// Note that the provided `new_region_options` must be a
264    /// complete set of all options rather than incremental changes.
265    pub(crate) async fn build_update_table_options_txn(
266        &self,
267        table_id: TableId,
268        region_distribution: RegionDistribution,
269        new_region_options: HashMap<String, String>,
270    ) -> Result<Txn> {
271        assert!(!region_distribution.is_empty());
272        // safety: region_distribution must not be empty
273        let (any_datanode, _) = region_distribution.first_key_value().unwrap();
274
275        let mut region_info = self
276            .kv_backend
277            .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
278            .await
279            .transpose()
280            .context(DatanodeTableInfoNotFoundSnafu {
281                datanode_id: *any_datanode,
282                table_id,
283            })?
284            .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
285            .region_info;
286
287        // If the region options are the same, we don't need to update it.
288        if region_info.region_options == new_region_options {
289            return Ok(Txn::new());
290        }
291        // substitute region options only.
292        region_info.region_options = new_region_options;
293
294        let mut txns = Vec::with_capacity(region_distribution.len());
295
296        for (datanode, regions) in region_distribution.into_iter() {
297            let key = DatanodeTableKey::new(datanode, table_id);
298            let key_bytes = key.to_bytes();
299            let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
300                .try_as_raw_value()?;
301            txns.push(TxnOp::Put(key_bytes, value_bytes));
302        }
303
304        let txn = Txn::new().and_then(txns);
305        Ok(txn)
306    }
307
308    /// Builds the update datanode table transactions. It only executes while the primary keys comparing successes.
309    pub(crate) fn build_update_txn(
310        &self,
311        table_id: TableId,
312        region_info: RegionInfo,
313        current_region_distribution: RegionDistribution,
314        new_region_distribution: RegionDistribution,
315        new_region_options: &HashMap<String, String>,
316        new_region_wal_options: &HashMap<RegionNumber, String>,
317    ) -> Result<Txn> {
318        let mut opts = Vec::new();
319
320        // Removes the old datanode table key value pairs
321        for current_datanode in current_region_distribution.keys() {
322            if !new_region_distribution.contains_key(current_datanode) {
323                let key = DatanodeTableKey::new(*current_datanode, table_id);
324                let raw_key = key.to_bytes();
325                opts.push(TxnOp::Delete(raw_key))
326            }
327        }
328
329        let need_update_options = region_info.region_options != *new_region_options;
330        let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
331
332        for (datanode, regions) in new_region_distribution.into_iter() {
333            let need_update =
334                if let Some(current_region) = current_region_distribution.get(&datanode) {
335                    // Updates if need.
336                    *current_region != regions || need_update_options || need_update_wal_options
337                } else {
338                    true
339                };
340            if need_update {
341                let key = DatanodeTableKey::new(datanode, table_id);
342                let raw_key = key.to_bytes();
343                // FIXME(weny): add unit tests.
344                let mut new_region_info = region_info.clone();
345                if need_update_options {
346                    new_region_info
347                        .region_options
348                        .clone_from(new_region_options);
349                }
350                if need_update_wal_options {
351                    new_region_info
352                        .region_wal_options
353                        .clone_from(new_region_wal_options);
354                }
355                let val = DatanodeTableValue::new(table_id, regions, new_region_info)
356                    .try_as_raw_value()?;
357                opts.push(TxnOp::Put(raw_key, val));
358            }
359        }
360
361        let txn = Txn::new().and_then(opts);
362        Ok(txn)
363    }
364
365    /// Builds the delete datanode table transactions. It only executes while the primary keys comparing successes.
366    pub fn build_delete_txn(
367        &self,
368        table_id: TableId,
369        distribution: RegionDistribution,
370    ) -> Result<Txn> {
371        let txns = distribution
372            .into_keys()
373            .map(|datanode_id| {
374                let key = DatanodeTableKey::new(datanode_id, table_id);
375                let raw_key = key.to_bytes();
376
377                Ok(TxnOp::Delete(raw_key))
378            })
379            .collect::<Result<Vec<_>>>()?;
380
381        let txn = Txn::new().and_then(txns);
382
383        Ok(txn)
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    #[test]
392    fn test_serialization() {
393        let key = DatanodeTableKey {
394            datanode_id: 1,
395            table_id: 2,
396        };
397        let raw_key = key.to_bytes();
398        assert_eq!(raw_key, b"__dn_table/1/2");
399
400        let value = DatanodeTableValue {
401            table_id: 42,
402            regions: vec![1, 2, 3],
403            follower_regions: vec![],
404            region_info: RegionInfo::default(),
405            version: 1,
406        };
407        let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
408
409        let raw_value = value.try_as_raw_value().unwrap();
410        assert_eq!(raw_value, literal);
411
412        let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
413        assert_eq!(actual, value);
414
415        // test serde default
416        let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
417        let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
418        assert!(parsed.is_ok());
419    }
420
421    #[derive(Debug, Serialize, Deserialize, PartialEq)]
422    struct StringHashMap {
423        inner: HashMap<String, String>,
424    }
425
426    #[serde_with::serde_as]
427    #[derive(Debug, Serialize, Deserialize, PartialEq)]
428    struct IntegerHashMap {
429        #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
430        inner: HashMap<u32, String>,
431    }
432
433    #[test]
434    fn test_serde_with_integer_hash_map() {
435        let map = StringHashMap {
436            inner: HashMap::from([
437                ("1".to_string(), "aaa".to_string()),
438                ("2".to_string(), "bbb".to_string()),
439                ("3".to_string(), "ccc".to_string()),
440            ]),
441        };
442        let encoded = serde_json::to_string(&map).unwrap();
443        let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
444        assert_eq!(
445            IntegerHashMap {
446                inner: HashMap::from([
447                    (1, "aaa".to_string()),
448                    (2, "bbb".to_string()),
449                    (3, "ccc".to_string()),
450                ]),
451            },
452            decoded
453        );
454
455        let map = IntegerHashMap {
456            inner: HashMap::from([
457                (1, "aaa".to_string()),
458                (2, "bbb".to_string()),
459                (3, "ccc".to_string()),
460            ]),
461        };
462        let encoded = serde_json::to_string(&map).unwrap();
463        let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
464        assert_eq!(
465            StringHashMap {
466                inner: HashMap::from([
467                    ("1".to_string(), "aaa".to_string()),
468                    ("2".to_string(), "bbb".to_string()),
469                    ("3".to_string(), "ccc".to_string()),
470                ]),
471            },
472            decoded
473        );
474    }
475
476    // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str`
477    // and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`.
478    // Warning: if the key of `region_wal_options` is of type non-String, this test would fail.
479    #[test]
480    fn test_serde_with_region_info() {
481        let region_info = RegionInfo {
482            engine: "test_engine".to_string(),
483            region_storage_path: "test_storage_path".to_string(),
484            region_options: HashMap::from([
485                ("a".to_string(), "aa".to_string()),
486                ("b".to_string(), "bb".to_string()),
487                ("c".to_string(), "cc".to_string()),
488            ]),
489            region_wal_options: HashMap::from([
490                (1, "aaa".to_string()),
491                (2, "bbb".to_string()),
492                (3, "ccc".to_string()),
493            ]),
494        };
495        let table_value = DatanodeTableValue {
496            table_id: 1,
497            regions: vec![],
498            follower_regions: vec![],
499            region_info,
500            version: 1,
501        };
502
503        let encoded = serde_json::to_string(&table_value).unwrap();
504        let decoded = serde_json::from_str(&encoded).unwrap();
505        assert_eq!(table_value, decoded);
506
507        let encoded = serde_json::to_vec(&table_value).unwrap();
508        let decoded = serde_json::from_slice(&encoded).unwrap();
509        assert_eq!(table_value, decoded);
510    }
511
512    #[test]
513    fn test_deserialization() {
514        fn test_err(raw_key: &[u8]) {
515            let result = DatanodeTableKey::from_bytes(raw_key);
516            assert!(result.is_err());
517        }
518
519        test_err(b"");
520        test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
521        test_err(b"invalid_prefix/1/2");
522        test_err(b"__dn_table/");
523        test_err(b"__dn_table/invalid_len_1");
524        test_err(b"__dn_table/invalid_len_3/1/2");
525        test_err(b"__dn_table/invalid_node_id/2");
526        test_err(b"__dn_table/1/invalid_table_id");
527
528        let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
529        assert_eq!(DatanodeTableKey::new(11, 21), key);
530    }
531}