1use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::key::{
27 MetadataKey, MetadataValue, RegionDistribution, RegionRoleSet, DATANODE_TABLE_KEY_PATTERN,
28 DATANODE_TABLE_KEY_PREFIX,
29};
30use crate::kv_backend::txn::{Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
33use crate::rpc::router::region_distribution;
34use crate::rpc::store::{BatchGetRequest, RangeRequest};
35use crate::rpc::KeyValue;
36use crate::DatanodeId;
37
38#[serde_with::serde_as]
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
40pub struct RegionInfo {
43 #[serde(default)]
44 pub engine: String,
46 #[serde(default)]
48 pub region_storage_path: String,
49 #[serde(default)]
51 pub region_options: HashMap<String, String>,
52 #[serde(default)]
55 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
56 pub region_wal_options: HashMap<RegionNumber, String>,
57}
58
59#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct DatanodeTableKey {
64 pub datanode_id: DatanodeId,
65 pub table_id: TableId,
66}
67
68impl DatanodeTableKey {
69 pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
70 Self {
71 datanode_id,
72 table_id,
73 }
74 }
75
76 pub fn prefix(datanode_id: DatanodeId) -> String {
77 format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
78 }
79}
80
81impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
82 fn to_bytes(&self) -> Vec<u8> {
83 self.to_string().into_bytes()
84 }
85
86 fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
87 let key = std::str::from_utf8(bytes).map_err(|e| {
88 InvalidMetadataSnafu {
89 err_msg: format!(
90 "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
91 String::from_utf8_lossy(bytes)
92 ),
93 }
94 .build()
95 })?;
96 let captures = DATANODE_TABLE_KEY_PATTERN
97 .captures(key)
98 .context(InvalidMetadataSnafu {
99 err_msg: format!("Invalid DatanodeTableKey '{key}'"),
100 })?;
101 let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
103 let table_id = captures[2].parse::<TableId>().unwrap();
104 Ok(DatanodeTableKey {
105 datanode_id,
106 table_id,
107 })
108 }
109}
110
111impl Display for DatanodeTableKey {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct DatanodeTableValue {
119 pub table_id: TableId,
120 pub regions: Vec<RegionNumber>,
121 #[serde(default)]
122 pub follower_regions: Vec<RegionNumber>,
123 #[serde(flatten)]
124 pub region_info: RegionInfo,
125 version: u64,
126}
127
128impl DatanodeTableValue {
129 pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
130 let RegionRoleSet {
131 leader_regions,
132 follower_regions,
133 } = region_role_set;
134
135 Self {
136 table_id,
137 regions: leader_regions,
138 follower_regions,
139 region_info,
140 version: 0,
141 }
142 }
143}
144
145pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
147 DatanodeTableValue::try_from_raw_value(&kv.value)
148}
149
150pub struct DatanodeTableManager {
151 kv_backend: KvBackendRef,
152}
153
154impl DatanodeTableManager {
155 pub fn new(kv_backend: KvBackendRef) -> Self {
156 Self { kv_backend }
157 }
158
159 pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
160 self.kv_backend
161 .get(&key.to_bytes())
162 .await?
163 .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
164 .transpose()
165 }
166
167 pub fn tables(
168 &self,
169 datanode_id: DatanodeId,
170 ) -> BoxStream<'static, Result<DatanodeTableValue>> {
171 let start_key = DatanodeTableKey::prefix(datanode_id);
172 let req = RangeRequest::new().with_prefix(start_key.as_bytes());
173
174 let stream = PaginationStream::new(
175 self.kv_backend.clone(),
176 req,
177 DEFAULT_PAGE_SIZE,
178 datanode_table_value_decoder,
179 )
180 .into_stream();
181
182 Box::pin(stream)
183 }
184
185 pub async fn regions(
187 &self,
188 table_id: TableId,
189 table_routes: &PhysicalTableRouteValue,
190 ) -> Result<Vec<DatanodeTableValue>> {
191 let keys = region_distribution(&table_routes.region_routes)
192 .into_keys()
193 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
194 .collect::<Vec<_>>();
195 let req = BatchGetRequest {
196 keys: keys.iter().map(|k| k.to_bytes()).collect(),
197 };
198 let resp = self.kv_backend.batch_get(req).await?;
199 resp.kvs
200 .into_iter()
201 .map(datanode_table_value_decoder)
202 .collect()
203 }
204
205 pub fn build_create_txn(
207 &self,
208 table_id: TableId,
209 engine: &str,
210 region_storage_path: &str,
211 region_options: HashMap<String, String>,
212 region_wal_options: HashMap<RegionNumber, String>,
213 distribution: RegionDistribution,
214 ) -> Result<Txn> {
215 let txns = distribution
216 .into_iter()
217 .map(|(datanode_id, regions)| {
218 let key = DatanodeTableKey::new(datanode_id, table_id);
219 let val = DatanodeTableValue::new(
220 table_id,
221 regions,
222 RegionInfo {
223 engine: engine.to_string(),
224 region_storage_path: region_storage_path.to_string(),
225 region_options: region_options.clone(),
226 region_wal_options: region_wal_options.clone(),
229 },
230 );
231
232 Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
233 })
234 .collect::<Result<Vec<_>>>()?;
235
236 let txn = Txn::new().and_then(txns);
237
238 Ok(txn)
239 }
240
241 pub(crate) async fn build_update_table_options_txn(
247 &self,
248 table_id: TableId,
249 region_distribution: RegionDistribution,
250 new_region_options: HashMap<String, String>,
251 ) -> Result<Txn> {
252 assert!(!region_distribution.is_empty());
253 let (any_datanode, _) = region_distribution.first_key_value().unwrap();
255
256 let mut region_info = self
257 .kv_backend
258 .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
259 .await
260 .transpose()
261 .context(DatanodeTableInfoNotFoundSnafu {
262 datanode_id: *any_datanode,
263 table_id,
264 })?
265 .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
266 .region_info;
267
268 if region_info.region_options == new_region_options {
270 return Ok(Txn::new());
271 }
272 region_info.region_options = new_region_options;
274
275 let mut txns = Vec::with_capacity(region_distribution.len());
276
277 for (datanode, regions) in region_distribution.into_iter() {
278 let key = DatanodeTableKey::new(datanode, table_id);
279 let key_bytes = key.to_bytes();
280 let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
281 .try_as_raw_value()?;
282 txns.push(TxnOp::Put(key_bytes, value_bytes));
283 }
284
285 let txn = Txn::new().and_then(txns);
286 Ok(txn)
287 }
288
289 pub(crate) fn build_update_txn(
291 &self,
292 table_id: TableId,
293 region_info: RegionInfo,
294 current_region_distribution: RegionDistribution,
295 new_region_distribution: RegionDistribution,
296 new_region_options: &HashMap<String, String>,
297 new_region_wal_options: &HashMap<RegionNumber, String>,
298 ) -> Result<Txn> {
299 let mut opts = Vec::new();
300
301 for current_datanode in current_region_distribution.keys() {
303 if !new_region_distribution.contains_key(current_datanode) {
304 let key = DatanodeTableKey::new(*current_datanode, table_id);
305 let raw_key = key.to_bytes();
306 opts.push(TxnOp::Delete(raw_key))
307 }
308 }
309
310 let need_update_options = region_info.region_options != *new_region_options;
311 let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
312
313 for (datanode, regions) in new_region_distribution.into_iter() {
314 let need_update =
315 if let Some(current_region) = current_region_distribution.get(&datanode) {
316 *current_region != regions || need_update_options || need_update_wal_options
318 } else {
319 true
320 };
321 if need_update {
322 let key = DatanodeTableKey::new(datanode, table_id);
323 let raw_key = key.to_bytes();
324 let mut new_region_info = region_info.clone();
326 if need_update_options {
327 new_region_info
328 .region_options
329 .clone_from(new_region_options);
330 }
331 if need_update_wal_options {
332 new_region_info
333 .region_wal_options
334 .clone_from(new_region_wal_options);
335 }
336 let val = DatanodeTableValue::new(table_id, regions, new_region_info)
337 .try_as_raw_value()?;
338 opts.push(TxnOp::Put(raw_key, val));
339 }
340 }
341
342 let txn = Txn::new().and_then(opts);
343 Ok(txn)
344 }
345
346 pub fn build_delete_txn(
348 &self,
349 table_id: TableId,
350 distribution: RegionDistribution,
351 ) -> Result<Txn> {
352 let txns = distribution
353 .into_keys()
354 .map(|datanode_id| {
355 let key = DatanodeTableKey::new(datanode_id, table_id);
356 let raw_key = key.to_bytes();
357
358 Ok(TxnOp::Delete(raw_key))
359 })
360 .collect::<Result<Vec<_>>>()?;
361
362 let txn = Txn::new().and_then(txns);
363
364 Ok(txn)
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[test]
373 fn test_serialization() {
374 let key = DatanodeTableKey {
375 datanode_id: 1,
376 table_id: 2,
377 };
378 let raw_key = key.to_bytes();
379 assert_eq!(raw_key, b"__dn_table/1/2");
380
381 let value = DatanodeTableValue {
382 table_id: 42,
383 regions: vec![1, 2, 3],
384 follower_regions: vec![],
385 region_info: RegionInfo::default(),
386 version: 1,
387 };
388 let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
389
390 let raw_value = value.try_as_raw_value().unwrap();
391 assert_eq!(raw_value, literal);
392
393 let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
394 assert_eq!(actual, value);
395
396 let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
398 let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
399 assert!(parsed.is_ok());
400 }
401
402 #[derive(Debug, Serialize, Deserialize, PartialEq)]
403 struct StringHashMap {
404 inner: HashMap<String, String>,
405 }
406
407 #[serde_with::serde_as]
408 #[derive(Debug, Serialize, Deserialize, PartialEq)]
409 struct IntegerHashMap {
410 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
411 inner: HashMap<u32, String>,
412 }
413
414 #[test]
415 fn test_serde_with_integer_hash_map() {
416 let map = StringHashMap {
417 inner: HashMap::from([
418 ("1".to_string(), "aaa".to_string()),
419 ("2".to_string(), "bbb".to_string()),
420 ("3".to_string(), "ccc".to_string()),
421 ]),
422 };
423 let encoded = serde_json::to_string(&map).unwrap();
424 let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
425 assert_eq!(
426 IntegerHashMap {
427 inner: HashMap::from([
428 (1, "aaa".to_string()),
429 (2, "bbb".to_string()),
430 (3, "ccc".to_string()),
431 ]),
432 },
433 decoded
434 );
435
436 let map = IntegerHashMap {
437 inner: HashMap::from([
438 (1, "aaa".to_string()),
439 (2, "bbb".to_string()),
440 (3, "ccc".to_string()),
441 ]),
442 };
443 let encoded = serde_json::to_string(&map).unwrap();
444 let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
445 assert_eq!(
446 StringHashMap {
447 inner: HashMap::from([
448 ("1".to_string(), "aaa".to_string()),
449 ("2".to_string(), "bbb".to_string()),
450 ("3".to_string(), "ccc".to_string()),
451 ]),
452 },
453 decoded
454 );
455 }
456
457 #[test]
461 fn test_serde_with_region_info() {
462 let region_info = RegionInfo {
463 engine: "test_engine".to_string(),
464 region_storage_path: "test_storage_path".to_string(),
465 region_options: HashMap::from([
466 ("a".to_string(), "aa".to_string()),
467 ("b".to_string(), "bb".to_string()),
468 ("c".to_string(), "cc".to_string()),
469 ]),
470 region_wal_options: HashMap::from([
471 (1, "aaa".to_string()),
472 (2, "bbb".to_string()),
473 (3, "ccc".to_string()),
474 ]),
475 };
476 let table_value = DatanodeTableValue {
477 table_id: 1,
478 regions: vec![],
479 follower_regions: vec![],
480 region_info,
481 version: 1,
482 };
483
484 let encoded = serde_json::to_string(&table_value).unwrap();
485 let decoded = serde_json::from_str(&encoded).unwrap();
486 assert_eq!(table_value, decoded);
487
488 let encoded = serde_json::to_vec(&table_value).unwrap();
489 let decoded = serde_json::from_slice(&encoded).unwrap();
490 assert_eq!(table_value, decoded);
491 }
492
493 #[test]
494 fn test_deserialization() {
495 fn test_err(raw_key: &[u8]) {
496 let result = DatanodeTableKey::from_bytes(raw_key);
497 assert!(result.is_err());
498 }
499
500 test_err(b"");
501 test_err(vec![0u8, 159, 146, 150].as_slice()); test_err(b"invalid_prefix/1/2");
503 test_err(b"__dn_table/");
504 test_err(b"__dn_table/invalid_len_1");
505 test_err(b"__dn_table/invalid_len_3/1/2");
506 test_err(b"__dn_table/invalid_node_id/2");
507 test_err(b"__dn_table/1/invalid_table_id");
508
509 let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
510 assert_eq!(DatanodeTableKey::new(11, 21), key);
511 }
512}