Skip to main content

mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37    MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40    AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41    RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42    RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43    RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49    CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
50    Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu,
51    MissingPartitionExprSnafu, Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64/// Request to write a region.
65#[derive(Debug)]
66pub struct WriteRequest {
67    /// Region to write.
68    pub region_id: RegionId,
69    /// Type of the write request.
70    pub op_type: OpType,
71    /// Rows to write.
72    pub rows: Rows,
73    /// Map column name to column index in `rows`.
74    pub name_to_index: HashMap<String, usize>,
75    /// Whether each column has null.
76    pub has_null: Vec<bool>,
77    /// Write hint.
78    pub hint: Option<WriteHint>,
79    /// Region metadata on the time of this request is created.
80    pub(crate) region_metadata: Option<RegionMetadataRef>,
81    /// Partition expression version for the region.
82    pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86    /// Creates a new request.
87    ///
88    /// Returns `Err` if `rows` are invalid.
89    pub fn new(
90        region_id: RegionId,
91        op_type: OpType,
92        rows: Rows,
93        region_metadata: Option<RegionMetadataRef>,
94    ) -> Result<WriteRequest> {
95        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96        for (index, column) in rows.schema.iter().enumerate() {
97            ensure!(
98                name_to_index
99                    .insert(column.column_name.clone(), index)
100                    .is_none(),
101                InvalidRequestSnafu {
102                    region_id,
103                    reason: format!("duplicate column {}", column.column_name),
104                }
105            );
106        }
107
108        let mut has_null = vec![false; rows.schema.len()];
109        for row in &rows.rows {
110            ensure!(
111                row.values.len() == rows.schema.len(),
112                InvalidRequestSnafu {
113                    region_id,
114                    reason: format!(
115                        "row has {} columns but schema has {}",
116                        row.values.len(),
117                        rows.schema.len()
118                    ),
119                }
120            );
121
122            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123                validate_proto_value(region_id, value, column_schema)?;
124
125                if value.value_data.is_none() {
126                    has_null[i] = true;
127                }
128            }
129        }
130
131        Ok(WriteRequest {
132            region_id,
133            op_type,
134            rows,
135            name_to_index,
136            has_null,
137            hint: None,
138            region_metadata,
139            partition_expr_version: None,
140        })
141    }
142
143    /// Sets the write hint.
144    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145        self.hint = hint;
146        self
147    }
148
149    pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150        self.partition_expr_version = partition_expr_version;
151        self
152    }
153
154    /// Returns the encoding hint.
155    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156        infer_primary_key_encoding_from_hint(self.hint.as_ref())
157    }
158
159    /// Returns estimated size of the request.
160    pub(crate) fn estimated_size(&self) -> usize {
161        let row_size = self
162            .rows
163            .rows
164            .first()
165            .map(|row| row.encoded_len())
166            .unwrap_or(0);
167        row_size * self.rows.rows.len()
168    }
169
170    /// Gets column index by name.
171    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172        self.name_to_index.get(name).copied()
173    }
174
175    /// Checks schema of rows is compatible with schema of the region.
176    ///
177    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
178    /// error.
179    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180        debug_assert_eq!(self.region_id, metadata.region_id);
181
182        let region_id = self.region_id;
183        // Index all columns in rows.
184        let mut rows_columns: HashMap<_, _> = self
185            .rows
186            .schema
187            .iter()
188            .map(|column| (&column.column_name, column))
189            .collect();
190
191        let mut need_fill_default = false;
192        // Checks all columns in this region.
193        for column in &metadata.column_metadatas {
194            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195                // Check data type.
196                ensure!(
197                    is_column_type_value_eq(
198                        input_col.datatype,
199                        input_col.datatype_extension.clone(),
200                        &column.column_schema.data_type
201                    ),
202                    InvalidRequestSnafu {
203                        region_id,
204                        reason: format!(
205                            "column {} expect type {:?}, given: {}({})",
206                            column.column_schema.name,
207                            column.column_schema.data_type,
208                            ColumnDataType::try_from(input_col.datatype)
209                                .map(|v| v.as_str_name())
210                                .unwrap_or("Unknown"),
211                            input_col.datatype,
212                        )
213                    }
214                );
215
216                // Check semantic type.
217                ensure!(
218                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219                    InvalidRequestSnafu {
220                        region_id,
221                        reason: format!(
222                            "column {} has semantic type {:?}, given: {}({})",
223                            column.column_schema.name,
224                            column.semantic_type,
225                            api::v1::SemanticType::try_from(input_col.semantic_type)
226                                .map(|v| v.as_str_name())
227                                .unwrap_or("Unknown"),
228                            input_col.semantic_type
229                        ),
230                    }
231                );
232
233                // Check nullable.
234                // Safety: `rows_columns` ensures this column exists.
235                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236                ensure!(
237                    !has_null || column.column_schema.is_nullable(),
238                    InvalidRequestSnafu {
239                        region_id,
240                        reason: format!(
241                            "column {} is not null but input has null",
242                            column.column_schema.name
243                        ),
244                    }
245                );
246            } else {
247                // Rows don't have this column.
248                self.check_missing_column(column)?;
249
250                need_fill_default = true;
251            }
252        }
253
254        // Checks all columns in rows exist in the region.
255        if !rows_columns.is_empty() {
256            let names: Vec<_> = rows_columns.into_keys().collect();
257            return InvalidRequestSnafu {
258                region_id,
259                reason: format!("unknown columns: {:?}", names),
260            }
261            .fail();
262        }
263
264        // If we need to fill default values, return a special error.
265        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267        Ok(())
268    }
269
270    /// Tries to fill missing columns.
271    ///
272    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
273    /// values.
274    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275        debug_assert_eq!(self.region_id, metadata.region_id);
276
277        let mut columns_to_fill = vec![];
278        for column in &metadata.column_metadatas {
279            if !self.name_to_index.contains_key(&column.column_schema.name) {
280                columns_to_fill.push(column);
281            }
282        }
283        self.fill_columns(columns_to_fill)?;
284
285        Ok(())
286    }
287
288    /// Checks the schema and fill missing columns.
289    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290        if let Err(e) = self.check_schema(metadata) {
291            if e.is_fill_default() {
292                // TODO(yingwen): Add metrics for this case.
293                // We need to fill default value. The write request may be a request
294                // sent before changing the schema.
295                self.fill_missing_columns(metadata)?;
296            } else {
297                return Err(e);
298            }
299        }
300
301        Ok(())
302    }
303
304    /// Fills default value for specific `columns`.
305    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306        let mut default_values = Vec::with_capacity(columns.len());
307        let mut columns_to_fill = Vec::with_capacity(columns.len());
308        for column in columns {
309            let default_value = self.column_default_value(column)?;
310            if default_value.value_data.is_some() {
311                default_values.push(default_value);
312                columns_to_fill.push(column);
313            }
314        }
315
316        for row in &mut self.rows.rows {
317            row.values.extend(default_values.iter().cloned());
318        }
319
320        for column in columns_to_fill {
321            let (datatype, datatype_ext) =
322                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323                    .with_context(|_| ConvertColumnDataTypeSnafu {
324                        reason: format!(
325                            "no protobuf type for column {} ({:?})",
326                            column.column_schema.name, column.column_schema.data_type
327                        ),
328                    })?
329                    .to_parts();
330            self.rows.schema.push(ColumnSchema {
331                column_name: column.column_schema.name.clone(),
332                datatype: datatype as i32,
333                semantic_type: column.semantic_type as i32,
334                datatype_extension: datatype_ext,
335                options: options_from_column_schema(&column.column_schema),
336            });
337        }
338
339        Ok(())
340    }
341
342    /// Checks whether we should allow a row doesn't provide this column.
343    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344        if self.op_type == OpType::Delete {
345            if column.semantic_type == SemanticType::Field {
346                // For delete request, all tags and timestamp is required. We don't fill default
347                // tag or timestamp while deleting rows.
348                return Ok(());
349            } else {
350                return InvalidRequestSnafu {
351                    region_id: self.region_id,
352                    reason: format!("delete requests need column {}", column.column_schema.name),
353                }
354                .fail();
355            }
356        }
357
358        // Not a delete request. Checks whether they have default value.
359        ensure!(
360            column.column_schema.is_nullable()
361                || column.column_schema.default_constraint().is_some(),
362            InvalidRequestSnafu {
363                region_id: self.region_id,
364                reason: format!("missing column {}", column.column_schema.name),
365            }
366        );
367
368        Ok(())
369    }
370
371    /// Returns the default value for specific column.
372    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373        let default_value = match self.op_type {
374            OpType::Delete => {
375                ensure!(
376                    column.semantic_type == SemanticType::Field,
377                    InvalidRequestSnafu {
378                        region_id: self.region_id,
379                        reason: format!(
380                            "delete requests need column {}",
381                            column.column_schema.name
382                        ),
383                    }
384                );
385
386                // For delete request, we need a default value for padding so we
387                // can delete a row even a field doesn't have a default value. So the
388                // value doesn't need to following the default value constraint of the
389                // column.
390                if column.column_schema.is_nullable() {
391                    datatypes::value::Value::Null
392                } else {
393                    column.column_schema.data_type.default_value()
394                }
395            }
396            OpType::Put => {
397                // For put requests, we use the default value from column schema.
398                if column.column_schema.is_default_impure() {
399                    UnexpectedSnafu {
400                        reason: format!(
401                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402                            self.region_id,
403                            column.column_schema.name,
404                            column.column_schema.default_constraint(),
405                        ),
406                    }
407                    .fail()?
408                }
409                column
410                    .column_schema
411                    .create_default()
412                    .context(CreateDefaultSnafu {
413                        region_id: self.region_id,
414                        column: &column.column_schema.name,
415                    })?
416                    // This column doesn't have default value.
417                    .with_context(|| InvalidRequestSnafu {
418                        region_id: self.region_id,
419                        reason: format!(
420                            "column {} does not have default value",
421                            column.column_schema.name
422                        ),
423                    })?
424            }
425        };
426
427        // Convert default value into proto's value.
428        Ok(api::helper::to_grpc_value(default_value))
429    }
430}
431
432/// Validate proto value schema.
433pub(crate) fn validate_proto_value(
434    region_id: RegionId,
435    value: &Value,
436    column_schema: &ColumnSchema,
437) -> Result<()> {
438    if let Some(value_type) = proto_value_type(value) {
439        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440            InvalidRequestSnafu {
441                region_id,
442                reason: format!(
443                    "column {} has unknown type {}",
444                    column_schema.column_name, column_schema.datatype
445                ),
446            }
447            .build()
448        })?;
449        ensure!(
450            proto_value_type_match(column_type, value_type),
451            InvalidRequestSnafu {
452                region_id,
453                reason: format!(
454                    "value has type {:?}, but column {} has type {:?}({})",
455                    value_type, column_schema.column_name, column_type, column_schema.datatype,
456                ),
457            }
458        );
459    }
460
461    Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465    match (column_type, value_type) {
466        (ct, vt) if ct == vt => true,
467        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468        (ColumnDataType::Json, ColumnDataType::Binary) => true,
469        _ => false,
470    }
471}
472
473/// Oneshot output result sender.
474#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478    /// Creates a new output sender.
479    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480        OutputTx(sender)
481    }
482
483    /// Sends the `result`.
484    pub(crate) fn send(self, result: Result<AffectedRows>) {
485        // Ignores send result.
486        let _ = self.0.send(result);
487    }
488}
489
490/// Optional output result sender.
491#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495    /// Creates a sender.
496    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497        OptionOutputTx(sender)
498    }
499
500    /// Creates an empty sender.
501    pub(crate) fn none() -> OptionOutputTx {
502        OptionOutputTx(None)
503    }
504
505    /// Sends the `result` and consumes the inner sender.
506    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507        if let Some(sender) = self.0.take() {
508            sender.send(result);
509        }
510    }
511
512    /// Sends the `result` and consumes the sender.
513    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514        if let Some(sender) = self.0.take() {
515            sender.send(result);
516        }
517    }
518
519    /// Takes the inner sender.
520    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521        self.0.take()
522    }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527        Self::new(Some(OutputTx::new(sender)))
528    }
529}
530
531impl OnFailure for OptionOutputTx {
532    fn on_failure(&mut self, err: Error) {
533        self.send_mut(Err(err));
534    }
535}
536
537/// Callback on failure.
538pub(crate) trait OnFailure {
539    /// Handles `err` on failure.
540    fn on_failure(&mut self, err: Error);
541}
542
543/// Sender and write request.
544#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546    /// Result sender.
547    pub(crate) sender: OptionOutputTx,
548    pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552    pub(crate) sender: OptionOutputTx,
553    pub(crate) region_id: RegionId,
554    pub(crate) request: BulkPart,
555    pub(crate) region_metadata: RegionMetadataRef,
556    pub(crate) partition_expr_version: Option<u64>,
557}
558
559/// Request sent to a worker with timestamp
560#[derive(Debug)]
561pub(crate) struct WorkerRequestWithTime {
562    pub(crate) request: WorkerRequest,
563    pub(crate) created_at: Instant,
564}
565
566impl WorkerRequestWithTime {
567    pub(crate) fn new(request: WorkerRequest) -> Self {
568        Self {
569            request,
570            created_at: Instant::now(),
571        }
572    }
573}
574
575/// Request sent to a worker
576#[derive(Debug)]
577pub(crate) enum WorkerRequest {
578    /// Write to a region.
579    Write(SenderWriteRequest),
580
581    /// Ddl request to a region.
582    Ddl(SenderDdlRequest),
583
584    /// Notifications from internal background jobs.
585    Background {
586        /// Id of the region to send.
587        region_id: RegionId,
588        /// Internal notification.
589        notify: BackgroundNotify,
590    },
591
592    /// The internal commands.
593    SetRegionRoleStateGracefully {
594        /// Id of the region to send.
595        region_id: RegionId,
596        /// The [SettableRegionRoleState].
597        region_role_state: SettableRegionRoleState,
598        /// The sender of [SetReadonlyResponse].
599        sender: Sender<SetRegionRoleStateResponse>,
600    },
601
602    /// Notify a worker to stop.
603    Stop,
604
605    /// Use [RegionEdit] to edit a region directly.
606    EditRegion(RegionEditRequest),
607
608    /// Keep the manifest of a region up to date.
609    SyncRegion(RegionSyncRequest),
610
611    /// Bulk inserts request and region metadata.
612    BulkInserts {
613        metadata: Option<RegionMetadataRef>,
614        request: RegionBulkInsertsRequest,
615        sender: OptionOutputTx,
616    },
617
618    /// Remap manifests request.
619    RemapManifests(RemapManifestsRequest),
620
621    /// Copy region from request.
622    CopyRegionFrom(CopyRegionFromRequest),
623}
624
625impl WorkerRequest {
626    /// Creates a new open region request.
627    pub(crate) fn new_open_region_request(
628        region_id: RegionId,
629        request: RegionOpenRequest,
630        entry_receiver: Option<WalEntryReceiver>,
631    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
632        let (sender, receiver) = oneshot::channel();
633
634        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
635            region_id,
636            sender: sender.into(),
637            request: DdlRequest::Open((request, entry_receiver)),
638        });
639
640        (worker_request, receiver)
641    }
642
643    /// Creates a new catchup region request.
644    pub(crate) fn new_catchup_region_request(
645        region_id: RegionId,
646        request: RegionCatchupRequest,
647        entry_receiver: Option<WalEntryReceiver>,
648    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
649        let (sender, receiver) = oneshot::channel();
650        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
651            region_id,
652            sender: sender.into(),
653            request: DdlRequest::Catchup((request, entry_receiver)),
654        });
655        (worker_request, receiver)
656    }
657
658    /// Converts request from a [RegionRequest].
659    pub(crate) fn try_from_region_request(
660        region_id: RegionId,
661        value: RegionRequest,
662        region_metadata: Option<RegionMetadataRef>,
663    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
664        let (sender, receiver) = oneshot::channel();
665        let worker_request = match value {
666            RegionRequest::Put(v) => {
667                let mut write_request =
668                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
669                        .with_hint(v.hint)
670                        .with_partition_expr_version(v.partition_expr_version);
671                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
672                    && let Some(region_metadata) = &region_metadata
673                {
674                    write_request.maybe_fill_missing_columns(region_metadata)?;
675                }
676                WorkerRequest::Write(SenderWriteRequest {
677                    sender: sender.into(),
678                    request: write_request,
679                })
680            }
681            RegionRequest::Delete(v) => {
682                let mut write_request =
683                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
684                        .with_hint(v.hint)
685                        .with_partition_expr_version(v.partition_expr_version);
686                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
687                    && let Some(region_metadata) = &region_metadata
688                {
689                    write_request.maybe_fill_missing_columns(region_metadata)?;
690                }
691                WorkerRequest::Write(SenderWriteRequest {
692                    sender: sender.into(),
693                    request: write_request,
694                })
695            }
696            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
697                region_id,
698                sender: sender.into(),
699                request: DdlRequest::Create(v),
700            }),
701            RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
702                region_id,
703                sender: sender.into(),
704                request: DdlRequest::Drop(v),
705            }),
706            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
707                region_id,
708                sender: sender.into(),
709                request: DdlRequest::Open((v, None)),
710            }),
711            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
712                region_id,
713                sender: sender.into(),
714                request: DdlRequest::Close(v),
715            }),
716            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
717                region_id,
718                sender: sender.into(),
719                request: DdlRequest::Alter(v),
720            }),
721            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
722                region_id,
723                sender: sender.into(),
724                request: DdlRequest::Flush(v),
725            }),
726            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
727                region_id,
728                sender: sender.into(),
729                request: DdlRequest::Compact(v),
730            }),
731            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
732                region_id,
733                sender: sender.into(),
734                request: DdlRequest::BuildIndex(v),
735            }),
736            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
737                region_id,
738                sender: sender.into(),
739                request: DdlRequest::Truncate(v),
740            }),
741            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
742                region_id,
743                sender: sender.into(),
744                request: DdlRequest::Catchup((v, None)),
745            }),
746            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
747                region_id,
748                sender: sender.into(),
749                request: DdlRequest::EnterStaging(v),
750            }),
751            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
752                metadata: region_metadata,
753                sender: sender.into(),
754                request: region_bulk_inserts_request,
755            },
756            RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
757                region_id,
758                sender: sender.into(),
759                request: DdlRequest::ApplyStagingManifest(v),
760            }),
761        };
762
763        Ok((worker_request, receiver))
764    }
765
766    pub(crate) fn new_set_readonly_gracefully(
767        region_id: RegionId,
768        region_role_state: SettableRegionRoleState,
769    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
770        let (sender, receiver) = oneshot::channel();
771
772        (
773            WorkerRequest::SetRegionRoleStateGracefully {
774                region_id,
775                region_role_state,
776                sender,
777            },
778            receiver,
779        )
780    }
781
782    pub(crate) fn new_sync_region_request(
783        region_id: RegionId,
784        manifest_version: ManifestVersion,
785    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
786        let (sender, receiver) = oneshot::channel();
787        (
788            WorkerRequest::SyncRegion(RegionSyncRequest {
789                region_id,
790                manifest_version,
791                sender,
792            }),
793            receiver,
794        )
795    }
796
797    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the partition expression is invalid or missing.
802    /// Returns an error if the new partition expressions are not found for some regions.
803    #[allow(clippy::type_complexity)]
804    pub(crate) fn try_from_remap_manifests_request(
805        store_api::region_engine::RemapManifestsRequest {
806            region_id,
807            input_regions,
808            region_mapping,
809            new_partition_exprs,
810        }: store_api::region_engine::RemapManifestsRequest,
811    ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
812        let (sender, receiver) = oneshot::channel();
813        let new_partition_exprs = new_partition_exprs
814            .into_iter()
815            .map(|(k, v)| {
816                Ok((
817                    k,
818                    PartitionExpr::from_json_str(&v)
819                        .context(InvalidPartitionExprSnafu { expr: v })?
820                        .context(MissingPartitionExprSnafu { region_id: k })?,
821                ))
822            })
823            .collect::<Result<HashMap<_, _>>>()?;
824
825        let request = RemapManifestsRequest {
826            region_id,
827            input_regions,
828            region_mapping,
829            new_partition_exprs,
830            sender,
831        };
832
833        Ok((WorkerRequest::RemapManifests(request), receiver))
834    }
835
836    /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
837    pub(crate) fn try_from_copy_region_from_request(
838        region_id: RegionId,
839        store_api::region_engine::MitoCopyRegionFromRequest {
840            source_region_id,
841            parallelism,
842        }: store_api::region_engine::MitoCopyRegionFromRequest,
843    ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
844        let (sender, receiver) = oneshot::channel();
845        let request = CopyRegionFromRequest {
846            region_id,
847            source_region_id,
848            parallelism,
849            sender,
850        };
851        Ok((WorkerRequest::CopyRegionFrom(request), receiver))
852    }
853}
854
855/// DDL request to a region.
856#[derive(Debug)]
857pub(crate) enum DdlRequest {
858    Create(RegionCreateRequest),
859    Drop(RegionDropRequest),
860    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
861    Close(RegionCloseRequest),
862    Alter(RegionAlterRequest),
863    Flush(RegionFlushRequest),
864    Compact(RegionCompactRequest),
865    BuildIndex(RegionBuildIndexRequest),
866    Truncate(RegionTruncateRequest),
867    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
868    EnterStaging(EnterStagingRequest),
869    ApplyStagingManifest(ApplyStagingManifestRequest),
870}
871
872/// Sender and Ddl request.
873#[derive(Debug)]
874pub(crate) struct SenderDdlRequest {
875    /// Region id of the request.
876    pub(crate) region_id: RegionId,
877    /// Result sender.
878    pub(crate) sender: OptionOutputTx,
879    /// Ddl request.
880    pub(crate) request: DdlRequest,
881}
882
883/// Notification from a background job.
884#[derive(Debug)]
885pub(crate) enum BackgroundNotify {
886    /// Flush has finished.
887    FlushFinished(FlushFinished),
888    /// Flush has failed.
889    FlushFailed(FlushFailed),
890    /// Index build has finished.
891    IndexBuildFinished(IndexBuildFinished),
892    /// Index build has been stopped (aborted or succeeded).
893    IndexBuildStopped(IndexBuildStopped),
894    /// Index build has failed.
895    IndexBuildFailed(IndexBuildFailed),
896    /// Compaction has finished.
897    CompactionFinished(CompactionFinished),
898    /// Compaction has been cancelled cooperatively.
899    CompactionCancelled(CompactionCancelled),
900    /// Compaction has failed.
901    CompactionFailed(CompactionFailed),
902    /// Truncate result.
903    Truncate(TruncateResult),
904    /// Region change result.
905    RegionChange(RegionChangeResult),
906    /// Region edit result.
907    RegionEdit(RegionEditResult),
908    /// Enter staging result.
909    EnterStaging(EnterStagingResult),
910    /// Copy region result.
911    CopyRegionFromFinished(CopyRegionFromFinished),
912}
913
914/// Notifies a flush job is finished.
915#[derive(Debug)]
916pub(crate) struct FlushFinished {
917    /// Region id.
918    pub(crate) region_id: RegionId,
919    /// Entry id of flushed data.
920    pub(crate) flushed_entry_id: EntryId,
921    /// Flush result senders.
922    pub(crate) senders: Vec<OutputTx>,
923    /// Flush timer.
924    pub(crate) _timer: HistogramTimer,
925    /// Region edit to apply.
926    pub(crate) edit: RegionEdit,
927    /// Memtables to remove.
928    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
929    /// Whether the region is in staging mode.
930    pub(crate) is_staging: bool,
931    /// Reason for flush.
932    pub(crate) flush_reason: FlushReason,
933}
934
935impl FlushFinished {
936    /// Marks the flush job as successful and observes the timer.
937    pub(crate) fn on_success(self) {
938        for sender in self.senders {
939            sender.send(Ok(0));
940        }
941    }
942}
943
944impl OnFailure for FlushFinished {
945    fn on_failure(&mut self, err: Error) {
946        let err = Arc::new(err);
947        for sender in self.senders.drain(..) {
948            sender.send(Err(err.clone()).context(FlushRegionSnafu {
949                region_id: self.region_id,
950            }));
951        }
952    }
953}
954
955/// Notifies a flush job is failed.
956#[derive(Debug)]
957pub(crate) struct FlushFailed {
958    /// The error source of the failure.
959    pub(crate) err: Arc<Error>,
960}
961
962#[derive(Debug)]
963pub(crate) struct IndexBuildFinished {
964    #[allow(dead_code)]
965    pub(crate) region_id: RegionId,
966    pub(crate) edit: RegionEdit,
967}
968
969/// Notifies an index build job has been stopped.
970#[derive(Debug)]
971pub(crate) struct IndexBuildStopped {
972    #[allow(dead_code)]
973    pub(crate) region_id: RegionId,
974    pub(crate) file_id: FileId,
975}
976
977/// Notifies an index build job has failed.
978#[derive(Debug)]
979pub(crate) struct IndexBuildFailed {
980    pub(crate) err: Arc<Error>,
981}
982
983/// Notifies a compaction job has finished.
984#[derive(Debug)]
985pub(crate) struct CompactionFinished {
986    /// Region id.
987    pub(crate) region_id: RegionId,
988    /// Compaction result senders.
989    pub(crate) senders: Vec<OutputTx>,
990    /// Start time of compaction task.
991    pub(crate) start_time: Instant,
992    /// Region edit to apply.
993    pub(crate) edit: RegionEdit,
994}
995
996/// Notifies a compaction job has been cancelled cooperatively.
997#[derive(Debug)]
998pub(crate) struct CompactionCancelled {
999    /// Region id.
1000    pub(crate) region_id: RegionId,
1001    /// Waiters to wake once the cancellation has been observed by the worker.
1002    pub(crate) senders: Vec<OutputTx>,
1003}
1004
1005impl CompactionCancelled {
1006    pub(crate) fn on_success(self) {
1007        for sender in self.senders {
1008            sender.send(CompactionCancelledSnafu {}.fail());
1009        }
1010        info!("Compaction cancelled for region: {}", self.region_id);
1011    }
1012}
1013
1014impl CompactionFinished {
1015    pub fn on_success(self) {
1016        // only update compaction time on success
1017        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
1018
1019        for sender in self.senders {
1020            sender.send(Ok(0));
1021        }
1022        info!("Successfully compacted region: {}", self.region_id);
1023    }
1024}
1025
1026impl OnFailure for CompactionFinished {
1027    /// Compaction succeeded but failed to update manifest or region's already been dropped.
1028    fn on_failure(&mut self, err: Error) {
1029        let err = Arc::new(err);
1030        for sender in self.senders.drain(..) {
1031            sender.send(Err(err.clone()).context(CompactRegionSnafu {
1032                region_id: self.region_id,
1033            }));
1034        }
1035    }
1036}
1037
1038/// A failing compaction result.
1039#[derive(Debug)]
1040pub(crate) struct CompactionFailed {
1041    pub(crate) region_id: RegionId,
1042    /// The error source of the failure.
1043    pub(crate) err: Arc<Error>,
1044}
1045
1046/// Notifies the truncate result of a region.
1047#[derive(Debug)]
1048pub(crate) struct TruncateResult {
1049    /// Region id.
1050    pub(crate) region_id: RegionId,
1051    /// Result sender.
1052    pub(crate) sender: OptionOutputTx,
1053    /// Truncate result.
1054    pub(crate) result: Result<()>,
1055    pub(crate) kind: TruncateKind,
1056}
1057
1058/// Notifies the region the result of writing region change action.
1059#[derive(Debug)]
1060pub(crate) struct RegionChangeResult {
1061    /// Region id.
1062    pub(crate) region_id: RegionId,
1063    /// The new region metadata to apply.
1064    pub(crate) new_meta: RegionMetadataRef,
1065    /// Result sender.
1066    pub(crate) sender: OptionOutputTx,
1067    /// Result from the manifest manager.
1068    pub(crate) result: Result<()>,
1069    /// Used for index build in schema change.
1070    pub(crate) need_index: bool,
1071    /// New options for the region.
1072    pub(crate) new_options: Option<RegionOptions>,
1073}
1074
1075/// Notifies the region the result of entering staging.
1076#[derive(Debug)]
1077pub(crate) struct EnterStagingResult {
1078    /// Region id.
1079    pub(crate) region_id: RegionId,
1080    /// The new staging partition directive to apply.
1081    pub(crate) partition_directive: StagingPartitionDirective,
1082    /// Result sender.
1083    pub(crate) sender: OptionOutputTx,
1084    /// Result from the manifest manager.
1085    pub(crate) result: Result<()>,
1086}
1087
1088#[derive(Debug)]
1089pub(crate) struct CopyRegionFromFinished {
1090    /// Region id.
1091    pub(crate) region_id: RegionId,
1092    /// Region edit to apply.
1093    pub(crate) edit: RegionEdit,
1094    /// Result sender.
1095    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1096}
1097
1098/// Request to edit a region directly.
1099#[derive(Debug)]
1100pub(crate) struct RegionEditRequest {
1101    pub(crate) region_id: RegionId,
1102    pub(crate) edit: RegionEdit,
1103    /// The sender to notify the result to the region engine.
1104    pub(crate) tx: Sender<Result<()>>,
1105}
1106
1107/// Notifies the regin the result of editing region.
1108#[derive(Debug)]
1109pub(crate) struct RegionEditResult {
1110    /// Region id.
1111    pub(crate) region_id: RegionId,
1112    /// Result sender.
1113    pub(crate) sender: Sender<Result<()>>,
1114    /// Region edit to apply.
1115    pub(crate) edit: RegionEdit,
1116    /// Result from the manifest manager.
1117    pub(crate) result: Result<()>,
1118    /// Whether region state need to be set to Writable after handling this request.
1119    pub(crate) update_region_state: bool,
1120    /// The region is in staging mode before handling this request.
1121    pub(crate) is_staging: bool,
1122}
1123
1124#[derive(Debug)]
1125pub(crate) struct BuildIndexRequest {
1126    pub(crate) region_id: RegionId,
1127    pub(crate) build_type: IndexBuildType,
1128    /// files need to build index, empty means all.
1129    pub(crate) file_metas: Vec<FileMeta>,
1130}
1131
1132#[derive(Debug)]
1133pub(crate) struct RegionSyncRequest {
1134    pub(crate) region_id: RegionId,
1135    pub(crate) manifest_version: ManifestVersion,
1136    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1137    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1138}
1139
1140#[derive(Debug)]
1141pub(crate) struct RemapManifestsRequest {
1142    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1143    pub(crate) region_id: RegionId,
1144    /// Regions to remap manifests from.
1145    pub(crate) input_regions: Vec<RegionId>,
1146    /// For each old region, which new regions should receive its files
1147    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1148    /// New partition expressions for the new regions.
1149    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1150    /// Sender for the result of the remap operation.
1151    ///
1152    /// The result is a map from region IDs to their corresponding staging manifest paths.
1153    pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1154}
1155
1156#[derive(Debug)]
1157pub(crate) struct CopyRegionFromRequest {
1158    /// The [`RegionId`] of the target region.
1159    pub(crate) region_id: RegionId,
1160    /// The [`RegionId`] of the source region.
1161    pub(crate) source_region_id: RegionId,
1162    /// The parallelism of the copy operation.
1163    pub(crate) parallelism: usize,
1164    /// Result sender.
1165    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170    use api::v1::value::ValueData;
1171    use api::v1::{Row, SemanticType};
1172    use common_error::ext::ErrorExt;
1173    use common_error::status_code::StatusCode;
1174    use datatypes::prelude::ConcreteDataType;
1175    use datatypes::schema::ColumnDefaultConstraint;
1176    use mito_codec::test_util::i64_value;
1177    use store_api::metadata::RegionMetadataBuilder;
1178    use tokio::sync::oneshot;
1179
1180    use super::*;
1181    use crate::error::Error;
1182    use crate::test_util::ts_ms_value;
1183
1184    fn new_column_schema(
1185        name: &str,
1186        data_type: ColumnDataType,
1187        semantic_type: SemanticType,
1188    ) -> ColumnSchema {
1189        ColumnSchema {
1190            column_name: name.to_string(),
1191            datatype: data_type as i32,
1192            semantic_type: semantic_type as i32,
1193            ..Default::default()
1194        }
1195    }
1196
1197    fn check_invalid_request(err: &Error, expect: &str) {
1198        if let Error::InvalidRequest {
1199            region_id: _,
1200            reason,
1201            location: _,
1202        } = err
1203        {
1204            assert_eq!(reason, expect);
1205        } else {
1206            panic!("Unexpected error {err}")
1207        }
1208    }
1209
1210    #[test]
1211    fn test_write_request_duplicate_column() {
1212        let rows = Rows {
1213            schema: vec![
1214                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1215                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1216            ],
1217            rows: vec![],
1218        };
1219
1220        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1221        check_invalid_request(&err, "duplicate column c0");
1222    }
1223
1224    #[test]
1225    fn test_valid_write_request() {
1226        let rows = Rows {
1227            schema: vec![
1228                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1229                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1230            ],
1231            rows: vec![Row {
1232                values: vec![i64_value(1), i64_value(2)],
1233            }],
1234        };
1235
1236        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1237        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1238        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1239        assert_eq!(None, request.column_index_by_name("c2"));
1240    }
1241
1242    #[test]
1243    fn test_compaction_cancelled_sends_cancelled_error() {
1244        let (tx, rx) = oneshot::channel();
1245        let request = CompactionCancelled {
1246            region_id: RegionId::new(1, 1),
1247            senders: vec![OutputTx::new(tx)],
1248        };
1249
1250        request.on_success();
1251
1252        let err = rx.blocking_recv().unwrap().unwrap_err();
1253        assert!(matches!(err, Error::CompactionCancelled { .. }));
1254        assert_eq!(err.status_code(), StatusCode::Cancelled);
1255    }
1256
1257    #[test]
1258    fn test_write_request_column_num() {
1259        let rows = Rows {
1260            schema: vec![
1261                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1262                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1263            ],
1264            rows: vec![Row {
1265                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1266            }],
1267        };
1268
1269        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1270        check_invalid_request(&err, "row has 3 columns but schema has 2");
1271    }
1272
1273    fn new_region_metadata() -> RegionMetadata {
1274        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1275        builder
1276            .push_column_metadata(ColumnMetadata {
1277                column_schema: datatypes::schema::ColumnSchema::new(
1278                    "ts",
1279                    ConcreteDataType::timestamp_millisecond_datatype(),
1280                    false,
1281                ),
1282                semantic_type: SemanticType::Timestamp,
1283                column_id: 1,
1284            })
1285            .push_column_metadata(ColumnMetadata {
1286                column_schema: datatypes::schema::ColumnSchema::new(
1287                    "k0",
1288                    ConcreteDataType::int64_datatype(),
1289                    true,
1290                ),
1291                semantic_type: SemanticType::Tag,
1292                column_id: 2,
1293            })
1294            .primary_key(vec![2]);
1295        builder.build().unwrap()
1296    }
1297
1298    #[test]
1299    fn test_check_schema() {
1300        let rows = Rows {
1301            schema: vec![
1302                new_column_schema(
1303                    "ts",
1304                    ColumnDataType::TimestampMillisecond,
1305                    SemanticType::Timestamp,
1306                ),
1307                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1308            ],
1309            rows: vec![Row {
1310                values: vec![ts_ms_value(1), i64_value(2)],
1311            }],
1312        };
1313        let metadata = new_region_metadata();
1314
1315        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1316        request.check_schema(&metadata).unwrap();
1317    }
1318
1319    #[test]
1320    fn test_column_type() {
1321        let rows = Rows {
1322            schema: vec![
1323                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1324                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1325            ],
1326            rows: vec![Row {
1327                values: vec![i64_value(1), i64_value(2)],
1328            }],
1329        };
1330        let metadata = new_region_metadata();
1331
1332        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1333        let err = request.check_schema(&metadata).unwrap_err();
1334        check_invalid_request(
1335            &err,
1336            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1337        );
1338    }
1339
1340    #[test]
1341    fn test_semantic_type() {
1342        let rows = Rows {
1343            schema: vec![
1344                new_column_schema(
1345                    "ts",
1346                    ColumnDataType::TimestampMillisecond,
1347                    SemanticType::Tag,
1348                ),
1349                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1350            ],
1351            rows: vec![Row {
1352                values: vec![ts_ms_value(1), i64_value(2)],
1353            }],
1354        };
1355        let metadata = new_region_metadata();
1356
1357        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1358        let err = request.check_schema(&metadata).unwrap_err();
1359        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1360    }
1361
1362    #[test]
1363    fn test_column_nullable() {
1364        let rows = Rows {
1365            schema: vec![
1366                new_column_schema(
1367                    "ts",
1368                    ColumnDataType::TimestampMillisecond,
1369                    SemanticType::Timestamp,
1370                ),
1371                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1372            ],
1373            rows: vec![Row {
1374                values: vec![Value { value_data: None }, i64_value(2)],
1375            }],
1376        };
1377        let metadata = new_region_metadata();
1378
1379        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1380        let err = request.check_schema(&metadata).unwrap_err();
1381        check_invalid_request(&err, "column ts is not null but input has null");
1382    }
1383
1384    #[test]
1385    fn test_column_default() {
1386        let rows = Rows {
1387            schema: vec![new_column_schema(
1388                "k0",
1389                ColumnDataType::Int64,
1390                SemanticType::Tag,
1391            )],
1392            rows: vec![Row {
1393                values: vec![i64_value(1)],
1394            }],
1395        };
1396        let metadata = new_region_metadata();
1397
1398        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1399        let err = request.check_schema(&metadata).unwrap_err();
1400        check_invalid_request(&err, "missing column ts");
1401    }
1402
1403    #[test]
1404    fn test_unknown_column() {
1405        let rows = Rows {
1406            schema: vec![
1407                new_column_schema(
1408                    "ts",
1409                    ColumnDataType::TimestampMillisecond,
1410                    SemanticType::Timestamp,
1411                ),
1412                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1413                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1414            ],
1415            rows: vec![Row {
1416                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1417            }],
1418        };
1419        let metadata = new_region_metadata();
1420
1421        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1422        let err = request.check_schema(&metadata).unwrap_err();
1423        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1424    }
1425
1426    #[test]
1427    fn test_fill_impure_columns_err() {
1428        let rows = Rows {
1429            schema: vec![new_column_schema(
1430                "k0",
1431                ColumnDataType::Int64,
1432                SemanticType::Tag,
1433            )],
1434            rows: vec![Row {
1435                values: vec![i64_value(1)],
1436            }],
1437        };
1438        let metadata = {
1439            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1440            builder
1441                .push_column_metadata(ColumnMetadata {
1442                    column_schema: datatypes::schema::ColumnSchema::new(
1443                        "ts",
1444                        ConcreteDataType::timestamp_millisecond_datatype(),
1445                        false,
1446                    )
1447                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1448                        "now()".to_string(),
1449                    )))
1450                    .unwrap(),
1451                    semantic_type: SemanticType::Timestamp,
1452                    column_id: 1,
1453                })
1454                .push_column_metadata(ColumnMetadata {
1455                    column_schema: datatypes::schema::ColumnSchema::new(
1456                        "k0",
1457                        ConcreteDataType::int64_datatype(),
1458                        true,
1459                    ),
1460                    semantic_type: SemanticType::Tag,
1461                    column_id: 2,
1462                })
1463                .primary_key(vec![2]);
1464            builder.build().unwrap()
1465        };
1466
1467        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1468        let err = request.check_schema(&metadata).unwrap_err();
1469        assert!(err.is_fill_default());
1470        assert!(
1471            request
1472                .fill_missing_columns(&metadata)
1473                .unwrap_err()
1474                .to_string()
1475                .contains("unexpected impure default value with region_id")
1476        );
1477    }
1478
1479    #[test]
1480    fn test_fill_missing_columns() {
1481        let rows = Rows {
1482            schema: vec![new_column_schema(
1483                "ts",
1484                ColumnDataType::TimestampMillisecond,
1485                SemanticType::Timestamp,
1486            )],
1487            rows: vec![Row {
1488                values: vec![ts_ms_value(1)],
1489            }],
1490        };
1491        let metadata = new_region_metadata();
1492
1493        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1494        let err = request.check_schema(&metadata).unwrap_err();
1495        assert!(err.is_fill_default());
1496        request.fill_missing_columns(&metadata).unwrap();
1497
1498        let expect_rows = Rows {
1499            schema: vec![new_column_schema(
1500                "ts",
1501                ColumnDataType::TimestampMillisecond,
1502                SemanticType::Timestamp,
1503            )],
1504            rows: vec![Row {
1505                values: vec![ts_ms_value(1)],
1506            }],
1507        };
1508        assert_eq!(expect_rows, request.rows);
1509    }
1510
1511    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1512        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1513        builder
1514            .push_column_metadata(ColumnMetadata {
1515                column_schema: datatypes::schema::ColumnSchema::new(
1516                    "ts",
1517                    ConcreteDataType::timestamp_millisecond_datatype(),
1518                    false,
1519                ),
1520                semantic_type: SemanticType::Timestamp,
1521                column_id: 1,
1522            })
1523            .push_column_metadata(ColumnMetadata {
1524                column_schema: datatypes::schema::ColumnSchema::new(
1525                    "k0",
1526                    ConcreteDataType::int64_datatype(),
1527                    true,
1528                ),
1529                semantic_type: SemanticType::Tag,
1530                column_id: 2,
1531            })
1532            .primary_key(vec![2]);
1533        builder
1534    }
1535
1536    fn region_metadata_two_fields() -> RegionMetadata {
1537        let mut builder = builder_with_ts_tag();
1538        builder
1539            .push_column_metadata(ColumnMetadata {
1540                column_schema: datatypes::schema::ColumnSchema::new(
1541                    "f0",
1542                    ConcreteDataType::int64_datatype(),
1543                    true,
1544                ),
1545                semantic_type: SemanticType::Field,
1546                column_id: 3,
1547            })
1548            // Column is not nullable.
1549            .push_column_metadata(ColumnMetadata {
1550                column_schema: datatypes::schema::ColumnSchema::new(
1551                    "f1",
1552                    ConcreteDataType::int64_datatype(),
1553                    false,
1554                )
1555                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1556                    datatypes::value::Value::Int64(100),
1557                )))
1558                .unwrap(),
1559                semantic_type: SemanticType::Field,
1560                column_id: 4,
1561            });
1562        builder.build().unwrap()
1563    }
1564
1565    #[test]
1566    fn test_fill_missing_for_delete() {
1567        let rows = Rows {
1568            schema: vec![new_column_schema(
1569                "ts",
1570                ColumnDataType::TimestampMillisecond,
1571                SemanticType::Timestamp,
1572            )],
1573            rows: vec![Row {
1574                values: vec![ts_ms_value(1)],
1575            }],
1576        };
1577        let metadata = region_metadata_two_fields();
1578
1579        let mut request =
1580            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1581        let err = request.check_schema(&metadata).unwrap_err();
1582        check_invalid_request(&err, "delete requests need column k0");
1583        let err = request.fill_missing_columns(&metadata).unwrap_err();
1584        check_invalid_request(&err, "delete requests need column k0");
1585
1586        let rows = Rows {
1587            schema: vec![
1588                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1589                new_column_schema(
1590                    "ts",
1591                    ColumnDataType::TimestampMillisecond,
1592                    SemanticType::Timestamp,
1593                ),
1594            ],
1595            rows: vec![Row {
1596                values: vec![i64_value(100), ts_ms_value(1)],
1597            }],
1598        };
1599        let mut request =
1600            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1601        let err = request.check_schema(&metadata).unwrap_err();
1602        assert!(err.is_fill_default());
1603        request.fill_missing_columns(&metadata).unwrap();
1604
1605        let expect_rows = Rows {
1606            schema: vec![
1607                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1608                new_column_schema(
1609                    "ts",
1610                    ColumnDataType::TimestampMillisecond,
1611                    SemanticType::Timestamp,
1612                ),
1613                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1614            ],
1615            // Column f1 is not nullable and we use 0 for padding.
1616            rows: vec![Row {
1617                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1618            }],
1619        };
1620        assert_eq!(expect_rows, request.rows);
1621    }
1622
1623    #[test]
1624    fn test_fill_missing_without_default_in_delete() {
1625        let mut builder = builder_with_ts_tag();
1626        builder
1627            // f0 is nullable.
1628            .push_column_metadata(ColumnMetadata {
1629                column_schema: datatypes::schema::ColumnSchema::new(
1630                    "f0",
1631                    ConcreteDataType::int64_datatype(),
1632                    true,
1633                ),
1634                semantic_type: SemanticType::Field,
1635                column_id: 3,
1636            })
1637            // f1 is not nullable and don't has default.
1638            .push_column_metadata(ColumnMetadata {
1639                column_schema: datatypes::schema::ColumnSchema::new(
1640                    "f1",
1641                    ConcreteDataType::int64_datatype(),
1642                    false,
1643                ),
1644                semantic_type: SemanticType::Field,
1645                column_id: 4,
1646            });
1647        let metadata = builder.build().unwrap();
1648
1649        let rows = Rows {
1650            schema: vec![
1651                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1652                new_column_schema(
1653                    "ts",
1654                    ColumnDataType::TimestampMillisecond,
1655                    SemanticType::Timestamp,
1656                ),
1657            ],
1658            // Missing f0 (nullable), f1 (not nullable).
1659            rows: vec![Row {
1660                values: vec![i64_value(100), ts_ms_value(1)],
1661            }],
1662        };
1663        let mut request =
1664            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1665        let err = request.check_schema(&metadata).unwrap_err();
1666        assert!(err.is_fill_default());
1667        request.fill_missing_columns(&metadata).unwrap();
1668
1669        let expect_rows = Rows {
1670            schema: vec![
1671                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1672                new_column_schema(
1673                    "ts",
1674                    ColumnDataType::TimestampMillisecond,
1675                    SemanticType::Timestamp,
1676                ),
1677                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1678            ],
1679            // Column f1 is not nullable and we use 0 for padding.
1680            rows: vec![Row {
1681                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1682            }],
1683        };
1684        assert_eq!(expect_rows, request.rows);
1685    }
1686
1687    #[test]
1688    fn test_no_default() {
1689        let rows = Rows {
1690            schema: vec![new_column_schema(
1691                "k0",
1692                ColumnDataType::Int64,
1693                SemanticType::Tag,
1694            )],
1695            rows: vec![Row {
1696                values: vec![i64_value(1)],
1697            }],
1698        };
1699        let metadata = new_region_metadata();
1700
1701        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1702        let err = request.fill_missing_columns(&metadata).unwrap_err();
1703        check_invalid_request(&err, "column ts does not have default value");
1704    }
1705
1706    #[test]
1707    fn test_missing_and_invalid() {
1708        // Missing f0 and f1 has invalid type (string).
1709        let rows = Rows {
1710            schema: vec![
1711                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1712                new_column_schema(
1713                    "ts",
1714                    ColumnDataType::TimestampMillisecond,
1715                    SemanticType::Timestamp,
1716                ),
1717                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1718            ],
1719            rows: vec![Row {
1720                values: vec![
1721                    i64_value(100),
1722                    ts_ms_value(1),
1723                    Value {
1724                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1725                    },
1726                ],
1727            }],
1728        };
1729        let metadata = region_metadata_two_fields();
1730
1731        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1732        let err = request.check_schema(&metadata).unwrap_err();
1733        check_invalid_request(
1734            &err,
1735            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1736        );
1737    }
1738
1739    #[test]
1740    fn test_write_request_metadata() {
1741        let rows = Rows {
1742            schema: vec![
1743                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1744                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1745            ],
1746            rows: vec![Row {
1747                values: vec![i64_value(1), i64_value(2)],
1748            }],
1749        };
1750
1751        let metadata = Arc::new(new_region_metadata());
1752        let request = WriteRequest::new(
1753            RegionId::new(1, 1),
1754            OpType::Put,
1755            rows,
1756            Some(metadata.clone()),
1757        )
1758        .unwrap();
1759
1760        assert!(request.region_metadata.is_some());
1761        assert_eq!(
1762            request.region_metadata.unwrap().region_id,
1763            RegionId::new(1, 1)
1764        );
1765    }
1766}