1use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::DatanodeId;
25use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
26use crate::key::table_route::PhysicalTableRouteValue;
27use crate::key::{
28 DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX, MetadataKey, MetadataValue,
29 RegionDistribution, RegionRoleSet,
30};
31use crate::kv_backend::KvBackendRef;
32use crate::kv_backend::txn::{Txn, TxnOp};
33use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
34use crate::rpc::KeyValue;
35use crate::rpc::router::region_distribution;
36use crate::rpc::store::{BatchGetRequest, RangeRequest};
37use crate::wal_provider::{RegionWalOptions, region_wal_options_serde};
38
39#[serde_with::serde_as]
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
41pub struct RegionInfo {
44 #[serde(default)]
45 pub engine: String,
47 #[serde(default)]
49 pub region_storage_path: String,
50 #[serde(default)]
52 pub region_options: HashMap<String, String>,
53 #[serde(default)]
56 #[serde(with = "region_wal_options_serde")]
57 pub region_wal_options: RegionWalOptions,
58}
59
60#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
64pub struct DatanodeTableKey {
65 pub datanode_id: DatanodeId,
66 pub table_id: TableId,
67}
68
69impl DatanodeTableKey {
70 pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
71 Self {
72 datanode_id,
73 table_id,
74 }
75 }
76
77 pub fn prefix(datanode_id: DatanodeId) -> String {
78 format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
79 }
80}
81
82impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
83 fn to_bytes(&self) -> Vec<u8> {
84 self.to_string().into_bytes()
85 }
86
87 fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
88 let key = std::str::from_utf8(bytes).map_err(|e| {
89 InvalidMetadataSnafu {
90 err_msg: format!(
91 "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
92 String::from_utf8_lossy(bytes)
93 ),
94 }
95 .build()
96 })?;
97 let captures = DATANODE_TABLE_KEY_PATTERN
98 .captures(key)
99 .context(InvalidMetadataSnafu {
100 err_msg: format!("Invalid DatanodeTableKey '{key}'"),
101 })?;
102 let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
104 let table_id = captures[2].parse::<TableId>().unwrap();
105 Ok(DatanodeTableKey {
106 datanode_id,
107 table_id,
108 })
109 }
110}
111
112impl Display for DatanodeTableKey {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
115 }
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub struct DatanodeTableValue {
120 pub table_id: TableId,
121 pub regions: Vec<RegionNumber>,
122 #[serde(default)]
123 pub follower_regions: Vec<RegionNumber>,
124 #[serde(flatten)]
125 pub region_info: RegionInfo,
126 version: u64,
127}
128
129impl DatanodeTableValue {
130 pub fn new(table_id: TableId, region_role_set: RegionRoleSet, region_info: RegionInfo) -> Self {
131 let RegionRoleSet {
132 leader_regions,
133 follower_regions,
134 } = region_role_set;
135
136 Self {
137 table_id,
138 regions: leader_regions,
139 follower_regions,
140 region_info,
141 version: 0,
142 }
143 }
144}
145
146pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
148 DatanodeTableValue::try_from_raw_value(&kv.value)
149}
150
151pub struct DatanodeTableManager {
152 kv_backend: KvBackendRef,
153}
154
155impl DatanodeTableManager {
156 pub fn new(kv_backend: KvBackendRef) -> Self {
157 Self { kv_backend }
158 }
159
160 pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
161 self.kv_backend
162 .get(&key.to_bytes())
163 .await?
164 .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
165 .transpose()
166 }
167
168 pub async fn batch_get(
169 &self,
170 keys: &[DatanodeTableKey],
171 ) -> Result<HashMap<DatanodeTableKey, DatanodeTableValue>> {
172 let req = BatchGetRequest::default().with_keys(keys.iter().map(|k| k.to_bytes()).collect());
173 let resp = self.kv_backend.batch_get(req).await?;
174 let values = resp
175 .kvs
176 .into_iter()
177 .map(|kv| {
178 Ok((
179 DatanodeTableKey::from_bytes(&kv.key)?,
180 DatanodeTableValue::try_from_raw_value(&kv.value)?,
181 ))
182 })
183 .collect::<Result<HashMap<_, _>>>()?;
184 Ok(values)
185 }
186
187 pub fn tables(
188 &self,
189 datanode_id: DatanodeId,
190 ) -> BoxStream<'static, Result<DatanodeTableValue>> {
191 let start_key = DatanodeTableKey::prefix(datanode_id);
192 let req = RangeRequest::new().with_prefix(start_key.as_bytes());
193
194 let stream = PaginationStream::new(
195 self.kv_backend.clone(),
196 req,
197 DEFAULT_PAGE_SIZE,
198 datanode_table_value_decoder,
199 )
200 .into_stream();
201
202 Box::pin(stream)
203 }
204
205 pub async fn regions(
207 &self,
208 table_id: TableId,
209 table_routes: &PhysicalTableRouteValue,
210 ) -> Result<Vec<DatanodeTableValue>> {
211 let keys = region_distribution(&table_routes.region_routes)
212 .into_keys()
213 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
214 .collect::<Vec<_>>();
215 let req = BatchGetRequest {
216 keys: keys.iter().map(|k| k.to_bytes()).collect(),
217 };
218 let resp = self.kv_backend.batch_get(req).await?;
219 resp.kvs
220 .into_iter()
221 .map(datanode_table_value_decoder)
222 .collect()
223 }
224
225 pub fn build_create_txn(
227 &self,
228 table_id: TableId,
229 engine: &str,
230 region_storage_path: &str,
231 region_options: HashMap<String, String>,
232 region_wal_options: RegionWalOptions,
233 distribution: RegionDistribution,
234 ) -> Result<Txn> {
235 let txns = distribution
236 .into_iter()
237 .map(|(datanode_id, regions)| {
238 let key = DatanodeTableKey::new(datanode_id, table_id);
239 let val = DatanodeTableValue::new(
240 table_id,
241 regions,
242 RegionInfo {
243 engine: engine.to_string(),
244 region_storage_path: region_storage_path.to_string(),
245 region_options: region_options.clone(),
246 region_wal_options: region_wal_options.clone(),
249 },
250 );
251
252 Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
253 })
254 .collect::<Result<Vec<_>>>()?;
255
256 let txn = Txn::new().and_then(txns);
257
258 Ok(txn)
259 }
260
261 pub(crate) async fn build_update_table_options_txn(
267 &self,
268 table_id: TableId,
269 region_distribution: RegionDistribution,
270 new_region_options: HashMap<String, String>,
271 ) -> Result<Txn> {
272 assert!(!region_distribution.is_empty());
273 let (any_datanode, _) = region_distribution.first_key_value().unwrap();
275
276 let mut region_info = self
277 .kv_backend
278 .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
279 .await
280 .transpose()
281 .context(DatanodeTableInfoNotFoundSnafu {
282 datanode_id: *any_datanode,
283 table_id,
284 })?
285 .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
286 .region_info;
287
288 if region_info.region_options == new_region_options {
290 return Ok(Txn::new());
291 }
292 region_info.region_options = new_region_options;
294
295 let mut txns = Vec::with_capacity(region_distribution.len());
296
297 for (datanode, regions) in region_distribution.into_iter() {
298 let key = DatanodeTableKey::new(datanode, table_id);
299 let key_bytes = key.to_bytes();
300 let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
301 .try_as_raw_value()?;
302 txns.push(TxnOp::Put(key_bytes, value_bytes));
303 }
304
305 let txn = Txn::new().and_then(txns);
306 Ok(txn)
307 }
308
309 pub(crate) fn build_update_txn(
311 &self,
312 table_id: TableId,
313 region_info: RegionInfo,
314 current_region_distribution: RegionDistribution,
315 new_region_distribution: RegionDistribution,
316 new_region_options: &HashMap<String, String>,
317 new_region_wal_options: &RegionWalOptions,
318 ) -> Result<Txn> {
319 let mut opts = Vec::new();
320
321 for current_datanode in current_region_distribution.keys() {
323 if !new_region_distribution.contains_key(current_datanode) {
324 let key = DatanodeTableKey::new(*current_datanode, table_id);
325 let raw_key = key.to_bytes();
326 opts.push(TxnOp::Delete(raw_key))
327 }
328 }
329
330 let need_update_options = region_info.region_options != *new_region_options;
331 let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
332
333 for (datanode, regions) in new_region_distribution.into_iter() {
334 let need_update =
335 if let Some(current_region) = current_region_distribution.get(&datanode) {
336 *current_region != regions || need_update_options || need_update_wal_options
338 } else {
339 true
340 };
341 if need_update {
342 let key = DatanodeTableKey::new(datanode, table_id);
343 let raw_key = key.to_bytes();
344 let mut new_region_info = region_info.clone();
346 if need_update_options {
347 new_region_info
348 .region_options
349 .clone_from(new_region_options);
350 }
351 if need_update_wal_options {
352 new_region_info
353 .region_wal_options
354 .clone_from(new_region_wal_options);
355 }
356 let val = DatanodeTableValue::new(table_id, regions, new_region_info)
357 .try_as_raw_value()?;
358 opts.push(TxnOp::Put(raw_key, val));
359 }
360 }
361
362 let txn = Txn::new().and_then(opts);
363 Ok(txn)
364 }
365
366 pub fn build_delete_txn(
368 &self,
369 table_id: TableId,
370 distribution: RegionDistribution,
371 ) -> Result<Txn> {
372 let txns = distribution
373 .into_keys()
374 .map(|datanode_id| {
375 let key = DatanodeTableKey::new(datanode_id, table_id);
376 let raw_key = key.to_bytes();
377
378 Ok(TxnOp::Delete(raw_key))
379 })
380 .collect::<Result<Vec<_>>>()?;
381
382 let txn = Txn::new().and_then(txns);
383
384 Ok(txn)
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use common_wal::options::WalOptions;
391
392 use super::*;
393
394 #[test]
395 fn test_serialization() {
396 let key = DatanodeTableKey {
397 datanode_id: 1,
398 table_id: 2,
399 };
400 let raw_key = key.to_bytes();
401 assert_eq!(raw_key, b"__dn_table/1/2");
402
403 let value = DatanodeTableValue {
404 table_id: 42,
405 regions: vec![1, 2, 3],
406 follower_regions: vec![],
407 region_info: RegionInfo::default(),
408 version: 1,
409 };
410 let literal = br#"{"table_id":42,"regions":[1,2,3],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
411
412 let raw_value = value.try_as_raw_value().unwrap();
413 assert_eq!(raw_value, literal);
414
415 let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
416 assert_eq!(actual, value);
417
418 let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
420 let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
421 assert!(parsed.is_ok());
422 }
423
424 #[derive(Debug, Serialize, Deserialize, PartialEq)]
425 struct StringHashMap {
426 inner: HashMap<String, String>,
427 }
428
429 #[serde_with::serde_as]
430 #[derive(Debug, Serialize, Deserialize, PartialEq)]
431 struct IntegerHashMap {
432 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
433 inner: HashMap<u32, String>,
434 }
435
436 #[test]
437 fn test_serde_with_integer_hash_map() {
438 let map = StringHashMap {
439 inner: HashMap::from([
440 ("1".to_string(), "aaa".to_string()),
441 ("2".to_string(), "bbb".to_string()),
442 ("3".to_string(), "ccc".to_string()),
443 ]),
444 };
445 let encoded = serde_json::to_string(&map).unwrap();
446 let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
447 assert_eq!(
448 IntegerHashMap {
449 inner: HashMap::from([
450 (1, "aaa".to_string()),
451 (2, "bbb".to_string()),
452 (3, "ccc".to_string()),
453 ]),
454 },
455 decoded
456 );
457
458 let map = IntegerHashMap {
459 inner: HashMap::from([
460 (1, "aaa".to_string()),
461 (2, "bbb".to_string()),
462 (3, "ccc".to_string()),
463 ]),
464 };
465 let encoded = serde_json::to_string(&map).unwrap();
466 let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
467 assert_eq!(
468 StringHashMap {
469 inner: HashMap::from([
470 ("1".to_string(), "aaa".to_string()),
471 ("2".to_string(), "bbb".to_string()),
472 ("3".to_string(), "ccc".to_string()),
473 ]),
474 },
475 decoded
476 );
477 }
478
479 #[test]
483 fn test_serde_with_region_info() {
484 let region_info = RegionInfo {
485 engine: "test_engine".to_string(),
486 region_storage_path: "test_storage_path".to_string(),
487 region_options: HashMap::from([
488 ("a".to_string(), "aa".to_string()),
489 ("b".to_string(), "bb".to_string()),
490 ("c".to_string(), "cc".to_string()),
491 ]),
492 region_wal_options: HashMap::from([
493 (1, WalOptions::RaftEngine),
494 (2, WalOptions::Noop),
495 (3, WalOptions::RaftEngine),
496 ]),
497 };
498 let table_value = DatanodeTableValue {
499 table_id: 1,
500 regions: vec![],
501 follower_regions: vec![],
502 region_info,
503 version: 1,
504 };
505
506 let encoded = serde_json::to_string(&table_value).unwrap();
507 let decoded = serde_json::from_str(&encoded).unwrap();
508 assert_eq!(table_value, decoded);
509
510 let encoded = serde_json::to_vec(&table_value).unwrap();
511 let decoded = serde_json::from_slice(&encoded).unwrap();
512 assert_eq!(table_value, decoded);
513 }
514
515 #[test]
516 fn test_deserialize_legacy_region_wal_options() {
517 let literal = br#"{"table_id":42,"regions":[1],"follower_regions":[],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{"1":"{\"wal.provider\":\"raft_engine\"}"},"version":1}"#;
518
519 let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
520
521 assert_eq!(
522 actual.region_info.region_wal_options,
523 HashMap::from([(1, WalOptions::RaftEngine)])
524 );
525 }
526
527 #[test]
528 fn test_deserialization() {
529 fn test_err(raw_key: &[u8]) {
530 let result = DatanodeTableKey::from_bytes(raw_key);
531 assert!(result.is_err());
532 }
533
534 test_err(b"");
535 test_err(vec![0u8, 159, 146, 150].as_slice()); test_err(b"invalid_prefix/1/2");
537 test_err(b"__dn_table/");
538 test_err(b"__dn_table/invalid_len_1");
539 test_err(b"__dn_table/invalid_len_3/1/2");
540 test_err(b"__dn_table/invalid_node_id/2");
541 test_err(b"__dn_table/1/invalid_table_id");
542
543 let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
544 assert_eq!(DatanodeTableKey::new(11, 21), key);
545 }
546}