1use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::DatanodeId;
25use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
26use crate::key::table_route::PhysicalTableRouteValue;
27use crate::key::{
28 DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, MetadataKey, MetadataValue,
29 RegionDistribution, RegionRoleSet,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::{Txn, TxnOp};
33use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
34use crate::rpc::KeyValue;
35use crate::rpc::router::region_distribution;
36use crate::rpc::store::{BatchGetRequest, RangeRequest};
37
38#[serde_with::serde_as]
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
40pub struct RegionInfo {
43 #[serde(default)]
44 pub engine: String,
46 #[serde(default)]
48 pub region_storage_path: String,
49 #[serde(default)]
51 pub region_options: HashMap<String, String>,
52 #[serde(default)]
55 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
56 pub region_wal_options: HashMap<RegionNumber, String>,
57}
58
59#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct DatanodeTableKey {
64 pub datanode_id: DatanodeId,
65 pub table_id: TableId,
66}
67
68impl DatanodeTableKey {
69 pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
70 Self {
71 datanode_id,
72 table_id,
73 }
74 }
75
76 pub fn prefix(datanode_id: DatanodeId) -> String {
77 format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
78 }
79}
80
81impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
82 fn to_bytes(&self) -> Vec<u8> {
83 self.to_string().into_bytes()
84 }
85
86 fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
87 let key = std::str::from_utf8(bytes).map_err(|e| {
88 InvalidMetadataSnafu {
89 err_msg: format!(
90 "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
91 String::from_utf8_lossy(bytes)
92 ),
93 }
94 .build()
95 })?;
96 let captures = DATANODE_TABLE_KEY_PATTERN
97 .captures(key)
98 .context(InvalidMetadataSnafu {
99 err_msg: format!("Invalid DatanodeTableKey '{key}'"),
100 })?;
101 let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
103 let table_id = captures[2].parse::<TableId>().unwrap();
104 Ok(DatanodeTableKey {
105 datanode_id,
106 table_id,
107 })
108 }
109}
110
111impl Display for DatanodeTableKey {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct DatanodeTableValue {
119 pub table_id: TableId,
120 pub regions: Vec<RegionNumber>,
121 #[serde(default)]
122 pub follower_regions: Vec<RegionNumber>,
123 #[serde(flatten)]
124 pub region_info: RegionInfo,
125 version: u64,
126}
127
128impl DatanodeTableValue {
129 pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
130 let RegionRoleSet {
131 leader_regions,
132 follower_regions,
133 } = region_role_set;
134
135 Self {
136 table_id,
137 regions: leader_regions,
138 follower_regions,
139 region_info,
140 version: 0,
141 }
142 }
143}
144
145pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
147 DatanodeTableValue::try_from_raw_value(&kv.value)
148}
149
150pub struct DatanodeTableManager {
151 kv_backend: KvBackendRef,
152}
153
154impl DatanodeTableManager {
155 pub fn new(kv_backend: KvBackendRef) -> Self {
156 Self { kv_backend }
157 }
158
159 pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
160 self.kv_backend
161 .get(&key.to_bytes())
162 .await?
163 .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
164 .transpose()
165 }
166
167 pub async fn batch_get(
168 &self,
169 keys: &[DatanodeTableKey],
170 ) -> Result<HashMap<DatanodeTableKey, DatanodeTableValue>> {
171 let req = BatchGetRequest::default().with_keys(keys.iter().map(|k| k.to_bytes()).collect());
172 let resp = self.kv_backend.batch_get(req).await?;
173 let values = resp
174 .kvs
175 .into_iter()
176 .map(|kv| {
177 Ok((
178 DatanodeTableKey::from_bytes(&kv.key)?,
179 DatanodeTableValue::try_from_raw_value(&kv.value)?,
180 ))
181 })
182 .collect::<Result<HashMap<_, _>>>()?;
183 Ok(values)
184 }
185
186 pub fn tables(
187 &self,
188 datanode_id: DatanodeId,
189 ) -> BoxStream<'static, Result<DatanodeTableValue>> {
190 let start_key = DatanodeTableKey::prefix(datanode_id);
191 let req = RangeRequest::new().with_prefix(start_key.as_bytes());
192
193 let stream = PaginationStream::new(
194 self.kv_backend.clone(),
195 req,
196 DEFAULT_PAGE_SIZE,
197 datanode_table_value_decoder,
198 )
199 .into_stream();
200
201 Box::pin(stream)
202 }
203
204 pub async fn regions(
206 &self,
207 table_id: TableId,
208 table_routes: &PhysicalTableRouteValue,
209 ) -> Result<Vec<DatanodeTableValue>> {
210 let keys = region_distribution(&table_routes.region_routes)
211 .into_keys()
212 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
213 .collect::<Vec<_>>();
214 let req = BatchGetRequest {
215 keys: keys.iter().map(|k| k.to_bytes()).collect(),
216 };
217 let resp = self.kv_backend.batch_get(req).await?;
218 resp.kvs
219 .into_iter()
220 .map(datanode_table_value_decoder)
221 .collect()
222 }
223
224 pub fn build_create_txn(
226 &self,
227 table_id: TableId,
228 engine: &str,
229 region_storage_path: &str,
230 region_options: HashMap<String, String>,
231 region_wal_options: HashMap<RegionNumber, String>,
232 distribution: RegionDistribution,
233 ) -> Result<Txn> {
234 let txns = distribution
235 .into_iter()
236 .map(|(datanode_id, regions)| {
237 let key = DatanodeTableKey::new(datanode_id, table_id);
238 let val = DatanodeTableValue::new(
239 table_id,
240 regions,
241 RegionInfo {
242 engine: engine.to_string(),
243 region_storage_path: region_storage_path.to_string(),
244 region_options: region_options.clone(),
245 region_wal_options: region_wal_options.clone(),
248 },
249 );
250
251 Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
252 })
253 .collect::<Result<Vec<_>>>()?;
254
255 let txn = Txn::new().and_then(txns);
256
257 Ok(txn)
258 }
259
260 pub(crate) async fn build_update_table_options_txn(
266 &self,
267 table_id: TableId,
268 region_distribution: RegionDistribution,
269 new_region_options: HashMap<String, String>,
270 ) -> Result<Txn> {
271 assert!(!region_distribution.is_empty());
272 let (any_datanode, _) = region_distribution.first_key_value().unwrap();
274
275 let mut region_info = self
276 .kv_backend
277 .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
278 .await
279 .transpose()
280 .context(DatanodeTableInfoNotFoundSnafu {
281 datanode_id: *any_datanode,
282 table_id,
283 })?
284 .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
285 .region_info;
286
287 if region_info.region_options == new_region_options {
289 return Ok(Txn::new());
290 }
291 region_info.region_options = new_region_options;
293
294 let mut txns = Vec::with_capacity(region_distribution.len());
295
296 for (datanode, regions) in region_distribution.into_iter() {
297 let key = DatanodeTableKey::new(datanode, table_id);
298 let key_bytes = key.to_bytes();
299 let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
300 .try_as_raw_value()?;
301 txns.push(TxnOp::Put(key_bytes, value_bytes));
302 }
303
304 let txn = Txn::new().and_then(txns);
305 Ok(txn)
306 }
307
308 pub(crate) fn build_update_txn(
310 &self,
311 table_id: TableId,
312 region_info: RegionInfo,
313 current_region_distribution: RegionDistribution,
314 new_region_distribution: RegionDistribution,
315 new_region_options: &HashMap<String, String>,
316 new_region_wal_options: &HashMap<RegionNumber, String>,
317 ) -> Result<Txn> {
318 let mut opts = Vec::new();
319
320 for current_datanode in current_region_distribution.keys() {
322 if !new_region_distribution.contains_key(current_datanode) {
323 let key = DatanodeTableKey::new(*current_datanode, table_id);
324 let raw_key = key.to_bytes();
325 opts.push(TxnOp::Delete(raw_key))
326 }
327 }
328
329 let need_update_options = region_info.region_options != *new_region_options;
330 let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
331
332 for (datanode, regions) in new_region_distribution.into_iter() {
333 let need_update =
334 if let Some(current_region) = current_region_distribution.get(&datanode) {
335 *current_region != regions || need_update_options || need_update_wal_options
337 } else {
338 true
339 };
340 if need_update {
341 let key = DatanodeTableKey::new(datanode, table_id);
342 let raw_key = key.to_bytes();
343 let mut new_region_info = region_info.clone();
345 if need_update_options {
346 new_region_info
347 .region_options
348 .clone_from(new_region_options);
349 }
350 if need_update_wal_options {
351 new_region_info
352 .region_wal_options
353 .clone_from(new_region_wal_options);
354 }
355 let val = DatanodeTableValue::new(table_id, regions, new_region_info)
356 .try_as_raw_value()?;
357 opts.push(TxnOp::Put(raw_key, val));
358 }
359 }
360
361 let txn = Txn::new().and_then(opts);
362 Ok(txn)
363 }
364
365 pub fn build_delete_txn(
367 &self,
368 table_id: TableId,
369 distribution: RegionDistribution,
370 ) -> Result<Txn> {
371 let txns = distribution
372 .into_keys()
373 .map(|datanode_id| {
374 let key = DatanodeTableKey::new(datanode_id, table_id);
375 let raw_key = key.to_bytes();
376
377 Ok(TxnOp::Delete(raw_key))
378 })
379 .collect::<Result<Vec<_>>>()?;
380
381 let txn = Txn::new().and_then(txns);
382
383 Ok(txn)
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[test]
392 fn test_serialization() {
393 let key = DatanodeTableKey {
394 datanode_id: 1,
395 table_id: 2,
396 };
397 let raw_key = key.to_bytes();
398 assert_eq!(raw_key, b"__dn_table/1/2");
399
400 let value = DatanodeTableValue {
401 table_id: 42,
402 regions: vec![1, 2, 3],
403 follower_regions: vec![],
404 region_info: RegionInfo::default(),
405 version: 1,
406 };
407 let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
408
409 let raw_value = value.try_as_raw_value().unwrap();
410 assert_eq!(raw_value, literal);
411
412 let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
413 assert_eq!(actual, value);
414
415 let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
417 let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
418 assert!(parsed.is_ok());
419 }
420
421 #[derive(Debug, Serialize, Deserialize, PartialEq)]
422 struct StringHashMap {
423 inner: HashMap<String, String>,
424 }
425
426 #[serde_with::serde_as]
427 #[derive(Debug, Serialize, Deserialize, PartialEq)]
428 struct IntegerHashMap {
429 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
430 inner: HashMap<u32, String>,
431 }
432
433 #[test]
434 fn test_serde_with_integer_hash_map() {
435 let map = StringHashMap {
436 inner: HashMap::from([
437 ("1".to_string(), "aaa".to_string()),
438 ("2".to_string(), "bbb".to_string()),
439 ("3".to_string(), "ccc".to_string()),
440 ]),
441 };
442 let encoded = serde_json::to_string(&map).unwrap();
443 let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
444 assert_eq!(
445 IntegerHashMap {
446 inner: HashMap::from([
447 (1, "aaa".to_string()),
448 (2, "bbb".to_string()),
449 (3, "ccc".to_string()),
450 ]),
451 },
452 decoded
453 );
454
455 let map = IntegerHashMap {
456 inner: HashMap::from([
457 (1, "aaa".to_string()),
458 (2, "bbb".to_string()),
459 (3, "ccc".to_string()),
460 ]),
461 };
462 let encoded = serde_json::to_string(&map).unwrap();
463 let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
464 assert_eq!(
465 StringHashMap {
466 inner: HashMap::from([
467 ("1".to_string(), "aaa".to_string()),
468 ("2".to_string(), "bbb".to_string()),
469 ("3".to_string(), "ccc".to_string()),
470 ]),
471 },
472 decoded
473 );
474 }
475
476 #[test]
480 fn test_serde_with_region_info() {
481 let region_info = RegionInfo {
482 engine: "test_engine".to_string(),
483 region_storage_path: "test_storage_path".to_string(),
484 region_options: HashMap::from([
485 ("a".to_string(), "aa".to_string()),
486 ("b".to_string(), "bb".to_string()),
487 ("c".to_string(), "cc".to_string()),
488 ]),
489 region_wal_options: HashMap::from([
490 (1, "aaa".to_string()),
491 (2, "bbb".to_string()),
492 (3, "ccc".to_string()),
493 ]),
494 };
495 let table_value = DatanodeTableValue {
496 table_id: 1,
497 regions: vec![],
498 follower_regions: vec![],
499 region_info,
500 version: 1,
501 };
502
503 let encoded = serde_json::to_string(&table_value).unwrap();
504 let decoded = serde_json::from_str(&encoded).unwrap();
505 assert_eq!(table_value, decoded);
506
507 let encoded = serde_json::to_vec(&table_value).unwrap();
508 let decoded = serde_json::from_slice(&encoded).unwrap();
509 assert_eq!(table_value, decoded);
510 }
511
512 #[test]
513 fn test_deserialization() {
514 fn test_err(raw_key: &[u8]) {
515 let result = DatanodeTableKey::from_bytes(raw_key);
516 assert!(result.is_err());
517 }
518
519 test_err(b"");
520 test_err(vec![0u8, 159, 146, 150].as_slice()); test_err(b"invalid_prefix/1/2");
522 test_err(b"__dn_table/");
523 test_err(b"__dn_table/invalid_len_1");
524 test_err(b"__dn_table/invalid_len_3/1/2");
525 test_err(b"__dn_table/invalid_node_id/2");
526 test_err(b"__dn_table/1/invalid_table_id");
527
528 let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
529 assert_eq!(DatanodeTableKey::new(11, 21), key);
530 }
531}