Skip to main content

common_meta/key/
datanode_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::DatanodeId;
25use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
26use crate::key::table_route::PhysicalTableRouteValue;
27use crate::key::{
28    DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, MetadataKey, MetadataValue,
29    RegionDistribution, RegionRoleSet,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::{Txn, TxnOp};
33use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
34use crate::rpc::KeyValue;
35use crate::rpc::router::region_distribution;
36use crate::rpc::store::{BatchGetRequest, RangeRequest};
37use crate::wal_provider::{RegionWalOptions, region_wal_options_serde};
38
39#[serde_with::serde_as]
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
41/// RegionInfo
42/// For compatible reason, DON'T modify the field name.
43pub struct RegionInfo {
44    #[serde(default)]
45    /// The table engine, it SHOULD be immutable after created.
46    pub engine: String,
47    /// The region storage path, it SHOULD be immutable after created.
48    #[serde(default)]
49    pub region_storage_path: String,
50    /// The region options.
51    #[serde(default)]
52    pub region_options: HashMap<String, String>,
53    /// The per-region wal options.
54    /// Key: region number. Value: the wal options of the region.
55    #[serde(default)]
56    #[serde(with = "region_wal_options_serde")]
57    pub region_wal_options: RegionWalOptions,
58}
59
60/// The key mapping {datanode_id} to {table_id}
61///
62/// The layout: `__dn_table/{datanode_id}/{table_id}`.
63#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
64pub struct DatanodeTableKey {
65    pub datanode_id: DatanodeId,
66    pub table_id: TableId,
67}
68
69impl DatanodeTableKey {
70    pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
71        Self {
72            datanode_id,
73            table_id,
74        }
75    }
76
77    pub fn prefix(datanode_id: DatanodeId) -> String {
78        format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
79    }
80}
81
82impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
83    fn to_bytes(&self) -> Vec<u8> {
84        self.to_string().into_bytes()
85    }
86
87    fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
88        let key = std::str::from_utf8(bytes).map_err(|e| {
89            InvalidMetadataSnafu {
90                err_msg: format!(
91                    "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
92                    String::from_utf8_lossy(bytes)
93                ),
94            }
95            .build()
96        })?;
97        let captures = DATANODE_TABLE_KEY_PATTERN
98            .captures(key)
99            .context(InvalidMetadataSnafu {
100                err_msg: format!("Invalid DatanodeTableKey '{key}'"),
101            })?;
102        // Safety: pass the regex check above
103        let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
104        let table_id = captures[2].parse::<TableId>().unwrap();
105        Ok(DatanodeTableKey {
106            datanode_id,
107            table_id,
108        })
109    }
110}
111
112impl Display for DatanodeTableKey {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
115    }
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub struct DatanodeTableValue {
120    pub table_id: TableId,
121    pub regions: Vec<RegionNumber>,
122    #[serde(default)]
123    pub follower_regions: Vec<RegionNumber>,
124    #[serde(flatten)]
125    pub region_info: RegionInfo,
126    version: u64,
127}
128
129impl DatanodeTableValue {
130    pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
131        let RegionRoleSet {
132            leader_regions,
133            follower_regions,
134        } = region_role_set;
135
136        Self {
137            table_id,
138            regions: leader_regions,
139            follower_regions,
140            region_info,
141            version: 0,
142        }
143    }
144}
145
146/// Decodes [`KeyValue`] to [`DatanodeTableValue`].
147pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
148    DatanodeTableValue::try_from_raw_value(&kv.value)
149}
150
151pub struct DatanodeTableManager {
152    kv_backend: KvBackendRef,
153}
154
155impl DatanodeTableManager {
156    pub fn new(kv_backend: KvBackendRef) -> Self {
157        Self { kv_backend }
158    }
159
160    pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
161        self.kv_backend
162            .get(&key.to_bytes())
163            .await?
164            .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
165            .transpose()
166    }
167
168    pub async fn batch_get(
169        &self,
170        keys: &[DatanodeTableKey],
171    ) -> Result<HashMap<DatanodeTableKey, DatanodeTableValue>> {
172        let req = BatchGetRequest::default().with_keys(keys.iter().map(|k| k.to_bytes()).collect());
173        let resp = self.kv_backend.batch_get(req).await?;
174        let values = resp
175            .kvs
176            .into_iter()
177            .map(|kv| {
178                Ok((
179                    DatanodeTableKey::from_bytes(&kv.key)?,
180                    DatanodeTableValue::try_from_raw_value(&kv.value)?,
181                ))
182            })
183            .collect::<Result<HashMap<_, _>>>()?;
184        Ok(values)
185    }
186
187    pub fn tables(
188        &self,
189        datanode_id: DatanodeId,
190    ) -> BoxStream<'static, Result<DatanodeTableValue>> {
191        let start_key = DatanodeTableKey::prefix(datanode_id);
192        let req = RangeRequest::new().with_prefix(start_key.as_bytes());
193
194        let stream = PaginationStream::new(
195            self.kv_backend.clone(),
196            req,
197            DEFAULT_PAGE_SIZE,
198            datanode_table_value_decoder,
199        )
200        .into_stream();
201
202        Box::pin(stream)
203    }
204
205    /// Find the [DatanodeTableValue]s for the given [TableId] and [PhysicalTableRouteValue].
206    pub async fn regions(
207        &self,
208        table_id: TableId,
209        table_routes: &PhysicalTableRouteValue,
210    ) -> Result<Vec<DatanodeTableValue>> {
211        let keys = region_distribution(&table_routes.region_routes)
212            .into_keys()
213            .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
214            .collect::<Vec<_>>();
215        let req = BatchGetRequest {
216            keys: keys.iter().map(|k| k.to_bytes()).collect(),
217        };
218        let resp = self.kv_backend.batch_get(req).await?;
219        resp.kvs
220            .into_iter()
221            .map(datanode_table_value_decoder)
222            .collect()
223    }
224
225    /// Builds the create datanode table transactions. It only executes while the primary keys comparing successes.
226    pub fn build_create_txn(
227        &self,
228        table_id: TableId,
229        engine: &str,
230        region_storage_path: &str,
231        region_options: HashMap<String, String>,
232        region_wal_options: RegionWalOptions,
233        distribution: RegionDistribution,
234    ) -> Result<Txn> {
235        let txns = distribution
236            .into_iter()
237            .map(|(datanode_id, regions)| {
238                let key = DatanodeTableKey::new(datanode_id, table_id);
239                let val = DatanodeTableValue::new(
240                    table_id,
241                    regions,
242                    RegionInfo {
243                        engine: engine.to_string(),
244                        region_storage_path: region_storage_path.to_string(),
245                        region_options: region_options.clone(),
246                        // FIXME(weny): Before we store all region wal options into table metadata or somewhere,
247                        // We must store all region wal options.
248                        region_wal_options: region_wal_options.clone(),
249                    },
250                );
251
252                Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
253            })
254            .collect::<Result<Vec<_>>>()?;
255
256        let txn = Txn::new().and_then(txns);
257
258        Ok(txn)
259    }
260
261    /// Builds a transaction to updates the redundant table options (including WAL options)
262    /// for given table id, if provided.
263    ///
264    /// Note that the provided `new_region_options` must be a
265    /// complete set of all options rather than incremental changes.
266    pub(crate) async fn build_update_table_options_txn(
267        &self,
268        table_id: TableId,
269        region_distribution: RegionDistribution,
270        new_region_options: HashMap<String, String>,
271    ) -> Result<Txn> {
272        assert!(!region_distribution.is_empty());
273        // safety: region_distribution must not be empty
274        let (any_datanode, _) = region_distribution.first_key_value().unwrap();
275
276        let mut region_info = self
277            .kv_backend
278            .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
279            .await
280            .transpose()
281            .context(DatanodeTableInfoNotFoundSnafu {
282                datanode_id: *any_datanode,
283                table_id,
284            })?
285            .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
286            .region_info;
287
288        // If the region options are the same, we don't need to update it.
289        if region_info.region_options == new_region_options {
290            return Ok(Txn::new());
291        }
292        // substitute region options only.
293        region_info.region_options = new_region_options;
294
295        let mut txns = Vec::with_capacity(region_distribution.len());
296
297        for (datanode, regions) in region_distribution.into_iter() {
298            let key = DatanodeTableKey::new(datanode, table_id);
299            let key_bytes = key.to_bytes();
300            let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
301                .try_as_raw_value()?;
302            txns.push(TxnOp::Put(key_bytes, value_bytes));
303        }
304
305        let txn = Txn::new().and_then(txns);
306        Ok(txn)
307    }
308
309    /// Builds the update datanode table transactions. It only executes while the primary keys comparing successes.
310    pub(crate) fn build_update_txn(
311        &self,
312        table_id: TableId,
313        region_info: RegionInfo,
314        current_region_distribution: RegionDistribution,
315        new_region_distribution: RegionDistribution,
316        new_region_options: &HashMap<String, String>,
317        new_region_wal_options: &RegionWalOptions,
318    ) -> Result<Txn> {
319        let mut opts = Vec::new();
320
321        // Removes the old datanode table key value pairs
322        for current_datanode in current_region_distribution.keys() {
323            if !new_region_distribution.contains_key(current_datanode) {
324                let key = DatanodeTableKey::new(*current_datanode, table_id);
325                let raw_key = key.to_bytes();
326                opts.push(TxnOp::Delete(raw_key))
327            }
328        }
329
330        let need_update_options = region_info.region_options != *new_region_options;
331        let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
332
333        for (datanode, regions) in new_region_distribution.into_iter() {
334            let need_update =
335                if let Some(current_region) = current_region_distribution.get(&datanode) {
336                    // Updates if need.
337                    *current_region != regions || need_update_options || need_update_wal_options
338                } else {
339                    true
340                };
341            if need_update {
342                let key = DatanodeTableKey::new(datanode, table_id);
343                let raw_key = key.to_bytes();
344                // FIXME(weny): add unit tests.
345                let mut new_region_info = region_info.clone();
346                if need_update_options {
347                    new_region_info
348                        .region_options
349                        .clone_from(new_region_options);
350                }
351                if need_update_wal_options {
352                    new_region_info
353                        .region_wal_options
354                        .clone_from(new_region_wal_options);
355                }
356                let val = DatanodeTableValue::new(table_id, regions, new_region_info)
357                    .try_as_raw_value()?;
358                opts.push(TxnOp::Put(raw_key, val));
359            }
360        }
361
362        let txn = Txn::new().and_then(opts);
363        Ok(txn)
364    }
365
366    /// Builds the delete datanode table transactions. It only executes while the primary keys comparing successes.
367    pub fn build_delete_txn(
368        &self,
369        table_id: TableId,
370        distribution: RegionDistribution,
371    ) -> Result<Txn> {
372        let txns = distribution
373            .into_keys()
374            .map(|datanode_id| {
375                let key = DatanodeTableKey::new(datanode_id, table_id);
376                let raw_key = key.to_bytes();
377
378                Ok(TxnOp::Delete(raw_key))
379            })
380            .collect::<Result<Vec<_>>>()?;
381
382        let txn = Txn::new().and_then(txns);
383
384        Ok(txn)
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use common_wal::options::WalOptions;
391
392    use super::*;
393
394    #[test]
395    fn test_serialization() {
396        let key = DatanodeTableKey {
397            datanode_id: 1,
398            table_id: 2,
399        };
400        let raw_key = key.to_bytes();
401        assert_eq!(raw_key, b"__dn_table/1/2");
402
403        let value = DatanodeTableValue {
404            table_id: 42,
405            regions: vec![1, 2, 3],
406            follower_regions: vec![],
407            region_info: RegionInfo::default(),
408            version: 1,
409        };
410        let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
411
412        let raw_value = value.try_as_raw_value().unwrap();
413        assert_eq!(raw_value, literal);
414
415        let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
416        assert_eq!(actual, value);
417
418        // test serde default
419        let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
420        let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
421        assert!(parsed.is_ok());
422    }
423
424    #[derive(Debug, Serialize, Deserialize, PartialEq)]
425    struct StringHashMap {
426        inner: HashMap<String, String>,
427    }
428
429    #[serde_with::serde_as]
430    #[derive(Debug, Serialize, Deserialize, PartialEq)]
431    struct IntegerHashMap {
432        #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
433        inner: HashMap<u32, String>,
434    }
435
436    #[test]
437    fn test_serde_with_integer_hash_map() {
438        let map = StringHashMap {
439            inner: HashMap::from([
440                ("1".to_string(), "aaa".to_string()),
441                ("2".to_string(), "bbb".to_string()),
442                ("3".to_string(), "ccc".to_string()),
443            ]),
444        };
445        let encoded = serde_json::to_string(&map).unwrap();
446        let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
447        assert_eq!(
448            IntegerHashMap {
449                inner: HashMap::from([
450                    (1, "aaa".to_string()),
451                    (2, "bbb".to_string()),
452                    (3, "ccc".to_string()),
453                ]),
454            },
455            decoded
456        );
457
458        let map = IntegerHashMap {
459            inner: HashMap::from([
460                (1, "aaa".to_string()),
461                (2, "bbb".to_string()),
462                (3, "ccc".to_string()),
463            ]),
464        };
465        let encoded = serde_json::to_string(&map).unwrap();
466        let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
467        assert_eq!(
468            StringHashMap {
469                inner: HashMap::from([
470                    ("1".to_string(), "aaa".to_string()),
471                    ("2".to_string(), "bbb".to_string()),
472                    ("3".to_string(), "ccc".to_string()),
473                ]),
474            },
475            decoded
476        );
477    }
478
479    // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str`
480    // and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`.
481    // Warning: if the key of `region_wal_options` is of type non-String, this test would fail.
482    #[test]
483    fn test_serde_with_region_info() {
484        let region_info = RegionInfo {
485            engine: "test_engine".to_string(),
486            region_storage_path: "test_storage_path".to_string(),
487            region_options: HashMap::from([
488                ("a".to_string(), "aa".to_string()),
489                ("b".to_string(), "bb".to_string()),
490                ("c".to_string(), "cc".to_string()),
491            ]),
492            region_wal_options: HashMap::from([
493                (1, WalOptions::RaftEngine),
494                (2, WalOptions::Noop),
495                (3, WalOptions::RaftEngine),
496            ]),
497        };
498        let table_value = DatanodeTableValue {
499            table_id: 1,
500            regions: vec![],
501            follower_regions: vec![],
502            region_info,
503            version: 1,
504        };
505
506        let encoded = serde_json::to_string(&table_value).unwrap();
507        let decoded = serde_json::from_str(&encoded).unwrap();
508        assert_eq!(table_value, decoded);
509
510        let encoded = serde_json::to_vec(&table_value).unwrap();
511        let decoded = serde_json::from_slice(&encoded).unwrap();
512        assert_eq!(table_value, decoded);
513    }
514
515    #[test]
516    fn test_deserialize_legacy_region_wal_options() {
517        let literal = br#"{"table_id":42,"regions":[1],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{"1":"{\"wal.provider\":\"raft_engine\"}"},"version":1}"#;
518
519        let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
520
521        assert_eq!(
522            actual.region_info.region_wal_options,
523            HashMap::from([(1, WalOptions::RaftEngine)])
524        );
525    }
526
527    #[test]
528    fn test_deserialization() {
529        fn test_err(raw_key: &[u8]) {
530            let result = DatanodeTableKey::from_bytes(raw_key);
531            assert!(result.is_err());
532        }
533
534        test_err(b"");
535        test_err(vec![0u8, 159, 146, 150].as_slice()); // invalid UTF8 string
536        test_err(b"invalid_prefix/1/2");
537        test_err(b"__dn_table/");
538        test_err(b"__dn_table/invalid_len_1");
539        test_err(b"__dn_table/invalid_len_3/1/2");
540        test_err(b"__dn_table/invalid_node_id/2");
541        test_err(b"__dn_table/1/invalid_table_id");
542
543        let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
544        assert_eq!(DatanodeTableKey::new(11, 21), key);
545    }
546}