store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26    CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27    FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28    region_request, truncate_request,
29};
30use api::v1::{
31    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50    RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60/// The type of path to generate.
61#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64    /// A bare path - the original path of an engine.
65    ///
66    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
67    Bare,
68    /// A path for the data region of a metric engine table.
69    ///
70    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
71    Data,
72    /// A path for the metadata region of a metric engine table.
73    ///
74    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
75    Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80    Create(Vec<(RegionId, RegionCreateRequest)>),
81    Drop(Vec<(RegionId, RegionDropRequest)>),
82    Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
87    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88        match body {
89            region_request::Body::Creates(creates) => {
90                let requests = creates
91                    .requests
92                    .into_iter()
93                    .map(parse_region_create)
94                    .collect::<Result<Vec<_>>>()?;
95                Ok(Some(Self::Create(requests)))
96            }
97            region_request::Body::Drops(drops) => {
98                let requests = drops
99                    .requests
100                    .into_iter()
101                    .map(parse_region_drop)
102                    .collect::<Result<Vec<_>>>()?;
103                Ok(Some(Self::Drop(requests)))
104            }
105            region_request::Body::Alters(alters) => {
106                let requests = alters
107                    .requests
108                    .into_iter()
109                    .map(parse_region_alter)
110                    .collect::<Result<Vec<_>>>()?;
111                Ok(Some(Self::Alter(requests)))
112            }
113            _ => Ok(None),
114        }
115    }
116
117    pub fn request_type(&self) -> &'static str {
118        self.into()
119    }
120
121    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122        match self {
123            Self::Create(requests) => requests
124                .into_iter()
125                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126                .collect(),
127            Self::Drop(requests) => requests
128                .into_iter()
129                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130                .collect(),
131            Self::Alter(requests) => requests
132                .into_iter()
133                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134                .collect(),
135        }
136    }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141    Put(RegionPutRequest),
142    Delete(RegionDeleteRequest),
143    Create(RegionCreateRequest),
144    Drop(RegionDropRequest),
145    Open(RegionOpenRequest),
146    Close(RegionCloseRequest),
147    Alter(RegionAlterRequest),
148    Flush(RegionFlushRequest),
149    Compact(RegionCompactRequest),
150    BuildIndex(RegionBuildIndexRequest),
151    Truncate(RegionTruncateRequest),
152    Catchup(RegionCatchupRequest),
153    BulkInserts(RegionBulkInsertsRequest),
154    EnterStaging(EnterStagingRequest),
155    ApplyStagingManifest(ApplyStagingManifestRequest),
156}
157
158impl RegionRequest {
159    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
160    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
161    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
162        match body {
163            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
164            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
165            region_request::Body::Create(create) => make_region_create(create),
166            region_request::Body::Drop(drop) => make_region_drop(drop),
167            region_request::Body::Open(open) => make_region_open(open),
168            region_request::Body::Close(close) => make_region_close(close),
169            region_request::Body::Alter(alter) => make_region_alter(alter),
170            region_request::Body::Flush(flush) => make_region_flush(flush),
171            region_request::Body::Compact(compact) => make_region_compact(compact),
172            region_request::Body::BuildIndex(index) => make_region_build_index(index),
173            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
174            region_request::Body::Creates(creates) => make_region_creates(creates),
175            region_request::Body::Drops(drops) => make_region_drops(drops),
176            region_request::Body::Alters(alters) => make_region_alters(alters),
177            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
178            region_request::Body::Sync(_) => UnexpectedSnafu {
179                reason: "Sync request should be handled separately by RegionServer",
180            }
181            .fail(),
182            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
183                reason: "ListMetadata request should be handled separately by RegionServer",
184            }
185            .fail(),
186            region_request::Body::ApplyStagingManifest(apply) => {
187                make_region_apply_staging_manifest(apply)
188            }
189        }
190    }
191
192    /// Returns the type name of the request.
193    pub fn request_type(&self) -> &'static str {
194        self.into()
195    }
196}
197
198fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
199    let requests = inserts
200        .requests
201        .into_iter()
202        .filter_map(|r| {
203            let region_id = r.region_id.into();
204            r.rows.map(|rows| {
205                (
206                    region_id,
207                    RegionRequest::Put(RegionPutRequest {
208                        rows,
209                        hint: None,
210                        partition_expr_version: r.partition_expr_version.map(|v| v.value),
211                    }),
212                )
213            })
214        })
215        .collect();
216    Ok(requests)
217}
218
219fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
220    let requests = deletes
221        .requests
222        .into_iter()
223        .filter_map(|r| {
224            let region_id = r.region_id.into();
225            r.rows.map(|rows| {
226                (
227                    region_id,
228                    RegionRequest::Delete(RegionDeleteRequest {
229                        rows,
230                        hint: None,
231                        partition_expr_version: r.partition_expr_version.map(|v| v.value),
232                    }),
233                )
234            })
235        })
236        .collect();
237    Ok(requests)
238}
239
240fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
241    let column_metadatas = create
242        .column_defs
243        .into_iter()
244        .map(ColumnMetadata::try_from_column_def)
245        .collect::<Result<Vec<_>>>()?;
246    let region_id = RegionId::from(create.region_id);
247    let table_dir = table_dir(&create.path, region_id.table_id());
248    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
249    Ok((
250        region_id,
251        RegionCreateRequest {
252            engine: create.engine,
253            column_metadatas,
254            primary_key: create.primary_key,
255            options: create.options,
256            table_dir,
257            path_type: PathType::Bare,
258            partition_expr_json,
259        },
260    ))
261}
262
263fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
264    let (region_id, request) = parse_region_create(create)?;
265    Ok(vec![(region_id, RegionRequest::Create(request))])
266}
267
268fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
269    let mut requests = Vec::with_capacity(creates.requests.len());
270    for create in creates.requests {
271        requests.extend(make_region_create(create)?);
272    }
273    Ok(requests)
274}
275
276fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
277    let region_id = drop.region_id.into();
278    Ok((
279        region_id,
280        RegionDropRequest {
281            fast_path: drop.fast_path,
282            force: drop.force,
283            partial_drop: drop.partial_drop,
284        },
285    ))
286}
287
288fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
289    let (region_id, request) = parse_region_drop(drop)?;
290    Ok(vec![(region_id, RegionRequest::Drop(request))])
291}
292
293fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
294    let mut requests = Vec::with_capacity(drops.requests.len());
295    for drop in drops.requests {
296        requests.extend(make_region_drop(drop)?);
297    }
298    Ok(requests)
299}
300
301fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
302    let region_id = RegionId::from(open.region_id);
303    let table_dir = table_dir(&open.path, region_id.table_id());
304    Ok(vec![(
305        region_id,
306        RegionRequest::Open(RegionOpenRequest {
307            engine: open.engine,
308            table_dir,
309            path_type: PathType::Bare,
310            options: open.options,
311            skip_wal_replay: false,
312            checkpoint: None,
313        }),
314    )])
315}
316
317fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
318    let region_id = close.region_id.into();
319    Ok(vec![(
320        region_id,
321        RegionRequest::Close(RegionCloseRequest {}),
322    )])
323}
324
325fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
326    let region_id = alter.region_id.into();
327    let request = RegionAlterRequest::try_from(alter)?;
328    Ok((region_id, request))
329}
330
331fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
332    let (region_id, request) = parse_region_alter(alter)?;
333    Ok(vec![(region_id, RegionRequest::Alter(request))])
334}
335
336fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
337    let mut requests = Vec::with_capacity(alters.requests.len());
338    for alter in alters.requests {
339        requests.extend(make_region_alter(alter)?);
340    }
341    Ok(requests)
342}
343
344fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
345    let region_id = flush.region_id.into();
346    Ok(vec![(
347        region_id,
348        RegionRequest::Flush(RegionFlushRequest {
349            row_group_size: None,
350        }),
351    )])
352}
353
354fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
355    let region_id = compact.region_id.into();
356    let options = compact
357        .options
358        .unwrap_or(compact_request::Options::Regular(Default::default()));
359    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
360    let parallelism = if compact.parallelism == 0 {
361        None
362    } else {
363        Some(compact.parallelism)
364    };
365    Ok(vec![(
366        region_id,
367        RegionRequest::Compact(RegionCompactRequest {
368            options,
369            parallelism,
370        }),
371    )])
372}
373
374fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
375    let region_id = index.region_id.into();
376    Ok(vec![(
377        region_id,
378        RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
379    )])
380}
381
382fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
383    let region_id = truncate.region_id.into();
384    match truncate.kind {
385        None => InvalidRawRegionRequestSnafu {
386            err: "missing kind in TruncateRequest".to_string(),
387        }
388        .fail(),
389        Some(truncate_request::Kind::All(_)) => Ok(vec![(
390            region_id,
391            RegionRequest::Truncate(RegionTruncateRequest::All),
392        )]),
393        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
394            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
395
396            Ok(vec![(
397                region_id,
398                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
399            )])
400        }
401    }
402}
403
404/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
405fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
406    let region_id = request.region_id.into();
407    let partition_expr_version = request.partition_expr_version.map(|v| v.value);
408    let Some(Body::ArrowIpc(request)) = request.body else {
409        return Ok(vec![]);
410    };
411
412    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
413        .with_label_values(&["decode"])
414        .start_timer();
415    let mut decoder =
416        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
417    let payload = decoder
418        .try_decode_record_batch(&request.data_header, &request.payload)
419        .context(FlightCodecSnafu)?;
420    decoder_timer.observe_duration();
421    Ok(vec![(
422        region_id,
423        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
424            region_id,
425            payload,
426            raw_data: request,
427            partition_expr_version,
428        }),
429    )])
430}
431
432fn make_region_apply_staging_manifest(
433    api::v1::region::ApplyStagingManifestRequest {
434        region_id,
435        partition_expr,
436        central_region_id,
437        manifest_path,
438    }: api::v1::region::ApplyStagingManifestRequest,
439) -> Result<Vec<(RegionId, RegionRequest)>> {
440    let region_id = region_id.into();
441    Ok(vec![(
442        region_id,
443        RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
444            partition_expr,
445            central_region_id: central_region_id.into(),
446            manifest_path,
447        }),
448    )])
449}
450
451/// Request to put data into a region.
452#[derive(Debug)]
453pub struct RegionPutRequest {
454    /// Rows to put.
455    pub rows: Rows,
456    /// Write hint.
457    pub hint: Option<WriteHint>,
458    /// Partition expression version for the region.
459    pub partition_expr_version: Option<u64>,
460}
461
462#[derive(Debug)]
463pub struct RegionReadRequest {
464    pub request: ScanRequest,
465}
466
467/// Request to delete data from a region.
468#[derive(Debug)]
469pub struct RegionDeleteRequest {
470    /// Keys to rows to delete.
471    ///
472    /// Each row only contains primary key columns and a time index column.
473    pub rows: Rows,
474    /// Write hint.
475    pub hint: Option<WriteHint>,
476    /// Partition expression version for the region.
477    pub partition_expr_version: Option<u64>,
478}
479
480#[derive(Debug, Clone)]
481pub struct RegionCreateRequest {
482    /// Region engine name
483    pub engine: String,
484    /// Columns in this region.
485    pub column_metadatas: Vec<ColumnMetadata>,
486    /// Columns in the primary key.
487    pub primary_key: Vec<ColumnId>,
488    /// Options of the created region.
489    pub options: HashMap<String, String>,
490    /// Directory for table's data home. Usually is composed by catalog and table id
491    pub table_dir: String,
492    /// Path type for generating paths
493    pub path_type: PathType,
494    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
495    /// `Option` to keep compatibility with old clients.
496    pub partition_expr_json: Option<String>,
497}
498
499impl RegionCreateRequest {
500    /// Checks whether the request is valid, returns an error if it is invalid.
501    pub fn validate(&self) -> Result<()> {
502        // time index must exist
503        ensure!(
504            self.column_metadatas
505                .iter()
506                .any(|x| x.semantic_type == SemanticType::Timestamp),
507            InvalidRegionRequestSnafu {
508                region_id: RegionId::new(0, 0),
509                err: "missing timestamp column in create region request".to_string(),
510            }
511        );
512
513        // build column id to indices
514        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
515        for (i, c) in self.column_metadatas.iter().enumerate() {
516            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
517                return InvalidRegionRequestSnafu {
518                    region_id: RegionId::new(0, 0),
519                    err: format!(
520                        "duplicate column id {} (at position {} and {}) in create region request",
521                        c.column_id, previous, i
522                    ),
523                }
524                .fail();
525            }
526        }
527
528        // primary key must exist
529        for column_id in &self.primary_key {
530            ensure!(
531                column_id_to_indices.contains_key(column_id),
532                InvalidRegionRequestSnafu {
533                    region_id: RegionId::new(0, 0),
534                    err: format!(
535                        "missing primary key column {} in create region request",
536                        column_id
537                    ),
538                }
539            );
540        }
541
542        Ok(())
543    }
544
545    /// Returns true when the region belongs to the metric engine's physical table.
546    pub fn is_physical_table(&self) -> bool {
547        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
548    }
549}
550
551#[derive(Debug, Clone)]
552pub struct RegionDropRequest {
553    /// Enables fast-path drop optimizations for logical regions.
554    /// Only applicable to the Metric Engine; ignored by others.
555    pub fast_path: bool,
556
557    /// Forces the drop of a physical region and all its associated logical regions.
558    /// Only relevant for physical regions managed by the Metric Engine.
559    pub force: bool,
560
561    /// If true, indicates that only a portion of the region is being dropped, and files may still be referenced by other regions.
562    /// This is used to prevent deletion of files that are still in use by other regions.
563    pub partial_drop: bool,
564}
565
566/// Open region request.
567#[derive(Debug, Clone)]
568pub struct RegionOpenRequest {
569    /// Region engine name
570    pub engine: String,
571    /// Directory for table's data home. Usually is composed by catalog and table id
572    pub table_dir: String,
573    /// Path type for generating paths
574    pub path_type: PathType,
575    /// Options of the opened region.
576    pub options: HashMap<String, String>,
577    /// To skip replaying the WAL.
578    pub skip_wal_replay: bool,
579    /// Replay checkpoint.
580    pub checkpoint: Option<ReplayCheckpoint>,
581}
582
583#[derive(Debug, Clone, Copy, PartialEq, Eq)]
584pub struct ReplayCheckpoint {
585    pub entry_id: u64,
586    pub metadata_entry_id: Option<u64>,
587}
588
589impl RegionOpenRequest {
590    /// Returns true when the region belongs to the metric engine's physical table.
591    pub fn is_physical_table(&self) -> bool {
592        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
593    }
594}
595
596/// Close region request.
597#[derive(Debug)]
598pub struct RegionCloseRequest {}
599
600/// Alter metadata of a region.
601#[derive(Debug, PartialEq, Eq, Clone)]
602pub struct RegionAlterRequest {
603    /// Kind of alteration to do.
604    pub kind: AlterKind,
605}
606
607impl RegionAlterRequest {
608    /// Checks whether the request is valid, returns an error if it is invalid.
609    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
610        self.kind.validate(metadata)?;
611
612        Ok(())
613    }
614
615    /// Returns true if we need to apply the request to the region.
616    ///
617    /// The `request` should be valid.
618    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
619        debug_assert!(self.validate(metadata).is_ok());
620        self.kind.need_alter(metadata)
621    }
622}
623
624impl TryFrom<AlterRequest> for RegionAlterRequest {
625    type Error = MetadataError;
626
627    fn try_from(value: AlterRequest) -> Result<Self> {
628        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
629            err: "missing kind in AlterRequest",
630        })?;
631
632        let kind = AlterKind::try_from(kind)?;
633        Ok(RegionAlterRequest { kind })
634    }
635}
636
637/// Kind of the alteration.
638#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
639pub enum AlterKind {
640    /// Add columns to the region.
641    AddColumns {
642        /// Columns to add.
643        columns: Vec<AddColumn>,
644    },
645    /// Drop columns from the region, only fields are allowed to drop.
646    DropColumns {
647        /// Name of columns to drop.
648        names: Vec<String>,
649    },
650    /// Change columns datatype form the region, only fields are allowed to change.
651    ModifyColumnTypes {
652        /// Columns to change.
653        columns: Vec<ModifyColumnType>,
654    },
655    /// Set region options.
656    SetRegionOptions { options: Vec<SetRegionOption> },
657    /// Unset region options.
658    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
659    /// Set index options.
660    SetIndexes { options: Vec<SetIndexOption> },
661    /// Unset index options.
662    UnsetIndexes { options: Vec<UnsetIndexOption> },
663    /// Drop column default value.
664    DropDefaults {
665        /// Name of columns to drop.
666        names: Vec<String>,
667    },
668    /// Set column default value.
669    SetDefaults {
670        /// Columns to change.
671        columns: Vec<SetDefault>,
672    },
673    /// Sync column metadatas.
674    SyncColumns {
675        column_metadatas: Vec<ColumnMetadata>,
676    },
677}
678#[derive(Debug, PartialEq, Eq, Clone)]
679pub struct SetDefault {
680    pub name: String,
681    pub default_constraint: Vec<u8>,
682}
683
684#[derive(Debug, PartialEq, Eq, Clone)]
685pub enum SetIndexOption {
686    Fulltext {
687        column_name: String,
688        options: FulltextOptions,
689    },
690    Inverted {
691        column_name: String,
692    },
693    Skipping {
694        column_name: String,
695        options: SkippingIndexOptions,
696    },
697}
698
699impl SetIndexOption {
700    /// Returns the column name of the index option.
701    pub fn column_name(&self) -> &String {
702        match self {
703            SetIndexOption::Fulltext { column_name, .. } => column_name,
704            SetIndexOption::Inverted { column_name } => column_name,
705            SetIndexOption::Skipping { column_name, .. } => column_name,
706        }
707    }
708
709    /// Returns true if the index option is fulltext.
710    pub fn is_fulltext(&self) -> bool {
711        match self {
712            SetIndexOption::Fulltext { .. } => true,
713            SetIndexOption::Inverted { .. } => false,
714            SetIndexOption::Skipping { .. } => false,
715        }
716    }
717}
718
719impl TryFrom<v1::SetIndex> for SetIndexOption {
720    type Error = MetadataError;
721
722    fn try_from(value: v1::SetIndex) -> Result<Self> {
723        let option = value.options.context(InvalidRawRegionRequestSnafu {
724            err: "missing options in SetIndex",
725        })?;
726
727        let opt = match option {
728            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
729                column_name: x.column_name.clone(),
730                options: FulltextOptions::new(
731                    x.enable,
732                    as_fulltext_option_analyzer(
733                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
734                    ),
735                    x.case_sensitive,
736                    as_fulltext_option_backend(
737                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
738                    ),
739                    x.granularity as u32,
740                    x.false_positive_rate,
741                )
742                .context(InvalidIndexOptionSnafu)?,
743            },
744            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
745                column_name: i.column_name,
746            },
747            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
748                column_name: s.column_name,
749                options: SkippingIndexOptions::new(
750                    s.granularity as u32,
751                    s.false_positive_rate,
752                    as_skipping_index_type(
753                        PbSkippingIndexType::try_from(s.skipping_index_type)
754                            .context(DecodeProtoSnafu)?,
755                    ),
756                )
757                .context(InvalidIndexOptionSnafu)?,
758            },
759        };
760
761        Ok(opt)
762    }
763}
764
765#[derive(Debug, PartialEq, Eq, Clone)]
766pub enum UnsetIndexOption {
767    Fulltext { column_name: String },
768    Inverted { column_name: String },
769    Skipping { column_name: String },
770}
771
772impl UnsetIndexOption {
773    pub fn column_name(&self) -> &String {
774        match self {
775            UnsetIndexOption::Fulltext { column_name } => column_name,
776            UnsetIndexOption::Inverted { column_name } => column_name,
777            UnsetIndexOption::Skipping { column_name } => column_name,
778        }
779    }
780
781    pub fn is_fulltext(&self) -> bool {
782        match self {
783            UnsetIndexOption::Fulltext { .. } => true,
784            UnsetIndexOption::Inverted { .. } => false,
785            UnsetIndexOption::Skipping { .. } => false,
786        }
787    }
788}
789
790impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
791    type Error = MetadataError;
792
793    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
794        let option = value.options.context(InvalidRawRegionRequestSnafu {
795            err: "missing options in UnsetIndex",
796        })?;
797
798        let opt = match option {
799            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
800                column_name: f.column_name,
801            },
802            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
803                column_name: i.column_name,
804            },
805            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
806                column_name: s.column_name,
807            },
808        };
809
810        Ok(opt)
811    }
812}
813
814impl AlterKind {
815    /// Returns an error if the alter kind is invalid.
816    ///
817    /// It allows adding column if not exists and dropping column if exists.
818    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
819        match self {
820            AlterKind::AddColumns { columns } => {
821                for col_to_add in columns {
822                    col_to_add.validate(metadata)?;
823                }
824            }
825            AlterKind::DropColumns { names } => {
826                for name in names {
827                    Self::validate_column_to_drop(name, metadata)?;
828                }
829            }
830            AlterKind::ModifyColumnTypes { columns } => {
831                for col_to_change in columns {
832                    col_to_change.validate(metadata)?;
833                }
834            }
835            AlterKind::SetRegionOptions { .. } => {}
836            AlterKind::UnsetRegionOptions { .. } => {}
837            AlterKind::SetIndexes { options } => {
838                for option in options {
839                    Self::validate_column_alter_index_option(
840                        option.column_name(),
841                        metadata,
842                        option.is_fulltext(),
843                    )?;
844                }
845            }
846            AlterKind::UnsetIndexes { options } => {
847                for option in options {
848                    Self::validate_column_alter_index_option(
849                        option.column_name(),
850                        metadata,
851                        option.is_fulltext(),
852                    )?;
853                }
854            }
855            AlterKind::DropDefaults { names } => {
856                names
857                    .iter()
858                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
859            }
860            AlterKind::SetDefaults { columns } => {
861                columns
862                    .iter()
863                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
864            }
865            AlterKind::SyncColumns { column_metadatas } => {
866                let new_primary_keys = column_metadatas
867                    .iter()
868                    .filter(|c| c.semantic_type == SemanticType::Tag)
869                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
870                    .collect::<HashMap<_, _>>();
871
872                let old_primary_keys = metadata
873                    .column_metadatas
874                    .iter()
875                    .filter(|c| c.semantic_type == SemanticType::Tag)
876                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
877
878                for (name, id) in old_primary_keys {
879                    let primary_key =
880                        new_primary_keys
881                            .get(name)
882                            .with_context(|| InvalidRegionRequestSnafu {
883                                region_id: metadata.region_id,
884                                err: format!("column {} is not a primary key", name),
885                            })?;
886
887                    ensure!(
888                        *primary_key == id,
889                        InvalidRegionRequestSnafu {
890                            region_id: metadata.region_id,
891                            err: format!(
892                                "column with same name {} has different id, existing: {}, got: {}",
893                                name, id, primary_key
894                            ),
895                        }
896                    );
897                }
898
899                let new_ts_column = column_metadatas
900                    .iter()
901                    .find(|c| c.semantic_type == SemanticType::Timestamp)
902                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
903                    .context(InvalidRegionRequestSnafu {
904                        region_id: metadata.region_id,
905                        err: "timestamp column not found",
906                    })?;
907
908                // Safety: timestamp column must exist.
909                let old_ts_column = metadata
910                    .column_metadatas
911                    .iter()
912                    .find(|c| c.semantic_type == SemanticType::Timestamp)
913                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
914                    .unwrap();
915
916                ensure!(
917                    new_ts_column == old_ts_column,
918                    InvalidRegionRequestSnafu {
919                        region_id: metadata.region_id,
920                        err: format!(
921                            "timestamp column {} has different id, existing: {}, got: {}",
922                            old_ts_column.0, old_ts_column.1, new_ts_column.1
923                        ),
924                    }
925                );
926            }
927        }
928        Ok(())
929    }
930
931    /// Returns true if we need to apply the alteration to the region.
932    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
933        debug_assert!(self.validate(metadata).is_ok());
934        match self {
935            AlterKind::AddColumns { columns } => columns
936                .iter()
937                .any(|col_to_add| col_to_add.need_alter(metadata)),
938            AlterKind::DropColumns { names } => names
939                .iter()
940                .any(|name| metadata.column_by_name(name).is_some()),
941            AlterKind::ModifyColumnTypes { columns } => columns
942                .iter()
943                .any(|col_to_change| col_to_change.need_alter(metadata)),
944            AlterKind::SetRegionOptions { .. } => true,
945            AlterKind::UnsetRegionOptions { .. } => true,
946            AlterKind::SetIndexes { options, .. } => options
947                .iter()
948                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
949            AlterKind::UnsetIndexes { options } => options
950                .iter()
951                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
952            AlterKind::DropDefaults { names } => names
953                .iter()
954                .any(|name| metadata.column_by_name(name).is_some()),
955
956            AlterKind::SetDefaults { columns } => columns
957                .iter()
958                .any(|x| metadata.column_by_name(&x.name).is_some()),
959            AlterKind::SyncColumns { column_metadatas } => {
960                metadata.column_metadatas != *column_metadatas
961            }
962        }
963    }
964
965    /// Returns an error if the column to drop is invalid.
966    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
967        let Some(column) = metadata.column_by_name(name) else {
968            return Ok(());
969        };
970        ensure!(
971            column.semantic_type == SemanticType::Field,
972            InvalidRegionRequestSnafu {
973                region_id: metadata.region_id,
974                err: format!("column {} is not a field and could not be dropped", name),
975            }
976        );
977        Ok(())
978    }
979
980    /// Returns an error if the column's alter index option is invalid.
981    fn validate_column_alter_index_option(
982        column_name: &String,
983        metadata: &RegionMetadata,
984        is_fulltext: bool,
985    ) -> Result<()> {
986        let column = metadata
987            .column_by_name(column_name)
988            .context(InvalidRegionRequestSnafu {
989                region_id: metadata.region_id,
990                err: format!("column {} not found", column_name),
991            })?;
992
993        if is_fulltext {
994            ensure!(
995                column.column_schema.data_type.is_string(),
996                InvalidRegionRequestSnafu {
997                    region_id: metadata.region_id,
998                    err: format!(
999                        "cannot change alter index options for non-string column {}",
1000                        column_name
1001                    ),
1002                }
1003            );
1004        }
1005
1006        Ok(())
1007    }
1008
1009    /// Returns an error if the column isn't exist.
1010    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
1011        metadata
1012            .column_by_name(column_name)
1013            .context(InvalidRegionRequestSnafu {
1014                region_id: metadata.region_id,
1015                err: format!("column {} not found", column_name),
1016            })?;
1017
1018        Ok(())
1019    }
1020}
1021
1022impl TryFrom<alter_request::Kind> for AlterKind {
1023    type Error = MetadataError;
1024
1025    fn try_from(kind: alter_request::Kind) -> Result<Self> {
1026        let alter_kind = match kind {
1027            alter_request::Kind::AddColumns(x) => {
1028                let columns = x
1029                    .add_columns
1030                    .into_iter()
1031                    .map(|x| x.try_into())
1032                    .collect::<Result<Vec<_>>>()?;
1033                AlterKind::AddColumns { columns }
1034            }
1035            alter_request::Kind::ModifyColumnTypes(x) => {
1036                let columns = x
1037                    .modify_column_types
1038                    .into_iter()
1039                    .map(|x| x.into())
1040                    .collect::<Vec<_>>();
1041                AlterKind::ModifyColumnTypes { columns }
1042            }
1043            alter_request::Kind::DropColumns(x) => {
1044                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1045                AlterKind::DropColumns { names }
1046            }
1047            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1048                options: options
1049                    .table_options
1050                    .iter()
1051                    .map(TryFrom::try_from)
1052                    .collect::<Result<Vec<_>>>()?,
1053            },
1054            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1055                keys: options
1056                    .keys
1057                    .iter()
1058                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1059                    .collect::<Result<Vec<_>>>()?,
1060            },
1061            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1062                options: vec![SetIndexOption::try_from(o)?],
1063            },
1064            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1065                options: vec![UnsetIndexOption::try_from(o)?],
1066            },
1067            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1068                options: o
1069                    .set_indexes
1070                    .into_iter()
1071                    .map(SetIndexOption::try_from)
1072                    .collect::<Result<Vec<_>>>()?,
1073            },
1074            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1075                options: o
1076                    .unset_indexes
1077                    .into_iter()
1078                    .map(UnsetIndexOption::try_from)
1079                    .collect::<Result<Vec<_>>>()?,
1080            },
1081            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1082                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1083            },
1084            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1085                columns: x
1086                    .set_defaults
1087                    .into_iter()
1088                    .map(|x| {
1089                        Ok(SetDefault {
1090                            name: x.column_name,
1091                            default_constraint: x.default_constraint.clone(),
1092                        })
1093                    })
1094                    .collect::<Result<Vec<_>>>()?,
1095            },
1096            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1097                column_metadatas: x
1098                    .column_defs
1099                    .into_iter()
1100                    .map(ColumnMetadata::try_from_column_def)
1101                    .collect::<Result<Vec<_>>>()?,
1102            },
1103        };
1104
1105        Ok(alter_kind)
1106    }
1107}
1108
1109/// Adds a column.
1110#[derive(Debug, PartialEq, Eq, Clone)]
1111pub struct AddColumn {
1112    /// Metadata of the column to add.
1113    pub column_metadata: ColumnMetadata,
1114    /// Location to add the column. If location is None, the region adds
1115    /// the column to the last.
1116    pub location: Option<AddColumnLocation>,
1117}
1118
1119impl AddColumn {
1120    /// Returns an error if the column to add is invalid.
1121    ///
1122    /// It allows adding existing columns. However, the existing column must have the same metadata
1123    /// and the location must be None.
1124    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1125        ensure!(
1126            self.column_metadata.column_schema.is_nullable()
1127                || self
1128                    .column_metadata
1129                    .column_schema
1130                    .default_constraint()
1131                    .is_some(),
1132            InvalidRegionRequestSnafu {
1133                region_id: metadata.region_id,
1134                err: format!(
1135                    "no default value for column {}",
1136                    self.column_metadata.column_schema.name
1137                ),
1138            }
1139        );
1140
1141        if let Some(existing_column) =
1142            metadata.column_by_name(&self.column_metadata.column_schema.name)
1143        {
1144            // If the column already exists.
1145            ensure!(
1146                *existing_column == self.column_metadata,
1147                InvalidRegionRequestSnafu {
1148                    region_id: metadata.region_id,
1149                    err: format!(
1150                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1151                        self.column_metadata.column_schema.name,
1152                        existing_column,
1153                        self.column_metadata,
1154                    ),
1155                }
1156            );
1157            ensure!(
1158                self.location.is_none(),
1159                InvalidRegionRequestSnafu {
1160                    region_id: metadata.region_id,
1161                    err: format!(
1162                        "column {} already exists, but location is specified",
1163                        self.column_metadata.column_schema.name
1164                    ),
1165                }
1166            );
1167        }
1168
1169        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1170            // Ensures the existing column has the same name.
1171            ensure!(
1172                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1173                InvalidRegionRequestSnafu {
1174                    region_id: metadata.region_id,
1175                    err: format!(
1176                        "column id {} already exists with different name {}",
1177                        self.column_metadata.column_id, existing_column.column_schema.name
1178                    ),
1179                }
1180            );
1181        }
1182
1183        Ok(())
1184    }
1185
1186    /// Returns true if no column to add to the region.
1187    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1188        debug_assert!(self.validate(metadata).is_ok());
1189        metadata
1190            .column_by_name(&self.column_metadata.column_schema.name)
1191            .is_none()
1192    }
1193}
1194
1195impl TryFrom<v1::region::AddColumn> for AddColumn {
1196    type Error = MetadataError;
1197
1198    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1199        let column_def = add_column
1200            .column_def
1201            .context(InvalidRawRegionRequestSnafu {
1202                err: "missing column_def in AddColumn",
1203            })?;
1204
1205        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1206        let location = add_column
1207            .location
1208            .map(AddColumnLocation::try_from)
1209            .transpose()?;
1210
1211        Ok(AddColumn {
1212            column_metadata,
1213            location,
1214        })
1215    }
1216}
1217
1218/// Location to add a column.
1219#[derive(Debug, PartialEq, Eq, Clone)]
1220pub enum AddColumnLocation {
1221    /// Add the column to the first position of columns.
1222    First,
1223    /// Add the column after specific column.
1224    After {
1225        /// Add the column after this column.
1226        column_name: String,
1227    },
1228}
1229
1230impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1231    type Error = MetadataError;
1232
1233    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1234        let location_type = LocationType::try_from(location.location_type)
1235            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1236        let add_column_location = match location_type {
1237            LocationType::First => AddColumnLocation::First,
1238            LocationType::After => AddColumnLocation::After {
1239                column_name: location.after_column_name,
1240            },
1241        };
1242
1243        Ok(add_column_location)
1244    }
1245}
1246
1247/// Change a column's datatype.
1248#[derive(Debug, PartialEq, Eq, Clone)]
1249pub struct ModifyColumnType {
1250    /// Schema of the column to modify.
1251    pub column_name: String,
1252    /// Column will be changed to this type.
1253    pub target_type: ConcreteDataType,
1254}
1255
1256impl ModifyColumnType {
1257    /// Returns an error if the column's datatype to change is invalid.
1258    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1259        let column_meta = metadata
1260            .column_by_name(&self.column_name)
1261            .with_context(|| InvalidRegionRequestSnafu {
1262                region_id: metadata.region_id,
1263                err: format!("column {} not found", self.column_name),
1264            })?;
1265
1266        ensure!(
1267            matches!(column_meta.semantic_type, SemanticType::Field),
1268            InvalidRegionRequestSnafu {
1269                region_id: metadata.region_id,
1270                err: "'timestamp' or 'tag' column cannot change type".to_string()
1271            }
1272        );
1273        ensure!(
1274            column_meta
1275                .column_schema
1276                .data_type
1277                .can_arrow_type_cast_to(&self.target_type),
1278            InvalidRegionRequestSnafu {
1279                region_id: metadata.region_id,
1280                err: format!(
1281                    "column '{}' cannot be cast automatically to type '{}'",
1282                    self.column_name, self.target_type
1283                ),
1284            }
1285        );
1286
1287        Ok(())
1288    }
1289
1290    /// Returns true if no column's datatype to change to the region.
1291    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1292        debug_assert!(self.validate(metadata).is_ok());
1293        metadata.column_by_name(&self.column_name).is_some()
1294    }
1295}
1296
1297impl From<v1::ModifyColumnType> for ModifyColumnType {
1298    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1299        let target_type = ColumnDataTypeWrapper::new(
1300            modify_column_type.target_type(),
1301            modify_column_type.target_type_extension,
1302        )
1303        .into();
1304
1305        ModifyColumnType {
1306            column_name: modify_column_type.column_name,
1307            target_type,
1308        }
1309    }
1310}
1311
1312#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1313pub enum SetRegionOption {
1314    Ttl(Option<TimeToLive>),
1315    // Modifying TwscOptions with values as (option name, new value).
1316    Twsc(String, String),
1317    // Modifying the SST format.
1318    Format(String),
1319}
1320
1321impl TryFrom<&PbOption> for SetRegionOption {
1322    type Error = MetadataError;
1323
1324    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1325        let PbOption { key, value } = value;
1326        match key.as_str() {
1327            TTL_KEY => {
1328                let ttl = TimeToLive::from_humantime_or_str(value)
1329                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1330
1331                Ok(Self::Ttl(Some(ttl)))
1332            }
1333            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1334                Ok(Self::Twsc(key.clone(), value.clone()))
1335            }
1336            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1337            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1338        }
1339    }
1340}
1341
1342impl From<&UnsetRegionOption> for SetRegionOption {
1343    fn from(unset_option: &UnsetRegionOption) -> Self {
1344        match unset_option {
1345            UnsetRegionOption::TwcsTriggerFileNum => {
1346                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1347            }
1348            UnsetRegionOption::TwcsMaxOutputFileSize => {
1349                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1350            }
1351            UnsetRegionOption::TwcsTimeWindow => {
1352                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1353            }
1354            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1355        }
1356    }
1357}
1358
1359impl TryFrom<&str> for UnsetRegionOption {
1360    type Error = MetadataError;
1361
1362    fn try_from(key: &str) -> Result<Self> {
1363        match key.to_ascii_lowercase().as_str() {
1364            TTL_KEY => Ok(Self::Ttl),
1365            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1366            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1367            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1368            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1369        }
1370    }
1371}
1372
1373#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1374pub enum UnsetRegionOption {
1375    TwcsTriggerFileNum,
1376    TwcsMaxOutputFileSize,
1377    TwcsTimeWindow,
1378    Ttl,
1379}
1380
1381impl UnsetRegionOption {
1382    pub fn as_str(&self) -> &str {
1383        match self {
1384            Self::Ttl => TTL_KEY,
1385            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1386            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1387            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1388        }
1389    }
1390}
1391
1392impl Display for UnsetRegionOption {
1393    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1394        write!(f, "{}", self.as_str())
1395    }
1396}
1397
1398#[derive(Debug, Clone, Default)]
1399pub struct RegionFlushRequest {
1400    pub row_group_size: Option<usize>,
1401}
1402
1403#[derive(Debug)]
1404pub struct RegionCompactRequest {
1405    pub options: compact_request::Options,
1406    pub parallelism: Option<u32>,
1407}
1408
1409impl Default for RegionCompactRequest {
1410    fn default() -> Self {
1411        Self {
1412            // Default to regular compaction.
1413            options: compact_request::Options::Regular(Default::default()),
1414            parallelism: None,
1415        }
1416    }
1417}
1418
1419#[derive(Debug, Clone, Default)]
1420pub struct RegionBuildIndexRequest {}
1421
1422/// Truncate region request.
1423#[derive(Debug)]
1424pub enum RegionTruncateRequest {
1425    /// Truncate all data in the region.
1426    All,
1427    ByTimeRanges {
1428        /// Time ranges to truncate. Both bound are inclusive.
1429        /// only files that are fully contained in the time range will be truncated.
1430        /// so no guarantee that all data in the time range will be truncated.
1431        time_ranges: Vec<(Timestamp, Timestamp)>,
1432    },
1433}
1434
1435/// Catchup region request.
1436///
1437/// Makes a readonly region to catch up to leader region changes.
1438/// There is no effect if it operating on a leader region.
1439#[derive(Debug, Clone, Copy, Default)]
1440pub struct RegionCatchupRequest {
1441    /// Sets it to writable if it's available after it has caught up with all changes.
1442    pub set_writable: bool,
1443    /// The `entry_id` that was expected to reply to.
1444    /// `None` stands replaying to latest.
1445    pub entry_id: Option<entry::Id>,
1446    /// Used for metrics metadata region.
1447    /// The `entry_id` that was expected to reply to.
1448    /// `None` stands replaying to latest.
1449    pub metadata_entry_id: Option<entry::Id>,
1450    /// The hint for replaying memtable.
1451    pub location_id: Option<u64>,
1452    /// Replay checkpoint.
1453    pub checkpoint: Option<ReplayCheckpoint>,
1454}
1455
1456#[derive(Debug, Clone)]
1457pub struct RegionBulkInsertsRequest {
1458    pub region_id: RegionId,
1459    pub payload: DfRecordBatch,
1460    pub raw_data: ArrowIpc,
1461    pub partition_expr_version: Option<u64>,
1462}
1463
1464impl RegionBulkInsertsRequest {
1465    pub fn estimated_size(&self) -> usize {
1466        self.payload.get_array_memory_size()
1467    }
1468}
1469
1470/// Request to stage a region with a new region rule(partition expression).
1471///
1472/// This request transitions a region into the staging mode.
1473/// It first flushes the memtable for the old region rule if it is not empty,
1474/// then enters the staging mode with the new region rule.
1475#[derive(Debug, Clone)]
1476pub struct EnterStagingRequest {
1477    /// The partition expression of the staging region.
1478    pub partition_expr: String,
1479}
1480
1481/// This request is used as part of the region repartition.
1482///
1483/// After a region has entered staging mode with a new region rule (partition
1484/// expression) and a separate process (for example, `remap_manifests`) has
1485/// generated the new file assignments for the staging region, this request
1486/// applies that generated manifest to the region.
1487///
1488/// In practice, this means:
1489/// - The `partition_expr` identifies the staging region rule that the manifest
1490///   was generated for.
1491/// - `central_region_id` specifies which region holds the staging blob storage
1492///   where the manifest was written during the `remap_manifests` operation.
1493/// - `manifest_path` is the relative path within the central region's staging
1494///   blob storage to fetch the generated manifest.
1495///
1496/// It should typically be called **after** the staging region has been
1497/// initialized by [`EnterStagingRequest`] and the new file layout has been
1498/// computed, to finalize the repartition operation.
1499#[derive(Debug, Clone)]
1500pub struct ApplyStagingManifestRequest {
1501    /// The partition expression of the staging region.
1502    pub partition_expr: String,
1503    /// The region that stores the staging manifests in its staging blob storage.
1504    pub central_region_id: RegionId,
1505    /// The relative path to the staging manifest within the central region's
1506    /// staging blob storage.
1507    pub manifest_path: String,
1508}
1509
1510impl fmt::Display for RegionRequest {
1511    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1512        match self {
1513            RegionRequest::Put(_) => write!(f, "Put"),
1514            RegionRequest::Delete(_) => write!(f, "Delete"),
1515            RegionRequest::Create(_) => write!(f, "Create"),
1516            RegionRequest::Drop(_) => write!(f, "Drop"),
1517            RegionRequest::Open(_) => write!(f, "Open"),
1518            RegionRequest::Close(_) => write!(f, "Close"),
1519            RegionRequest::Alter(_) => write!(f, "Alter"),
1520            RegionRequest::Flush(_) => write!(f, "Flush"),
1521            RegionRequest::Compact(_) => write!(f, "Compact"),
1522            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1523            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1524            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1525            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1526            RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1527            RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1528        }
1529    }
1530}
1531
1532#[cfg(test)]
1533mod tests {
1534
1535    use api::v1::region::RegionColumnDef;
1536    use api::v1::{ColumnDataType, ColumnDef};
1537    use datatypes::prelude::ConcreteDataType;
1538    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1539
1540    use super::*;
1541    use crate::metadata::RegionMetadataBuilder;
1542
1543    #[test]
1544    fn test_from_proto_location() {
1545        let proto_location = v1::AddColumnLocation {
1546            location_type: LocationType::First as i32,
1547            after_column_name: String::default(),
1548        };
1549        let location = AddColumnLocation::try_from(proto_location).unwrap();
1550        assert_eq!(location, AddColumnLocation::First);
1551
1552        let proto_location = v1::AddColumnLocation {
1553            location_type: 10,
1554            after_column_name: String::default(),
1555        };
1556        AddColumnLocation::try_from(proto_location).unwrap_err();
1557
1558        let proto_location = v1::AddColumnLocation {
1559            location_type: LocationType::After as i32,
1560            after_column_name: "a".to_string(),
1561        };
1562        let location = AddColumnLocation::try_from(proto_location).unwrap();
1563        assert_eq!(
1564            location,
1565            AddColumnLocation::After {
1566                column_name: "a".to_string()
1567            }
1568        );
1569    }
1570
1571    #[test]
1572    fn test_from_none_proto_add_column() {
1573        AddColumn::try_from(v1::region::AddColumn {
1574            column_def: None,
1575            location: None,
1576        })
1577        .unwrap_err();
1578    }
1579
1580    #[test]
1581    fn test_from_proto_alter_request() {
1582        RegionAlterRequest::try_from(AlterRequest {
1583            region_id: 0,
1584            schema_version: 1,
1585            kind: None,
1586        })
1587        .unwrap_err();
1588
1589        let request = RegionAlterRequest::try_from(AlterRequest {
1590            region_id: 0,
1591            schema_version: 1,
1592            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1593                add_columns: vec![v1::region::AddColumn {
1594                    column_def: Some(RegionColumnDef {
1595                        column_def: Some(ColumnDef {
1596                            name: "a".to_string(),
1597                            data_type: ColumnDataType::String as i32,
1598                            is_nullable: true,
1599                            default_constraint: vec![],
1600                            semantic_type: SemanticType::Field as i32,
1601                            comment: String::new(),
1602                            ..Default::default()
1603                        }),
1604                        column_id: 1,
1605                    }),
1606                    location: Some(v1::AddColumnLocation {
1607                        location_type: LocationType::First as i32,
1608                        after_column_name: String::default(),
1609                    }),
1610                }],
1611            })),
1612        })
1613        .unwrap();
1614
1615        assert_eq!(
1616            request,
1617            RegionAlterRequest {
1618                kind: AlterKind::AddColumns {
1619                    columns: vec![AddColumn {
1620                        column_metadata: ColumnMetadata {
1621                            column_schema: ColumnSchema::new(
1622                                "a",
1623                                ConcreteDataType::string_datatype(),
1624                                true,
1625                            ),
1626                            semantic_type: SemanticType::Field,
1627                            column_id: 1,
1628                        },
1629                        location: Some(AddColumnLocation::First),
1630                    }]
1631                },
1632            }
1633        );
1634    }
1635
1636    /// Returns a new region metadata for testing. Metadata:
1637    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1638    fn new_metadata() -> RegionMetadata {
1639        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1640        builder
1641            .push_column_metadata(ColumnMetadata {
1642                column_schema: ColumnSchema::new(
1643                    "ts",
1644                    ConcreteDataType::timestamp_millisecond_datatype(),
1645                    false,
1646                ),
1647                semantic_type: SemanticType::Timestamp,
1648                column_id: 1,
1649            })
1650            .push_column_metadata(ColumnMetadata {
1651                column_schema: ColumnSchema::new(
1652                    "tag_0",
1653                    ConcreteDataType::string_datatype(),
1654                    true,
1655                ),
1656                semantic_type: SemanticType::Tag,
1657                column_id: 2,
1658            })
1659            .push_column_metadata(ColumnMetadata {
1660                column_schema: ColumnSchema::new(
1661                    "field_0",
1662                    ConcreteDataType::string_datatype(),
1663                    true,
1664                ),
1665                semantic_type: SemanticType::Field,
1666                column_id: 3,
1667            })
1668            .push_column_metadata(ColumnMetadata {
1669                column_schema: ColumnSchema::new(
1670                    "field_1",
1671                    ConcreteDataType::boolean_datatype(),
1672                    true,
1673                ),
1674                semantic_type: SemanticType::Field,
1675                column_id: 4,
1676            })
1677            .primary_key(vec![2]);
1678        builder.build().unwrap()
1679    }
1680
1681    #[test]
1682    fn test_add_column_validate() {
1683        let metadata = new_metadata();
1684        let add_column = AddColumn {
1685            column_metadata: ColumnMetadata {
1686                column_schema: ColumnSchema::new(
1687                    "tag_1",
1688                    ConcreteDataType::string_datatype(),
1689                    true,
1690                ),
1691                semantic_type: SemanticType::Tag,
1692                column_id: 5,
1693            },
1694            location: None,
1695        };
1696        add_column.validate(&metadata).unwrap();
1697        assert!(add_column.need_alter(&metadata));
1698
1699        // Add not null column.
1700        AddColumn {
1701            column_metadata: ColumnMetadata {
1702                column_schema: ColumnSchema::new(
1703                    "tag_1",
1704                    ConcreteDataType::string_datatype(),
1705                    false,
1706                ),
1707                semantic_type: SemanticType::Tag,
1708                column_id: 5,
1709            },
1710            location: None,
1711        }
1712        .validate(&metadata)
1713        .unwrap_err();
1714
1715        // Add existing column.
1716        let add_column = AddColumn {
1717            column_metadata: ColumnMetadata {
1718                column_schema: ColumnSchema::new(
1719                    "tag_0",
1720                    ConcreteDataType::string_datatype(),
1721                    true,
1722                ),
1723                semantic_type: SemanticType::Tag,
1724                column_id: 2,
1725            },
1726            location: None,
1727        };
1728        add_column.validate(&metadata).unwrap();
1729        assert!(!add_column.need_alter(&metadata));
1730    }
1731
1732    #[test]
1733    fn test_add_duplicate_columns() {
1734        let kind = AlterKind::AddColumns {
1735            columns: vec![
1736                AddColumn {
1737                    column_metadata: ColumnMetadata {
1738                        column_schema: ColumnSchema::new(
1739                            "tag_1",
1740                            ConcreteDataType::string_datatype(),
1741                            true,
1742                        ),
1743                        semantic_type: SemanticType::Tag,
1744                        column_id: 5,
1745                    },
1746                    location: None,
1747                },
1748                AddColumn {
1749                    column_metadata: ColumnMetadata {
1750                        column_schema: ColumnSchema::new(
1751                            "tag_1",
1752                            ConcreteDataType::string_datatype(),
1753                            true,
1754                        ),
1755                        semantic_type: SemanticType::Field,
1756                        column_id: 6,
1757                    },
1758                    location: None,
1759                },
1760            ],
1761        };
1762        let metadata = new_metadata();
1763        kind.validate(&metadata).unwrap();
1764        assert!(kind.need_alter(&metadata));
1765    }
1766
1767    #[test]
1768    fn test_add_existing_column_different_metadata() {
1769        let metadata = new_metadata();
1770
1771        // Add existing column with different id.
1772        let kind = AlterKind::AddColumns {
1773            columns: vec![AddColumn {
1774                column_metadata: ColumnMetadata {
1775                    column_schema: ColumnSchema::new(
1776                        "tag_0",
1777                        ConcreteDataType::string_datatype(),
1778                        true,
1779                    ),
1780                    semantic_type: SemanticType::Tag,
1781                    column_id: 4,
1782                },
1783                location: None,
1784            }],
1785        };
1786        kind.validate(&metadata).unwrap_err();
1787
1788        // Add existing column with different type.
1789        let kind = AlterKind::AddColumns {
1790            columns: vec![AddColumn {
1791                column_metadata: ColumnMetadata {
1792                    column_schema: ColumnSchema::new(
1793                        "tag_0",
1794                        ConcreteDataType::int64_datatype(),
1795                        true,
1796                    ),
1797                    semantic_type: SemanticType::Tag,
1798                    column_id: 2,
1799                },
1800                location: None,
1801            }],
1802        };
1803        kind.validate(&metadata).unwrap_err();
1804
1805        // Add existing column with different name.
1806        let kind = AlterKind::AddColumns {
1807            columns: vec![AddColumn {
1808                column_metadata: ColumnMetadata {
1809                    column_schema: ColumnSchema::new(
1810                        "tag_1",
1811                        ConcreteDataType::string_datatype(),
1812                        true,
1813                    ),
1814                    semantic_type: SemanticType::Tag,
1815                    column_id: 2,
1816                },
1817                location: None,
1818            }],
1819        };
1820        kind.validate(&metadata).unwrap_err();
1821    }
1822
1823    #[test]
1824    fn test_add_existing_column_with_location() {
1825        let metadata = new_metadata();
1826        let kind = AlterKind::AddColumns {
1827            columns: vec![AddColumn {
1828                column_metadata: ColumnMetadata {
1829                    column_schema: ColumnSchema::new(
1830                        "tag_0",
1831                        ConcreteDataType::string_datatype(),
1832                        true,
1833                    ),
1834                    semantic_type: SemanticType::Tag,
1835                    column_id: 2,
1836                },
1837                location: Some(AddColumnLocation::First),
1838            }],
1839        };
1840        kind.validate(&metadata).unwrap_err();
1841    }
1842
1843    #[test]
1844    fn test_validate_drop_column() {
1845        let metadata = new_metadata();
1846        let kind = AlterKind::DropColumns {
1847            names: vec!["xxxx".to_string()],
1848        };
1849        kind.validate(&metadata).unwrap();
1850        assert!(!kind.need_alter(&metadata));
1851
1852        AlterKind::DropColumns {
1853            names: vec!["tag_0".to_string()],
1854        }
1855        .validate(&metadata)
1856        .unwrap_err();
1857
1858        let kind = AlterKind::DropColumns {
1859            names: vec!["field_0".to_string()],
1860        };
1861        kind.validate(&metadata).unwrap();
1862        assert!(kind.need_alter(&metadata));
1863    }
1864
1865    #[test]
1866    fn test_validate_modify_column_type() {
1867        let metadata = new_metadata();
1868        AlterKind::ModifyColumnTypes {
1869            columns: vec![ModifyColumnType {
1870                column_name: "xxxx".to_string(),
1871                target_type: ConcreteDataType::string_datatype(),
1872            }],
1873        }
1874        .validate(&metadata)
1875        .unwrap_err();
1876
1877        AlterKind::ModifyColumnTypes {
1878            columns: vec![ModifyColumnType {
1879                column_name: "field_1".to_string(),
1880                target_type: ConcreteDataType::date_datatype(),
1881            }],
1882        }
1883        .validate(&metadata)
1884        .unwrap_err();
1885
1886        AlterKind::ModifyColumnTypes {
1887            columns: vec![ModifyColumnType {
1888                column_name: "ts".to_string(),
1889                target_type: ConcreteDataType::date_datatype(),
1890            }],
1891        }
1892        .validate(&metadata)
1893        .unwrap_err();
1894
1895        AlterKind::ModifyColumnTypes {
1896            columns: vec![ModifyColumnType {
1897                column_name: "tag_0".to_string(),
1898                target_type: ConcreteDataType::date_datatype(),
1899            }],
1900        }
1901        .validate(&metadata)
1902        .unwrap_err();
1903
1904        let kind = AlterKind::ModifyColumnTypes {
1905            columns: vec![ModifyColumnType {
1906                column_name: "field_0".to_string(),
1907                target_type: ConcreteDataType::int32_datatype(),
1908            }],
1909        };
1910        kind.validate(&metadata).unwrap();
1911        assert!(kind.need_alter(&metadata));
1912    }
1913
1914    #[test]
1915    fn test_validate_add_columns() {
1916        let kind = AlterKind::AddColumns {
1917            columns: vec![
1918                AddColumn {
1919                    column_metadata: ColumnMetadata {
1920                        column_schema: ColumnSchema::new(
1921                            "tag_1",
1922                            ConcreteDataType::string_datatype(),
1923                            true,
1924                        ),
1925                        semantic_type: SemanticType::Tag,
1926                        column_id: 5,
1927                    },
1928                    location: None,
1929                },
1930                AddColumn {
1931                    column_metadata: ColumnMetadata {
1932                        column_schema: ColumnSchema::new(
1933                            "field_2",
1934                            ConcreteDataType::string_datatype(),
1935                            true,
1936                        ),
1937                        semantic_type: SemanticType::Field,
1938                        column_id: 6,
1939                    },
1940                    location: None,
1941                },
1942            ],
1943        };
1944        let request = RegionAlterRequest { kind };
1945        let mut metadata = new_metadata();
1946        metadata.schema_version = 1;
1947        request.validate(&metadata).unwrap();
1948    }
1949
1950    #[test]
1951    fn test_validate_create_region() {
1952        let column_metadatas = vec![
1953            ColumnMetadata {
1954                column_schema: ColumnSchema::new(
1955                    "ts",
1956                    ConcreteDataType::timestamp_millisecond_datatype(),
1957                    false,
1958                ),
1959                semantic_type: SemanticType::Timestamp,
1960                column_id: 1,
1961            },
1962            ColumnMetadata {
1963                column_schema: ColumnSchema::new(
1964                    "tag_0",
1965                    ConcreteDataType::string_datatype(),
1966                    true,
1967                ),
1968                semantic_type: SemanticType::Tag,
1969                column_id: 2,
1970            },
1971            ColumnMetadata {
1972                column_schema: ColumnSchema::new(
1973                    "field_0",
1974                    ConcreteDataType::string_datatype(),
1975                    true,
1976                ),
1977                semantic_type: SemanticType::Field,
1978                column_id: 3,
1979            },
1980        ];
1981        let create = RegionCreateRequest {
1982            engine: "mito".to_string(),
1983            column_metadatas,
1984            primary_key: vec![3, 4],
1985            options: HashMap::new(),
1986            table_dir: "path".to_string(),
1987            path_type: PathType::Bare,
1988            partition_expr_json: Some("".to_string()),
1989        };
1990
1991        assert!(create.validate().is_err());
1992    }
1993
1994    #[test]
1995    fn test_validate_modify_column_fulltext_options() {
1996        let kind = AlterKind::SetIndexes {
1997            options: vec![SetIndexOption::Fulltext {
1998                column_name: "tag_0".to_string(),
1999                options: FulltextOptions::new_unchecked(
2000                    true,
2001                    FulltextAnalyzer::Chinese,
2002                    false,
2003                    FulltextBackend::Bloom,
2004                    1000,
2005                    0.01,
2006                ),
2007            }],
2008        };
2009        let request = RegionAlterRequest { kind };
2010        let mut metadata = new_metadata();
2011        metadata.schema_version = 1;
2012        request.validate(&metadata).unwrap();
2013
2014        let kind = AlterKind::UnsetIndexes {
2015            options: vec![UnsetIndexOption::Fulltext {
2016                column_name: "tag_0".to_string(),
2017            }],
2018        };
2019        let request = RegionAlterRequest { kind };
2020        let mut metadata = new_metadata();
2021        metadata.schema_version = 1;
2022        request.validate(&metadata).unwrap();
2023    }
2024
2025    #[test]
2026    fn test_validate_sync_columns() {
2027        let metadata = new_metadata();
2028        let kind = AlterKind::SyncColumns {
2029            column_metadatas: vec![
2030                ColumnMetadata {
2031                    column_schema: ColumnSchema::new(
2032                        "tag_1",
2033                        ConcreteDataType::string_datatype(),
2034                        true,
2035                    ),
2036                    semantic_type: SemanticType::Tag,
2037                    column_id: 5,
2038                },
2039                ColumnMetadata {
2040                    column_schema: ColumnSchema::new(
2041                        "field_2",
2042                        ConcreteDataType::string_datatype(),
2043                        true,
2044                    ),
2045                    semantic_type: SemanticType::Field,
2046                    column_id: 6,
2047                },
2048            ],
2049        };
2050        let err = kind.validate(&metadata).unwrap_err();
2051        assert!(err.to_string().contains("not a primary key"));
2052
2053        // Change the timestamp column name.
2054        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2055        let ts_column = column_metadatas_with_different_ts_column
2056            .iter_mut()
2057            .find(|c| c.semantic_type == SemanticType::Timestamp)
2058            .unwrap();
2059        ts_column.column_schema.name = "ts1".to_string();
2060
2061        let kind = AlterKind::SyncColumns {
2062            column_metadatas: column_metadatas_with_different_ts_column,
2063        };
2064        let err = kind.validate(&metadata).unwrap_err();
2065        assert!(
2066            err.to_string()
2067                .contains("timestamp column ts has different id")
2068        );
2069
2070        // Change the primary key column name.
2071        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2072        let pk_column = column_metadatas_with_different_pk_column
2073            .iter_mut()
2074            .find(|c| c.column_schema.name == "tag_0")
2075            .unwrap();
2076        pk_column.column_id = 100;
2077        let kind = AlterKind::SyncColumns {
2078            column_metadatas: column_metadatas_with_different_pk_column,
2079        };
2080        let err = kind.validate(&metadata).unwrap_err();
2081        assert!(
2082            err.to_string()
2083                .contains("column with same name tag_0 has different id")
2084        );
2085
2086        // Add a new field column.
2087        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2088        column_metadatas_with_new_field_column.push(ColumnMetadata {
2089            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2090            semantic_type: SemanticType::Field,
2091            column_id: 4,
2092        });
2093        let kind = AlterKind::SyncColumns {
2094            column_metadatas: column_metadatas_with_new_field_column,
2095        };
2096        kind.validate(&metadata).unwrap();
2097    }
2098
2099    #[test]
2100    fn test_cast_path_type_to_primitive() {
2101        assert_eq!(PathType::Bare as u8, 0);
2102        assert_eq!(PathType::Data as u8, 1);
2103        assert_eq!(PathType::Metadata as u8, 2);
2104        assert_eq!(
2105            PathType::try_from(PathType::Bare as u8).unwrap(),
2106            PathType::Bare
2107        );
2108        assert_eq!(
2109            PathType::try_from(PathType::Data as u8).unwrap(),
2110            PathType::Data
2111        );
2112        assert_eq!(
2113            PathType::try_from(PathType::Metadata as u8).unwrap(),
2114            PathType::Metadata
2115        );
2116    }
2117}