1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use datafusion::prelude::{col, lit};
29use futures_util::TryStreamExt;
30use futures_util::stream::BoxStream;
31use mito2::engine::MitoEngine;
32use moka::future::Cache;
33use moka::policy::EvictionPolicy;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::ColumnMetadata;
36use store_api::metric_engine_consts::{
37 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
38 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
39 METADATA_SCHEMA_VALUE_COLUMN_NAME,
40};
41use store_api::region_engine::RegionEngine;
42use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
43use store_api::storage::{RegionId, ScanRequest};
44use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
45
46use crate::error::{
47 CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
48 DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
49 MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
50};
51use crate::utils;
52
53const REGION_PREFIX: &str = "__region_";
54const COLUMN_PREFIX: &str = "__column_";
55
56pub struct MetadataRegion {
69 pub(crate) mito: MitoEngine,
70 cache: Cache<RegionId, RegionMetadataCacheEntry>,
75 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
80}
81
82#[derive(Clone)]
83struct RegionMetadataCacheEntry {
84 key_values: Arc<BTreeMap<String, String>>,
85 size: usize,
86}
87
88const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
90const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
92
93impl MetadataRegion {
94 pub fn new(mito: MitoEngine) -> Self {
95 let cache = Cache::builder()
96 .max_capacity(MAX_CACHE_SIZE)
97 .eviction_policy(EvictionPolicy::lru())
100 .time_to_live(CACHE_TTL)
101 .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
102 .build();
103 Self {
104 mito,
105 cache,
106 logical_region_lock: RwLock::new(HashMap::new()),
107 }
108 }
109
110 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
114 match self
115 .logical_region_lock
116 .write()
117 .await
118 .entry(logical_region_id)
119 {
120 Entry::Occupied(_) => false,
121 Entry::Vacant(vacant_entry) => {
122 vacant_entry.insert(Arc::new(RwLock::new(())));
123 true
124 }
125 }
126 }
127
128 pub async fn read_lock_logical_region(
130 &self,
131 logical_region_id: RegionId,
132 ) -> Result<OwnedRwLockReadGuard<()>> {
133 let lock = self
134 .logical_region_lock
135 .read()
136 .await
137 .get(&logical_region_id)
138 .context(LogicalRegionNotFoundSnafu {
139 region_id: logical_region_id,
140 })?
141 .clone();
142 Ok(RwLock::read_owned(lock).await)
143 }
144
145 pub async fn write_lock_logical_region(
147 &self,
148 logical_region_id: RegionId,
149 ) -> Result<OwnedRwLockWriteGuard<()>> {
150 let lock = self
151 .logical_region_lock
152 .read()
153 .await
154 .get(&logical_region_id)
155 .context(LogicalRegionNotFoundSnafu {
156 region_id: logical_region_id,
157 })?
158 .clone();
159 Ok(RwLock::write_owned(lock).await)
160 }
161
162 pub async fn remove_logical_region(
166 &self,
167 physical_region_id: RegionId,
168 logical_region_id: RegionId,
169 ) -> Result<()> {
170 let region_id = utils::to_metadata_region_id(physical_region_id);
172 let region_key = Self::concat_region_key(logical_region_id);
173
174 let logical_columns = self
176 .logical_columns(physical_region_id, logical_region_id)
177 .await?;
178 let mut column_keys = logical_columns
179 .into_iter()
180 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
181 .collect::<Vec<_>>();
182
183 column_keys.push(region_key);
185 self.delete(region_id, &column_keys).await?;
186
187 self.logical_region_lock
188 .write()
189 .await
190 .remove(&logical_region_id);
191
192 Ok(())
193 }
194
195 pub async fn logical_columns(
199 &self,
200 physical_region_id: RegionId,
201 logical_region_id: RegionId,
202 ) -> Result<Vec<(String, ColumnMetadata)>> {
203 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
204 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
205
206 let mut columns = vec![];
207 for (k, v) in self
208 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
209 .await?
210 {
211 if !k.starts_with(®ion_column_prefix) {
212 continue;
213 }
214 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
216 let column_metadata = Self::deserialize_column_metadata(&v)?;
217 columns.push((column_name, column_metadata));
218 }
219
220 Ok(columns)
221 }
222
223 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
225 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
226
227 let mut regions = vec![];
228 for k in self
229 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
230 .await?
231 {
232 if !k.starts_with(REGION_PREFIX) {
233 continue;
234 }
235 let region_id = Self::parse_region_key(&k).unwrap();
237 let region_id = region_id.parse::<u64>().unwrap().into();
238 regions.push(region_id);
239 }
240
241 Ok(regions)
242 }
243}
244
245impl MetadataRegion {
247 pub fn concat_region_key(region_id: RegionId) -> String {
248 format!("{REGION_PREFIX}{}", region_id.as_u64())
249 }
250
251 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
253 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
254 format!(
255 "{COLUMN_PREFIX}{}_{}",
256 region_id.as_u64(),
257 encoded_column_name
258 )
259 }
260
261 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
263 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
264 }
265
266 pub fn parse_region_key(key: &str) -> Option<&str> {
267 key.strip_prefix(REGION_PREFIX)
268 }
269
270 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
272 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
273 let mut iter = stripped.split('_');
274
275 let region_id_raw = iter.next().unwrap();
276 let region_id = region_id_raw
277 .parse::<u64>()
278 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
279 .into();
280
281 let encoded_column_name = iter.next().unwrap();
282 let column_name = STANDARD_NO_PAD
283 .decode(encoded_column_name)
284 .context(DecodeColumnValueSnafu)?;
285
286 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
287 } else {
288 Ok(None)
289 }
290 }
291
292 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
293 serde_json::to_string(column_metadata).unwrap()
294 }
295
296 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
297 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
298 raw: column_metadata,
299 })
300 }
301}
302
303pub fn decode_batch_stream<T: Send + 'static>(
305 mut record_batch_stream: SendableRecordBatchStream,
306 decode: fn(RecordBatch) -> Vec<T>,
307) -> BoxStream<'static, Result<T>> {
308 let stream = try_stream! {
309 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
310 for item in decode(batch) {
311 yield item;
312 }
313 }
314 };
315 Box::pin(stream)
316}
317
318fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
320 let keys = batch.iter_column_as_string(0);
321 let values = batch.iter_column_as_string(1);
322 keys.zip(values)
323 .filter_map(|(k, v)| match (k, v) {
324 (Some(k), Some(v)) => Some((k, v)),
325 (Some(k), None) => Some((k, "".to_string())),
326 (None, _) => None,
327 })
328 .collect::<Vec<_>>()
329}
330
331fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
333 batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
334}
335
336impl MetadataRegion {
340 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
341 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
342
343 let projection = if key_only {
344 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
345 } else {
346 vec![
347 METADATA_SCHEMA_KEY_COLUMN_INDEX,
348 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
349 ]
350 };
351 ScanRequest {
352 projection: Some(projection),
353 filters: vec![filter_expr],
354 ..Default::default()
355 }
356 }
357
358 fn build_read_request() -> ScanRequest {
359 let projection = vec![
360 METADATA_SCHEMA_KEY_COLUMN_INDEX,
361 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
362 ];
363 ScanRequest {
364 projection: Some(projection),
365 ..Default::default()
366 }
367 }
368
369 async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
370 let scan_req = MetadataRegion::build_read_request();
371 let record_batch_stream = self
372 .mito
373 .scan_to_stream(metadata_region_id, scan_req)
374 .await
375 .context(MitoReadOperationSnafu)?;
376
377 let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
378 .try_collect::<BTreeMap<_, _>>()
379 .await?;
380 let mut size = 0;
381 for (k, v) in kv.iter() {
382 size += k.len();
383 size += v.len();
384 }
385 let kv = Arc::new(kv);
386 Ok(RegionMetadataCacheEntry {
387 key_values: kv,
388 size,
389 })
390 }
391
392 async fn get_all_with_prefix(
393 &self,
394 metadata_region_id: RegionId,
395 prefix: &str,
396 ) -> Result<HashMap<String, String>> {
397 let region_metadata = self
398 .cache
399 .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
400 .await
401 .context(CacheGetSnafu)?;
402
403 let range = region_metadata.key_values.range(prefix.to_string()..);
404 let mut result = HashMap::new();
405 for (k, v) in range {
406 if !k.starts_with(prefix) {
407 break;
408 }
409 result.insert(k.clone(), v.clone());
410 }
411 Ok(result)
412 }
413
414 pub async fn get_all_key_with_prefix(
415 &self,
416 region_id: RegionId,
417 prefix: &str,
418 ) -> Result<Vec<String>> {
419 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
420 let record_batch_stream = self
421 .mito
422 .scan_to_stream(region_id, scan_req)
423 .await
424 .context(MitoReadOperationSnafu)?;
425
426 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
427 .try_collect::<Vec<_>>()
428 .await
429 }
430
431 async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
434 let delete_request = Self::build_delete_request(keys);
435 self.mito
436 .handle_request(
437 metadata_region_id,
438 store_api::region_request::RegionRequest::Delete(delete_request),
439 )
440 .await
441 .context(MitoWriteOperationSnafu)?;
442 self.cache.invalidate(&metadata_region_id).await;
444
445 Ok(())
446 }
447
448 pub(crate) fn build_put_request_from_iter(
449 kv: impl Iterator<Item = (String, String)>,
450 ) -> RegionPutRequest {
451 let cols = vec![
452 ColumnSchema {
453 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
454 datatype: ColumnDataType::TimestampMillisecond as _,
455 semantic_type: SemanticType::Timestamp as _,
456 ..Default::default()
457 },
458 ColumnSchema {
459 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
460 datatype: ColumnDataType::String as _,
461 semantic_type: SemanticType::Tag as _,
462 ..Default::default()
463 },
464 ColumnSchema {
465 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
466 datatype: ColumnDataType::String as _,
467 semantic_type: SemanticType::Field as _,
468 ..Default::default()
469 },
470 ];
471 let rows = Rows {
472 schema: cols,
473 rows: kv
474 .into_iter()
475 .map(|(key, value)| {
476 row(vec![
477 ValueData::TimestampMillisecondValue(0),
478 ValueData::StringValue(key),
479 ValueData::StringValue(value),
480 ])
481 })
482 .collect(),
483 };
484
485 RegionPutRequest { rows, hint: None }
486 }
487
488 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
489 let cols = vec![
490 ColumnSchema {
491 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
492 datatype: ColumnDataType::TimestampMillisecond as _,
493 semantic_type: SemanticType::Timestamp as _,
494 ..Default::default()
495 },
496 ColumnSchema {
497 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
498 datatype: ColumnDataType::String as _,
499 semantic_type: SemanticType::Tag as _,
500 ..Default::default()
501 },
502 ];
503 let rows = keys
504 .iter()
505 .map(|key| {
506 row(vec![
507 ValueData::TimestampMillisecondValue(0),
508 ValueData::StringValue(key.clone()),
509 ])
510 })
511 .collect();
512 let rows = Rows { schema: cols, rows };
513
514 RegionDeleteRequest { rows, hint: None }
515 }
516
517 pub async fn add_logical_regions(
519 &self,
520 physical_region_id: RegionId,
521 write_region_id: bool,
522 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
523 ) -> Result<()> {
524 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
525 let iter = logical_regions
526 .into_iter()
527 .flat_map(|(logical_region_id, column_metadatas)| {
528 if write_region_id {
529 Some((
530 MetadataRegion::concat_region_key(logical_region_id),
531 String::new(),
532 ))
533 } else {
534 None
535 }
536 .into_iter()
537 .chain(column_metadatas.into_iter().map(
538 move |(name, column_metadata)| {
539 (
540 MetadataRegion::concat_column_key(logical_region_id, name),
541 MetadataRegion::serialize_column_metadata(column_metadata),
542 )
543 },
544 ))
545 })
546 .collect::<Vec<_>>();
547
548 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
549 self.mito
550 .handle_request(
551 metadata_region_id,
552 store_api::region_request::RegionRequest::Put(put_request),
553 )
554 .await
555 .context(MitoWriteOperationSnafu)?;
556 self.cache.invalidate(&metadata_region_id).await;
558
559 Ok(())
560 }
561}
562
563#[cfg(test)]
564impl MetadataRegion {
565 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
568 use datatypes::arrow::array::{Array, AsArray};
569
570 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
571 .eq(datafusion::prelude::lit(key));
572
573 let scan_req = ScanRequest {
574 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
575 filters: vec![filter_expr],
576 ..Default::default()
577 };
578 let record_batch_stream = self
579 .mito
580 .scan_to_stream(region_id, scan_req)
581 .await
582 .context(MitoReadOperationSnafu)?;
583 let scan_result = common_recordbatch::util::collect(record_batch_stream)
584 .await
585 .context(CollectRecordBatchStreamSnafu)?;
586
587 let Some(first_batch) = scan_result.first() else {
588 return Ok(None);
589 };
590
591 let column = first_batch.column(0);
592 let column = column.as_string::<i32>();
593 let val = column.is_valid(0).then(|| column.value(0).to_string());
594
595 Ok(val)
596 }
597
598 pub async fn column_semantic_type(
600 &self,
601 physical_region_id: RegionId,
602 logical_region_id: RegionId,
603 column_name: &str,
604 ) -> Result<Option<SemanticType>> {
605 let region_id = utils::to_metadata_region_id(physical_region_id);
606 let column_key = Self::concat_column_key(logical_region_id, column_name);
607 let semantic_type = self.get(region_id, &column_key).await?;
608 semantic_type
609 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
610 .transpose()
611 }
612}
613
614#[cfg(test)]
615mod test {
616 use datatypes::data_type::ConcreteDataType;
617 use datatypes::schema::ColumnSchema;
618
619 use super::*;
620 use crate::test_util::TestEnv;
621 use crate::utils::to_metadata_region_id;
622
623 #[test]
624 fn test_concat_table_key() {
625 let region_id = RegionId::new(1234, 7844);
626 let expected = "__region_5299989651108".to_string();
627 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
628 }
629
630 #[test]
631 fn test_concat_column_key() {
632 let region_id = RegionId::new(8489, 9184);
633 let column_name = "my_column";
634 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
635 assert_eq!(
636 MetadataRegion::concat_column_key(region_id, column_name),
637 expected
638 );
639 }
640
641 #[test]
642 fn test_parse_table_key() {
643 let region_id = RegionId::new(87474, 10607);
644 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
645 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
646
647 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
648 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
649 }
650
651 #[test]
652 fn test_parse_valid_column_key() {
653 let region_id = RegionId::new(176, 910);
654 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
655 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
656
657 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
658 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
659 }
660
661 #[test]
662 fn test_parse_invalid_column_key() {
663 let key = "__column_asdfasd_????";
664 let result = MetadataRegion::parse_column_key(key);
665 assert!(result.is_err());
666 }
667
668 #[test]
669 fn test_serialize_column_metadata() {
670 let semantic_type = SemanticType::Tag;
671 let column_metadata = ColumnMetadata {
672 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
673 semantic_type,
674 column_id: 5,
675 };
676 let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
677 let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
678 assert_eq!(
679 MetadataRegion::serialize_column_metadata(&column_metadata),
680 new_fmt
681 );
682 assert_eq!(
684 MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
685 column_metadata
686 );
687 assert_eq!(
688 MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
689 column_metadata
690 );
691
692 let semantic_type = "\"Invalid Column Metadata\"";
693 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
694 }
695
696 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
697 HashMap::from([
698 (
699 "label1".to_string(),
700 ColumnMetadata {
701 column_schema: ColumnSchema::new(
702 "label1".to_string(),
703 ConcreteDataType::string_datatype(),
704 false,
705 ),
706 semantic_type: SemanticType::Tag,
707 column_id: 5,
708 },
709 ),
710 (
711 "label2".to_string(),
712 ColumnMetadata {
713 column_schema: ColumnSchema::new(
714 "label2".to_string(),
715 ConcreteDataType::string_datatype(),
716 false,
717 ),
718 semantic_type: SemanticType::Tag,
719 column_id: 5,
720 },
721 ),
722 ])
723 }
724
725 #[tokio::test]
726 async fn add_logical_regions_to_meta_region() {
727 let env = TestEnv::new().await;
728 env.init_metric_region().await;
729 let metadata_region = env.metadata_region();
730 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
731 let column_metadatas = test_column_metadatas();
732 let logical_region_id = RegionId::new(1024, 1);
733
734 let iter = vec![(
735 logical_region_id,
736 column_metadatas
737 .iter()
738 .map(|(k, v)| (k.as_str(), v))
739 .collect::<HashMap<_, _>>(),
740 )];
741 metadata_region
742 .add_logical_regions(physical_region_id, true, iter.into_iter())
743 .await
744 .unwrap();
745 let iter = vec![(
747 logical_region_id,
748 column_metadatas
749 .iter()
750 .map(|(k, v)| (k.as_str(), v))
751 .collect::<HashMap<_, _>>(),
752 )];
753 metadata_region
754 .add_logical_regions(physical_region_id, true, iter.into_iter())
755 .await
756 .unwrap();
757
758 let logical_regions = metadata_region
760 .logical_regions(physical_region_id)
761 .await
762 .unwrap();
763 assert_eq!(logical_regions.len(), 2);
764
765 let logical_columns = metadata_region
767 .logical_columns(physical_region_id, logical_region_id)
768 .await
769 .unwrap()
770 .into_iter()
771 .collect::<HashMap<_, _>>();
772 assert_eq!(logical_columns.len(), 2);
773 assert_eq!(column_metadatas, logical_columns);
774 }
775}