1use std::collections::HashMap;
16use std::fmt::Display;
17
18use futures::stream::BoxStream;
19use serde::{Deserialize, Serialize};
20use snafu::OptionExt;
21use store_api::storage::RegionNumber;
22use table::metadata::TableId;
23
24use crate::error::{DatanodeTableInfoNotFoundSnafu, InvalidMetadataSnafu, Result};
25use crate::key::table_route::PhysicalTableRouteValue;
26use crate::key::{
27 MetadataKey, MetadataValue, RegionDistribution, DATANODE_TABLE_KEY_PATTERN,
28 DATANODE_TABLE_KEY_PREFIX,
29};
30use crate::kv_backend::txn::{Txn, TxnOp};
31use crate::kv_backend::KvBackendRef;
32use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
33use crate::rpc::router::region_distribution;
34use crate::rpc::store::{BatchGetRequest, RangeRequest};
35use crate::rpc::KeyValue;
36use crate::DatanodeId;
37
38#[serde_with::serde_as]
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
40pub struct RegionInfo {
43 #[serde(default)]
44 pub engine: String,
46 #[serde(default)]
48 pub region_storage_path: String,
49 #[serde(default)]
51 pub region_options: HashMap<String, String>,
52 #[serde(default)]
55 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
56 pub region_wal_options: HashMap<RegionNumber, String>,
57}
58
59#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct DatanodeTableKey {
64 pub datanode_id: DatanodeId,
65 pub table_id: TableId,
66}
67
68impl DatanodeTableKey {
69 pub fn new(datanode_id: DatanodeId, table_id: TableId) -> Self {
70 Self {
71 datanode_id,
72 table_id,
73 }
74 }
75
76 pub fn prefix(datanode_id: DatanodeId) -> String {
77 format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
78 }
79}
80
81impl MetadataKey<'_, DatanodeTableKey> for DatanodeTableKey {
82 fn to_bytes(&self) -> Vec<u8> {
83 self.to_string().into_bytes()
84 }
85
86 fn from_bytes(bytes: &[u8]) -> Result<DatanodeTableKey> {
87 let key = std::str::from_utf8(bytes).map_err(|e| {
88 InvalidMetadataSnafu {
89 err_msg: format!(
90 "DatanodeTableKey '{}' is not a valid UTF8 string: {e}",
91 String::from_utf8_lossy(bytes)
92 ),
93 }
94 .build()
95 })?;
96 let captures = DATANODE_TABLE_KEY_PATTERN
97 .captures(key)
98 .context(InvalidMetadataSnafu {
99 err_msg: format!("Invalid DatanodeTableKey '{key}'"),
100 })?;
101 let datanode_id = captures[1].parse::<DatanodeId>().unwrap();
103 let table_id = captures[2].parse::<TableId>().unwrap();
104 Ok(DatanodeTableKey {
105 datanode_id,
106 table_id,
107 })
108 }
109}
110
111impl Display for DatanodeTableKey {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub struct DatanodeTableValue {
119 pub table_id: TableId,
120 pub regions: Vec<RegionNumber>,
121 #[serde(flatten)]
122 pub region_info: RegionInfo,
123 version: u64,
124}
125
126impl DatanodeTableValue {
127 pub fn new(table_id: TableId, regions: Vec<RegionNumber>, region_info: RegionInfo) -> Self {
128 Self {
129 table_id,
130 regions,
131 region_info,
132 version: 0,
133 }
134 }
135}
136
137pub fn datanode_table_value_decoder(kv: KeyValue) -> Result<DatanodeTableValue> {
139 DatanodeTableValue::try_from_raw_value(&kv.value)
140}
141
142pub struct DatanodeTableManager {
143 kv_backend: KvBackendRef,
144}
145
146impl DatanodeTableManager {
147 pub fn new(kv_backend: KvBackendRef) -> Self {
148 Self { kv_backend }
149 }
150
151 pub async fn get(&self, key: &DatanodeTableKey) -> Result<Option<DatanodeTableValue>> {
152 self.kv_backend
153 .get(&key.to_bytes())
154 .await?
155 .map(|kv| DatanodeTableValue::try_from_raw_value(&kv.value))
156 .transpose()
157 }
158
159 pub fn tables(
160 &self,
161 datanode_id: DatanodeId,
162 ) -> BoxStream<'static, Result<DatanodeTableValue>> {
163 let start_key = DatanodeTableKey::prefix(datanode_id);
164 let req = RangeRequest::new().with_prefix(start_key.as_bytes());
165
166 let stream = PaginationStream::new(
167 self.kv_backend.clone(),
168 req,
169 DEFAULT_PAGE_SIZE,
170 datanode_table_value_decoder,
171 )
172 .into_stream();
173
174 Box::pin(stream)
175 }
176
177 pub async fn regions(
179 &self,
180 table_id: TableId,
181 table_routes: &PhysicalTableRouteValue,
182 ) -> Result<Vec<DatanodeTableValue>> {
183 let keys = region_distribution(&table_routes.region_routes)
184 .into_keys()
185 .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
186 .collect::<Vec<_>>();
187 let req = BatchGetRequest {
188 keys: keys.iter().map(|k| k.to_bytes()).collect(),
189 };
190 let resp = self.kv_backend.batch_get(req).await?;
191 resp.kvs
192 .into_iter()
193 .map(datanode_table_value_decoder)
194 .collect()
195 }
196
197 pub fn build_create_txn(
199 &self,
200 table_id: TableId,
201 engine: &str,
202 region_storage_path: &str,
203 region_options: HashMap<String, String>,
204 region_wal_options: HashMap<RegionNumber, String>,
205 distribution: RegionDistribution,
206 ) -> Result<Txn> {
207 let txns = distribution
208 .into_iter()
209 .map(|(datanode_id, regions)| {
210 let key = DatanodeTableKey::new(datanode_id, table_id);
211 let val = DatanodeTableValue::new(
212 table_id,
213 regions,
214 RegionInfo {
215 engine: engine.to_string(),
216 region_storage_path: region_storage_path.to_string(),
217 region_options: region_options.clone(),
218 region_wal_options: region_wal_options.clone(),
221 },
222 );
223
224 Ok(TxnOp::Put(key.to_bytes(), val.try_as_raw_value()?))
225 })
226 .collect::<Result<Vec<_>>>()?;
227
228 let txn = Txn::new().and_then(txns);
229
230 Ok(txn)
231 }
232
233 pub(crate) async fn build_update_table_options_txn(
239 &self,
240 table_id: TableId,
241 region_distribution: RegionDistribution,
242 new_region_options: HashMap<String, String>,
243 ) -> Result<Txn> {
244 assert!(!region_distribution.is_empty());
245 let (any_datanode, _) = region_distribution.first_key_value().unwrap();
247
248 let mut region_info = self
249 .kv_backend
250 .get(&DatanodeTableKey::new(*any_datanode, table_id).to_bytes())
251 .await
252 .transpose()
253 .context(DatanodeTableInfoNotFoundSnafu {
254 datanode_id: *any_datanode,
255 table_id,
256 })?
257 .and_then(|r| DatanodeTableValue::try_from_raw_value(&r.value))?
258 .region_info;
259 region_info.region_options = new_region_options;
261
262 let mut txns = Vec::with_capacity(region_distribution.len());
263
264 for (datanode, regions) in region_distribution.into_iter() {
265 let key = DatanodeTableKey::new(datanode, table_id);
266 let key_bytes = key.to_bytes();
267 let value_bytes = DatanodeTableValue::new(table_id, regions, region_info.clone())
268 .try_as_raw_value()?;
269 txns.push(TxnOp::Put(key_bytes, value_bytes));
270 }
271
272 let txn = Txn::new().and_then(txns);
273 Ok(txn)
274 }
275
276 pub(crate) fn build_update_txn(
278 &self,
279 table_id: TableId,
280 region_info: RegionInfo,
281 current_region_distribution: RegionDistribution,
282 new_region_distribution: RegionDistribution,
283 new_region_options: &HashMap<String, String>,
284 new_region_wal_options: &HashMap<RegionNumber, String>,
285 ) -> Result<Txn> {
286 let mut opts = Vec::new();
287
288 for current_datanode in current_region_distribution.keys() {
290 if !new_region_distribution.contains_key(current_datanode) {
291 let key = DatanodeTableKey::new(*current_datanode, table_id);
292 let raw_key = key.to_bytes();
293 opts.push(TxnOp::Delete(raw_key))
294 }
295 }
296
297 let need_update_options = region_info.region_options != *new_region_options;
298 let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options;
299
300 for (datanode, regions) in new_region_distribution.into_iter() {
301 let need_update =
302 if let Some(current_region) = current_region_distribution.get(&datanode) {
303 *current_region != regions || need_update_options || need_update_wal_options
305 } else {
306 true
307 };
308 if need_update {
309 let key = DatanodeTableKey::new(datanode, table_id);
310 let raw_key = key.to_bytes();
311 let mut new_region_info = region_info.clone();
313 if need_update_options {
314 new_region_info
315 .region_options
316 .clone_from(new_region_options);
317 }
318 if need_update_wal_options {
319 new_region_info
320 .region_wal_options
321 .clone_from(new_region_wal_options);
322 }
323 let val = DatanodeTableValue::new(table_id, regions, new_region_info)
324 .try_as_raw_value()?;
325 opts.push(TxnOp::Put(raw_key, val));
326 }
327 }
328
329 let txn = Txn::new().and_then(opts);
330 Ok(txn)
331 }
332
333 pub fn build_delete_txn(
335 &self,
336 table_id: TableId,
337 distribution: RegionDistribution,
338 ) -> Result<Txn> {
339 let txns = distribution
340 .into_keys()
341 .map(|datanode_id| {
342 let key = DatanodeTableKey::new(datanode_id, table_id);
343 let raw_key = key.to_bytes();
344
345 Ok(TxnOp::Delete(raw_key))
346 })
347 .collect::<Result<Vec<_>>>()?;
348
349 let txn = Txn::new().and_then(txns);
350
351 Ok(txn)
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_serialization() {
361 let key = DatanodeTableKey {
362 datanode_id: 1,
363 table_id: 2,
364 };
365 let raw_key = key.to_bytes();
366 assert_eq!(raw_key, b"__dn_table/1/2");
367
368 let value = DatanodeTableValue {
369 table_id: 42,
370 regions: vec![1, 2, 3],
371 region_info: RegionInfo::default(),
372 version: 1,
373 };
374 let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#;
375
376 let raw_value = value.try_as_raw_value().unwrap();
377 assert_eq!(raw_value, literal);
378
379 let actual = DatanodeTableValue::try_from_raw_value(literal).unwrap();
380 assert_eq!(actual, value);
381
382 let raw_str = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#;
384 let parsed = DatanodeTableValue::try_from_raw_value(raw_str);
385 assert!(parsed.is_ok());
386 }
387
388 #[derive(Debug, Serialize, Deserialize, PartialEq)]
389 struct StringHashMap {
390 inner: HashMap<String, String>,
391 }
392
393 #[serde_with::serde_as]
394 #[derive(Debug, Serialize, Deserialize, PartialEq)]
395 struct IntegerHashMap {
396 #[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
397 inner: HashMap<u32, String>,
398 }
399
400 #[test]
401 fn test_serde_with_integer_hash_map() {
402 let map = StringHashMap {
403 inner: HashMap::from([
404 ("1".to_string(), "aaa".to_string()),
405 ("2".to_string(), "bbb".to_string()),
406 ("3".to_string(), "ccc".to_string()),
407 ]),
408 };
409 let encoded = serde_json::to_string(&map).unwrap();
410 let decoded: IntegerHashMap = serde_json::from_str(&encoded).unwrap();
411 assert_eq!(
412 IntegerHashMap {
413 inner: HashMap::from([
414 (1, "aaa".to_string()),
415 (2, "bbb".to_string()),
416 (3, "ccc".to_string()),
417 ]),
418 },
419 decoded
420 );
421
422 let map = IntegerHashMap {
423 inner: HashMap::from([
424 (1, "aaa".to_string()),
425 (2, "bbb".to_string()),
426 (3, "ccc".to_string()),
427 ]),
428 };
429 let encoded = serde_json::to_string(&map).unwrap();
430 let decoded: StringHashMap = serde_json::from_str(&encoded).unwrap();
431 assert_eq!(
432 StringHashMap {
433 inner: HashMap::from([
434 ("1".to_string(), "aaa".to_string()),
435 ("2".to_string(), "bbb".to_string()),
436 ("3".to_string(), "ccc".to_string()),
437 ]),
438 },
439 decoded
440 );
441 }
442
443 #[test]
447 fn test_serde_with_region_info() {
448 let region_info = RegionInfo {
449 engine: "test_engine".to_string(),
450 region_storage_path: "test_storage_path".to_string(),
451 region_options: HashMap::from([
452 ("a".to_string(), "aa".to_string()),
453 ("b".to_string(), "bb".to_string()),
454 ("c".to_string(), "cc".to_string()),
455 ]),
456 region_wal_options: HashMap::from([
457 (1, "aaa".to_string()),
458 (2, "bbb".to_string()),
459 (3, "ccc".to_string()),
460 ]),
461 };
462 let table_value = DatanodeTableValue {
463 table_id: 1,
464 regions: vec![],
465 region_info,
466 version: 1,
467 };
468
469 let encoded = serde_json::to_string(&table_value).unwrap();
470 let decoded = serde_json::from_str(&encoded).unwrap();
471 assert_eq!(table_value, decoded);
472
473 let encoded = serde_json::to_vec(&table_value).unwrap();
474 let decoded = serde_json::from_slice(&encoded).unwrap();
475 assert_eq!(table_value, decoded);
476 }
477
478 #[test]
479 fn test_deserialization() {
480 fn test_err(raw_key: &[u8]) {
481 let result = DatanodeTableKey::from_bytes(raw_key);
482 assert!(result.is_err());
483 }
484
485 test_err(b"");
486 test_err(vec![0u8, 159, 146, 150].as_slice()); test_err(b"invalid_prefix/1/2");
488 test_err(b"__dn_table/");
489 test_err(b"__dn_table/invalid_len_1");
490 test_err(b"__dn_table/invalid_len_3/1/2");
491 test_err(b"__dn_table/invalid_node_id/2");
492 test_err(b"__dn_table/1/invalid_table_id");
493
494 let key = DatanodeTableKey::from_bytes(b"__dn_table/11/21").unwrap();
495 assert_eq!(DatanodeTableKey::new(11, 21), key);
496 }
497}