mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23    ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::manifest::ManifestVersion;
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{RegionId, SequenceNumber};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedImpureDefaultSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56/// Request to write a region.
57#[derive(Debug)]
58pub struct WriteRequest {
59    /// Region to write.
60    pub region_id: RegionId,
61    /// Type of the write request.
62    pub op_type: OpType,
63    /// Rows to write.
64    pub rows: Rows,
65    /// Map column name to column index in `rows`.
66    pub name_to_index: HashMap<String, usize>,
67    /// Whether each column has null.
68    pub has_null: Vec<bool>,
69    /// Write hint.
70    pub hint: Option<WriteHint>,
71    /// Region metadata on the time of this request is created.
72    pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76    /// Creates a new request.
77    ///
78    /// Returns `Err` if `rows` are invalid.
79    pub fn new(
80        region_id: RegionId,
81        op_type: OpType,
82        rows: Rows,
83        region_metadata: Option<RegionMetadataRef>,
84    ) -> Result<WriteRequest> {
85        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86        for (index, column) in rows.schema.iter().enumerate() {
87            ensure!(
88                name_to_index
89                    .insert(column.column_name.clone(), index)
90                    .is_none(),
91                InvalidRequestSnafu {
92                    region_id,
93                    reason: format!("duplicate column {}", column.column_name),
94                }
95            );
96        }
97
98        let mut has_null = vec![false; rows.schema.len()];
99        for row in &rows.rows {
100            ensure!(
101                row.values.len() == rows.schema.len(),
102                InvalidRequestSnafu {
103                    region_id,
104                    reason: format!(
105                        "row has {} columns but schema has {}",
106                        row.values.len(),
107                        rows.schema.len()
108                    ),
109                }
110            );
111
112            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113                validate_proto_value(region_id, value, column_schema)?;
114
115                if value.value_data.is_none() {
116                    has_null[i] = true;
117                }
118            }
119        }
120
121        Ok(WriteRequest {
122            region_id,
123            op_type,
124            rows,
125            name_to_index,
126            has_null,
127            hint: None,
128            region_metadata,
129        })
130    }
131
132    /// Sets the write hint.
133    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134        self.hint = hint;
135        self
136    }
137
138    /// Returns the encoding hint.
139    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140        infer_primary_key_encoding_from_hint(self.hint.as_ref())
141    }
142
143    /// Returns estimated size of the request.
144    pub(crate) fn estimated_size(&self) -> usize {
145        let row_size = self
146            .rows
147            .rows
148            .first()
149            .map(|row| row.encoded_len())
150            .unwrap_or(0);
151        row_size * self.rows.rows.len()
152    }
153
154    /// Gets column index by name.
155    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156        self.name_to_index.get(name).copied()
157    }
158
159    /// Checks schema of rows is compatible with schema of the region.
160    ///
161    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
162    /// error.
163    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164        debug_assert_eq!(self.region_id, metadata.region_id);
165
166        let region_id = self.region_id;
167        // Index all columns in rows.
168        let mut rows_columns: HashMap<_, _> = self
169            .rows
170            .schema
171            .iter()
172            .map(|column| (&column.column_name, column))
173            .collect();
174
175        let mut need_fill_default = false;
176        // Checks all columns in this region.
177        for column in &metadata.column_metadatas {
178            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179                // Check data type.
180                ensure!(
181                    is_column_type_value_eq(
182                        input_col.datatype,
183                        input_col.datatype_extension,
184                        &column.column_schema.data_type
185                    ),
186                    InvalidRequestSnafu {
187                        region_id,
188                        reason: format!(
189                            "column {} expect type {:?}, given: {}({})",
190                            column.column_schema.name,
191                            column.column_schema.data_type,
192                            ColumnDataType::try_from(input_col.datatype)
193                                .map(|v| v.as_str_name())
194                                .unwrap_or("Unknown"),
195                            input_col.datatype,
196                        )
197                    }
198                );
199
200                // Check semantic type.
201                ensure!(
202                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203                    InvalidRequestSnafu {
204                        region_id,
205                        reason: format!(
206                            "column {} has semantic type {:?}, given: {}({})",
207                            column.column_schema.name,
208                            column.semantic_type,
209                            api::v1::SemanticType::try_from(input_col.semantic_type)
210                                .map(|v| v.as_str_name())
211                                .unwrap_or("Unknown"),
212                            input_col.semantic_type
213                        ),
214                    }
215                );
216
217                // Check nullable.
218                // Safety: `rows_columns` ensures this column exists.
219                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220                ensure!(
221                    !has_null || column.column_schema.is_nullable(),
222                    InvalidRequestSnafu {
223                        region_id,
224                        reason: format!(
225                            "column {} is not null but input has null",
226                            column.column_schema.name
227                        ),
228                    }
229                );
230            } else {
231                // Rows don't have this column.
232                self.check_missing_column(column)?;
233
234                need_fill_default = true;
235            }
236        }
237
238        // Checks all columns in rows exist in the region.
239        if !rows_columns.is_empty() {
240            let names: Vec<_> = rows_columns.into_keys().collect();
241            return InvalidRequestSnafu {
242                region_id,
243                reason: format!("unknown columns: {:?}", names),
244            }
245            .fail();
246        }
247
248        // If we need to fill default values, return a special error.
249        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251        Ok(())
252    }
253
254    /// Tries to fill missing columns.
255    ///
256    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
257    /// values.
258    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259        debug_assert_eq!(self.region_id, metadata.region_id);
260
261        let mut columns_to_fill = vec![];
262        for column in &metadata.column_metadatas {
263            if !self.name_to_index.contains_key(&column.column_schema.name) {
264                columns_to_fill.push(column);
265            }
266        }
267        self.fill_columns(columns_to_fill)?;
268
269        Ok(())
270    }
271
272    /// Checks the schema and fill missing columns.
273    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274        if let Err(e) = self.check_schema(metadata) {
275            if e.is_fill_default() {
276                // TODO(yingwen): Add metrics for this case.
277                // We need to fill default value. The write request may be a request
278                // sent before changing the schema.
279                self.fill_missing_columns(metadata)?;
280            } else {
281                return Err(e);
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Fills default value for specific `columns`.
289    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290        let mut default_values = Vec::with_capacity(columns.len());
291        let mut columns_to_fill = Vec::with_capacity(columns.len());
292        for column in columns {
293            let default_value = self.column_default_value(column)?;
294            if default_value.value_data.is_some() {
295                default_values.push(default_value);
296                columns_to_fill.push(column);
297            }
298        }
299
300        for row in &mut self.rows.rows {
301            row.values.extend(default_values.iter().cloned());
302        }
303
304        for column in columns_to_fill {
305            let (datatype, datatype_ext) =
306                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307                    .with_context(|_| ConvertColumnDataTypeSnafu {
308                        reason: format!(
309                            "no protobuf type for column {} ({:?})",
310                            column.column_schema.name, column.column_schema.data_type
311                        ),
312                    })?
313                    .to_parts();
314            self.rows.schema.push(ColumnSchema {
315                column_name: column.column_schema.name.clone(),
316                datatype: datatype as i32,
317                semantic_type: column.semantic_type as i32,
318                datatype_extension: datatype_ext,
319                options: options_from_column_schema(&column.column_schema),
320            });
321        }
322
323        Ok(())
324    }
325
326    /// Checks whether we should allow a row doesn't provide this column.
327    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328        if self.op_type == OpType::Delete {
329            if column.semantic_type == SemanticType::Field {
330                // For delete request, all tags and timestamp is required. We don't fill default
331                // tag or timestamp while deleting rows.
332                return Ok(());
333            } else {
334                return InvalidRequestSnafu {
335                    region_id: self.region_id,
336                    reason: format!("delete requests need column {}", column.column_schema.name),
337                }
338                .fail();
339            }
340        }
341
342        // Not a delete request. Checks whether they have default value.
343        ensure!(
344            column.column_schema.is_nullable()
345                || column.column_schema.default_constraint().is_some(),
346            InvalidRequestSnafu {
347                region_id: self.region_id,
348                reason: format!("missing column {}", column.column_schema.name),
349            }
350        );
351
352        Ok(())
353    }
354
355    /// Returns the default value for specific column.
356    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357        let default_value = match self.op_type {
358            OpType::Delete => {
359                ensure!(
360                    column.semantic_type == SemanticType::Field,
361                    InvalidRequestSnafu {
362                        region_id: self.region_id,
363                        reason: format!(
364                            "delete requests need column {}",
365                            column.column_schema.name
366                        ),
367                    }
368                );
369
370                // For delete request, we need a default value for padding so we
371                // can delete a row even a field doesn't have a default value. So the
372                // value doesn't need to following the default value constraint of the
373                // column.
374                if column.column_schema.is_nullable() {
375                    datatypes::value::Value::Null
376                } else {
377                    column.column_schema.data_type.default_value()
378                }
379            }
380            OpType::Put => {
381                // For put requests, we use the default value from column schema.
382                if column.column_schema.is_default_impure() {
383                    UnexpectedImpureDefaultSnafu {
384                        region_id: self.region_id,
385                        column: &column.column_schema.name,
386                        default_value: format!("{:?}", column.column_schema.default_constraint()),
387                    }
388                    .fail()?
389                }
390                column
391                    .column_schema
392                    .create_default()
393                    .context(CreateDefaultSnafu {
394                        region_id: self.region_id,
395                        column: &column.column_schema.name,
396                    })?
397                    // This column doesn't have default value.
398                    .with_context(|| InvalidRequestSnafu {
399                        region_id: self.region_id,
400                        reason: format!(
401                            "column {} does not have default value",
402                            column.column_schema.name
403                        ),
404                    })?
405            }
406        };
407
408        // Convert default value into proto's value.
409        to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
410            region_id: self.region_id,
411            reason: format!(
412                "no protobuf type for default value of column {} ({:?})",
413                column.column_schema.name, column.column_schema.data_type
414            ),
415        })
416    }
417}
418
419/// Validate proto value schema.
420pub(crate) fn validate_proto_value(
421    region_id: RegionId,
422    value: &Value,
423    column_schema: &ColumnSchema,
424) -> Result<()> {
425    if let Some(value_type) = proto_value_type(value) {
426        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
427            InvalidRequestSnafu {
428                region_id,
429                reason: format!(
430                    "column {} has unknown type {}",
431                    column_schema.column_name, column_schema.datatype
432                ),
433            }
434            .build()
435        })?;
436        ensure!(
437            proto_value_type_match(column_type, value_type),
438            InvalidRequestSnafu {
439                region_id,
440                reason: format!(
441                    "value has type {:?}, but column {} has type {:?}({})",
442                    value_type, column_schema.column_name, column_type, column_schema.datatype,
443                ),
444            }
445        );
446    }
447
448    Ok(())
449}
450
451fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
452    match (column_type, value_type) {
453        (ct, vt) if ct == vt => true,
454        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
455        (ColumnDataType::Json, ColumnDataType::Binary) => true,
456        _ => false,
457    }
458}
459
460/// Oneshot output result sender.
461#[derive(Debug)]
462pub struct OutputTx(Sender<Result<AffectedRows>>);
463
464impl OutputTx {
465    /// Creates a new output sender.
466    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
467        OutputTx(sender)
468    }
469
470    /// Sends the `result`.
471    pub(crate) fn send(self, result: Result<AffectedRows>) {
472        // Ignores send result.
473        let _ = self.0.send(result);
474    }
475}
476
477/// Optional output result sender.
478#[derive(Debug)]
479pub(crate) struct OptionOutputTx(Option<OutputTx>);
480
481impl OptionOutputTx {
482    /// Creates a sender.
483    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
484        OptionOutputTx(sender)
485    }
486
487    /// Creates an empty sender.
488    pub(crate) fn none() -> OptionOutputTx {
489        OptionOutputTx(None)
490    }
491
492    /// Sends the `result` and consumes the inner sender.
493    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
494        if let Some(sender) = self.0.take() {
495            sender.send(result);
496        }
497    }
498
499    /// Sends the `result` and consumes the sender.
500    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
501        if let Some(sender) = self.0.take() {
502            sender.send(result);
503        }
504    }
505
506    /// Takes the inner sender.
507    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
508        self.0.take()
509    }
510}
511
512impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
513    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
514        Self::new(Some(OutputTx::new(sender)))
515    }
516}
517
518impl OnFailure for OptionOutputTx {
519    fn on_failure(&mut self, err: Error) {
520        self.send_mut(Err(err));
521    }
522}
523
524/// Callback on failure.
525pub(crate) trait OnFailure {
526    /// Handles `err` on failure.
527    fn on_failure(&mut self, err: Error);
528}
529
530/// Sender and write request.
531#[derive(Debug)]
532pub(crate) struct SenderWriteRequest {
533    /// Result sender.
534    pub(crate) sender: OptionOutputTx,
535    pub(crate) request: WriteRequest,
536}
537
538pub(crate) struct SenderBulkRequest {
539    pub(crate) sender: OptionOutputTx,
540    pub(crate) region_id: RegionId,
541    pub(crate) request: BulkPart,
542    pub(crate) region_metadata: RegionMetadataRef,
543}
544
545/// Request sent to a worker
546#[derive(Debug)]
547pub(crate) enum WorkerRequest {
548    /// Write to a region.
549    Write(SenderWriteRequest),
550
551    /// Ddl request to a region.
552    Ddl(SenderDdlRequest),
553
554    /// Notifications from internal background jobs.
555    Background {
556        /// Id of the region to send.
557        region_id: RegionId,
558        /// Internal notification.
559        notify: BackgroundNotify,
560    },
561
562    /// The internal commands.
563    SetRegionRoleStateGracefully {
564        /// Id of the region to send.
565        region_id: RegionId,
566        /// The [SettableRegionRoleState].
567        region_role_state: SettableRegionRoleState,
568        /// The sender of [SetReadonlyResponse].
569        sender: Sender<SetRegionRoleStateResponse>,
570    },
571
572    /// Notify a worker to stop.
573    Stop,
574
575    /// Use [RegionEdit] to edit a region directly.
576    EditRegion(RegionEditRequest),
577
578    /// Keep the manifest of a region up to date.
579    SyncRegion(RegionSyncRequest),
580
581    /// Bulk inserts request and region metadata.
582    BulkInserts {
583        metadata: Option<RegionMetadataRef>,
584        request: RegionBulkInsertsRequest,
585        sender: OptionOutputTx,
586    },
587}
588
589impl WorkerRequest {
590    pub(crate) fn new_open_region_request(
591        region_id: RegionId,
592        request: RegionOpenRequest,
593        entry_receiver: Option<WalEntryReceiver>,
594    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
595        let (sender, receiver) = oneshot::channel();
596
597        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
598            region_id,
599            sender: sender.into(),
600            request: DdlRequest::Open((request, entry_receiver)),
601        });
602
603        (worker_request, receiver)
604    }
605
606    /// Converts request from a [RegionRequest].
607    pub(crate) fn try_from_region_request(
608        region_id: RegionId,
609        value: RegionRequest,
610        region_metadata: Option<RegionMetadataRef>,
611    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
612        let (sender, receiver) = oneshot::channel();
613        let worker_request = match value {
614            RegionRequest::Put(v) => {
615                let mut write_request =
616                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
617                        .with_hint(v.hint);
618                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
619                    && let Some(region_metadata) = &region_metadata
620                {
621                    write_request.maybe_fill_missing_columns(region_metadata)?;
622                }
623                WorkerRequest::Write(SenderWriteRequest {
624                    sender: sender.into(),
625                    request: write_request,
626                })
627            }
628            RegionRequest::Delete(v) => {
629                let mut write_request =
630                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
631                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
632                    && let Some(region_metadata) = &region_metadata
633                {
634                    write_request.maybe_fill_missing_columns(region_metadata)?;
635                }
636                WorkerRequest::Write(SenderWriteRequest {
637                    sender: sender.into(),
638                    request: write_request,
639                })
640            }
641            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
642                region_id,
643                sender: sender.into(),
644                request: DdlRequest::Create(v),
645            }),
646            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
647                region_id,
648                sender: sender.into(),
649                request: DdlRequest::Drop,
650            }),
651            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
652                region_id,
653                sender: sender.into(),
654                request: DdlRequest::Open((v, None)),
655            }),
656            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
657                region_id,
658                sender: sender.into(),
659                request: DdlRequest::Close(v),
660            }),
661            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
662                region_id,
663                sender: sender.into(),
664                request: DdlRequest::Alter(v),
665            }),
666            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
667                region_id,
668                sender: sender.into(),
669                request: DdlRequest::Flush(v),
670            }),
671            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
672                region_id,
673                sender: sender.into(),
674                request: DdlRequest::Compact(v),
675            }),
676            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
677                region_id,
678                sender: sender.into(),
679                request: DdlRequest::Truncate(v),
680            }),
681            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
682                region_id,
683                sender: sender.into(),
684                request: DdlRequest::Catchup(v),
685            }),
686            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
687                metadata: region_metadata,
688                sender: sender.into(),
689                request: region_bulk_inserts_request,
690            },
691        };
692
693        Ok((worker_request, receiver))
694    }
695
696    pub(crate) fn new_set_readonly_gracefully(
697        region_id: RegionId,
698        region_role_state: SettableRegionRoleState,
699    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
700        let (sender, receiver) = oneshot::channel();
701
702        (
703            WorkerRequest::SetRegionRoleStateGracefully {
704                region_id,
705                region_role_state,
706                sender,
707            },
708            receiver,
709        )
710    }
711
712    pub(crate) fn new_sync_region_request(
713        region_id: RegionId,
714        manifest_version: ManifestVersion,
715    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
716        let (sender, receiver) = oneshot::channel();
717        (
718            WorkerRequest::SyncRegion(RegionSyncRequest {
719                region_id,
720                manifest_version,
721                sender,
722            }),
723            receiver,
724        )
725    }
726}
727
728/// DDL request to a region.
729#[derive(Debug)]
730pub(crate) enum DdlRequest {
731    Create(RegionCreateRequest),
732    Drop,
733    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
734    Close(RegionCloseRequest),
735    Alter(RegionAlterRequest),
736    Flush(RegionFlushRequest),
737    Compact(RegionCompactRequest),
738    Truncate(RegionTruncateRequest),
739    Catchup(RegionCatchupRequest),
740}
741
742/// Sender and Ddl request.
743#[derive(Debug)]
744pub(crate) struct SenderDdlRequest {
745    /// Region id of the request.
746    pub(crate) region_id: RegionId,
747    /// Result sender.
748    pub(crate) sender: OptionOutputTx,
749    /// Ddl request.
750    pub(crate) request: DdlRequest,
751}
752
753/// Notification from a background job.
754#[derive(Debug)]
755pub(crate) enum BackgroundNotify {
756    /// Flush has finished.
757    FlushFinished(FlushFinished),
758    /// Flush has failed.
759    FlushFailed(FlushFailed),
760    /// Compaction has finished.
761    CompactionFinished(CompactionFinished),
762    /// Compaction has failed.
763    CompactionFailed(CompactionFailed),
764    /// Truncate result.
765    Truncate(TruncateResult),
766    /// Region change result.
767    RegionChange(RegionChangeResult),
768    /// Region edit result.
769    RegionEdit(RegionEditResult),
770}
771
772/// Notifies a flush job is finished.
773#[derive(Debug)]
774pub(crate) struct FlushFinished {
775    /// Region id.
776    pub(crate) region_id: RegionId,
777    /// Entry id of flushed data.
778    pub(crate) flushed_entry_id: EntryId,
779    /// Flush result senders.
780    pub(crate) senders: Vec<OutputTx>,
781    /// Flush timer.
782    pub(crate) _timer: HistogramTimer,
783    /// Region edit to apply.
784    pub(crate) edit: RegionEdit,
785    /// Memtables to remove.
786    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
787}
788
789impl FlushFinished {
790    /// Marks the flush job as successful and observes the timer.
791    pub(crate) fn on_success(self) {
792        for sender in self.senders {
793            sender.send(Ok(0));
794        }
795    }
796}
797
798impl OnFailure for FlushFinished {
799    fn on_failure(&mut self, err: Error) {
800        let err = Arc::new(err);
801        for sender in self.senders.drain(..) {
802            sender.send(Err(err.clone()).context(FlushRegionSnafu {
803                region_id: self.region_id,
804            }));
805        }
806    }
807}
808
809/// Notifies a flush job is failed.
810#[derive(Debug)]
811pub(crate) struct FlushFailed {
812    /// The error source of the failure.
813    pub(crate) err: Arc<Error>,
814}
815
816/// Notifies a compaction job has finished.
817#[derive(Debug)]
818pub(crate) struct CompactionFinished {
819    /// Region id.
820    pub(crate) region_id: RegionId,
821    /// Compaction result senders.
822    pub(crate) senders: Vec<OutputTx>,
823    /// Start time of compaction task.
824    pub(crate) start_time: Instant,
825    /// Region edit to apply.
826    pub(crate) edit: RegionEdit,
827}
828
829impl CompactionFinished {
830    pub fn on_success(self) {
831        // only update compaction time on success
832        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
833
834        for sender in self.senders {
835            sender.send(Ok(0));
836        }
837        info!("Successfully compacted region: {}", self.region_id);
838    }
839}
840
841impl OnFailure for CompactionFinished {
842    /// Compaction succeeded but failed to update manifest or region's already been dropped.
843    fn on_failure(&mut self, err: Error) {
844        let err = Arc::new(err);
845        for sender in self.senders.drain(..) {
846            sender.send(Err(err.clone()).context(CompactRegionSnafu {
847                region_id: self.region_id,
848            }));
849        }
850    }
851}
852
853/// A failing compaction result.
854#[derive(Debug)]
855pub(crate) struct CompactionFailed {
856    pub(crate) region_id: RegionId,
857    /// The error source of the failure.
858    pub(crate) err: Arc<Error>,
859}
860
861/// Notifies the truncate result of a region.
862#[derive(Debug)]
863pub(crate) struct TruncateResult {
864    /// Region id.
865    pub(crate) region_id: RegionId,
866    /// Result sender.
867    pub(crate) sender: OptionOutputTx,
868    /// Truncate result.
869    pub(crate) result: Result<()>,
870    /// Truncated entry id.
871    pub(crate) truncated_entry_id: EntryId,
872    /// Truncated sequence.
873    pub(crate) truncated_sequence: SequenceNumber,
874}
875
876/// Notifies the region the result of writing region change action.
877#[derive(Debug)]
878pub(crate) struct RegionChangeResult {
879    /// Region id.
880    pub(crate) region_id: RegionId,
881    /// The new region metadata to apply.
882    pub(crate) new_meta: RegionMetadataRef,
883    /// Result sender.
884    pub(crate) sender: OptionOutputTx,
885    /// Result from the manifest manager.
886    pub(crate) result: Result<()>,
887}
888
889/// Request to edit a region directly.
890#[derive(Debug)]
891pub(crate) struct RegionEditRequest {
892    pub(crate) region_id: RegionId,
893    pub(crate) edit: RegionEdit,
894    /// The sender to notify the result to the region engine.
895    pub(crate) tx: Sender<Result<()>>,
896}
897
898/// Notifies the regin the result of editing region.
899#[derive(Debug)]
900pub(crate) struct RegionEditResult {
901    /// Region id.
902    pub(crate) region_id: RegionId,
903    /// Result sender.
904    pub(crate) sender: Sender<Result<()>>,
905    /// Region edit to apply.
906    pub(crate) edit: RegionEdit,
907    /// Result from the manifest manager.
908    pub(crate) result: Result<()>,
909}
910
911#[derive(Debug)]
912pub(crate) struct RegionSyncRequest {
913    pub(crate) region_id: RegionId,
914    pub(crate) manifest_version: ManifestVersion,
915    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
916    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
917}
918
919#[cfg(test)]
920mod tests {
921    use api::v1::value::ValueData;
922    use api::v1::{Row, SemanticType};
923    use datatypes::prelude::ConcreteDataType;
924    use datatypes::schema::ColumnDefaultConstraint;
925    use mito_codec::test_util::i64_value;
926    use store_api::metadata::RegionMetadataBuilder;
927
928    use super::*;
929    use crate::error::Error;
930    use crate::test_util::ts_ms_value;
931
932    fn new_column_schema(
933        name: &str,
934        data_type: ColumnDataType,
935        semantic_type: SemanticType,
936    ) -> ColumnSchema {
937        ColumnSchema {
938            column_name: name.to_string(),
939            datatype: data_type as i32,
940            semantic_type: semantic_type as i32,
941            ..Default::default()
942        }
943    }
944
945    fn check_invalid_request(err: &Error, expect: &str) {
946        if let Error::InvalidRequest {
947            region_id: _,
948            reason,
949            location: _,
950        } = err
951        {
952            assert_eq!(reason, expect);
953        } else {
954            panic!("Unexpected error {err}")
955        }
956    }
957
958    #[test]
959    fn test_write_request_duplicate_column() {
960        let rows = Rows {
961            schema: vec![
962                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
963                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
964            ],
965            rows: vec![],
966        };
967
968        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
969        check_invalid_request(&err, "duplicate column c0");
970    }
971
972    #[test]
973    fn test_valid_write_request() {
974        let rows = Rows {
975            schema: vec![
976                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
977                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
978            ],
979            rows: vec![Row {
980                values: vec![i64_value(1), i64_value(2)],
981            }],
982        };
983
984        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
985        assert_eq!(0, request.column_index_by_name("c0").unwrap());
986        assert_eq!(1, request.column_index_by_name("c1").unwrap());
987        assert_eq!(None, request.column_index_by_name("c2"));
988    }
989
990    #[test]
991    fn test_write_request_column_num() {
992        let rows = Rows {
993            schema: vec![
994                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
995                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
996            ],
997            rows: vec![Row {
998                values: vec![i64_value(1), i64_value(2), i64_value(3)],
999            }],
1000        };
1001
1002        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1003        check_invalid_request(&err, "row has 3 columns but schema has 2");
1004    }
1005
1006    fn new_region_metadata() -> RegionMetadata {
1007        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1008        builder
1009            .push_column_metadata(ColumnMetadata {
1010                column_schema: datatypes::schema::ColumnSchema::new(
1011                    "ts",
1012                    ConcreteDataType::timestamp_millisecond_datatype(),
1013                    false,
1014                ),
1015                semantic_type: SemanticType::Timestamp,
1016                column_id: 1,
1017            })
1018            .push_column_metadata(ColumnMetadata {
1019                column_schema: datatypes::schema::ColumnSchema::new(
1020                    "k0",
1021                    ConcreteDataType::int64_datatype(),
1022                    true,
1023                ),
1024                semantic_type: SemanticType::Tag,
1025                column_id: 2,
1026            })
1027            .primary_key(vec![2]);
1028        builder.build().unwrap()
1029    }
1030
1031    #[test]
1032    fn test_check_schema() {
1033        let rows = Rows {
1034            schema: vec![
1035                new_column_schema(
1036                    "ts",
1037                    ColumnDataType::TimestampMillisecond,
1038                    SemanticType::Timestamp,
1039                ),
1040                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1041            ],
1042            rows: vec![Row {
1043                values: vec![ts_ms_value(1), i64_value(2)],
1044            }],
1045        };
1046        let metadata = new_region_metadata();
1047
1048        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1049        request.check_schema(&metadata).unwrap();
1050    }
1051
1052    #[test]
1053    fn test_column_type() {
1054        let rows = Rows {
1055            schema: vec![
1056                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1057                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1058            ],
1059            rows: vec![Row {
1060                values: vec![i64_value(1), i64_value(2)],
1061            }],
1062        };
1063        let metadata = new_region_metadata();
1064
1065        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1066        let err = request.check_schema(&metadata).unwrap_err();
1067        check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1068    }
1069
1070    #[test]
1071    fn test_semantic_type() {
1072        let rows = Rows {
1073            schema: vec![
1074                new_column_schema(
1075                    "ts",
1076                    ColumnDataType::TimestampMillisecond,
1077                    SemanticType::Tag,
1078                ),
1079                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1080            ],
1081            rows: vec![Row {
1082                values: vec![ts_ms_value(1), i64_value(2)],
1083            }],
1084        };
1085        let metadata = new_region_metadata();
1086
1087        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1088        let err = request.check_schema(&metadata).unwrap_err();
1089        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1090    }
1091
1092    #[test]
1093    fn test_column_nullable() {
1094        let rows = Rows {
1095            schema: vec![
1096                new_column_schema(
1097                    "ts",
1098                    ColumnDataType::TimestampMillisecond,
1099                    SemanticType::Timestamp,
1100                ),
1101                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1102            ],
1103            rows: vec![Row {
1104                values: vec![Value { value_data: None }, i64_value(2)],
1105            }],
1106        };
1107        let metadata = new_region_metadata();
1108
1109        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1110        let err = request.check_schema(&metadata).unwrap_err();
1111        check_invalid_request(&err, "column ts is not null but input has null");
1112    }
1113
1114    #[test]
1115    fn test_column_default() {
1116        let rows = Rows {
1117            schema: vec![new_column_schema(
1118                "k0",
1119                ColumnDataType::Int64,
1120                SemanticType::Tag,
1121            )],
1122            rows: vec![Row {
1123                values: vec![i64_value(1)],
1124            }],
1125        };
1126        let metadata = new_region_metadata();
1127
1128        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1129        let err = request.check_schema(&metadata).unwrap_err();
1130        check_invalid_request(&err, "missing column ts");
1131    }
1132
1133    #[test]
1134    fn test_unknown_column() {
1135        let rows = Rows {
1136            schema: vec![
1137                new_column_schema(
1138                    "ts",
1139                    ColumnDataType::TimestampMillisecond,
1140                    SemanticType::Timestamp,
1141                ),
1142                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1143                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1144            ],
1145            rows: vec![Row {
1146                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1147            }],
1148        };
1149        let metadata = new_region_metadata();
1150
1151        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1152        let err = request.check_schema(&metadata).unwrap_err();
1153        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1154    }
1155
1156    #[test]
1157    fn test_fill_impure_columns_err() {
1158        let rows = Rows {
1159            schema: vec![new_column_schema(
1160                "k0",
1161                ColumnDataType::Int64,
1162                SemanticType::Tag,
1163            )],
1164            rows: vec![Row {
1165                values: vec![i64_value(1)],
1166            }],
1167        };
1168        let metadata = {
1169            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1170            builder
1171                .push_column_metadata(ColumnMetadata {
1172                    column_schema: datatypes::schema::ColumnSchema::new(
1173                        "ts",
1174                        ConcreteDataType::timestamp_millisecond_datatype(),
1175                        false,
1176                    )
1177                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1178                        "now()".to_string(),
1179                    )))
1180                    .unwrap(),
1181                    semantic_type: SemanticType::Timestamp,
1182                    column_id: 1,
1183                })
1184                .push_column_metadata(ColumnMetadata {
1185                    column_schema: datatypes::schema::ColumnSchema::new(
1186                        "k0",
1187                        ConcreteDataType::int64_datatype(),
1188                        true,
1189                    ),
1190                    semantic_type: SemanticType::Tag,
1191                    column_id: 2,
1192                })
1193                .primary_key(vec![2]);
1194            builder.build().unwrap()
1195        };
1196
1197        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1198        let err = request.check_schema(&metadata).unwrap_err();
1199        assert!(err.is_fill_default());
1200        assert!(request
1201            .fill_missing_columns(&metadata)
1202            .unwrap_err()
1203            .to_string()
1204            .contains("Unexpected impure default value with region_id"));
1205    }
1206
1207    #[test]
1208    fn test_fill_missing_columns() {
1209        let rows = Rows {
1210            schema: vec![new_column_schema(
1211                "ts",
1212                ColumnDataType::TimestampMillisecond,
1213                SemanticType::Timestamp,
1214            )],
1215            rows: vec![Row {
1216                values: vec![ts_ms_value(1)],
1217            }],
1218        };
1219        let metadata = new_region_metadata();
1220
1221        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1222        let err = request.check_schema(&metadata).unwrap_err();
1223        assert!(err.is_fill_default());
1224        request.fill_missing_columns(&metadata).unwrap();
1225
1226        let expect_rows = Rows {
1227            schema: vec![new_column_schema(
1228                "ts",
1229                ColumnDataType::TimestampMillisecond,
1230                SemanticType::Timestamp,
1231            )],
1232            rows: vec![Row {
1233                values: vec![ts_ms_value(1)],
1234            }],
1235        };
1236        assert_eq!(expect_rows, request.rows);
1237    }
1238
1239    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1240        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1241        builder
1242            .push_column_metadata(ColumnMetadata {
1243                column_schema: datatypes::schema::ColumnSchema::new(
1244                    "ts",
1245                    ConcreteDataType::timestamp_millisecond_datatype(),
1246                    false,
1247                ),
1248                semantic_type: SemanticType::Timestamp,
1249                column_id: 1,
1250            })
1251            .push_column_metadata(ColumnMetadata {
1252                column_schema: datatypes::schema::ColumnSchema::new(
1253                    "k0",
1254                    ConcreteDataType::int64_datatype(),
1255                    true,
1256                ),
1257                semantic_type: SemanticType::Tag,
1258                column_id: 2,
1259            })
1260            .primary_key(vec![2]);
1261        builder
1262    }
1263
1264    fn region_metadata_two_fields() -> RegionMetadata {
1265        let mut builder = builder_with_ts_tag();
1266        builder
1267            .push_column_metadata(ColumnMetadata {
1268                column_schema: datatypes::schema::ColumnSchema::new(
1269                    "f0",
1270                    ConcreteDataType::int64_datatype(),
1271                    true,
1272                ),
1273                semantic_type: SemanticType::Field,
1274                column_id: 3,
1275            })
1276            // Column is not nullable.
1277            .push_column_metadata(ColumnMetadata {
1278                column_schema: datatypes::schema::ColumnSchema::new(
1279                    "f1",
1280                    ConcreteDataType::int64_datatype(),
1281                    false,
1282                )
1283                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1284                    datatypes::value::Value::Int64(100),
1285                )))
1286                .unwrap(),
1287                semantic_type: SemanticType::Field,
1288                column_id: 4,
1289            });
1290        builder.build().unwrap()
1291    }
1292
1293    #[test]
1294    fn test_fill_missing_for_delete() {
1295        let rows = Rows {
1296            schema: vec![new_column_schema(
1297                "ts",
1298                ColumnDataType::TimestampMillisecond,
1299                SemanticType::Timestamp,
1300            )],
1301            rows: vec![Row {
1302                values: vec![ts_ms_value(1)],
1303            }],
1304        };
1305        let metadata = region_metadata_two_fields();
1306
1307        let mut request =
1308            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1309        let err = request.check_schema(&metadata).unwrap_err();
1310        check_invalid_request(&err, "delete requests need column k0");
1311        let err = request.fill_missing_columns(&metadata).unwrap_err();
1312        check_invalid_request(&err, "delete requests need column k0");
1313
1314        let rows = Rows {
1315            schema: vec![
1316                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1317                new_column_schema(
1318                    "ts",
1319                    ColumnDataType::TimestampMillisecond,
1320                    SemanticType::Timestamp,
1321                ),
1322            ],
1323            rows: vec![Row {
1324                values: vec![i64_value(100), ts_ms_value(1)],
1325            }],
1326        };
1327        let mut request =
1328            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1329        let err = request.check_schema(&metadata).unwrap_err();
1330        assert!(err.is_fill_default());
1331        request.fill_missing_columns(&metadata).unwrap();
1332
1333        let expect_rows = Rows {
1334            schema: vec![
1335                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1336                new_column_schema(
1337                    "ts",
1338                    ColumnDataType::TimestampMillisecond,
1339                    SemanticType::Timestamp,
1340                ),
1341                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1342            ],
1343            // Column f1 is not nullable and we use 0 for padding.
1344            rows: vec![Row {
1345                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1346            }],
1347        };
1348        assert_eq!(expect_rows, request.rows);
1349    }
1350
1351    #[test]
1352    fn test_fill_missing_without_default_in_delete() {
1353        let mut builder = builder_with_ts_tag();
1354        builder
1355            // f0 is nullable.
1356            .push_column_metadata(ColumnMetadata {
1357                column_schema: datatypes::schema::ColumnSchema::new(
1358                    "f0",
1359                    ConcreteDataType::int64_datatype(),
1360                    true,
1361                ),
1362                semantic_type: SemanticType::Field,
1363                column_id: 3,
1364            })
1365            // f1 is not nullable and don't has default.
1366            .push_column_metadata(ColumnMetadata {
1367                column_schema: datatypes::schema::ColumnSchema::new(
1368                    "f1",
1369                    ConcreteDataType::int64_datatype(),
1370                    false,
1371                ),
1372                semantic_type: SemanticType::Field,
1373                column_id: 4,
1374            });
1375        let metadata = builder.build().unwrap();
1376
1377        let rows = Rows {
1378            schema: vec![
1379                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1380                new_column_schema(
1381                    "ts",
1382                    ColumnDataType::TimestampMillisecond,
1383                    SemanticType::Timestamp,
1384                ),
1385            ],
1386            // Missing f0 (nullable), f1 (not nullable).
1387            rows: vec![Row {
1388                values: vec![i64_value(100), ts_ms_value(1)],
1389            }],
1390        };
1391        let mut request =
1392            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1393        let err = request.check_schema(&metadata).unwrap_err();
1394        assert!(err.is_fill_default());
1395        request.fill_missing_columns(&metadata).unwrap();
1396
1397        let expect_rows = Rows {
1398            schema: vec![
1399                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1400                new_column_schema(
1401                    "ts",
1402                    ColumnDataType::TimestampMillisecond,
1403                    SemanticType::Timestamp,
1404                ),
1405                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1406            ],
1407            // Column f1 is not nullable and we use 0 for padding.
1408            rows: vec![Row {
1409                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1410            }],
1411        };
1412        assert_eq!(expect_rows, request.rows);
1413    }
1414
1415    #[test]
1416    fn test_no_default() {
1417        let rows = Rows {
1418            schema: vec![new_column_schema(
1419                "k0",
1420                ColumnDataType::Int64,
1421                SemanticType::Tag,
1422            )],
1423            rows: vec![Row {
1424                values: vec![i64_value(1)],
1425            }],
1426        };
1427        let metadata = new_region_metadata();
1428
1429        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1430        let err = request.fill_missing_columns(&metadata).unwrap_err();
1431        check_invalid_request(&err, "column ts does not have default value");
1432    }
1433
1434    #[test]
1435    fn test_missing_and_invalid() {
1436        // Missing f0 and f1 has invalid type (string).
1437        let rows = Rows {
1438            schema: vec![
1439                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1440                new_column_schema(
1441                    "ts",
1442                    ColumnDataType::TimestampMillisecond,
1443                    SemanticType::Timestamp,
1444                ),
1445                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1446            ],
1447            rows: vec![Row {
1448                values: vec![
1449                    i64_value(100),
1450                    ts_ms_value(1),
1451                    Value {
1452                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1453                    },
1454                ],
1455            }],
1456        };
1457        let metadata = region_metadata_two_fields();
1458
1459        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1460        let err = request.check_schema(&metadata).unwrap_err();
1461        check_invalid_request(
1462            &err,
1463            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1464        );
1465    }
1466
1467    #[test]
1468    fn test_write_request_metadata() {
1469        let rows = Rows {
1470            schema: vec![
1471                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1472                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1473            ],
1474            rows: vec![Row {
1475                values: vec![i64_value(1), i64_value(2)],
1476            }],
1477        };
1478
1479        let metadata = Arc::new(new_region_metadata());
1480        let request = WriteRequest::new(
1481            RegionId::new(1, 1),
1482            OpType::Put,
1483            rows,
1484            Some(metadata.clone()),
1485        )
1486        .unwrap();
1487
1488        assert!(request.region_metadata.is_some());
1489        assert_eq!(
1490            request.region_metadata.unwrap().region_id,
1491            RegionId::new(1, 1)
1492        );
1493    }
1494}