store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26    CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27    FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28    region_request, truncate_request,
29};
30use api::v1::{
31    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50    RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60/// The type of path to generate.
61#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64    /// A bare path - the original path of an engine.
65    ///
66    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
67    Bare,
68    /// A path for the data region of a metric engine table.
69    ///
70    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
71    Data,
72    /// A path for the metadata region of a metric engine table.
73    ///
74    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
75    Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80    Create(Vec<(RegionId, RegionCreateRequest)>),
81    Drop(Vec<(RegionId, RegionDropRequest)>),
82    Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
87    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88        match body {
89            region_request::Body::Creates(creates) => {
90                let requests = creates
91                    .requests
92                    .into_iter()
93                    .map(parse_region_create)
94                    .collect::<Result<Vec<_>>>()?;
95                Ok(Some(Self::Create(requests)))
96            }
97            region_request::Body::Drops(drops) => {
98                let requests = drops
99                    .requests
100                    .into_iter()
101                    .map(parse_region_drop)
102                    .collect::<Result<Vec<_>>>()?;
103                Ok(Some(Self::Drop(requests)))
104            }
105            region_request::Body::Alters(alters) => {
106                let requests = alters
107                    .requests
108                    .into_iter()
109                    .map(parse_region_alter)
110                    .collect::<Result<Vec<_>>>()?;
111                Ok(Some(Self::Alter(requests)))
112            }
113            _ => Ok(None),
114        }
115    }
116
117    pub fn request_type(&self) -> &'static str {
118        self.into()
119    }
120
121    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122        match self {
123            Self::Create(requests) => requests
124                .into_iter()
125                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126                .collect(),
127            Self::Drop(requests) => requests
128                .into_iter()
129                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130                .collect(),
131            Self::Alter(requests) => requests
132                .into_iter()
133                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134                .collect(),
135        }
136    }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141    Put(RegionPutRequest),
142    Delete(RegionDeleteRequest),
143    Create(RegionCreateRequest),
144    Drop(RegionDropRequest),
145    Open(RegionOpenRequest),
146    Close(RegionCloseRequest),
147    Alter(RegionAlterRequest),
148    Flush(RegionFlushRequest),
149    Compact(RegionCompactRequest),
150    BuildIndex(RegionBuildIndexRequest),
151    Truncate(RegionTruncateRequest),
152    Catchup(RegionCatchupRequest),
153    BulkInserts(RegionBulkInsertsRequest),
154    EnterStaging(EnterStagingRequest),
155    ApplyStagingManifest(ApplyStagingManifestRequest),
156}
157
158impl RegionRequest {
159    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
160    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
161    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
162        match body {
163            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
164            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
165            region_request::Body::Create(create) => make_region_create(create),
166            region_request::Body::Drop(drop) => make_region_drop(drop),
167            region_request::Body::Open(open) => make_region_open(open),
168            region_request::Body::Close(close) => make_region_close(close),
169            region_request::Body::Alter(alter) => make_region_alter(alter),
170            region_request::Body::Flush(flush) => make_region_flush(flush),
171            region_request::Body::Compact(compact) => make_region_compact(compact),
172            region_request::Body::BuildIndex(index) => make_region_build_index(index),
173            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
174            region_request::Body::Creates(creates) => make_region_creates(creates),
175            region_request::Body::Drops(drops) => make_region_drops(drops),
176            region_request::Body::Alters(alters) => make_region_alters(alters),
177            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
178            region_request::Body::Sync(_) => UnexpectedSnafu {
179                reason: "Sync request should be handled separately by RegionServer",
180            }
181            .fail(),
182            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
183                reason: "ListMetadata request should be handled separately by RegionServer",
184            }
185            .fail(),
186            region_request::Body::ApplyStagingManifest(apply) => {
187                make_region_apply_staging_manifest(apply)
188            }
189        }
190    }
191
192    /// Returns the type name of the request.
193    pub fn request_type(&self) -> &'static str {
194        self.into()
195    }
196}
197
198fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
199    let requests = inserts
200        .requests
201        .into_iter()
202        .filter_map(|r| {
203            let region_id = r.region_id.into();
204            r.rows.map(|rows| {
205                (
206                    region_id,
207                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
208                )
209            })
210        })
211        .collect();
212    Ok(requests)
213}
214
215fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
216    let requests = deletes
217        .requests
218        .into_iter()
219        .filter_map(|r| {
220            let region_id = r.region_id.into();
221            r.rows.map(|rows| {
222                (
223                    region_id,
224                    RegionRequest::Delete(RegionDeleteRequest { rows, hint: None }),
225                )
226            })
227        })
228        .collect();
229    Ok(requests)
230}
231
232fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
233    let column_metadatas = create
234        .column_defs
235        .into_iter()
236        .map(ColumnMetadata::try_from_column_def)
237        .collect::<Result<Vec<_>>>()?;
238    let region_id = RegionId::from(create.region_id);
239    let table_dir = table_dir(&create.path, region_id.table_id());
240    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
241    Ok((
242        region_id,
243        RegionCreateRequest {
244            engine: create.engine,
245            column_metadatas,
246            primary_key: create.primary_key,
247            options: create.options,
248            table_dir,
249            path_type: PathType::Bare,
250            partition_expr_json,
251        },
252    ))
253}
254
255fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
256    let (region_id, request) = parse_region_create(create)?;
257    Ok(vec![(region_id, RegionRequest::Create(request))])
258}
259
260fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
261    let mut requests = Vec::with_capacity(creates.requests.len());
262    for create in creates.requests {
263        requests.extend(make_region_create(create)?);
264    }
265    Ok(requests)
266}
267
268fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
269    let region_id = drop.region_id.into();
270    Ok((
271        region_id,
272        RegionDropRequest {
273            fast_path: drop.fast_path,
274            force: drop.force,
275            partial_drop: drop.partial_drop,
276        },
277    ))
278}
279
280fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
281    let (region_id, request) = parse_region_drop(drop)?;
282    Ok(vec![(region_id, RegionRequest::Drop(request))])
283}
284
285fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
286    let mut requests = Vec::with_capacity(drops.requests.len());
287    for drop in drops.requests {
288        requests.extend(make_region_drop(drop)?);
289    }
290    Ok(requests)
291}
292
293fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
294    let region_id = RegionId::from(open.region_id);
295    let table_dir = table_dir(&open.path, region_id.table_id());
296    Ok(vec![(
297        region_id,
298        RegionRequest::Open(RegionOpenRequest {
299            engine: open.engine,
300            table_dir,
301            path_type: PathType::Bare,
302            options: open.options,
303            skip_wal_replay: false,
304            checkpoint: None,
305        }),
306    )])
307}
308
309fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
310    let region_id = close.region_id.into();
311    Ok(vec![(
312        region_id,
313        RegionRequest::Close(RegionCloseRequest {}),
314    )])
315}
316
317fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
318    let region_id = alter.region_id.into();
319    let request = RegionAlterRequest::try_from(alter)?;
320    Ok((region_id, request))
321}
322
323fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
324    let (region_id, request) = parse_region_alter(alter)?;
325    Ok(vec![(region_id, RegionRequest::Alter(request))])
326}
327
328fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
329    let mut requests = Vec::with_capacity(alters.requests.len());
330    for alter in alters.requests {
331        requests.extend(make_region_alter(alter)?);
332    }
333    Ok(requests)
334}
335
336fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
337    let region_id = flush.region_id.into();
338    Ok(vec![(
339        region_id,
340        RegionRequest::Flush(RegionFlushRequest {
341            row_group_size: None,
342        }),
343    )])
344}
345
346fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
347    let region_id = compact.region_id.into();
348    let options = compact
349        .options
350        .unwrap_or(compact_request::Options::Regular(Default::default()));
351    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
352    let parallelism = if compact.parallelism == 0 {
353        None
354    } else {
355        Some(compact.parallelism)
356    };
357    Ok(vec![(
358        region_id,
359        RegionRequest::Compact(RegionCompactRequest {
360            options,
361            parallelism,
362        }),
363    )])
364}
365
366fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
367    let region_id = index.region_id.into();
368    Ok(vec![(
369        region_id,
370        RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
371    )])
372}
373
374fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
375    let region_id = truncate.region_id.into();
376    match truncate.kind {
377        None => InvalidRawRegionRequestSnafu {
378            err: "missing kind in TruncateRequest".to_string(),
379        }
380        .fail(),
381        Some(truncate_request::Kind::All(_)) => Ok(vec![(
382            region_id,
383            RegionRequest::Truncate(RegionTruncateRequest::All),
384        )]),
385        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
386            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
387
388            Ok(vec![(
389                region_id,
390                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
391            )])
392        }
393    }
394}
395
396/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
397fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
398    let region_id = request.region_id.into();
399    let Some(Body::ArrowIpc(request)) = request.body else {
400        return Ok(vec![]);
401    };
402
403    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
404        .with_label_values(&["decode"])
405        .start_timer();
406    let mut decoder =
407        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
408    let payload = decoder
409        .try_decode_record_batch(&request.data_header, &request.payload)
410        .context(FlightCodecSnafu)?;
411    decoder_timer.observe_duration();
412    Ok(vec![(
413        region_id,
414        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
415            region_id,
416            payload,
417            raw_data: request,
418        }),
419    )])
420}
421
422fn make_region_apply_staging_manifest(
423    api::v1::region::ApplyStagingManifestRequest {
424        region_id,
425        partition_expr,
426        central_region_id,
427        manifest_path,
428    }: api::v1::region::ApplyStagingManifestRequest,
429) -> Result<Vec<(RegionId, RegionRequest)>> {
430    let region_id = region_id.into();
431    Ok(vec![(
432        region_id,
433        RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
434            partition_expr,
435            central_region_id: central_region_id.into(),
436            manifest_path,
437        }),
438    )])
439}
440
441/// Request to put data into a region.
442#[derive(Debug)]
443pub struct RegionPutRequest {
444    /// Rows to put.
445    pub rows: Rows,
446    /// Write hint.
447    pub hint: Option<WriteHint>,
448}
449
450#[derive(Debug)]
451pub struct RegionReadRequest {
452    pub request: ScanRequest,
453}
454
455/// Request to delete data from a region.
456#[derive(Debug)]
457pub struct RegionDeleteRequest {
458    /// Keys to rows to delete.
459    ///
460    /// Each row only contains primary key columns and a time index column.
461    pub rows: Rows,
462    /// Write hint.
463    pub hint: Option<WriteHint>,
464}
465
466#[derive(Debug, Clone)]
467pub struct RegionCreateRequest {
468    /// Region engine name
469    pub engine: String,
470    /// Columns in this region.
471    pub column_metadatas: Vec<ColumnMetadata>,
472    /// Columns in the primary key.
473    pub primary_key: Vec<ColumnId>,
474    /// Options of the created region.
475    pub options: HashMap<String, String>,
476    /// Directory for table's data home. Usually is composed by catalog and table id
477    pub table_dir: String,
478    /// Path type for generating paths
479    pub path_type: PathType,
480    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
481    /// `Option` to keep compatibility with old clients.
482    pub partition_expr_json: Option<String>,
483}
484
485impl RegionCreateRequest {
486    /// Checks whether the request is valid, returns an error if it is invalid.
487    pub fn validate(&self) -> Result<()> {
488        // time index must exist
489        ensure!(
490            self.column_metadatas
491                .iter()
492                .any(|x| x.semantic_type == SemanticType::Timestamp),
493            InvalidRegionRequestSnafu {
494                region_id: RegionId::new(0, 0),
495                err: "missing timestamp column in create region request".to_string(),
496            }
497        );
498
499        // build column id to indices
500        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
501        for (i, c) in self.column_metadatas.iter().enumerate() {
502            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
503                return InvalidRegionRequestSnafu {
504                    region_id: RegionId::new(0, 0),
505                    err: format!(
506                        "duplicate column id {} (at position {} and {}) in create region request",
507                        c.column_id, previous, i
508                    ),
509                }
510                .fail();
511            }
512        }
513
514        // primary key must exist
515        for column_id in &self.primary_key {
516            ensure!(
517                column_id_to_indices.contains_key(column_id),
518                InvalidRegionRequestSnafu {
519                    region_id: RegionId::new(0, 0),
520                    err: format!(
521                        "missing primary key column {} in create region request",
522                        column_id
523                    ),
524                }
525            );
526        }
527
528        Ok(())
529    }
530
531    /// Returns true when the region belongs to the metric engine's physical table.
532    pub fn is_physical_table(&self) -> bool {
533        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
534    }
535}
536
537#[derive(Debug, Clone)]
538pub struct RegionDropRequest {
539    /// Enables fast-path drop optimizations for logical regions.
540    /// Only applicable to the Metric Engine; ignored by others.
541    pub fast_path: bool,
542
543    /// Forces the drop of a physical region and all its associated logical regions.
544    /// Only relevant for physical regions managed by the Metric Engine.
545    pub force: bool,
546
547    /// If true, indicates that only a portion of the region is being dropped, and files may still be referenced by other regions.
548    /// This is used to prevent deletion of files that are still in use by other regions.
549    pub partial_drop: bool,
550}
551
552/// Open region request.
553#[derive(Debug, Clone)]
554pub struct RegionOpenRequest {
555    /// Region engine name
556    pub engine: String,
557    /// Directory for table's data home. Usually is composed by catalog and table id
558    pub table_dir: String,
559    /// Path type for generating paths
560    pub path_type: PathType,
561    /// Options of the opened region.
562    pub options: HashMap<String, String>,
563    /// To skip replaying the WAL.
564    pub skip_wal_replay: bool,
565    /// Replay checkpoint.
566    pub checkpoint: Option<ReplayCheckpoint>,
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq)]
570pub struct ReplayCheckpoint {
571    pub entry_id: u64,
572    pub metadata_entry_id: Option<u64>,
573}
574
575impl RegionOpenRequest {
576    /// Returns true when the region belongs to the metric engine's physical table.
577    pub fn is_physical_table(&self) -> bool {
578        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
579    }
580}
581
582/// Close region request.
583#[derive(Debug)]
584pub struct RegionCloseRequest {}
585
586/// Alter metadata of a region.
587#[derive(Debug, PartialEq, Eq, Clone)]
588pub struct RegionAlterRequest {
589    /// Kind of alteration to do.
590    pub kind: AlterKind,
591}
592
593impl RegionAlterRequest {
594    /// Checks whether the request is valid, returns an error if it is invalid.
595    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
596        self.kind.validate(metadata)?;
597
598        Ok(())
599    }
600
601    /// Returns true if we need to apply the request to the region.
602    ///
603    /// The `request` should be valid.
604    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
605        debug_assert!(self.validate(metadata).is_ok());
606        self.kind.need_alter(metadata)
607    }
608}
609
610impl TryFrom<AlterRequest> for RegionAlterRequest {
611    type Error = MetadataError;
612
613    fn try_from(value: AlterRequest) -> Result<Self> {
614        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
615            err: "missing kind in AlterRequest",
616        })?;
617
618        let kind = AlterKind::try_from(kind)?;
619        Ok(RegionAlterRequest { kind })
620    }
621}
622
623/// Kind of the alteration.
624#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
625pub enum AlterKind {
626    /// Add columns to the region.
627    AddColumns {
628        /// Columns to add.
629        columns: Vec<AddColumn>,
630    },
631    /// Drop columns from the region, only fields are allowed to drop.
632    DropColumns {
633        /// Name of columns to drop.
634        names: Vec<String>,
635    },
636    /// Change columns datatype form the region, only fields are allowed to change.
637    ModifyColumnTypes {
638        /// Columns to change.
639        columns: Vec<ModifyColumnType>,
640    },
641    /// Set region options.
642    SetRegionOptions { options: Vec<SetRegionOption> },
643    /// Unset region options.
644    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
645    /// Set index options.
646    SetIndexes { options: Vec<SetIndexOption> },
647    /// Unset index options.
648    UnsetIndexes { options: Vec<UnsetIndexOption> },
649    /// Drop column default value.
650    DropDefaults {
651        /// Name of columns to drop.
652        names: Vec<String>,
653    },
654    /// Set column default value.
655    SetDefaults {
656        /// Columns to change.
657        columns: Vec<SetDefault>,
658    },
659    /// Sync column metadatas.
660    SyncColumns {
661        column_metadatas: Vec<ColumnMetadata>,
662    },
663}
664#[derive(Debug, PartialEq, Eq, Clone)]
665pub struct SetDefault {
666    pub name: String,
667    pub default_constraint: Vec<u8>,
668}
669
670#[derive(Debug, PartialEq, Eq, Clone)]
671pub enum SetIndexOption {
672    Fulltext {
673        column_name: String,
674        options: FulltextOptions,
675    },
676    Inverted {
677        column_name: String,
678    },
679    Skipping {
680        column_name: String,
681        options: SkippingIndexOptions,
682    },
683}
684
685impl SetIndexOption {
686    /// Returns the column name of the index option.
687    pub fn column_name(&self) -> &String {
688        match self {
689            SetIndexOption::Fulltext { column_name, .. } => column_name,
690            SetIndexOption::Inverted { column_name } => column_name,
691            SetIndexOption::Skipping { column_name, .. } => column_name,
692        }
693    }
694
695    /// Returns true if the index option is fulltext.
696    pub fn is_fulltext(&self) -> bool {
697        match self {
698            SetIndexOption::Fulltext { .. } => true,
699            SetIndexOption::Inverted { .. } => false,
700            SetIndexOption::Skipping { .. } => false,
701        }
702    }
703}
704
705impl TryFrom<v1::SetIndex> for SetIndexOption {
706    type Error = MetadataError;
707
708    fn try_from(value: v1::SetIndex) -> Result<Self> {
709        let option = value.options.context(InvalidRawRegionRequestSnafu {
710            err: "missing options in SetIndex",
711        })?;
712
713        let opt = match option {
714            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
715                column_name: x.column_name.clone(),
716                options: FulltextOptions::new(
717                    x.enable,
718                    as_fulltext_option_analyzer(
719                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
720                    ),
721                    x.case_sensitive,
722                    as_fulltext_option_backend(
723                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
724                    ),
725                    x.granularity as u32,
726                    x.false_positive_rate,
727                )
728                .context(InvalidIndexOptionSnafu)?,
729            },
730            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
731                column_name: i.column_name,
732            },
733            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
734                column_name: s.column_name,
735                options: SkippingIndexOptions::new(
736                    s.granularity as u32,
737                    s.false_positive_rate,
738                    as_skipping_index_type(
739                        PbSkippingIndexType::try_from(s.skipping_index_type)
740                            .context(DecodeProtoSnafu)?,
741                    ),
742                )
743                .context(InvalidIndexOptionSnafu)?,
744            },
745        };
746
747        Ok(opt)
748    }
749}
750
751#[derive(Debug, PartialEq, Eq, Clone)]
752pub enum UnsetIndexOption {
753    Fulltext { column_name: String },
754    Inverted { column_name: String },
755    Skipping { column_name: String },
756}
757
758impl UnsetIndexOption {
759    pub fn column_name(&self) -> &String {
760        match self {
761            UnsetIndexOption::Fulltext { column_name } => column_name,
762            UnsetIndexOption::Inverted { column_name } => column_name,
763            UnsetIndexOption::Skipping { column_name } => column_name,
764        }
765    }
766
767    pub fn is_fulltext(&self) -> bool {
768        match self {
769            UnsetIndexOption::Fulltext { .. } => true,
770            UnsetIndexOption::Inverted { .. } => false,
771            UnsetIndexOption::Skipping { .. } => false,
772        }
773    }
774}
775
776impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
777    type Error = MetadataError;
778
779    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
780        let option = value.options.context(InvalidRawRegionRequestSnafu {
781            err: "missing options in UnsetIndex",
782        })?;
783
784        let opt = match option {
785            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
786                column_name: f.column_name,
787            },
788            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
789                column_name: i.column_name,
790            },
791            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
792                column_name: s.column_name,
793            },
794        };
795
796        Ok(opt)
797    }
798}
799
800impl AlterKind {
801    /// Returns an error if the alter kind is invalid.
802    ///
803    /// It allows adding column if not exists and dropping column if exists.
804    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
805        match self {
806            AlterKind::AddColumns { columns } => {
807                for col_to_add in columns {
808                    col_to_add.validate(metadata)?;
809                }
810            }
811            AlterKind::DropColumns { names } => {
812                for name in names {
813                    Self::validate_column_to_drop(name, metadata)?;
814                }
815            }
816            AlterKind::ModifyColumnTypes { columns } => {
817                for col_to_change in columns {
818                    col_to_change.validate(metadata)?;
819                }
820            }
821            AlterKind::SetRegionOptions { .. } => {}
822            AlterKind::UnsetRegionOptions { .. } => {}
823            AlterKind::SetIndexes { options } => {
824                for option in options {
825                    Self::validate_column_alter_index_option(
826                        option.column_name(),
827                        metadata,
828                        option.is_fulltext(),
829                    )?;
830                }
831            }
832            AlterKind::UnsetIndexes { options } => {
833                for option in options {
834                    Self::validate_column_alter_index_option(
835                        option.column_name(),
836                        metadata,
837                        option.is_fulltext(),
838                    )?;
839                }
840            }
841            AlterKind::DropDefaults { names } => {
842                names
843                    .iter()
844                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
845            }
846            AlterKind::SetDefaults { columns } => {
847                columns
848                    .iter()
849                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
850            }
851            AlterKind::SyncColumns { column_metadatas } => {
852                let new_primary_keys = column_metadatas
853                    .iter()
854                    .filter(|c| c.semantic_type == SemanticType::Tag)
855                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
856                    .collect::<HashMap<_, _>>();
857
858                let old_primary_keys = metadata
859                    .column_metadatas
860                    .iter()
861                    .filter(|c| c.semantic_type == SemanticType::Tag)
862                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
863
864                for (name, id) in old_primary_keys {
865                    let primary_key =
866                        new_primary_keys
867                            .get(name)
868                            .with_context(|| InvalidRegionRequestSnafu {
869                                region_id: metadata.region_id,
870                                err: format!("column {} is not a primary key", name),
871                            })?;
872
873                    ensure!(
874                        *primary_key == id,
875                        InvalidRegionRequestSnafu {
876                            region_id: metadata.region_id,
877                            err: format!(
878                                "column with same name {} has different id, existing: {}, got: {}",
879                                name, id, primary_key
880                            ),
881                        }
882                    );
883                }
884
885                let new_ts_column = column_metadatas
886                    .iter()
887                    .find(|c| c.semantic_type == SemanticType::Timestamp)
888                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
889                    .context(InvalidRegionRequestSnafu {
890                        region_id: metadata.region_id,
891                        err: "timestamp column not found",
892                    })?;
893
894                // Safety: timestamp column must exist.
895                let old_ts_column = metadata
896                    .column_metadatas
897                    .iter()
898                    .find(|c| c.semantic_type == SemanticType::Timestamp)
899                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
900                    .unwrap();
901
902                ensure!(
903                    new_ts_column == old_ts_column,
904                    InvalidRegionRequestSnafu {
905                        region_id: metadata.region_id,
906                        err: format!(
907                            "timestamp column {} has different id, existing: {}, got: {}",
908                            old_ts_column.0, old_ts_column.1, new_ts_column.1
909                        ),
910                    }
911                );
912            }
913        }
914        Ok(())
915    }
916
917    /// Returns true if we need to apply the alteration to the region.
918    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
919        debug_assert!(self.validate(metadata).is_ok());
920        match self {
921            AlterKind::AddColumns { columns } => columns
922                .iter()
923                .any(|col_to_add| col_to_add.need_alter(metadata)),
924            AlterKind::DropColumns { names } => names
925                .iter()
926                .any(|name| metadata.column_by_name(name).is_some()),
927            AlterKind::ModifyColumnTypes { columns } => columns
928                .iter()
929                .any(|col_to_change| col_to_change.need_alter(metadata)),
930            AlterKind::SetRegionOptions { .. } => true,
931            AlterKind::UnsetRegionOptions { .. } => true,
932            AlterKind::SetIndexes { options, .. } => options
933                .iter()
934                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
935            AlterKind::UnsetIndexes { options } => options
936                .iter()
937                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
938            AlterKind::DropDefaults { names } => names
939                .iter()
940                .any(|name| metadata.column_by_name(name).is_some()),
941
942            AlterKind::SetDefaults { columns } => columns
943                .iter()
944                .any(|x| metadata.column_by_name(&x.name).is_some()),
945            AlterKind::SyncColumns { column_metadatas } => {
946                metadata.column_metadatas != *column_metadatas
947            }
948        }
949    }
950
951    /// Returns an error if the column to drop is invalid.
952    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
953        let Some(column) = metadata.column_by_name(name) else {
954            return Ok(());
955        };
956        ensure!(
957            column.semantic_type == SemanticType::Field,
958            InvalidRegionRequestSnafu {
959                region_id: metadata.region_id,
960                err: format!("column {} is not a field and could not be dropped", name),
961            }
962        );
963        Ok(())
964    }
965
966    /// Returns an error if the column's alter index option is invalid.
967    fn validate_column_alter_index_option(
968        column_name: &String,
969        metadata: &RegionMetadata,
970        is_fulltext: bool,
971    ) -> Result<()> {
972        let column = metadata
973            .column_by_name(column_name)
974            .context(InvalidRegionRequestSnafu {
975                region_id: metadata.region_id,
976                err: format!("column {} not found", column_name),
977            })?;
978
979        if is_fulltext {
980            ensure!(
981                column.column_schema.data_type.is_string(),
982                InvalidRegionRequestSnafu {
983                    region_id: metadata.region_id,
984                    err: format!(
985                        "cannot change alter index options for non-string column {}",
986                        column_name
987                    ),
988                }
989            );
990        }
991
992        Ok(())
993    }
994
995    /// Returns an error if the column isn't exist.
996    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
997        metadata
998            .column_by_name(column_name)
999            .context(InvalidRegionRequestSnafu {
1000                region_id: metadata.region_id,
1001                err: format!("column {} not found", column_name),
1002            })?;
1003
1004        Ok(())
1005    }
1006}
1007
1008impl TryFrom<alter_request::Kind> for AlterKind {
1009    type Error = MetadataError;
1010
1011    fn try_from(kind: alter_request::Kind) -> Result<Self> {
1012        let alter_kind = match kind {
1013            alter_request::Kind::AddColumns(x) => {
1014                let columns = x
1015                    .add_columns
1016                    .into_iter()
1017                    .map(|x| x.try_into())
1018                    .collect::<Result<Vec<_>>>()?;
1019                AlterKind::AddColumns { columns }
1020            }
1021            alter_request::Kind::ModifyColumnTypes(x) => {
1022                let columns = x
1023                    .modify_column_types
1024                    .into_iter()
1025                    .map(|x| x.into())
1026                    .collect::<Vec<_>>();
1027                AlterKind::ModifyColumnTypes { columns }
1028            }
1029            alter_request::Kind::DropColumns(x) => {
1030                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1031                AlterKind::DropColumns { names }
1032            }
1033            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1034                options: options
1035                    .table_options
1036                    .iter()
1037                    .map(TryFrom::try_from)
1038                    .collect::<Result<Vec<_>>>()?,
1039            },
1040            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1041                keys: options
1042                    .keys
1043                    .iter()
1044                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1045                    .collect::<Result<Vec<_>>>()?,
1046            },
1047            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1048                options: vec![SetIndexOption::try_from(o)?],
1049            },
1050            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1051                options: vec![UnsetIndexOption::try_from(o)?],
1052            },
1053            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1054                options: o
1055                    .set_indexes
1056                    .into_iter()
1057                    .map(SetIndexOption::try_from)
1058                    .collect::<Result<Vec<_>>>()?,
1059            },
1060            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1061                options: o
1062                    .unset_indexes
1063                    .into_iter()
1064                    .map(UnsetIndexOption::try_from)
1065                    .collect::<Result<Vec<_>>>()?,
1066            },
1067            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1068                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1069            },
1070            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1071                columns: x
1072                    .set_defaults
1073                    .into_iter()
1074                    .map(|x| {
1075                        Ok(SetDefault {
1076                            name: x.column_name,
1077                            default_constraint: x.default_constraint.clone(),
1078                        })
1079                    })
1080                    .collect::<Result<Vec<_>>>()?,
1081            },
1082            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1083                column_metadatas: x
1084                    .column_defs
1085                    .into_iter()
1086                    .map(ColumnMetadata::try_from_column_def)
1087                    .collect::<Result<Vec<_>>>()?,
1088            },
1089        };
1090
1091        Ok(alter_kind)
1092    }
1093}
1094
1095/// Adds a column.
1096#[derive(Debug, PartialEq, Eq, Clone)]
1097pub struct AddColumn {
1098    /// Metadata of the column to add.
1099    pub column_metadata: ColumnMetadata,
1100    /// Location to add the column. If location is None, the region adds
1101    /// the column to the last.
1102    pub location: Option<AddColumnLocation>,
1103}
1104
1105impl AddColumn {
1106    /// Returns an error if the column to add is invalid.
1107    ///
1108    /// It allows adding existing columns. However, the existing column must have the same metadata
1109    /// and the location must be None.
1110    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1111        ensure!(
1112            self.column_metadata.column_schema.is_nullable()
1113                || self
1114                    .column_metadata
1115                    .column_schema
1116                    .default_constraint()
1117                    .is_some(),
1118            InvalidRegionRequestSnafu {
1119                region_id: metadata.region_id,
1120                err: format!(
1121                    "no default value for column {}",
1122                    self.column_metadata.column_schema.name
1123                ),
1124            }
1125        );
1126
1127        if let Some(existing_column) =
1128            metadata.column_by_name(&self.column_metadata.column_schema.name)
1129        {
1130            // If the column already exists.
1131            ensure!(
1132                *existing_column == self.column_metadata,
1133                InvalidRegionRequestSnafu {
1134                    region_id: metadata.region_id,
1135                    err: format!(
1136                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1137                        self.column_metadata.column_schema.name,
1138                        existing_column,
1139                        self.column_metadata,
1140                    ),
1141                }
1142            );
1143            ensure!(
1144                self.location.is_none(),
1145                InvalidRegionRequestSnafu {
1146                    region_id: metadata.region_id,
1147                    err: format!(
1148                        "column {} already exists, but location is specified",
1149                        self.column_metadata.column_schema.name
1150                    ),
1151                }
1152            );
1153        }
1154
1155        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1156            // Ensures the existing column has the same name.
1157            ensure!(
1158                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1159                InvalidRegionRequestSnafu {
1160                    region_id: metadata.region_id,
1161                    err: format!(
1162                        "column id {} already exists with different name {}",
1163                        self.column_metadata.column_id, existing_column.column_schema.name
1164                    ),
1165                }
1166            );
1167        }
1168
1169        Ok(())
1170    }
1171
1172    /// Returns true if no column to add to the region.
1173    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1174        debug_assert!(self.validate(metadata).is_ok());
1175        metadata
1176            .column_by_name(&self.column_metadata.column_schema.name)
1177            .is_none()
1178    }
1179}
1180
1181impl TryFrom<v1::region::AddColumn> for AddColumn {
1182    type Error = MetadataError;
1183
1184    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1185        let column_def = add_column
1186            .column_def
1187            .context(InvalidRawRegionRequestSnafu {
1188                err: "missing column_def in AddColumn",
1189            })?;
1190
1191        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1192        let location = add_column
1193            .location
1194            .map(AddColumnLocation::try_from)
1195            .transpose()?;
1196
1197        Ok(AddColumn {
1198            column_metadata,
1199            location,
1200        })
1201    }
1202}
1203
1204/// Location to add a column.
1205#[derive(Debug, PartialEq, Eq, Clone)]
1206pub enum AddColumnLocation {
1207    /// Add the column to the first position of columns.
1208    First,
1209    /// Add the column after specific column.
1210    After {
1211        /// Add the column after this column.
1212        column_name: String,
1213    },
1214}
1215
1216impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1217    type Error = MetadataError;
1218
1219    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1220        let location_type = LocationType::try_from(location.location_type)
1221            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1222        let add_column_location = match location_type {
1223            LocationType::First => AddColumnLocation::First,
1224            LocationType::After => AddColumnLocation::After {
1225                column_name: location.after_column_name,
1226            },
1227        };
1228
1229        Ok(add_column_location)
1230    }
1231}
1232
1233/// Change a column's datatype.
1234#[derive(Debug, PartialEq, Eq, Clone)]
1235pub struct ModifyColumnType {
1236    /// Schema of the column to modify.
1237    pub column_name: String,
1238    /// Column will be changed to this type.
1239    pub target_type: ConcreteDataType,
1240}
1241
1242impl ModifyColumnType {
1243    /// Returns an error if the column's datatype to change is invalid.
1244    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1245        let column_meta = metadata
1246            .column_by_name(&self.column_name)
1247            .with_context(|| InvalidRegionRequestSnafu {
1248                region_id: metadata.region_id,
1249                err: format!("column {} not found", self.column_name),
1250            })?;
1251
1252        ensure!(
1253            matches!(column_meta.semantic_type, SemanticType::Field),
1254            InvalidRegionRequestSnafu {
1255                region_id: metadata.region_id,
1256                err: "'timestamp' or 'tag' column cannot change type".to_string()
1257            }
1258        );
1259        ensure!(
1260            column_meta
1261                .column_schema
1262                .data_type
1263                .can_arrow_type_cast_to(&self.target_type),
1264            InvalidRegionRequestSnafu {
1265                region_id: metadata.region_id,
1266                err: format!(
1267                    "column '{}' cannot be cast automatically to type '{}'",
1268                    self.column_name, self.target_type
1269                ),
1270            }
1271        );
1272
1273        Ok(())
1274    }
1275
1276    /// Returns true if no column's datatype to change to the region.
1277    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1278        debug_assert!(self.validate(metadata).is_ok());
1279        metadata.column_by_name(&self.column_name).is_some()
1280    }
1281}
1282
1283impl From<v1::ModifyColumnType> for ModifyColumnType {
1284    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1285        let target_type = ColumnDataTypeWrapper::new(
1286            modify_column_type.target_type(),
1287            modify_column_type.target_type_extension,
1288        )
1289        .into();
1290
1291        ModifyColumnType {
1292            column_name: modify_column_type.column_name,
1293            target_type,
1294        }
1295    }
1296}
1297
1298#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1299pub enum SetRegionOption {
1300    Ttl(Option<TimeToLive>),
1301    // Modifying TwscOptions with values as (option name, new value).
1302    Twsc(String, String),
1303    // Modifying the SST format.
1304    Format(String),
1305}
1306
1307impl TryFrom<&PbOption> for SetRegionOption {
1308    type Error = MetadataError;
1309
1310    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1311        let PbOption { key, value } = value;
1312        match key.as_str() {
1313            TTL_KEY => {
1314                let ttl = TimeToLive::from_humantime_or_str(value)
1315                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1316
1317                Ok(Self::Ttl(Some(ttl)))
1318            }
1319            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1320                Ok(Self::Twsc(key.clone(), value.clone()))
1321            }
1322            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1323            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1324        }
1325    }
1326}
1327
1328impl From<&UnsetRegionOption> for SetRegionOption {
1329    fn from(unset_option: &UnsetRegionOption) -> Self {
1330        match unset_option {
1331            UnsetRegionOption::TwcsTriggerFileNum => {
1332                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1333            }
1334            UnsetRegionOption::TwcsMaxOutputFileSize => {
1335                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1336            }
1337            UnsetRegionOption::TwcsTimeWindow => {
1338                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1339            }
1340            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1341        }
1342    }
1343}
1344
1345impl TryFrom<&str> for UnsetRegionOption {
1346    type Error = MetadataError;
1347
1348    fn try_from(key: &str) -> Result<Self> {
1349        match key.to_ascii_lowercase().as_str() {
1350            TTL_KEY => Ok(Self::Ttl),
1351            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1352            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1353            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1354            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1355        }
1356    }
1357}
1358
1359#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1360pub enum UnsetRegionOption {
1361    TwcsTriggerFileNum,
1362    TwcsMaxOutputFileSize,
1363    TwcsTimeWindow,
1364    Ttl,
1365}
1366
1367impl UnsetRegionOption {
1368    pub fn as_str(&self) -> &str {
1369        match self {
1370            Self::Ttl => TTL_KEY,
1371            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1372            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1373            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1374        }
1375    }
1376}
1377
1378impl Display for UnsetRegionOption {
1379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1380        write!(f, "{}", self.as_str())
1381    }
1382}
1383
1384#[derive(Debug, Clone, Default)]
1385pub struct RegionFlushRequest {
1386    pub row_group_size: Option<usize>,
1387}
1388
1389#[derive(Debug)]
1390pub struct RegionCompactRequest {
1391    pub options: compact_request::Options,
1392    pub parallelism: Option<u32>,
1393}
1394
1395impl Default for RegionCompactRequest {
1396    fn default() -> Self {
1397        Self {
1398            // Default to regular compaction.
1399            options: compact_request::Options::Regular(Default::default()),
1400            parallelism: None,
1401        }
1402    }
1403}
1404
1405#[derive(Debug, Clone, Default)]
1406pub struct RegionBuildIndexRequest {}
1407
1408/// Truncate region request.
1409#[derive(Debug)]
1410pub enum RegionTruncateRequest {
1411    /// Truncate all data in the region.
1412    All,
1413    ByTimeRanges {
1414        /// Time ranges to truncate. Both bound are inclusive.
1415        /// only files that are fully contained in the time range will be truncated.
1416        /// so no guarantee that all data in the time range will be truncated.
1417        time_ranges: Vec<(Timestamp, Timestamp)>,
1418    },
1419}
1420
1421/// Catchup region request.
1422///
1423/// Makes a readonly region to catch up to leader region changes.
1424/// There is no effect if it operating on a leader region.
1425#[derive(Debug, Clone, Copy, Default)]
1426pub struct RegionCatchupRequest {
1427    /// Sets it to writable if it's available after it has caught up with all changes.
1428    pub set_writable: bool,
1429    /// The `entry_id` that was expected to reply to.
1430    /// `None` stands replaying to latest.
1431    pub entry_id: Option<entry::Id>,
1432    /// Used for metrics metadata region.
1433    /// The `entry_id` that was expected to reply to.
1434    /// `None` stands replaying to latest.
1435    pub metadata_entry_id: Option<entry::Id>,
1436    /// The hint for replaying memtable.
1437    pub location_id: Option<u64>,
1438    /// Replay checkpoint.
1439    pub checkpoint: Option<ReplayCheckpoint>,
1440}
1441
1442#[derive(Debug, Clone)]
1443pub struct RegionBulkInsertsRequest {
1444    pub region_id: RegionId,
1445    pub payload: DfRecordBatch,
1446    pub raw_data: ArrowIpc,
1447}
1448
1449impl RegionBulkInsertsRequest {
1450    pub fn estimated_size(&self) -> usize {
1451        self.payload.get_array_memory_size()
1452    }
1453}
1454
1455/// Request to stage a region with a new region rule(partition expression).
1456///
1457/// This request transitions a region into the staging mode.
1458/// It first flushes the memtable for the old region rule if it is not empty,
1459/// then enters the staging mode with the new region rule.
1460#[derive(Debug, Clone)]
1461pub struct EnterStagingRequest {
1462    /// The partition expression of the staging region.
1463    pub partition_expr: String,
1464}
1465
1466/// This request is used as part of the region repartition.
1467///
1468/// After a region has entered staging mode with a new region rule (partition
1469/// expression) and a separate process (for example, `remap_manifests`) has
1470/// generated the new file assignments for the staging region, this request
1471/// applies that generated manifest to the region.
1472///
1473/// In practice, this means:
1474/// - The `partition_expr` identifies the staging region rule that the manifest
1475///   was generated for.
1476/// - `central_region_id` specifies which region holds the staging blob storage
1477///   where the manifest was written during the `remap_manifests` operation.
1478/// - `manifest_path` is the relative path within the central region's staging
1479///   blob storage to fetch the generated manifest.
1480///
1481/// It should typically be called **after** the staging region has been
1482/// initialized by [`EnterStagingRequest`] and the new file layout has been
1483/// computed, to finalize the repartition operation.
1484#[derive(Debug, Clone)]
1485pub struct ApplyStagingManifestRequest {
1486    /// The partition expression of the staging region.
1487    pub partition_expr: String,
1488    /// The region that stores the staging manifests in its staging blob storage.
1489    pub central_region_id: RegionId,
1490    /// The relative path to the staging manifest within the central region's
1491    /// staging blob storage.
1492    pub manifest_path: String,
1493}
1494
1495impl fmt::Display for RegionRequest {
1496    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1497        match self {
1498            RegionRequest::Put(_) => write!(f, "Put"),
1499            RegionRequest::Delete(_) => write!(f, "Delete"),
1500            RegionRequest::Create(_) => write!(f, "Create"),
1501            RegionRequest::Drop(_) => write!(f, "Drop"),
1502            RegionRequest::Open(_) => write!(f, "Open"),
1503            RegionRequest::Close(_) => write!(f, "Close"),
1504            RegionRequest::Alter(_) => write!(f, "Alter"),
1505            RegionRequest::Flush(_) => write!(f, "Flush"),
1506            RegionRequest::Compact(_) => write!(f, "Compact"),
1507            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1508            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1509            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1510            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1511            RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1512            RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1513        }
1514    }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519
1520    use api::v1::region::RegionColumnDef;
1521    use api::v1::{ColumnDataType, ColumnDef};
1522    use datatypes::prelude::ConcreteDataType;
1523    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1524
1525    use super::*;
1526    use crate::metadata::RegionMetadataBuilder;
1527
1528    #[test]
1529    fn test_from_proto_location() {
1530        let proto_location = v1::AddColumnLocation {
1531            location_type: LocationType::First as i32,
1532            after_column_name: String::default(),
1533        };
1534        let location = AddColumnLocation::try_from(proto_location).unwrap();
1535        assert_eq!(location, AddColumnLocation::First);
1536
1537        let proto_location = v1::AddColumnLocation {
1538            location_type: 10,
1539            after_column_name: String::default(),
1540        };
1541        AddColumnLocation::try_from(proto_location).unwrap_err();
1542
1543        let proto_location = v1::AddColumnLocation {
1544            location_type: LocationType::After as i32,
1545            after_column_name: "a".to_string(),
1546        };
1547        let location = AddColumnLocation::try_from(proto_location).unwrap();
1548        assert_eq!(
1549            location,
1550            AddColumnLocation::After {
1551                column_name: "a".to_string()
1552            }
1553        );
1554    }
1555
1556    #[test]
1557    fn test_from_none_proto_add_column() {
1558        AddColumn::try_from(v1::region::AddColumn {
1559            column_def: None,
1560            location: None,
1561        })
1562        .unwrap_err();
1563    }
1564
1565    #[test]
1566    fn test_from_proto_alter_request() {
1567        RegionAlterRequest::try_from(AlterRequest {
1568            region_id: 0,
1569            schema_version: 1,
1570            kind: None,
1571        })
1572        .unwrap_err();
1573
1574        let request = RegionAlterRequest::try_from(AlterRequest {
1575            region_id: 0,
1576            schema_version: 1,
1577            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1578                add_columns: vec![v1::region::AddColumn {
1579                    column_def: Some(RegionColumnDef {
1580                        column_def: Some(ColumnDef {
1581                            name: "a".to_string(),
1582                            data_type: ColumnDataType::String as i32,
1583                            is_nullable: true,
1584                            default_constraint: vec![],
1585                            semantic_type: SemanticType::Field as i32,
1586                            comment: String::new(),
1587                            ..Default::default()
1588                        }),
1589                        column_id: 1,
1590                    }),
1591                    location: Some(v1::AddColumnLocation {
1592                        location_type: LocationType::First as i32,
1593                        after_column_name: String::default(),
1594                    }),
1595                }],
1596            })),
1597        })
1598        .unwrap();
1599
1600        assert_eq!(
1601            request,
1602            RegionAlterRequest {
1603                kind: AlterKind::AddColumns {
1604                    columns: vec![AddColumn {
1605                        column_metadata: ColumnMetadata {
1606                            column_schema: ColumnSchema::new(
1607                                "a",
1608                                ConcreteDataType::string_datatype(),
1609                                true,
1610                            ),
1611                            semantic_type: SemanticType::Field,
1612                            column_id: 1,
1613                        },
1614                        location: Some(AddColumnLocation::First),
1615                    }]
1616                },
1617            }
1618        );
1619    }
1620
1621    /// Returns a new region metadata for testing. Metadata:
1622    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1623    fn new_metadata() -> RegionMetadata {
1624        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1625        builder
1626            .push_column_metadata(ColumnMetadata {
1627                column_schema: ColumnSchema::new(
1628                    "ts",
1629                    ConcreteDataType::timestamp_millisecond_datatype(),
1630                    false,
1631                ),
1632                semantic_type: SemanticType::Timestamp,
1633                column_id: 1,
1634            })
1635            .push_column_metadata(ColumnMetadata {
1636                column_schema: ColumnSchema::new(
1637                    "tag_0",
1638                    ConcreteDataType::string_datatype(),
1639                    true,
1640                ),
1641                semantic_type: SemanticType::Tag,
1642                column_id: 2,
1643            })
1644            .push_column_metadata(ColumnMetadata {
1645                column_schema: ColumnSchema::new(
1646                    "field_0",
1647                    ConcreteDataType::string_datatype(),
1648                    true,
1649                ),
1650                semantic_type: SemanticType::Field,
1651                column_id: 3,
1652            })
1653            .push_column_metadata(ColumnMetadata {
1654                column_schema: ColumnSchema::new(
1655                    "field_1",
1656                    ConcreteDataType::boolean_datatype(),
1657                    true,
1658                ),
1659                semantic_type: SemanticType::Field,
1660                column_id: 4,
1661            })
1662            .primary_key(vec![2]);
1663        builder.build().unwrap()
1664    }
1665
1666    #[test]
1667    fn test_add_column_validate() {
1668        let metadata = new_metadata();
1669        let add_column = AddColumn {
1670            column_metadata: ColumnMetadata {
1671                column_schema: ColumnSchema::new(
1672                    "tag_1",
1673                    ConcreteDataType::string_datatype(),
1674                    true,
1675                ),
1676                semantic_type: SemanticType::Tag,
1677                column_id: 5,
1678            },
1679            location: None,
1680        };
1681        add_column.validate(&metadata).unwrap();
1682        assert!(add_column.need_alter(&metadata));
1683
1684        // Add not null column.
1685        AddColumn {
1686            column_metadata: ColumnMetadata {
1687                column_schema: ColumnSchema::new(
1688                    "tag_1",
1689                    ConcreteDataType::string_datatype(),
1690                    false,
1691                ),
1692                semantic_type: SemanticType::Tag,
1693                column_id: 5,
1694            },
1695            location: None,
1696        }
1697        .validate(&metadata)
1698        .unwrap_err();
1699
1700        // Add existing column.
1701        let add_column = AddColumn {
1702            column_metadata: ColumnMetadata {
1703                column_schema: ColumnSchema::new(
1704                    "tag_0",
1705                    ConcreteDataType::string_datatype(),
1706                    true,
1707                ),
1708                semantic_type: SemanticType::Tag,
1709                column_id: 2,
1710            },
1711            location: None,
1712        };
1713        add_column.validate(&metadata).unwrap();
1714        assert!(!add_column.need_alter(&metadata));
1715    }
1716
1717    #[test]
1718    fn test_add_duplicate_columns() {
1719        let kind = AlterKind::AddColumns {
1720            columns: vec![
1721                AddColumn {
1722                    column_metadata: ColumnMetadata {
1723                        column_schema: ColumnSchema::new(
1724                            "tag_1",
1725                            ConcreteDataType::string_datatype(),
1726                            true,
1727                        ),
1728                        semantic_type: SemanticType::Tag,
1729                        column_id: 5,
1730                    },
1731                    location: None,
1732                },
1733                AddColumn {
1734                    column_metadata: ColumnMetadata {
1735                        column_schema: ColumnSchema::new(
1736                            "tag_1",
1737                            ConcreteDataType::string_datatype(),
1738                            true,
1739                        ),
1740                        semantic_type: SemanticType::Field,
1741                        column_id: 6,
1742                    },
1743                    location: None,
1744                },
1745            ],
1746        };
1747        let metadata = new_metadata();
1748        kind.validate(&metadata).unwrap();
1749        assert!(kind.need_alter(&metadata));
1750    }
1751
1752    #[test]
1753    fn test_add_existing_column_different_metadata() {
1754        let metadata = new_metadata();
1755
1756        // Add existing column with different id.
1757        let kind = AlterKind::AddColumns {
1758            columns: vec![AddColumn {
1759                column_metadata: ColumnMetadata {
1760                    column_schema: ColumnSchema::new(
1761                        "tag_0",
1762                        ConcreteDataType::string_datatype(),
1763                        true,
1764                    ),
1765                    semantic_type: SemanticType::Tag,
1766                    column_id: 4,
1767                },
1768                location: None,
1769            }],
1770        };
1771        kind.validate(&metadata).unwrap_err();
1772
1773        // Add existing column with different type.
1774        let kind = AlterKind::AddColumns {
1775            columns: vec![AddColumn {
1776                column_metadata: ColumnMetadata {
1777                    column_schema: ColumnSchema::new(
1778                        "tag_0",
1779                        ConcreteDataType::int64_datatype(),
1780                        true,
1781                    ),
1782                    semantic_type: SemanticType::Tag,
1783                    column_id: 2,
1784                },
1785                location: None,
1786            }],
1787        };
1788        kind.validate(&metadata).unwrap_err();
1789
1790        // Add existing column with different name.
1791        let kind = AlterKind::AddColumns {
1792            columns: vec![AddColumn {
1793                column_metadata: ColumnMetadata {
1794                    column_schema: ColumnSchema::new(
1795                        "tag_1",
1796                        ConcreteDataType::string_datatype(),
1797                        true,
1798                    ),
1799                    semantic_type: SemanticType::Tag,
1800                    column_id: 2,
1801                },
1802                location: None,
1803            }],
1804        };
1805        kind.validate(&metadata).unwrap_err();
1806    }
1807
1808    #[test]
1809    fn test_add_existing_column_with_location() {
1810        let metadata = new_metadata();
1811        let kind = AlterKind::AddColumns {
1812            columns: vec![AddColumn {
1813                column_metadata: ColumnMetadata {
1814                    column_schema: ColumnSchema::new(
1815                        "tag_0",
1816                        ConcreteDataType::string_datatype(),
1817                        true,
1818                    ),
1819                    semantic_type: SemanticType::Tag,
1820                    column_id: 2,
1821                },
1822                location: Some(AddColumnLocation::First),
1823            }],
1824        };
1825        kind.validate(&metadata).unwrap_err();
1826    }
1827
1828    #[test]
1829    fn test_validate_drop_column() {
1830        let metadata = new_metadata();
1831        let kind = AlterKind::DropColumns {
1832            names: vec!["xxxx".to_string()],
1833        };
1834        kind.validate(&metadata).unwrap();
1835        assert!(!kind.need_alter(&metadata));
1836
1837        AlterKind::DropColumns {
1838            names: vec!["tag_0".to_string()],
1839        }
1840        .validate(&metadata)
1841        .unwrap_err();
1842
1843        let kind = AlterKind::DropColumns {
1844            names: vec!["field_0".to_string()],
1845        };
1846        kind.validate(&metadata).unwrap();
1847        assert!(kind.need_alter(&metadata));
1848    }
1849
1850    #[test]
1851    fn test_validate_modify_column_type() {
1852        let metadata = new_metadata();
1853        AlterKind::ModifyColumnTypes {
1854            columns: vec![ModifyColumnType {
1855                column_name: "xxxx".to_string(),
1856                target_type: ConcreteDataType::string_datatype(),
1857            }],
1858        }
1859        .validate(&metadata)
1860        .unwrap_err();
1861
1862        AlterKind::ModifyColumnTypes {
1863            columns: vec![ModifyColumnType {
1864                column_name: "field_1".to_string(),
1865                target_type: ConcreteDataType::date_datatype(),
1866            }],
1867        }
1868        .validate(&metadata)
1869        .unwrap_err();
1870
1871        AlterKind::ModifyColumnTypes {
1872            columns: vec![ModifyColumnType {
1873                column_name: "ts".to_string(),
1874                target_type: ConcreteDataType::date_datatype(),
1875            }],
1876        }
1877        .validate(&metadata)
1878        .unwrap_err();
1879
1880        AlterKind::ModifyColumnTypes {
1881            columns: vec![ModifyColumnType {
1882                column_name: "tag_0".to_string(),
1883                target_type: ConcreteDataType::date_datatype(),
1884            }],
1885        }
1886        .validate(&metadata)
1887        .unwrap_err();
1888
1889        let kind = AlterKind::ModifyColumnTypes {
1890            columns: vec![ModifyColumnType {
1891                column_name: "field_0".to_string(),
1892                target_type: ConcreteDataType::int32_datatype(),
1893            }],
1894        };
1895        kind.validate(&metadata).unwrap();
1896        assert!(kind.need_alter(&metadata));
1897    }
1898
1899    #[test]
1900    fn test_validate_add_columns() {
1901        let kind = AlterKind::AddColumns {
1902            columns: vec![
1903                AddColumn {
1904                    column_metadata: ColumnMetadata {
1905                        column_schema: ColumnSchema::new(
1906                            "tag_1",
1907                            ConcreteDataType::string_datatype(),
1908                            true,
1909                        ),
1910                        semantic_type: SemanticType::Tag,
1911                        column_id: 5,
1912                    },
1913                    location: None,
1914                },
1915                AddColumn {
1916                    column_metadata: ColumnMetadata {
1917                        column_schema: ColumnSchema::new(
1918                            "field_2",
1919                            ConcreteDataType::string_datatype(),
1920                            true,
1921                        ),
1922                        semantic_type: SemanticType::Field,
1923                        column_id: 6,
1924                    },
1925                    location: None,
1926                },
1927            ],
1928        };
1929        let request = RegionAlterRequest { kind };
1930        let mut metadata = new_metadata();
1931        metadata.schema_version = 1;
1932        request.validate(&metadata).unwrap();
1933    }
1934
1935    #[test]
1936    fn test_validate_create_region() {
1937        let column_metadatas = vec![
1938            ColumnMetadata {
1939                column_schema: ColumnSchema::new(
1940                    "ts",
1941                    ConcreteDataType::timestamp_millisecond_datatype(),
1942                    false,
1943                ),
1944                semantic_type: SemanticType::Timestamp,
1945                column_id: 1,
1946            },
1947            ColumnMetadata {
1948                column_schema: ColumnSchema::new(
1949                    "tag_0",
1950                    ConcreteDataType::string_datatype(),
1951                    true,
1952                ),
1953                semantic_type: SemanticType::Tag,
1954                column_id: 2,
1955            },
1956            ColumnMetadata {
1957                column_schema: ColumnSchema::new(
1958                    "field_0",
1959                    ConcreteDataType::string_datatype(),
1960                    true,
1961                ),
1962                semantic_type: SemanticType::Field,
1963                column_id: 3,
1964            },
1965        ];
1966        let create = RegionCreateRequest {
1967            engine: "mito".to_string(),
1968            column_metadatas,
1969            primary_key: vec![3, 4],
1970            options: HashMap::new(),
1971            table_dir: "path".to_string(),
1972            path_type: PathType::Bare,
1973            partition_expr_json: Some("".to_string()),
1974        };
1975
1976        assert!(create.validate().is_err());
1977    }
1978
1979    #[test]
1980    fn test_validate_modify_column_fulltext_options() {
1981        let kind = AlterKind::SetIndexes {
1982            options: vec![SetIndexOption::Fulltext {
1983                column_name: "tag_0".to_string(),
1984                options: FulltextOptions::new_unchecked(
1985                    true,
1986                    FulltextAnalyzer::Chinese,
1987                    false,
1988                    FulltextBackend::Bloom,
1989                    1000,
1990                    0.01,
1991                ),
1992            }],
1993        };
1994        let request = RegionAlterRequest { kind };
1995        let mut metadata = new_metadata();
1996        metadata.schema_version = 1;
1997        request.validate(&metadata).unwrap();
1998
1999        let kind = AlterKind::UnsetIndexes {
2000            options: vec![UnsetIndexOption::Fulltext {
2001                column_name: "tag_0".to_string(),
2002            }],
2003        };
2004        let request = RegionAlterRequest { kind };
2005        let mut metadata = new_metadata();
2006        metadata.schema_version = 1;
2007        request.validate(&metadata).unwrap();
2008    }
2009
2010    #[test]
2011    fn test_validate_sync_columns() {
2012        let metadata = new_metadata();
2013        let kind = AlterKind::SyncColumns {
2014            column_metadatas: vec![
2015                ColumnMetadata {
2016                    column_schema: ColumnSchema::new(
2017                        "tag_1",
2018                        ConcreteDataType::string_datatype(),
2019                        true,
2020                    ),
2021                    semantic_type: SemanticType::Tag,
2022                    column_id: 5,
2023                },
2024                ColumnMetadata {
2025                    column_schema: ColumnSchema::new(
2026                        "field_2",
2027                        ConcreteDataType::string_datatype(),
2028                        true,
2029                    ),
2030                    semantic_type: SemanticType::Field,
2031                    column_id: 6,
2032                },
2033            ],
2034        };
2035        let err = kind.validate(&metadata).unwrap_err();
2036        assert!(err.to_string().contains("not a primary key"));
2037
2038        // Change the timestamp column name.
2039        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2040        let ts_column = column_metadatas_with_different_ts_column
2041            .iter_mut()
2042            .find(|c| c.semantic_type == SemanticType::Timestamp)
2043            .unwrap();
2044        ts_column.column_schema.name = "ts1".to_string();
2045
2046        let kind = AlterKind::SyncColumns {
2047            column_metadatas: column_metadatas_with_different_ts_column,
2048        };
2049        let err = kind.validate(&metadata).unwrap_err();
2050        assert!(
2051            err.to_string()
2052                .contains("timestamp column ts has different id")
2053        );
2054
2055        // Change the primary key column name.
2056        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2057        let pk_column = column_metadatas_with_different_pk_column
2058            .iter_mut()
2059            .find(|c| c.column_schema.name == "tag_0")
2060            .unwrap();
2061        pk_column.column_id = 100;
2062        let kind = AlterKind::SyncColumns {
2063            column_metadatas: column_metadatas_with_different_pk_column,
2064        };
2065        let err = kind.validate(&metadata).unwrap_err();
2066        assert!(
2067            err.to_string()
2068                .contains("column with same name tag_0 has different id")
2069        );
2070
2071        // Add a new field column.
2072        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2073        column_metadatas_with_new_field_column.push(ColumnMetadata {
2074            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2075            semantic_type: SemanticType::Field,
2076            column_id: 4,
2077        });
2078        let kind = AlterKind::SyncColumns {
2079            column_metadatas: column_metadatas_with_new_field_column,
2080        };
2081        kind.validate(&metadata).unwrap();
2082    }
2083
2084    #[test]
2085    fn test_cast_path_type_to_primitive() {
2086        assert_eq!(PathType::Bare as u8, 0);
2087        assert_eq!(PathType::Data as u8, 1);
2088        assert_eq!(PathType::Metadata as u8, 2);
2089        assert_eq!(
2090            PathType::try_from(PathType::Bare as u8).unwrap(),
2091            PathType::Bare
2092        );
2093        assert_eq!(
2094            PathType::try_from(PathType::Data as u8).unwrap(),
2095            PathType::Data
2096        );
2097        assert_eq!(
2098            PathType::try_from(PathType::Metadata as u8).unwrap(),
2099            PathType::Metadata
2100        );
2101    }
2102}