1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use api::v1::value::ValueData;
20use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
21use async_stream::try_stream;
22use base64::engine::general_purpose::STANDARD_NO_PAD;
23use base64::Engine;
24use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
25use datafusion::prelude::{col, lit};
26use futures_util::stream::BoxStream;
27use futures_util::TryStreamExt;
28use mito2::engine::MitoEngine;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
34 METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
38use store_api::storage::{RegionId, ScanRequest};
39use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
40
41use crate::error::{
42 CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
43 LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
44 ParseRegionIdSnafu, Result,
45};
46use crate::utils;
47
48const REGION_PREFIX: &str = "__region_";
49const COLUMN_PREFIX: &str = "__column_";
50
51pub struct MetadataRegion {
64 pub(crate) mito: MitoEngine,
65 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
70}
71
72impl MetadataRegion {
73 pub fn new(mito: MitoEngine) -> Self {
74 Self {
75 mito,
76 logical_region_lock: RwLock::new(HashMap::new()),
77 }
78 }
79
80 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
84 match self
85 .logical_region_lock
86 .write()
87 .await
88 .entry(logical_region_id)
89 {
90 Entry::Occupied(_) => false,
91 Entry::Vacant(vacant_entry) => {
92 vacant_entry.insert(Arc::new(RwLock::new(())));
93 true
94 }
95 }
96 }
97
98 pub async fn read_lock_logical_region(
100 &self,
101 logical_region_id: RegionId,
102 ) -> Result<OwnedRwLockReadGuard<()>> {
103 let lock = self
104 .logical_region_lock
105 .read()
106 .await
107 .get(&logical_region_id)
108 .context(LogicalRegionNotFoundSnafu {
109 region_id: logical_region_id,
110 })?
111 .clone();
112 Ok(RwLock::read_owned(lock).await)
113 }
114
115 pub async fn write_lock_logical_region(
117 &self,
118 logical_region_id: RegionId,
119 ) -> Result<OwnedRwLockWriteGuard<()>> {
120 let lock = self
121 .logical_region_lock
122 .read()
123 .await
124 .get(&logical_region_id)
125 .context(LogicalRegionNotFoundSnafu {
126 region_id: logical_region_id,
127 })?
128 .clone();
129 Ok(RwLock::write_owned(lock).await)
130 }
131
132 pub async fn remove_logical_region(
136 &self,
137 physical_region_id: RegionId,
138 logical_region_id: RegionId,
139 ) -> Result<()> {
140 let region_id = utils::to_metadata_region_id(physical_region_id);
142 let region_key = Self::concat_region_key(logical_region_id);
143
144 let logical_columns = self
146 .logical_columns(physical_region_id, logical_region_id)
147 .await?;
148 let mut column_keys = logical_columns
149 .into_iter()
150 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
151 .collect::<Vec<_>>();
152
153 column_keys.push(region_key);
155 self.delete(region_id, &column_keys).await?;
156
157 self.logical_region_lock
158 .write()
159 .await
160 .remove(&logical_region_id);
161
162 Ok(())
163 }
164
165 pub async fn logical_columns(
169 &self,
170 physical_region_id: RegionId,
171 logical_region_id: RegionId,
172 ) -> Result<Vec<(String, ColumnMetadata)>> {
173 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
174 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
175
176 let mut columns = vec![];
177 for (k, v) in self
178 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
179 .await?
180 {
181 if !k.starts_with(®ion_column_prefix) {
182 continue;
183 }
184 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
186 let column_metadata = Self::deserialize_column_metadata(&v)?;
187 columns.push((column_name, column_metadata));
188 }
189
190 Ok(columns)
191 }
192
193 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
195 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
196
197 let mut regions = vec![];
198 for k in self
199 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
200 .await?
201 {
202 if !k.starts_with(REGION_PREFIX) {
203 continue;
204 }
205 let region_id = Self::parse_region_key(&k).unwrap();
207 let region_id = region_id.parse::<u64>().unwrap().into();
208 regions.push(region_id);
209 }
210
211 Ok(regions)
212 }
213}
214
215impl MetadataRegion {
217 pub fn concat_region_key(region_id: RegionId) -> String {
218 format!("{REGION_PREFIX}{}", region_id.as_u64())
219 }
220
221 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
223 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
224 format!(
225 "{COLUMN_PREFIX}{}_{}",
226 region_id.as_u64(),
227 encoded_column_name
228 )
229 }
230
231 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
233 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
234 }
235
236 pub fn parse_region_key(key: &str) -> Option<&str> {
237 key.strip_prefix(REGION_PREFIX)
238 }
239
240 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
242 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
243 let mut iter = stripped.split('_');
244
245 let region_id_raw = iter.next().unwrap();
246 let region_id = region_id_raw
247 .parse::<u64>()
248 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
249 .into();
250
251 let encoded_column_name = iter.next().unwrap();
252 let column_name = STANDARD_NO_PAD
253 .decode(encoded_column_name)
254 .context(DecodeColumnValueSnafu)?;
255
256 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
257 } else {
258 Ok(None)
259 }
260 }
261
262 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
263 serde_json::to_string(column_metadata).unwrap()
264 }
265
266 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
267 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
268 raw: column_metadata,
269 })
270 }
271}
272
273pub fn decode_batch_stream<T: Send + 'static>(
275 mut record_batch_stream: SendableRecordBatchStream,
276 decode: fn(RecordBatch) -> Vec<T>,
277) -> BoxStream<'static, Result<T>> {
278 let stream = try_stream! {
279 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
280 for item in decode(batch) {
281 yield item;
282 }
283 }
284 };
285 Box::pin(stream)
286}
287
288fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
290 let key_col = batch.column(0);
291 let val_col = batch.column(1);
292
293 (0..batch.num_rows())
294 .flat_map(move |row_index| {
295 let key = key_col
296 .get_ref(row_index)
297 .as_string()
298 .unwrap()
299 .map(|s| s.to_string());
300
301 key.map(|k| {
302 (
303 k,
304 val_col
305 .get_ref(row_index)
306 .as_string()
307 .unwrap()
308 .map(|s| s.to_string())
309 .unwrap_or_default(),
310 )
311 })
312 })
313 .collect()
314}
315
316fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
318 let key_col = batch.column(0);
319
320 (0..batch.num_rows())
321 .flat_map(move |row_index| {
322 let key = key_col
323 .get_ref(row_index)
324 .as_string()
325 .unwrap()
326 .map(|s| s.to_string());
327 key
328 })
329 .collect()
330}
331
332impl MetadataRegion {
336 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
337 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
338
339 let projection = if key_only {
340 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
341 } else {
342 vec![
343 METADATA_SCHEMA_KEY_COLUMN_INDEX,
344 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
345 ]
346 };
347 ScanRequest {
348 projection: Some(projection),
349 filters: vec![filter_expr],
350 ..Default::default()
351 }
352 }
353
354 pub async fn get_all_with_prefix(
355 &self,
356 region_id: RegionId,
357 prefix: &str,
358 ) -> Result<HashMap<String, String>> {
359 let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
360 let record_batch_stream = self
361 .mito
362 .scan_to_stream(region_id, scan_req)
363 .await
364 .context(MitoReadOperationSnafu)?;
365
366 decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
367 .try_collect::<HashMap<_, _>>()
368 .await
369 }
370
371 pub async fn get_all_key_with_prefix(
372 &self,
373 region_id: RegionId,
374 prefix: &str,
375 ) -> Result<Vec<String>> {
376 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
377 let record_batch_stream = self
378 .mito
379 .scan_to_stream(region_id, scan_req)
380 .await
381 .context(MitoReadOperationSnafu)?;
382
383 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
384 .try_collect::<Vec<_>>()
385 .await
386 }
387
388 async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
391 let delete_request = Self::build_delete_request(keys);
392 self.mito
393 .handle_request(
394 region_id,
395 store_api::region_request::RegionRequest::Delete(delete_request),
396 )
397 .await
398 .context(MitoWriteOperationSnafu)?;
399 Ok(())
400 }
401
402 pub(crate) fn build_put_request_from_iter(
403 kv: impl Iterator<Item = (String, String)>,
404 ) -> RegionPutRequest {
405 let cols = vec![
406 ColumnSchema {
407 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
408 datatype: ColumnDataType::TimestampMillisecond as _,
409 semantic_type: SemanticType::Timestamp as _,
410 ..Default::default()
411 },
412 ColumnSchema {
413 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
414 datatype: ColumnDataType::String as _,
415 semantic_type: SemanticType::Tag as _,
416 ..Default::default()
417 },
418 ColumnSchema {
419 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
420 datatype: ColumnDataType::String as _,
421 semantic_type: SemanticType::Field as _,
422 ..Default::default()
423 },
424 ];
425 let rows = Rows {
426 schema: cols,
427 rows: kv
428 .into_iter()
429 .map(|(key, value)| Row {
430 values: vec![
431 Value {
432 value_data: Some(ValueData::TimestampMillisecondValue(0)),
433 },
434 Value {
435 value_data: Some(ValueData::StringValue(key)),
436 },
437 Value {
438 value_data: Some(ValueData::StringValue(value)),
439 },
440 ],
441 })
442 .collect(),
443 };
444
445 RegionPutRequest { rows, hint: None }
446 }
447
448 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
449 let cols = vec![
450 ColumnSchema {
451 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
452 datatype: ColumnDataType::TimestampMillisecond as _,
453 semantic_type: SemanticType::Timestamp as _,
454 ..Default::default()
455 },
456 ColumnSchema {
457 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
458 datatype: ColumnDataType::String as _,
459 semantic_type: SemanticType::Tag as _,
460 ..Default::default()
461 },
462 ];
463 let rows = keys
464 .iter()
465 .map(|key| Row {
466 values: vec![
467 Value {
468 value_data: Some(ValueData::TimestampMillisecondValue(0)),
469 },
470 Value {
471 value_data: Some(ValueData::StringValue(key.to_string())),
472 },
473 ],
474 })
475 .collect();
476 let rows = Rows { schema: cols, rows };
477
478 RegionDeleteRequest { rows }
479 }
480
481 pub async fn add_logical_regions(
483 &self,
484 physical_region_id: RegionId,
485 write_region_id: bool,
486 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
487 ) -> Result<()> {
488 let region_id = utils::to_metadata_region_id(physical_region_id);
489 let iter = logical_regions
490 .into_iter()
491 .flat_map(|(logical_region_id, column_metadatas)| {
492 if write_region_id {
493 Some((
494 MetadataRegion::concat_region_key(logical_region_id),
495 String::new(),
496 ))
497 } else {
498 None
499 }
500 .into_iter()
501 .chain(column_metadatas.into_iter().map(
502 move |(name, column_metadata)| {
503 (
504 MetadataRegion::concat_column_key(logical_region_id, name),
505 MetadataRegion::serialize_column_metadata(column_metadata),
506 )
507 },
508 ))
509 })
510 .collect::<Vec<_>>();
511
512 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
513 self.mito
514 .handle_request(
515 region_id,
516 store_api::region_request::RegionRequest::Put(put_request),
517 )
518 .await
519 .context(MitoWriteOperationSnafu)?;
520
521 Ok(())
522 }
523}
524
525#[cfg(test)]
526impl MetadataRegion {
527 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
530 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
531 .eq(datafusion::prelude::lit(key));
532
533 let scan_req = ScanRequest {
534 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
535 filters: vec![filter_expr],
536 ..Default::default()
537 };
538 let record_batch_stream = self
539 .mito
540 .scan_to_stream(region_id, scan_req)
541 .await
542 .context(MitoReadOperationSnafu)?;
543 let scan_result = common_recordbatch::util::collect(record_batch_stream)
544 .await
545 .context(CollectRecordBatchStreamSnafu)?;
546
547 let Some(first_batch) = scan_result.first() else {
548 return Ok(None);
549 };
550
551 let val = first_batch
552 .column(0)
553 .get_ref(0)
554 .as_string()
555 .unwrap()
556 .map(|s| s.to_string());
557
558 Ok(val)
559 }
560
561 pub async fn column_semantic_type(
563 &self,
564 physical_region_id: RegionId,
565 logical_region_id: RegionId,
566 column_name: &str,
567 ) -> Result<Option<SemanticType>> {
568 let region_id = utils::to_metadata_region_id(physical_region_id);
569 let column_key = Self::concat_column_key(logical_region_id, column_name);
570 let semantic_type = self.get(region_id, &column_key).await?;
571 semantic_type
572 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
573 .transpose()
574 }
575}
576
577#[cfg(test)]
578mod test {
579 use datatypes::data_type::ConcreteDataType;
580 use datatypes::schema::ColumnSchema;
581
582 use super::*;
583 use crate::test_util::TestEnv;
584 use crate::utils::to_metadata_region_id;
585
586 #[test]
587 fn test_concat_table_key() {
588 let region_id = RegionId::new(1234, 7844);
589 let expected = "__region_5299989651108".to_string();
590 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
591 }
592
593 #[test]
594 fn test_concat_column_key() {
595 let region_id = RegionId::new(8489, 9184);
596 let column_name = "my_column";
597 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
598 assert_eq!(
599 MetadataRegion::concat_column_key(region_id, column_name),
600 expected
601 );
602 }
603
604 #[test]
605 fn test_parse_table_key() {
606 let region_id = RegionId::new(87474, 10607);
607 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
608 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
609
610 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
611 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
612 }
613
614 #[test]
615 fn test_parse_valid_column_key() {
616 let region_id = RegionId::new(176, 910);
617 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
618 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
619
620 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
621 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
622 }
623
624 #[test]
625 fn test_parse_invalid_column_key() {
626 let key = "__column_asdfasd_????";
627 let result = MetadataRegion::parse_column_key(key);
628 assert!(result.is_err());
629 }
630
631 #[test]
632 fn test_serialize_column_metadata() {
633 let semantic_type = SemanticType::Tag;
634 let column_metadata = ColumnMetadata {
635 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
636 semantic_type,
637 column_id: 5,
638 };
639 let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
640 assert_eq!(
641 MetadataRegion::serialize_column_metadata(&column_metadata),
642 expected
643 );
644
645 let semantic_type = "\"Invalid Column Metadata\"";
646 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
647 }
648
649 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
650 HashMap::from([
651 (
652 "label1".to_string(),
653 ColumnMetadata {
654 column_schema: ColumnSchema::new(
655 "label1".to_string(),
656 ConcreteDataType::string_datatype(),
657 false,
658 ),
659 semantic_type: SemanticType::Tag,
660 column_id: 5,
661 },
662 ),
663 (
664 "label2".to_string(),
665 ColumnMetadata {
666 column_schema: ColumnSchema::new(
667 "label2".to_string(),
668 ConcreteDataType::string_datatype(),
669 false,
670 ),
671 semantic_type: SemanticType::Tag,
672 column_id: 5,
673 },
674 ),
675 ])
676 }
677
678 #[tokio::test]
679 async fn add_logical_regions_to_meta_region() {
680 let env = TestEnv::new().await;
681 env.init_metric_region().await;
682 let metadata_region = env.metadata_region();
683 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
684 let column_metadatas = test_column_metadatas();
685 let logical_region_id = RegionId::new(1024, 1);
686
687 let iter = vec![(
688 logical_region_id,
689 column_metadatas
690 .iter()
691 .map(|(k, v)| (k.as_str(), v))
692 .collect::<HashMap<_, _>>(),
693 )];
694 metadata_region
695 .add_logical_regions(physical_region_id, true, iter.into_iter())
696 .await
697 .unwrap();
698 let iter = vec![(
700 logical_region_id,
701 column_metadatas
702 .iter()
703 .map(|(k, v)| (k.as_str(), v))
704 .collect::<HashMap<_, _>>(),
705 )];
706 metadata_region
707 .add_logical_regions(physical_region_id, true, iter.into_iter())
708 .await
709 .unwrap();
710
711 let logical_regions = metadata_region
713 .logical_regions(physical_region_id)
714 .await
715 .unwrap();
716 assert_eq!(logical_regions.len(), 2);
717
718 let logical_columns = metadata_region
720 .logical_columns(physical_region_id, logical_region_id)
721 .await
722 .unwrap()
723 .into_iter()
724 .collect::<HashMap<_, _>>();
725 assert_eq!(logical_columns.len(), 2);
726 assert_eq!(column_metadatas, logical_columns);
727 }
728}