store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest,
26    CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests,
27    OpenRequest, TruncateRequest, alter_request, compact_request, region_request, truncate_request,
28};
29use api::v1::{
30    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::{TimeToLive, Timestamp};
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use num_enum::TryFromPrimitive;
40use serde::{Deserialize, Serialize};
41use snafu::{OptionExt, ResultExt, ensure};
42use strum::{AsRefStr, IntoStaticStr};
43
44use crate::logstore::entry;
45use crate::metadata::{
46    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
47    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
48    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
49    RegionMetadata, Result, UnexpectedSnafu,
50};
51use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
52use crate::metrics;
53use crate::mito_engine_options::{
54    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
55};
56use crate::path_utils::table_dir;
57use crate::storage::{ColumnId, RegionId, ScanRequest};
58
59/// The type of path to generate.
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
61#[repr(u8)]
62pub enum PathType {
63    /// A bare path - the original path of an engine.
64    ///
65    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
66    Bare,
67    /// A path for the data region of a metric engine table.
68    ///
69    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
70    Data,
71    /// A path for the metadata region of a metric engine table.
72    ///
73    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
74    Metadata,
75}
76
77#[derive(Debug, IntoStaticStr)]
78pub enum BatchRegionDdlRequest {
79    Create(Vec<(RegionId, RegionCreateRequest)>),
80    Drop(Vec<(RegionId, RegionDropRequest)>),
81    Alter(Vec<(RegionId, RegionAlterRequest)>),
82}
83
84impl BatchRegionDdlRequest {
85    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
86    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
87        match body {
88            region_request::Body::Creates(creates) => {
89                let requests = creates
90                    .requests
91                    .into_iter()
92                    .map(parse_region_create)
93                    .collect::<Result<Vec<_>>>()?;
94                Ok(Some(Self::Create(requests)))
95            }
96            region_request::Body::Drops(drops) => {
97                let requests = drops
98                    .requests
99                    .into_iter()
100                    .map(parse_region_drop)
101                    .collect::<Result<Vec<_>>>()?;
102                Ok(Some(Self::Drop(requests)))
103            }
104            region_request::Body::Alters(alters) => {
105                let requests = alters
106                    .requests
107                    .into_iter()
108                    .map(parse_region_alter)
109                    .collect::<Result<Vec<_>>>()?;
110                Ok(Some(Self::Alter(requests)))
111            }
112            _ => Ok(None),
113        }
114    }
115
116    pub fn request_type(&self) -> &'static str {
117        self.into()
118    }
119
120    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
121        match self {
122            Self::Create(requests) => requests
123                .into_iter()
124                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
125                .collect(),
126            Self::Drop(requests) => requests
127                .into_iter()
128                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
129                .collect(),
130            Self::Alter(requests) => requests
131                .into_iter()
132                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
133                .collect(),
134        }
135    }
136}
137
138#[derive(Debug, IntoStaticStr)]
139pub enum RegionRequest {
140    Put(RegionPutRequest),
141    Delete(RegionDeleteRequest),
142    Create(RegionCreateRequest),
143    Drop(RegionDropRequest),
144    Open(RegionOpenRequest),
145    Close(RegionCloseRequest),
146    Alter(RegionAlterRequest),
147    Flush(RegionFlushRequest),
148    Compact(RegionCompactRequest),
149    Truncate(RegionTruncateRequest),
150    Catchup(RegionCatchupRequest),
151    BulkInserts(RegionBulkInsertsRequest),
152}
153
154impl RegionRequest {
155    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
156    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
157    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
158        match body {
159            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
160            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
161            region_request::Body::Create(create) => make_region_create(create),
162            region_request::Body::Drop(drop) => make_region_drop(drop),
163            region_request::Body::Open(open) => make_region_open(open),
164            region_request::Body::Close(close) => make_region_close(close),
165            region_request::Body::Alter(alter) => make_region_alter(alter),
166            region_request::Body::Flush(flush) => make_region_flush(flush),
167            region_request::Body::Compact(compact) => make_region_compact(compact),
168            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
169            region_request::Body::Creates(creates) => make_region_creates(creates),
170            region_request::Body::Drops(drops) => make_region_drops(drops),
171            region_request::Body::Alters(alters) => make_region_alters(alters),
172            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
173            region_request::Body::Sync(_) => UnexpectedSnafu {
174                reason: "Sync request should be handled separately by RegionServer",
175            }
176            .fail(),
177            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
178                reason: "ListMetadata request should be handled separately by RegionServer",
179            }
180            .fail(),
181        }
182    }
183
184    /// Returns the type name of the request.
185    pub fn request_type(&self) -> &'static str {
186        self.into()
187    }
188}
189
190fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
191    let requests = inserts
192        .requests
193        .into_iter()
194        .filter_map(|r| {
195            let region_id = r.region_id.into();
196            r.rows.map(|rows| {
197                (
198                    region_id,
199                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
200                )
201            })
202        })
203        .collect();
204    Ok(requests)
205}
206
207fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
208    let requests = deletes
209        .requests
210        .into_iter()
211        .filter_map(|r| {
212            let region_id = r.region_id.into();
213            r.rows.map(|rows| {
214                (
215                    region_id,
216                    RegionRequest::Delete(RegionDeleteRequest { rows }),
217                )
218            })
219        })
220        .collect();
221    Ok(requests)
222}
223
224fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
225    let column_metadatas = create
226        .column_defs
227        .into_iter()
228        .map(ColumnMetadata::try_from_column_def)
229        .collect::<Result<Vec<_>>>()?;
230    let region_id = RegionId::from(create.region_id);
231    let table_dir = table_dir(&create.path, region_id.table_id());
232    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
233    Ok((
234        region_id,
235        RegionCreateRequest {
236            engine: create.engine,
237            column_metadatas,
238            primary_key: create.primary_key,
239            options: create.options,
240            table_dir,
241            path_type: PathType::Bare,
242            partition_expr_json,
243        },
244    ))
245}
246
247fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
248    let (region_id, request) = parse_region_create(create)?;
249    Ok(vec![(region_id, RegionRequest::Create(request))])
250}
251
252fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
253    let mut requests = Vec::with_capacity(creates.requests.len());
254    for create in creates.requests {
255        requests.extend(make_region_create(create)?);
256    }
257    Ok(requests)
258}
259
260fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
261    let region_id = drop.region_id.into();
262    Ok((
263        region_id,
264        RegionDropRequest {
265            fast_path: drop.fast_path,
266        },
267    ))
268}
269
270fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271    let (region_id, request) = parse_region_drop(drop)?;
272    Ok(vec![(region_id, RegionRequest::Drop(request))])
273}
274
275fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
276    let mut requests = Vec::with_capacity(drops.requests.len());
277    for drop in drops.requests {
278        requests.extend(make_region_drop(drop)?);
279    }
280    Ok(requests)
281}
282
283fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
284    let region_id = RegionId::from(open.region_id);
285    let table_dir = table_dir(&open.path, region_id.table_id());
286    Ok(vec![(
287        region_id,
288        RegionRequest::Open(RegionOpenRequest {
289            engine: open.engine,
290            table_dir,
291            path_type: PathType::Bare,
292            options: open.options,
293            skip_wal_replay: false,
294            checkpoint: None,
295        }),
296    )])
297}
298
299fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
300    let region_id = close.region_id.into();
301    Ok(vec![(
302        region_id,
303        RegionRequest::Close(RegionCloseRequest {}),
304    )])
305}
306
307fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
308    let region_id = alter.region_id.into();
309    let request = RegionAlterRequest::try_from(alter)?;
310    Ok((region_id, request))
311}
312
313fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
314    let (region_id, request) = parse_region_alter(alter)?;
315    Ok(vec![(region_id, RegionRequest::Alter(request))])
316}
317
318fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
319    let mut requests = Vec::with_capacity(alters.requests.len());
320    for alter in alters.requests {
321        requests.extend(make_region_alter(alter)?);
322    }
323    Ok(requests)
324}
325
326fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
327    let region_id = flush.region_id.into();
328    Ok(vec![(
329        region_id,
330        RegionRequest::Flush(RegionFlushRequest {
331            row_group_size: None,
332        }),
333    )])
334}
335
336fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
337    let region_id = compact.region_id.into();
338    let options = compact
339        .options
340        .unwrap_or(compact_request::Options::Regular(Default::default()));
341    Ok(vec![(
342        region_id,
343        RegionRequest::Compact(RegionCompactRequest { options }),
344    )])
345}
346
347fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
348    let region_id = truncate.region_id.into();
349    match truncate.kind {
350        None => InvalidRawRegionRequestSnafu {
351            err: "missing kind in TruncateRequest".to_string(),
352        }
353        .fail(),
354        Some(truncate_request::Kind::All(_)) => Ok(vec![(
355            region_id,
356            RegionRequest::Truncate(RegionTruncateRequest::All),
357        )]),
358        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
359            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
360
361            Ok(vec![(
362                region_id,
363                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
364            )])
365        }
366    }
367}
368
369/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
370fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
371    let region_id = request.region_id.into();
372    let Some(Body::ArrowIpc(request)) = request.body else {
373        return Ok(vec![]);
374    };
375
376    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
377        .with_label_values(&["decode"])
378        .start_timer();
379    let mut decoder =
380        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
381    let payload = decoder
382        .try_decode_record_batch(&request.data_header, &request.payload)
383        .context(FlightCodecSnafu)?;
384    decoder_timer.observe_duration();
385    Ok(vec![(
386        region_id,
387        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
388            region_id,
389            payload,
390            raw_data: request,
391        }),
392    )])
393}
394
395/// Request to put data into a region.
396#[derive(Debug)]
397pub struct RegionPutRequest {
398    /// Rows to put.
399    pub rows: Rows,
400    /// Write hint.
401    pub hint: Option<WriteHint>,
402}
403
404#[derive(Debug)]
405pub struct RegionReadRequest {
406    pub request: ScanRequest,
407}
408
409/// Request to delete data from a region.
410#[derive(Debug)]
411pub struct RegionDeleteRequest {
412    /// Keys to rows to delete.
413    ///
414    /// Each row only contains primary key columns and a time index column.
415    pub rows: Rows,
416}
417
418#[derive(Debug, Clone)]
419pub struct RegionCreateRequest {
420    /// Region engine name
421    pub engine: String,
422    /// Columns in this region.
423    pub column_metadatas: Vec<ColumnMetadata>,
424    /// Columns in the primary key.
425    pub primary_key: Vec<ColumnId>,
426    /// Options of the created region.
427    pub options: HashMap<String, String>,
428    /// Directory for table's data home. Usually is composed by catalog and table id
429    pub table_dir: String,
430    /// Path type for generating paths
431    pub path_type: PathType,
432    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
433    /// `Option` to keep compatibility with old clients.
434    pub partition_expr_json: Option<String>,
435}
436
437impl RegionCreateRequest {
438    /// Checks whether the request is valid, returns an error if it is invalid.
439    pub fn validate(&self) -> Result<()> {
440        // time index must exist
441        ensure!(
442            self.column_metadatas
443                .iter()
444                .any(|x| x.semantic_type == SemanticType::Timestamp),
445            InvalidRegionRequestSnafu {
446                region_id: RegionId::new(0, 0),
447                err: "missing timestamp column in create region request".to_string(),
448            }
449        );
450
451        // build column id to indices
452        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
453        for (i, c) in self.column_metadatas.iter().enumerate() {
454            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
455                return InvalidRegionRequestSnafu {
456                    region_id: RegionId::new(0, 0),
457                    err: format!(
458                        "duplicate column id {} (at position {} and {}) in create region request",
459                        c.column_id, previous, i
460                    ),
461                }
462                .fail();
463            }
464        }
465
466        // primary key must exist
467        for column_id in &self.primary_key {
468            ensure!(
469                column_id_to_indices.contains_key(column_id),
470                InvalidRegionRequestSnafu {
471                    region_id: RegionId::new(0, 0),
472                    err: format!(
473                        "missing primary key column {} in create region request",
474                        column_id
475                    ),
476                }
477            );
478        }
479
480        Ok(())
481    }
482
483    /// Returns true when the region belongs to the metric engine's physical table.
484    pub fn is_physical_table(&self) -> bool {
485        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
486    }
487}
488
489#[derive(Debug, Clone)]
490pub struct RegionDropRequest {
491    pub fast_path: bool,
492}
493
494/// Open region request.
495#[derive(Debug, Clone)]
496pub struct RegionOpenRequest {
497    /// Region engine name
498    pub engine: String,
499    /// Directory for table's data home. Usually is composed by catalog and table id
500    pub table_dir: String,
501    /// Path type for generating paths
502    pub path_type: PathType,
503    /// Options of the opened region.
504    pub options: HashMap<String, String>,
505    /// To skip replaying the WAL.
506    pub skip_wal_replay: bool,
507    /// Replay checkpoint.
508    pub checkpoint: Option<ReplayCheckpoint>,
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
512pub struct ReplayCheckpoint {
513    pub entry_id: u64,
514    pub metadata_entry_id: Option<u64>,
515}
516
517impl RegionOpenRequest {
518    /// Returns true when the region belongs to the metric engine's physical table.
519    pub fn is_physical_table(&self) -> bool {
520        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
521    }
522}
523
524/// Close region request.
525#[derive(Debug)]
526pub struct RegionCloseRequest {}
527
528/// Alter metadata of a region.
529#[derive(Debug, PartialEq, Eq, Clone)]
530pub struct RegionAlterRequest {
531    /// Kind of alteration to do.
532    pub kind: AlterKind,
533}
534
535impl RegionAlterRequest {
536    /// Checks whether the request is valid, returns an error if it is invalid.
537    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
538        self.kind.validate(metadata)?;
539
540        Ok(())
541    }
542
543    /// Returns true if we need to apply the request to the region.
544    ///
545    /// The `request` should be valid.
546    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
547        debug_assert!(self.validate(metadata).is_ok());
548        self.kind.need_alter(metadata)
549    }
550}
551
552impl TryFrom<AlterRequest> for RegionAlterRequest {
553    type Error = MetadataError;
554
555    fn try_from(value: AlterRequest) -> Result<Self> {
556        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
557            err: "missing kind in AlterRequest",
558        })?;
559
560        let kind = AlterKind::try_from(kind)?;
561        Ok(RegionAlterRequest { kind })
562    }
563}
564
565/// Kind of the alteration.
566#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
567pub enum AlterKind {
568    /// Add columns to the region.
569    AddColumns {
570        /// Columns to add.
571        columns: Vec<AddColumn>,
572    },
573    /// Drop columns from the region, only fields are allowed to drop.
574    DropColumns {
575        /// Name of columns to drop.
576        names: Vec<String>,
577    },
578    /// Change columns datatype form the region, only fields are allowed to change.
579    ModifyColumnTypes {
580        /// Columns to change.
581        columns: Vec<ModifyColumnType>,
582    },
583    /// Set region options.
584    SetRegionOptions { options: Vec<SetRegionOption> },
585    /// Unset region options.
586    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
587    /// Set index options.
588    SetIndexes { options: Vec<SetIndexOption> },
589    /// Unset index options.
590    UnsetIndexes { options: Vec<UnsetIndexOption> },
591    /// Drop column default value.
592    DropDefaults {
593        /// Name of columns to drop.
594        names: Vec<String>,
595    },
596    /// Set column default value.
597    SetDefaults {
598        /// Columns to change.
599        columns: Vec<SetDefault>,
600    },
601    /// Sync column metadatas.
602    SyncColumns {
603        column_metadatas: Vec<ColumnMetadata>,
604    },
605}
606#[derive(Debug, PartialEq, Eq, Clone)]
607pub struct SetDefault {
608    pub name: String,
609    pub default_constraint: Vec<u8>,
610}
611
612#[derive(Debug, PartialEq, Eq, Clone)]
613pub enum SetIndexOption {
614    Fulltext {
615        column_name: String,
616        options: FulltextOptions,
617    },
618    Inverted {
619        column_name: String,
620    },
621    Skipping {
622        column_name: String,
623        options: SkippingIndexOptions,
624    },
625}
626
627impl SetIndexOption {
628    /// Returns the column name of the index option.
629    pub fn column_name(&self) -> &String {
630        match self {
631            SetIndexOption::Fulltext { column_name, .. } => column_name,
632            SetIndexOption::Inverted { column_name } => column_name,
633            SetIndexOption::Skipping { column_name, .. } => column_name,
634        }
635    }
636
637    /// Returns true if the index option is fulltext.
638    pub fn is_fulltext(&self) -> bool {
639        match self {
640            SetIndexOption::Fulltext { .. } => true,
641            SetIndexOption::Inverted { .. } => false,
642            SetIndexOption::Skipping { .. } => false,
643        }
644    }
645}
646
647impl TryFrom<v1::SetIndex> for SetIndexOption {
648    type Error = MetadataError;
649
650    fn try_from(value: v1::SetIndex) -> Result<Self> {
651        let option = value.options.context(InvalidRawRegionRequestSnafu {
652            err: "missing options in SetIndex",
653        })?;
654
655        let opt = match option {
656            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
657                column_name: x.column_name.clone(),
658                options: FulltextOptions::new(
659                    x.enable,
660                    as_fulltext_option_analyzer(
661                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
662                    ),
663                    x.case_sensitive,
664                    as_fulltext_option_backend(
665                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
666                    ),
667                    x.granularity as u32,
668                    x.false_positive_rate,
669                )
670                .context(InvalidIndexOptionSnafu)?,
671            },
672            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
673                column_name: i.column_name,
674            },
675            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
676                column_name: s.column_name,
677                options: SkippingIndexOptions::new(
678                    s.granularity as u32,
679                    s.false_positive_rate,
680                    as_skipping_index_type(
681                        PbSkippingIndexType::try_from(s.skipping_index_type)
682                            .context(DecodeProtoSnafu)?,
683                    ),
684                )
685                .context(InvalidIndexOptionSnafu)?,
686            },
687        };
688
689        Ok(opt)
690    }
691}
692
693#[derive(Debug, PartialEq, Eq, Clone)]
694pub enum UnsetIndexOption {
695    Fulltext { column_name: String },
696    Inverted { column_name: String },
697    Skipping { column_name: String },
698}
699
700impl UnsetIndexOption {
701    pub fn column_name(&self) -> &String {
702        match self {
703            UnsetIndexOption::Fulltext { column_name } => column_name,
704            UnsetIndexOption::Inverted { column_name } => column_name,
705            UnsetIndexOption::Skipping { column_name } => column_name,
706        }
707    }
708
709    pub fn is_fulltext(&self) -> bool {
710        match self {
711            UnsetIndexOption::Fulltext { .. } => true,
712            UnsetIndexOption::Inverted { .. } => false,
713            UnsetIndexOption::Skipping { .. } => false,
714        }
715    }
716}
717
718impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
719    type Error = MetadataError;
720
721    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
722        let option = value.options.context(InvalidRawRegionRequestSnafu {
723            err: "missing options in UnsetIndex",
724        })?;
725
726        let opt = match option {
727            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
728                column_name: f.column_name,
729            },
730            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
731                column_name: i.column_name,
732            },
733            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
734                column_name: s.column_name,
735            },
736        };
737
738        Ok(opt)
739    }
740}
741
742impl AlterKind {
743    /// Returns an error if the alter kind is invalid.
744    ///
745    /// It allows adding column if not exists and dropping column if exists.
746    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
747        match self {
748            AlterKind::AddColumns { columns } => {
749                for col_to_add in columns {
750                    col_to_add.validate(metadata)?;
751                }
752            }
753            AlterKind::DropColumns { names } => {
754                for name in names {
755                    Self::validate_column_to_drop(name, metadata)?;
756                }
757            }
758            AlterKind::ModifyColumnTypes { columns } => {
759                for col_to_change in columns {
760                    col_to_change.validate(metadata)?;
761                }
762            }
763            AlterKind::SetRegionOptions { .. } => {}
764            AlterKind::UnsetRegionOptions { .. } => {}
765            AlterKind::SetIndexes { options } => {
766                for option in options {
767                    Self::validate_column_alter_index_option(
768                        option.column_name(),
769                        metadata,
770                        option.is_fulltext(),
771                    )?;
772                }
773            }
774            AlterKind::UnsetIndexes { options } => {
775                for option in options {
776                    Self::validate_column_alter_index_option(
777                        option.column_name(),
778                        metadata,
779                        option.is_fulltext(),
780                    )?;
781                }
782            }
783            AlterKind::DropDefaults { names } => {
784                names
785                    .iter()
786                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
787            }
788            AlterKind::SetDefaults { columns } => {
789                columns
790                    .iter()
791                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
792            }
793            AlterKind::SyncColumns { column_metadatas } => {
794                let new_primary_keys = column_metadatas
795                    .iter()
796                    .filter(|c| c.semantic_type == SemanticType::Tag)
797                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
798                    .collect::<HashMap<_, _>>();
799
800                let old_primary_keys = metadata
801                    .column_metadatas
802                    .iter()
803                    .filter(|c| c.semantic_type == SemanticType::Tag)
804                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
805
806                for (name, id) in old_primary_keys {
807                    let primary_key =
808                        new_primary_keys
809                            .get(name)
810                            .with_context(|| InvalidRegionRequestSnafu {
811                                region_id: metadata.region_id,
812                                err: format!("column {} is not a primary key", name),
813                            })?;
814
815                    ensure!(
816                        *primary_key == id,
817                        InvalidRegionRequestSnafu {
818                            region_id: metadata.region_id,
819                            err: format!(
820                                "column with same name {} has different id, existing: {}, got: {}",
821                                name, id, primary_key
822                            ),
823                        }
824                    );
825                }
826
827                let new_ts_column = column_metadatas
828                    .iter()
829                    .find(|c| c.semantic_type == SemanticType::Timestamp)
830                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
831                    .context(InvalidRegionRequestSnafu {
832                        region_id: metadata.region_id,
833                        err: "timestamp column not found",
834                    })?;
835
836                // Safety: timestamp column must exist.
837                let old_ts_column = metadata
838                    .column_metadatas
839                    .iter()
840                    .find(|c| c.semantic_type == SemanticType::Timestamp)
841                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
842                    .unwrap();
843
844                ensure!(
845                    new_ts_column == old_ts_column,
846                    InvalidRegionRequestSnafu {
847                        region_id: metadata.region_id,
848                        err: format!(
849                            "timestamp column {} has different id, existing: {}, got: {}",
850                            old_ts_column.0, old_ts_column.1, new_ts_column.1
851                        ),
852                    }
853                );
854            }
855        }
856        Ok(())
857    }
858
859    /// Returns true if we need to apply the alteration to the region.
860    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
861        debug_assert!(self.validate(metadata).is_ok());
862        match self {
863            AlterKind::AddColumns { columns } => columns
864                .iter()
865                .any(|col_to_add| col_to_add.need_alter(metadata)),
866            AlterKind::DropColumns { names } => names
867                .iter()
868                .any(|name| metadata.column_by_name(name).is_some()),
869            AlterKind::ModifyColumnTypes { columns } => columns
870                .iter()
871                .any(|col_to_change| col_to_change.need_alter(metadata)),
872            AlterKind::SetRegionOptions { .. } => {
873                // we need to update region options for `ChangeTableOptions`.
874                // todo: we need to check if ttl has ever changed.
875                true
876            }
877            AlterKind::UnsetRegionOptions { .. } => true,
878            AlterKind::SetIndexes { options, .. } => options
879                .iter()
880                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
881            AlterKind::UnsetIndexes { options } => options
882                .iter()
883                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
884            AlterKind::DropDefaults { names } => names
885                .iter()
886                .any(|name| metadata.column_by_name(name).is_some()),
887
888            AlterKind::SetDefaults { columns } => columns
889                .iter()
890                .any(|x| metadata.column_by_name(&x.name).is_some()),
891            AlterKind::SyncColumns { column_metadatas } => {
892                metadata.column_metadatas != *column_metadatas
893            }
894        }
895    }
896
897    /// Returns an error if the column to drop is invalid.
898    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
899        let Some(column) = metadata.column_by_name(name) else {
900            return Ok(());
901        };
902        ensure!(
903            column.semantic_type == SemanticType::Field,
904            InvalidRegionRequestSnafu {
905                region_id: metadata.region_id,
906                err: format!("column {} is not a field and could not be dropped", name),
907            }
908        );
909        Ok(())
910    }
911
912    /// Returns an error if the column's alter index option is invalid.
913    fn validate_column_alter_index_option(
914        column_name: &String,
915        metadata: &RegionMetadata,
916        is_fulltext: bool,
917    ) -> Result<()> {
918        let column = metadata
919            .column_by_name(column_name)
920            .context(InvalidRegionRequestSnafu {
921                region_id: metadata.region_id,
922                err: format!("column {} not found", column_name),
923            })?;
924
925        if is_fulltext {
926            ensure!(
927                column.column_schema.data_type.is_string(),
928                InvalidRegionRequestSnafu {
929                    region_id: metadata.region_id,
930                    err: format!(
931                        "cannot change alter index options for non-string column {}",
932                        column_name
933                    ),
934                }
935            );
936        }
937
938        Ok(())
939    }
940
941    /// Returns an error if the column isn't exist.
942    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
943        metadata
944            .column_by_name(column_name)
945            .context(InvalidRegionRequestSnafu {
946                region_id: metadata.region_id,
947                err: format!("column {} not found", column_name),
948            })?;
949
950        Ok(())
951    }
952}
953
954impl TryFrom<alter_request::Kind> for AlterKind {
955    type Error = MetadataError;
956
957    fn try_from(kind: alter_request::Kind) -> Result<Self> {
958        let alter_kind = match kind {
959            alter_request::Kind::AddColumns(x) => {
960                let columns = x
961                    .add_columns
962                    .into_iter()
963                    .map(|x| x.try_into())
964                    .collect::<Result<Vec<_>>>()?;
965                AlterKind::AddColumns { columns }
966            }
967            alter_request::Kind::ModifyColumnTypes(x) => {
968                let columns = x
969                    .modify_column_types
970                    .into_iter()
971                    .map(|x| x.into())
972                    .collect::<Vec<_>>();
973                AlterKind::ModifyColumnTypes { columns }
974            }
975            alter_request::Kind::DropColumns(x) => {
976                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
977                AlterKind::DropColumns { names }
978            }
979            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
980                options: options
981                    .table_options
982                    .iter()
983                    .map(TryFrom::try_from)
984                    .collect::<Result<Vec<_>>>()?,
985            },
986            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
987                keys: options
988                    .keys
989                    .iter()
990                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
991                    .collect::<Result<Vec<_>>>()?,
992            },
993            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
994                options: vec![SetIndexOption::try_from(o)?],
995            },
996            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
997                options: vec![UnsetIndexOption::try_from(o)?],
998            },
999            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1000                options: o
1001                    .set_indexes
1002                    .into_iter()
1003                    .map(SetIndexOption::try_from)
1004                    .collect::<Result<Vec<_>>>()?,
1005            },
1006            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1007                options: o
1008                    .unset_indexes
1009                    .into_iter()
1010                    .map(UnsetIndexOption::try_from)
1011                    .collect::<Result<Vec<_>>>()?,
1012            },
1013            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1014                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1015            },
1016            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1017                columns: x
1018                    .set_defaults
1019                    .into_iter()
1020                    .map(|x| {
1021                        Ok(SetDefault {
1022                            name: x.column_name,
1023                            default_constraint: x.default_constraint.clone(),
1024                        })
1025                    })
1026                    .collect::<Result<Vec<_>>>()?,
1027            },
1028            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1029                column_metadatas: x
1030                    .column_defs
1031                    .into_iter()
1032                    .map(ColumnMetadata::try_from_column_def)
1033                    .collect::<Result<Vec<_>>>()?,
1034            },
1035        };
1036
1037        Ok(alter_kind)
1038    }
1039}
1040
1041/// Adds a column.
1042#[derive(Debug, PartialEq, Eq, Clone)]
1043pub struct AddColumn {
1044    /// Metadata of the column to add.
1045    pub column_metadata: ColumnMetadata,
1046    /// Location to add the column. If location is None, the region adds
1047    /// the column to the last.
1048    pub location: Option<AddColumnLocation>,
1049}
1050
1051impl AddColumn {
1052    /// Returns an error if the column to add is invalid.
1053    ///
1054    /// It allows adding existing columns. However, the existing column must have the same metadata
1055    /// and the location must be None.
1056    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1057        ensure!(
1058            self.column_metadata.column_schema.is_nullable()
1059                || self
1060                    .column_metadata
1061                    .column_schema
1062                    .default_constraint()
1063                    .is_some(),
1064            InvalidRegionRequestSnafu {
1065                region_id: metadata.region_id,
1066                err: format!(
1067                    "no default value for column {}",
1068                    self.column_metadata.column_schema.name
1069                ),
1070            }
1071        );
1072
1073        if let Some(existing_column) =
1074            metadata.column_by_name(&self.column_metadata.column_schema.name)
1075        {
1076            // If the column already exists.
1077            ensure!(
1078                *existing_column == self.column_metadata,
1079                InvalidRegionRequestSnafu {
1080                    region_id: metadata.region_id,
1081                    err: format!(
1082                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1083                        self.column_metadata.column_schema.name,
1084                        existing_column,
1085                        self.column_metadata,
1086                    ),
1087                }
1088            );
1089            ensure!(
1090                self.location.is_none(),
1091                InvalidRegionRequestSnafu {
1092                    region_id: metadata.region_id,
1093                    err: format!(
1094                        "column {} already exists, but location is specified",
1095                        self.column_metadata.column_schema.name
1096                    ),
1097                }
1098            );
1099        }
1100
1101        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1102            // Ensures the existing column has the same name.
1103            ensure!(
1104                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1105                InvalidRegionRequestSnafu {
1106                    region_id: metadata.region_id,
1107                    err: format!(
1108                        "column id {} already exists with different name {}",
1109                        self.column_metadata.column_id, existing_column.column_schema.name
1110                    ),
1111                }
1112            );
1113        }
1114
1115        Ok(())
1116    }
1117
1118    /// Returns true if no column to add to the region.
1119    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1120        debug_assert!(self.validate(metadata).is_ok());
1121        metadata
1122            .column_by_name(&self.column_metadata.column_schema.name)
1123            .is_none()
1124    }
1125}
1126
1127impl TryFrom<v1::region::AddColumn> for AddColumn {
1128    type Error = MetadataError;
1129
1130    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1131        let column_def = add_column
1132            .column_def
1133            .context(InvalidRawRegionRequestSnafu {
1134                err: "missing column_def in AddColumn",
1135            })?;
1136
1137        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1138        let location = add_column
1139            .location
1140            .map(AddColumnLocation::try_from)
1141            .transpose()?;
1142
1143        Ok(AddColumn {
1144            column_metadata,
1145            location,
1146        })
1147    }
1148}
1149
1150/// Location to add a column.
1151#[derive(Debug, PartialEq, Eq, Clone)]
1152pub enum AddColumnLocation {
1153    /// Add the column to the first position of columns.
1154    First,
1155    /// Add the column after specific column.
1156    After {
1157        /// Add the column after this column.
1158        column_name: String,
1159    },
1160}
1161
1162impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1163    type Error = MetadataError;
1164
1165    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1166        let location_type = LocationType::try_from(location.location_type)
1167            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1168        let add_column_location = match location_type {
1169            LocationType::First => AddColumnLocation::First,
1170            LocationType::After => AddColumnLocation::After {
1171                column_name: location.after_column_name,
1172            },
1173        };
1174
1175        Ok(add_column_location)
1176    }
1177}
1178
1179/// Change a column's datatype.
1180#[derive(Debug, PartialEq, Eq, Clone)]
1181pub struct ModifyColumnType {
1182    /// Schema of the column to modify.
1183    pub column_name: String,
1184    /// Column will be changed to this type.
1185    pub target_type: ConcreteDataType,
1186}
1187
1188impl ModifyColumnType {
1189    /// Returns an error if the column's datatype to change is invalid.
1190    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1191        let column_meta = metadata
1192            .column_by_name(&self.column_name)
1193            .with_context(|| InvalidRegionRequestSnafu {
1194                region_id: metadata.region_id,
1195                err: format!("column {} not found", self.column_name),
1196            })?;
1197
1198        ensure!(
1199            matches!(column_meta.semantic_type, SemanticType::Field),
1200            InvalidRegionRequestSnafu {
1201                region_id: metadata.region_id,
1202                err: "'timestamp' or 'tag' column cannot change type".to_string()
1203            }
1204        );
1205        ensure!(
1206            column_meta
1207                .column_schema
1208                .data_type
1209                .can_arrow_type_cast_to(&self.target_type),
1210            InvalidRegionRequestSnafu {
1211                region_id: metadata.region_id,
1212                err: format!(
1213                    "column '{}' cannot be cast automatically to type '{}'",
1214                    self.column_name, self.target_type
1215                ),
1216            }
1217        );
1218
1219        Ok(())
1220    }
1221
1222    /// Returns true if no column's datatype to change to the region.
1223    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1224        debug_assert!(self.validate(metadata).is_ok());
1225        metadata.column_by_name(&self.column_name).is_some()
1226    }
1227}
1228
1229impl From<v1::ModifyColumnType> for ModifyColumnType {
1230    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1231        let target_type = ColumnDataTypeWrapper::new(
1232            modify_column_type.target_type(),
1233            modify_column_type.target_type_extension,
1234        )
1235        .into();
1236
1237        ModifyColumnType {
1238            column_name: modify_column_type.column_name,
1239            target_type,
1240        }
1241    }
1242}
1243
1244#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1245pub enum SetRegionOption {
1246    Ttl(Option<TimeToLive>),
1247    // Modifying TwscOptions with values as (option name, new value).
1248    Twsc(String, String),
1249}
1250
1251impl TryFrom<&PbOption> for SetRegionOption {
1252    type Error = MetadataError;
1253
1254    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1255        let PbOption { key, value } = value;
1256        match key.as_str() {
1257            TTL_KEY => {
1258                let ttl = TimeToLive::from_humantime_or_str(value)
1259                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1260
1261                Ok(Self::Ttl(Some(ttl)))
1262            }
1263            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1264                Ok(Self::Twsc(key.to_string(), value.to_string()))
1265            }
1266            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1267        }
1268    }
1269}
1270
1271impl From<&UnsetRegionOption> for SetRegionOption {
1272    fn from(unset_option: &UnsetRegionOption) -> Self {
1273        match unset_option {
1274            UnsetRegionOption::TwcsTriggerFileNum => {
1275                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1276            }
1277            UnsetRegionOption::TwcsMaxOutputFileSize => {
1278                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1279            }
1280            UnsetRegionOption::TwcsTimeWindow => {
1281                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1282            }
1283            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1284        }
1285    }
1286}
1287
1288impl TryFrom<&str> for UnsetRegionOption {
1289    type Error = MetadataError;
1290
1291    fn try_from(key: &str) -> Result<Self> {
1292        match key.to_ascii_lowercase().as_str() {
1293            TTL_KEY => Ok(Self::Ttl),
1294            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1295            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1296            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1297            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1298        }
1299    }
1300}
1301
1302#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1303pub enum UnsetRegionOption {
1304    TwcsTriggerFileNum,
1305    TwcsMaxOutputFileSize,
1306    TwcsTimeWindow,
1307    Ttl,
1308}
1309
1310impl UnsetRegionOption {
1311    pub fn as_str(&self) -> &str {
1312        match self {
1313            Self::Ttl => TTL_KEY,
1314            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1315            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1316            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1317        }
1318    }
1319}
1320
1321impl Display for UnsetRegionOption {
1322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1323        write!(f, "{}", self.as_str())
1324    }
1325}
1326
1327#[derive(Debug, Clone, Default)]
1328pub struct RegionFlushRequest {
1329    pub row_group_size: Option<usize>,
1330}
1331
1332#[derive(Debug)]
1333pub struct RegionCompactRequest {
1334    pub options: compact_request::Options,
1335}
1336
1337impl Default for RegionCompactRequest {
1338    fn default() -> Self {
1339        Self {
1340            // Default to regular compaction.
1341            options: compact_request::Options::Regular(Default::default()),
1342        }
1343    }
1344}
1345
1346/// Truncate region request.
1347#[derive(Debug)]
1348pub enum RegionTruncateRequest {
1349    /// Truncate all data in the region.
1350    All,
1351    ByTimeRanges {
1352        /// Time ranges to truncate. Both bound are inclusive.
1353        /// only files that are fully contained in the time range will be truncated.
1354        /// so no guarantee that all data in the time range will be truncated.
1355        time_ranges: Vec<(Timestamp, Timestamp)>,
1356    },
1357}
1358
1359/// Catchup region request.
1360///
1361/// Makes a readonly region to catch up to leader region changes.
1362/// There is no effect if it operating on a leader region.
1363#[derive(Debug, Clone, Copy, Default)]
1364pub struct RegionCatchupRequest {
1365    /// Sets it to writable if it's available after it has caught up with all changes.
1366    pub set_writable: bool,
1367    /// The `entry_id` that was expected to reply to.
1368    /// `None` stands replaying to latest.
1369    pub entry_id: Option<entry::Id>,
1370    /// Used for metrics metadata region.
1371    /// The `entry_id` that was expected to reply to.
1372    /// `None` stands replaying to latest.
1373    pub metadata_entry_id: Option<entry::Id>,
1374    /// The hint for replaying memtable.
1375    pub location_id: Option<u64>,
1376    /// Replay checkpoint.
1377    pub checkpoint: Option<ReplayCheckpoint>,
1378}
1379
1380#[derive(Debug, Clone)]
1381pub struct RegionBulkInsertsRequest {
1382    pub region_id: RegionId,
1383    pub payload: DfRecordBatch,
1384    pub raw_data: ArrowIpc,
1385}
1386
1387impl RegionBulkInsertsRequest {
1388    pub fn estimated_size(&self) -> usize {
1389        self.payload.get_array_memory_size()
1390    }
1391}
1392
1393impl fmt::Display for RegionRequest {
1394    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1395        match self {
1396            RegionRequest::Put(_) => write!(f, "Put"),
1397            RegionRequest::Delete(_) => write!(f, "Delete"),
1398            RegionRequest::Create(_) => write!(f, "Create"),
1399            RegionRequest::Drop(_) => write!(f, "Drop"),
1400            RegionRequest::Open(_) => write!(f, "Open"),
1401            RegionRequest::Close(_) => write!(f, "Close"),
1402            RegionRequest::Alter(_) => write!(f, "Alter"),
1403            RegionRequest::Flush(_) => write!(f, "Flush"),
1404            RegionRequest::Compact(_) => write!(f, "Compact"),
1405            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1406            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1407            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1408        }
1409    }
1410}
1411
1412#[cfg(test)]
1413mod tests {
1414
1415    use api::v1::region::RegionColumnDef;
1416    use api::v1::{ColumnDataType, ColumnDef};
1417    use datatypes::prelude::ConcreteDataType;
1418    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1419
1420    use super::*;
1421    use crate::metadata::RegionMetadataBuilder;
1422
1423    #[test]
1424    fn test_from_proto_location() {
1425        let proto_location = v1::AddColumnLocation {
1426            location_type: LocationType::First as i32,
1427            after_column_name: String::default(),
1428        };
1429        let location = AddColumnLocation::try_from(proto_location).unwrap();
1430        assert_eq!(location, AddColumnLocation::First);
1431
1432        let proto_location = v1::AddColumnLocation {
1433            location_type: 10,
1434            after_column_name: String::default(),
1435        };
1436        AddColumnLocation::try_from(proto_location).unwrap_err();
1437
1438        let proto_location = v1::AddColumnLocation {
1439            location_type: LocationType::After as i32,
1440            after_column_name: "a".to_string(),
1441        };
1442        let location = AddColumnLocation::try_from(proto_location).unwrap();
1443        assert_eq!(
1444            location,
1445            AddColumnLocation::After {
1446                column_name: "a".to_string()
1447            }
1448        );
1449    }
1450
1451    #[test]
1452    fn test_from_none_proto_add_column() {
1453        AddColumn::try_from(v1::region::AddColumn {
1454            column_def: None,
1455            location: None,
1456        })
1457        .unwrap_err();
1458    }
1459
1460    #[test]
1461    fn test_from_proto_alter_request() {
1462        RegionAlterRequest::try_from(AlterRequest {
1463            region_id: 0,
1464            schema_version: 1,
1465            kind: None,
1466        })
1467        .unwrap_err();
1468
1469        let request = RegionAlterRequest::try_from(AlterRequest {
1470            region_id: 0,
1471            schema_version: 1,
1472            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1473                add_columns: vec![v1::region::AddColumn {
1474                    column_def: Some(RegionColumnDef {
1475                        column_def: Some(ColumnDef {
1476                            name: "a".to_string(),
1477                            data_type: ColumnDataType::String as i32,
1478                            is_nullable: true,
1479                            default_constraint: vec![],
1480                            semantic_type: SemanticType::Field as i32,
1481                            comment: String::new(),
1482                            ..Default::default()
1483                        }),
1484                        column_id: 1,
1485                    }),
1486                    location: Some(v1::AddColumnLocation {
1487                        location_type: LocationType::First as i32,
1488                        after_column_name: String::default(),
1489                    }),
1490                }],
1491            })),
1492        })
1493        .unwrap();
1494
1495        assert_eq!(
1496            request,
1497            RegionAlterRequest {
1498                kind: AlterKind::AddColumns {
1499                    columns: vec![AddColumn {
1500                        column_metadata: ColumnMetadata {
1501                            column_schema: ColumnSchema::new(
1502                                "a",
1503                                ConcreteDataType::string_datatype(),
1504                                true,
1505                            ),
1506                            semantic_type: SemanticType::Field,
1507                            column_id: 1,
1508                        },
1509                        location: Some(AddColumnLocation::First),
1510                    }]
1511                },
1512            }
1513        );
1514    }
1515
1516    /// Returns a new region metadata for testing. Metadata:
1517    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1518    fn new_metadata() -> RegionMetadata {
1519        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1520        builder
1521            .push_column_metadata(ColumnMetadata {
1522                column_schema: ColumnSchema::new(
1523                    "ts",
1524                    ConcreteDataType::timestamp_millisecond_datatype(),
1525                    false,
1526                ),
1527                semantic_type: SemanticType::Timestamp,
1528                column_id: 1,
1529            })
1530            .push_column_metadata(ColumnMetadata {
1531                column_schema: ColumnSchema::new(
1532                    "tag_0",
1533                    ConcreteDataType::string_datatype(),
1534                    true,
1535                ),
1536                semantic_type: SemanticType::Tag,
1537                column_id: 2,
1538            })
1539            .push_column_metadata(ColumnMetadata {
1540                column_schema: ColumnSchema::new(
1541                    "field_0",
1542                    ConcreteDataType::string_datatype(),
1543                    true,
1544                ),
1545                semantic_type: SemanticType::Field,
1546                column_id: 3,
1547            })
1548            .push_column_metadata(ColumnMetadata {
1549                column_schema: ColumnSchema::new(
1550                    "field_1",
1551                    ConcreteDataType::boolean_datatype(),
1552                    true,
1553                ),
1554                semantic_type: SemanticType::Field,
1555                column_id: 4,
1556            })
1557            .primary_key(vec![2]);
1558        builder.build().unwrap()
1559    }
1560
1561    #[test]
1562    fn test_add_column_validate() {
1563        let metadata = new_metadata();
1564        let add_column = AddColumn {
1565            column_metadata: ColumnMetadata {
1566                column_schema: ColumnSchema::new(
1567                    "tag_1",
1568                    ConcreteDataType::string_datatype(),
1569                    true,
1570                ),
1571                semantic_type: SemanticType::Tag,
1572                column_id: 5,
1573            },
1574            location: None,
1575        };
1576        add_column.validate(&metadata).unwrap();
1577        assert!(add_column.need_alter(&metadata));
1578
1579        // Add not null column.
1580        AddColumn {
1581            column_metadata: ColumnMetadata {
1582                column_schema: ColumnSchema::new(
1583                    "tag_1",
1584                    ConcreteDataType::string_datatype(),
1585                    false,
1586                ),
1587                semantic_type: SemanticType::Tag,
1588                column_id: 5,
1589            },
1590            location: None,
1591        }
1592        .validate(&metadata)
1593        .unwrap_err();
1594
1595        // Add existing column.
1596        let add_column = AddColumn {
1597            column_metadata: ColumnMetadata {
1598                column_schema: ColumnSchema::new(
1599                    "tag_0",
1600                    ConcreteDataType::string_datatype(),
1601                    true,
1602                ),
1603                semantic_type: SemanticType::Tag,
1604                column_id: 2,
1605            },
1606            location: None,
1607        };
1608        add_column.validate(&metadata).unwrap();
1609        assert!(!add_column.need_alter(&metadata));
1610    }
1611
1612    #[test]
1613    fn test_add_duplicate_columns() {
1614        let kind = AlterKind::AddColumns {
1615            columns: vec![
1616                AddColumn {
1617                    column_metadata: ColumnMetadata {
1618                        column_schema: ColumnSchema::new(
1619                            "tag_1",
1620                            ConcreteDataType::string_datatype(),
1621                            true,
1622                        ),
1623                        semantic_type: SemanticType::Tag,
1624                        column_id: 5,
1625                    },
1626                    location: None,
1627                },
1628                AddColumn {
1629                    column_metadata: ColumnMetadata {
1630                        column_schema: ColumnSchema::new(
1631                            "tag_1",
1632                            ConcreteDataType::string_datatype(),
1633                            true,
1634                        ),
1635                        semantic_type: SemanticType::Field,
1636                        column_id: 6,
1637                    },
1638                    location: None,
1639                },
1640            ],
1641        };
1642        let metadata = new_metadata();
1643        kind.validate(&metadata).unwrap();
1644        assert!(kind.need_alter(&metadata));
1645    }
1646
1647    #[test]
1648    fn test_add_existing_column_different_metadata() {
1649        let metadata = new_metadata();
1650
1651        // Add existing column with different id.
1652        let kind = AlterKind::AddColumns {
1653            columns: vec![AddColumn {
1654                column_metadata: ColumnMetadata {
1655                    column_schema: ColumnSchema::new(
1656                        "tag_0",
1657                        ConcreteDataType::string_datatype(),
1658                        true,
1659                    ),
1660                    semantic_type: SemanticType::Tag,
1661                    column_id: 4,
1662                },
1663                location: None,
1664            }],
1665        };
1666        kind.validate(&metadata).unwrap_err();
1667
1668        // Add existing column with different type.
1669        let kind = AlterKind::AddColumns {
1670            columns: vec![AddColumn {
1671                column_metadata: ColumnMetadata {
1672                    column_schema: ColumnSchema::new(
1673                        "tag_0",
1674                        ConcreteDataType::int64_datatype(),
1675                        true,
1676                    ),
1677                    semantic_type: SemanticType::Tag,
1678                    column_id: 2,
1679                },
1680                location: None,
1681            }],
1682        };
1683        kind.validate(&metadata).unwrap_err();
1684
1685        // Add existing column with different name.
1686        let kind = AlterKind::AddColumns {
1687            columns: vec![AddColumn {
1688                column_metadata: ColumnMetadata {
1689                    column_schema: ColumnSchema::new(
1690                        "tag_1",
1691                        ConcreteDataType::string_datatype(),
1692                        true,
1693                    ),
1694                    semantic_type: SemanticType::Tag,
1695                    column_id: 2,
1696                },
1697                location: None,
1698            }],
1699        };
1700        kind.validate(&metadata).unwrap_err();
1701    }
1702
1703    #[test]
1704    fn test_add_existing_column_with_location() {
1705        let metadata = new_metadata();
1706        let kind = AlterKind::AddColumns {
1707            columns: vec![AddColumn {
1708                column_metadata: ColumnMetadata {
1709                    column_schema: ColumnSchema::new(
1710                        "tag_0",
1711                        ConcreteDataType::string_datatype(),
1712                        true,
1713                    ),
1714                    semantic_type: SemanticType::Tag,
1715                    column_id: 2,
1716                },
1717                location: Some(AddColumnLocation::First),
1718            }],
1719        };
1720        kind.validate(&metadata).unwrap_err();
1721    }
1722
1723    #[test]
1724    fn test_validate_drop_column() {
1725        let metadata = new_metadata();
1726        let kind = AlterKind::DropColumns {
1727            names: vec!["xxxx".to_string()],
1728        };
1729        kind.validate(&metadata).unwrap();
1730        assert!(!kind.need_alter(&metadata));
1731
1732        AlterKind::DropColumns {
1733            names: vec!["tag_0".to_string()],
1734        }
1735        .validate(&metadata)
1736        .unwrap_err();
1737
1738        let kind = AlterKind::DropColumns {
1739            names: vec!["field_0".to_string()],
1740        };
1741        kind.validate(&metadata).unwrap();
1742        assert!(kind.need_alter(&metadata));
1743    }
1744
1745    #[test]
1746    fn test_validate_modify_column_type() {
1747        let metadata = new_metadata();
1748        AlterKind::ModifyColumnTypes {
1749            columns: vec![ModifyColumnType {
1750                column_name: "xxxx".to_string(),
1751                target_type: ConcreteDataType::string_datatype(),
1752            }],
1753        }
1754        .validate(&metadata)
1755        .unwrap_err();
1756
1757        AlterKind::ModifyColumnTypes {
1758            columns: vec![ModifyColumnType {
1759                column_name: "field_1".to_string(),
1760                target_type: ConcreteDataType::date_datatype(),
1761            }],
1762        }
1763        .validate(&metadata)
1764        .unwrap_err();
1765
1766        AlterKind::ModifyColumnTypes {
1767            columns: vec![ModifyColumnType {
1768                column_name: "ts".to_string(),
1769                target_type: ConcreteDataType::date_datatype(),
1770            }],
1771        }
1772        .validate(&metadata)
1773        .unwrap_err();
1774
1775        AlterKind::ModifyColumnTypes {
1776            columns: vec![ModifyColumnType {
1777                column_name: "tag_0".to_string(),
1778                target_type: ConcreteDataType::date_datatype(),
1779            }],
1780        }
1781        .validate(&metadata)
1782        .unwrap_err();
1783
1784        let kind = AlterKind::ModifyColumnTypes {
1785            columns: vec![ModifyColumnType {
1786                column_name: "field_0".to_string(),
1787                target_type: ConcreteDataType::int32_datatype(),
1788            }],
1789        };
1790        kind.validate(&metadata).unwrap();
1791        assert!(kind.need_alter(&metadata));
1792    }
1793
1794    #[test]
1795    fn test_validate_add_columns() {
1796        let kind = AlterKind::AddColumns {
1797            columns: vec![
1798                AddColumn {
1799                    column_metadata: ColumnMetadata {
1800                        column_schema: ColumnSchema::new(
1801                            "tag_1",
1802                            ConcreteDataType::string_datatype(),
1803                            true,
1804                        ),
1805                        semantic_type: SemanticType::Tag,
1806                        column_id: 5,
1807                    },
1808                    location: None,
1809                },
1810                AddColumn {
1811                    column_metadata: ColumnMetadata {
1812                        column_schema: ColumnSchema::new(
1813                            "field_2",
1814                            ConcreteDataType::string_datatype(),
1815                            true,
1816                        ),
1817                        semantic_type: SemanticType::Field,
1818                        column_id: 6,
1819                    },
1820                    location: None,
1821                },
1822            ],
1823        };
1824        let request = RegionAlterRequest { kind };
1825        let mut metadata = new_metadata();
1826        metadata.schema_version = 1;
1827        request.validate(&metadata).unwrap();
1828    }
1829
1830    #[test]
1831    fn test_validate_create_region() {
1832        let column_metadatas = vec![
1833            ColumnMetadata {
1834                column_schema: ColumnSchema::new(
1835                    "ts",
1836                    ConcreteDataType::timestamp_millisecond_datatype(),
1837                    false,
1838                ),
1839                semantic_type: SemanticType::Timestamp,
1840                column_id: 1,
1841            },
1842            ColumnMetadata {
1843                column_schema: ColumnSchema::new(
1844                    "tag_0",
1845                    ConcreteDataType::string_datatype(),
1846                    true,
1847                ),
1848                semantic_type: SemanticType::Tag,
1849                column_id: 2,
1850            },
1851            ColumnMetadata {
1852                column_schema: ColumnSchema::new(
1853                    "field_0",
1854                    ConcreteDataType::string_datatype(),
1855                    true,
1856                ),
1857                semantic_type: SemanticType::Field,
1858                column_id: 3,
1859            },
1860        ];
1861        let create = RegionCreateRequest {
1862            engine: "mito".to_string(),
1863            column_metadatas,
1864            primary_key: vec![3, 4],
1865            options: HashMap::new(),
1866            table_dir: "path".to_string(),
1867            path_type: PathType::Bare,
1868            partition_expr_json: Some("".to_string()),
1869        };
1870
1871        assert!(create.validate().is_err());
1872    }
1873
1874    #[test]
1875    fn test_validate_modify_column_fulltext_options() {
1876        let kind = AlterKind::SetIndexes {
1877            options: vec![SetIndexOption::Fulltext {
1878                column_name: "tag_0".to_string(),
1879                options: FulltextOptions::new_unchecked(
1880                    true,
1881                    FulltextAnalyzer::Chinese,
1882                    false,
1883                    FulltextBackend::Bloom,
1884                    1000,
1885                    0.01,
1886                ),
1887            }],
1888        };
1889        let request = RegionAlterRequest { kind };
1890        let mut metadata = new_metadata();
1891        metadata.schema_version = 1;
1892        request.validate(&metadata).unwrap();
1893
1894        let kind = AlterKind::UnsetIndexes {
1895            options: vec![UnsetIndexOption::Fulltext {
1896                column_name: "tag_0".to_string(),
1897            }],
1898        };
1899        let request = RegionAlterRequest { kind };
1900        let mut metadata = new_metadata();
1901        metadata.schema_version = 1;
1902        request.validate(&metadata).unwrap();
1903    }
1904
1905    #[test]
1906    fn test_validate_sync_columns() {
1907        let metadata = new_metadata();
1908        let kind = AlterKind::SyncColumns {
1909            column_metadatas: vec![
1910                ColumnMetadata {
1911                    column_schema: ColumnSchema::new(
1912                        "tag_1",
1913                        ConcreteDataType::string_datatype(),
1914                        true,
1915                    ),
1916                    semantic_type: SemanticType::Tag,
1917                    column_id: 5,
1918                },
1919                ColumnMetadata {
1920                    column_schema: ColumnSchema::new(
1921                        "field_2",
1922                        ConcreteDataType::string_datatype(),
1923                        true,
1924                    ),
1925                    semantic_type: SemanticType::Field,
1926                    column_id: 6,
1927                },
1928            ],
1929        };
1930        let err = kind.validate(&metadata).unwrap_err();
1931        assert!(err.to_string().contains("not a primary key"));
1932
1933        // Change the timestamp column name.
1934        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
1935        let ts_column = column_metadatas_with_different_ts_column
1936            .iter_mut()
1937            .find(|c| c.semantic_type == SemanticType::Timestamp)
1938            .unwrap();
1939        ts_column.column_schema.name = "ts1".to_string();
1940
1941        let kind = AlterKind::SyncColumns {
1942            column_metadatas: column_metadatas_with_different_ts_column,
1943        };
1944        let err = kind.validate(&metadata).unwrap_err();
1945        assert!(
1946            err.to_string()
1947                .contains("timestamp column ts has different id")
1948        );
1949
1950        // Change the primary key column name.
1951        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
1952        let pk_column = column_metadatas_with_different_pk_column
1953            .iter_mut()
1954            .find(|c| c.column_schema.name == "tag_0")
1955            .unwrap();
1956        pk_column.column_id = 100;
1957        let kind = AlterKind::SyncColumns {
1958            column_metadatas: column_metadatas_with_different_pk_column,
1959        };
1960        let err = kind.validate(&metadata).unwrap_err();
1961        assert!(
1962            err.to_string()
1963                .contains("column with same name tag_0 has different id")
1964        );
1965
1966        // Add a new field column.
1967        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
1968        column_metadatas_with_new_field_column.push(ColumnMetadata {
1969            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
1970            semantic_type: SemanticType::Field,
1971            column_id: 4,
1972        });
1973        let kind = AlterKind::SyncColumns {
1974            column_metadatas: column_metadatas_with_new_field_column,
1975        };
1976        kind.validate(&metadata).unwrap();
1977    }
1978
1979    #[test]
1980    fn test_cast_path_type_to_primitive() {
1981        assert_eq!(PathType::Bare as u8, 0);
1982        assert_eq!(PathType::Data as u8, 1);
1983        assert_eq!(PathType::Metadata as u8, 2);
1984        assert_eq!(
1985            PathType::try_from(PathType::Bare as u8).unwrap(),
1986            PathType::Bare
1987        );
1988        assert_eq!(
1989            PathType::try_from(PathType::Data as u8).unwrap(),
1990            PathType::Data
1991        );
1992        assert_eq!(
1993            PathType::try_from(PathType::Metadata as u8).unwrap(),
1994            PathType::Metadata
1995        );
1996    }
1997}