mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23    ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
36use store_api::region_request::{
37    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
38    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
39    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
40};
41use store_api::storage::{RegionId, SequenceNumber};
42use store_api::ManifestVersion;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56/// Request to write a region.
57#[derive(Debug)]
58pub struct WriteRequest {
59    /// Region to write.
60    pub region_id: RegionId,
61    /// Type of the write request.
62    pub op_type: OpType,
63    /// Rows to write.
64    pub rows: Rows,
65    /// Map column name to column index in `rows`.
66    pub name_to_index: HashMap<String, usize>,
67    /// Whether each column has null.
68    pub has_null: Vec<bool>,
69    /// Write hint.
70    pub hint: Option<WriteHint>,
71    /// Region metadata on the time of this request is created.
72    pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76    /// Creates a new request.
77    ///
78    /// Returns `Err` if `rows` are invalid.
79    pub fn new(
80        region_id: RegionId,
81        op_type: OpType,
82        rows: Rows,
83        region_metadata: Option<RegionMetadataRef>,
84    ) -> Result<WriteRequest> {
85        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86        for (index, column) in rows.schema.iter().enumerate() {
87            ensure!(
88                name_to_index
89                    .insert(column.column_name.clone(), index)
90                    .is_none(),
91                InvalidRequestSnafu {
92                    region_id,
93                    reason: format!("duplicate column {}", column.column_name),
94                }
95            );
96        }
97
98        let mut has_null = vec![false; rows.schema.len()];
99        for row in &rows.rows {
100            ensure!(
101                row.values.len() == rows.schema.len(),
102                InvalidRequestSnafu {
103                    region_id,
104                    reason: format!(
105                        "row has {} columns but schema has {}",
106                        row.values.len(),
107                        rows.schema.len()
108                    ),
109                }
110            );
111
112            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113                validate_proto_value(region_id, value, column_schema)?;
114
115                if value.value_data.is_none() {
116                    has_null[i] = true;
117                }
118            }
119        }
120
121        Ok(WriteRequest {
122            region_id,
123            op_type,
124            rows,
125            name_to_index,
126            has_null,
127            hint: None,
128            region_metadata,
129        })
130    }
131
132    /// Sets the write hint.
133    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134        self.hint = hint;
135        self
136    }
137
138    /// Returns the encoding hint.
139    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140        infer_primary_key_encoding_from_hint(self.hint.as_ref())
141    }
142
143    /// Returns estimated size of the request.
144    pub(crate) fn estimated_size(&self) -> usize {
145        let row_size = self
146            .rows
147            .rows
148            .first()
149            .map(|row| row.encoded_len())
150            .unwrap_or(0);
151        row_size * self.rows.rows.len()
152    }
153
154    /// Gets column index by name.
155    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156        self.name_to_index.get(name).copied()
157    }
158
159    /// Checks schema of rows is compatible with schema of the region.
160    ///
161    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
162    /// error.
163    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164        debug_assert_eq!(self.region_id, metadata.region_id);
165
166        let region_id = self.region_id;
167        // Index all columns in rows.
168        let mut rows_columns: HashMap<_, _> = self
169            .rows
170            .schema
171            .iter()
172            .map(|column| (&column.column_name, column))
173            .collect();
174
175        let mut need_fill_default = false;
176        // Checks all columns in this region.
177        for column in &metadata.column_metadatas {
178            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179                // Check data type.
180                ensure!(
181                    is_column_type_value_eq(
182                        input_col.datatype,
183                        input_col.datatype_extension,
184                        &column.column_schema.data_type
185                    ),
186                    InvalidRequestSnafu {
187                        region_id,
188                        reason: format!(
189                            "column {} expect type {:?}, given: {}({})",
190                            column.column_schema.name,
191                            column.column_schema.data_type,
192                            ColumnDataType::try_from(input_col.datatype)
193                                .map(|v| v.as_str_name())
194                                .unwrap_or("Unknown"),
195                            input_col.datatype,
196                        )
197                    }
198                );
199
200                // Check semantic type.
201                ensure!(
202                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203                    InvalidRequestSnafu {
204                        region_id,
205                        reason: format!(
206                            "column {} has semantic type {:?}, given: {}({})",
207                            column.column_schema.name,
208                            column.semantic_type,
209                            api::v1::SemanticType::try_from(input_col.semantic_type)
210                                .map(|v| v.as_str_name())
211                                .unwrap_or("Unknown"),
212                            input_col.semantic_type
213                        ),
214                    }
215                );
216
217                // Check nullable.
218                // Safety: `rows_columns` ensures this column exists.
219                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220                ensure!(
221                    !has_null || column.column_schema.is_nullable(),
222                    InvalidRequestSnafu {
223                        region_id,
224                        reason: format!(
225                            "column {} is not null but input has null",
226                            column.column_schema.name
227                        ),
228                    }
229                );
230            } else {
231                // Rows don't have this column.
232                self.check_missing_column(column)?;
233
234                need_fill_default = true;
235            }
236        }
237
238        // Checks all columns in rows exist in the region.
239        if !rows_columns.is_empty() {
240            let names: Vec<_> = rows_columns.into_keys().collect();
241            return InvalidRequestSnafu {
242                region_id,
243                reason: format!("unknown columns: {:?}", names),
244            }
245            .fail();
246        }
247
248        // If we need to fill default values, return a special error.
249        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251        Ok(())
252    }
253
254    /// Tries to fill missing columns.
255    ///
256    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
257    /// values.
258    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259        debug_assert_eq!(self.region_id, metadata.region_id);
260
261        let mut columns_to_fill = vec![];
262        for column in &metadata.column_metadatas {
263            if !self.name_to_index.contains_key(&column.column_schema.name) {
264                columns_to_fill.push(column);
265            }
266        }
267        self.fill_columns(columns_to_fill)?;
268
269        Ok(())
270    }
271
272    /// Checks the schema and fill missing columns.
273    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274        if let Err(e) = self.check_schema(metadata) {
275            if e.is_fill_default() {
276                // TODO(yingwen): Add metrics for this case.
277                // We need to fill default value. The write request may be a request
278                // sent before changing the schema.
279                self.fill_missing_columns(metadata)?;
280            } else {
281                return Err(e);
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Fills default value for specific `columns`.
289    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290        let mut default_values = Vec::with_capacity(columns.len());
291        let mut columns_to_fill = Vec::with_capacity(columns.len());
292        for column in columns {
293            let default_value = self.column_default_value(column)?;
294            if default_value.value_data.is_some() {
295                default_values.push(default_value);
296                columns_to_fill.push(column);
297            }
298        }
299
300        for row in &mut self.rows.rows {
301            row.values.extend(default_values.iter().cloned());
302        }
303
304        for column in columns_to_fill {
305            let (datatype, datatype_ext) =
306                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307                    .with_context(|_| ConvertColumnDataTypeSnafu {
308                        reason: format!(
309                            "no protobuf type for column {} ({:?})",
310                            column.column_schema.name, column.column_schema.data_type
311                        ),
312                    })?
313                    .to_parts();
314            self.rows.schema.push(ColumnSchema {
315                column_name: column.column_schema.name.clone(),
316                datatype: datatype as i32,
317                semantic_type: column.semantic_type as i32,
318                datatype_extension: datatype_ext,
319                options: options_from_column_schema(&column.column_schema),
320            });
321        }
322
323        Ok(())
324    }
325
326    /// Checks whether we should allow a row doesn't provide this column.
327    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328        if self.op_type == OpType::Delete {
329            if column.semantic_type == SemanticType::Field {
330                // For delete request, all tags and timestamp is required. We don't fill default
331                // tag or timestamp while deleting rows.
332                return Ok(());
333            } else {
334                return InvalidRequestSnafu {
335                    region_id: self.region_id,
336                    reason: format!("delete requests need column {}", column.column_schema.name),
337                }
338                .fail();
339            }
340        }
341
342        // Not a delete request. Checks whether they have default value.
343        ensure!(
344            column.column_schema.is_nullable()
345                || column.column_schema.default_constraint().is_some(),
346            InvalidRequestSnafu {
347                region_id: self.region_id,
348                reason: format!("missing column {}", column.column_schema.name),
349            }
350        );
351
352        Ok(())
353    }
354
355    /// Returns the default value for specific column.
356    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357        let default_value = match self.op_type {
358            OpType::Delete => {
359                ensure!(
360                    column.semantic_type == SemanticType::Field,
361                    InvalidRequestSnafu {
362                        region_id: self.region_id,
363                        reason: format!(
364                            "delete requests need column {}",
365                            column.column_schema.name
366                        ),
367                    }
368                );
369
370                // For delete request, we need a default value for padding so we
371                // can delete a row even a field doesn't have a default value. So the
372                // value doesn't need to following the default value constraint of the
373                // column.
374                if column.column_schema.is_nullable() {
375                    datatypes::value::Value::Null
376                } else {
377                    column.column_schema.data_type.default_value()
378                }
379            }
380            OpType::Put => {
381                // For put requests, we use the default value from column schema.
382                if column.column_schema.is_default_impure() {
383                    UnexpectedSnafu {
384                        reason: format!(
385                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", 
386                            self.region_id,
387                            column.column_schema.name,
388                            column.column_schema.default_constraint(),
389                        ),
390                    }
391                    .fail()?
392                }
393                column
394                    .column_schema
395                    .create_default()
396                    .context(CreateDefaultSnafu {
397                        region_id: self.region_id,
398                        column: &column.column_schema.name,
399                    })?
400                    // This column doesn't have default value.
401                    .with_context(|| InvalidRequestSnafu {
402                        region_id: self.region_id,
403                        reason: format!(
404                            "column {} does not have default value",
405                            column.column_schema.name
406                        ),
407                    })?
408            }
409        };
410
411        // Convert default value into proto's value.
412        to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
413            region_id: self.region_id,
414            reason: format!(
415                "no protobuf type for default value of column {} ({:?})",
416                column.column_schema.name, column.column_schema.data_type
417            ),
418        })
419    }
420}
421
422/// Validate proto value schema.
423pub(crate) fn validate_proto_value(
424    region_id: RegionId,
425    value: &Value,
426    column_schema: &ColumnSchema,
427) -> Result<()> {
428    if let Some(value_type) = proto_value_type(value) {
429        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
430            InvalidRequestSnafu {
431                region_id,
432                reason: format!(
433                    "column {} has unknown type {}",
434                    column_schema.column_name, column_schema.datatype
435                ),
436            }
437            .build()
438        })?;
439        ensure!(
440            proto_value_type_match(column_type, value_type),
441            InvalidRequestSnafu {
442                region_id,
443                reason: format!(
444                    "value has type {:?}, but column {} has type {:?}({})",
445                    value_type, column_schema.column_name, column_type, column_schema.datatype,
446                ),
447            }
448        );
449    }
450
451    Ok(())
452}
453
454fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
455    match (column_type, value_type) {
456        (ct, vt) if ct == vt => true,
457        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
458        (ColumnDataType::Json, ColumnDataType::Binary) => true,
459        _ => false,
460    }
461}
462
463/// Oneshot output result sender.
464#[derive(Debug)]
465pub struct OutputTx(Sender<Result<AffectedRows>>);
466
467impl OutputTx {
468    /// Creates a new output sender.
469    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
470        OutputTx(sender)
471    }
472
473    /// Sends the `result`.
474    pub(crate) fn send(self, result: Result<AffectedRows>) {
475        // Ignores send result.
476        let _ = self.0.send(result);
477    }
478}
479
480/// Optional output result sender.
481#[derive(Debug)]
482pub(crate) struct OptionOutputTx(Option<OutputTx>);
483
484impl OptionOutputTx {
485    /// Creates a sender.
486    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
487        OptionOutputTx(sender)
488    }
489
490    /// Creates an empty sender.
491    pub(crate) fn none() -> OptionOutputTx {
492        OptionOutputTx(None)
493    }
494
495    /// Sends the `result` and consumes the inner sender.
496    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
497        if let Some(sender) = self.0.take() {
498            sender.send(result);
499        }
500    }
501
502    /// Sends the `result` and consumes the sender.
503    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
504        if let Some(sender) = self.0.take() {
505            sender.send(result);
506        }
507    }
508
509    /// Takes the inner sender.
510    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
511        self.0.take()
512    }
513}
514
515impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
516    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
517        Self::new(Some(OutputTx::new(sender)))
518    }
519}
520
521impl OnFailure for OptionOutputTx {
522    fn on_failure(&mut self, err: Error) {
523        self.send_mut(Err(err));
524    }
525}
526
527/// Callback on failure.
528pub(crate) trait OnFailure {
529    /// Handles `err` on failure.
530    fn on_failure(&mut self, err: Error);
531}
532
533/// Sender and write request.
534#[derive(Debug)]
535pub(crate) struct SenderWriteRequest {
536    /// Result sender.
537    pub(crate) sender: OptionOutputTx,
538    pub(crate) request: WriteRequest,
539}
540
541pub(crate) struct SenderBulkRequest {
542    pub(crate) sender: OptionOutputTx,
543    pub(crate) region_id: RegionId,
544    pub(crate) request: BulkPart,
545    pub(crate) region_metadata: RegionMetadataRef,
546}
547
548/// Request sent to a worker with timestamp
549#[derive(Debug)]
550pub(crate) struct WorkerRequestWithTime {
551    pub(crate) request: WorkerRequest,
552    pub(crate) created_at: Instant,
553}
554
555impl WorkerRequestWithTime {
556    pub(crate) fn new(request: WorkerRequest) -> Self {
557        Self {
558            request,
559            created_at: Instant::now(),
560        }
561    }
562}
563
564/// Request sent to a worker
565#[derive(Debug)]
566pub(crate) enum WorkerRequest {
567    /// Write to a region.
568    Write(SenderWriteRequest),
569
570    /// Ddl request to a region.
571    Ddl(SenderDdlRequest),
572
573    /// Notifications from internal background jobs.
574    Background {
575        /// Id of the region to send.
576        region_id: RegionId,
577        /// Internal notification.
578        notify: BackgroundNotify,
579    },
580
581    /// The internal commands.
582    SetRegionRoleStateGracefully {
583        /// Id of the region to send.
584        region_id: RegionId,
585        /// The [SettableRegionRoleState].
586        region_role_state: SettableRegionRoleState,
587        /// The sender of [SetReadonlyResponse].
588        sender: Sender<SetRegionRoleStateResponse>,
589    },
590
591    /// Notify a worker to stop.
592    Stop,
593
594    /// Use [RegionEdit] to edit a region directly.
595    EditRegion(RegionEditRequest),
596
597    /// Keep the manifest of a region up to date.
598    SyncRegion(RegionSyncRequest),
599
600    /// Bulk inserts request and region metadata.
601    BulkInserts {
602        metadata: Option<RegionMetadataRef>,
603        request: RegionBulkInsertsRequest,
604        sender: OptionOutputTx,
605    },
606}
607
608impl WorkerRequest {
609    pub(crate) fn new_open_region_request(
610        region_id: RegionId,
611        request: RegionOpenRequest,
612        entry_receiver: Option<WalEntryReceiver>,
613    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
614        let (sender, receiver) = oneshot::channel();
615
616        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
617            region_id,
618            sender: sender.into(),
619            request: DdlRequest::Open((request, entry_receiver)),
620        });
621
622        (worker_request, receiver)
623    }
624
625    /// Converts request from a [RegionRequest].
626    pub(crate) fn try_from_region_request(
627        region_id: RegionId,
628        value: RegionRequest,
629        region_metadata: Option<RegionMetadataRef>,
630    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
631        let (sender, receiver) = oneshot::channel();
632        let worker_request = match value {
633            RegionRequest::Put(v) => {
634                let mut write_request =
635                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
636                        .with_hint(v.hint);
637                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
638                    && let Some(region_metadata) = &region_metadata
639                {
640                    write_request.maybe_fill_missing_columns(region_metadata)?;
641                }
642                WorkerRequest::Write(SenderWriteRequest {
643                    sender: sender.into(),
644                    request: write_request,
645                })
646            }
647            RegionRequest::Delete(v) => {
648                let mut write_request =
649                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
650                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651                    && let Some(region_metadata) = &region_metadata
652                {
653                    write_request.maybe_fill_missing_columns(region_metadata)?;
654                }
655                WorkerRequest::Write(SenderWriteRequest {
656                    sender: sender.into(),
657                    request: write_request,
658                })
659            }
660            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
661                region_id,
662                sender: sender.into(),
663                request: DdlRequest::Create(v),
664            }),
665            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
666                region_id,
667                sender: sender.into(),
668                request: DdlRequest::Drop,
669            }),
670            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
671                region_id,
672                sender: sender.into(),
673                request: DdlRequest::Open((v, None)),
674            }),
675            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
676                region_id,
677                sender: sender.into(),
678                request: DdlRequest::Close(v),
679            }),
680            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
681                region_id,
682                sender: sender.into(),
683                request: DdlRequest::Alter(v),
684            }),
685            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
686                region_id,
687                sender: sender.into(),
688                request: DdlRequest::Flush(v),
689            }),
690            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
691                region_id,
692                sender: sender.into(),
693                request: DdlRequest::Compact(v),
694            }),
695            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
696                region_id,
697                sender: sender.into(),
698                request: DdlRequest::Truncate(v),
699            }),
700            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
701                region_id,
702                sender: sender.into(),
703                request: DdlRequest::Catchup(v),
704            }),
705            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
706                metadata: region_metadata,
707                sender: sender.into(),
708                request: region_bulk_inserts_request,
709            },
710        };
711
712        Ok((worker_request, receiver))
713    }
714
715    pub(crate) fn new_set_readonly_gracefully(
716        region_id: RegionId,
717        region_role_state: SettableRegionRoleState,
718    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
719        let (sender, receiver) = oneshot::channel();
720
721        (
722            WorkerRequest::SetRegionRoleStateGracefully {
723                region_id,
724                region_role_state,
725                sender,
726            },
727            receiver,
728        )
729    }
730
731    pub(crate) fn new_sync_region_request(
732        region_id: RegionId,
733        manifest_version: ManifestVersion,
734    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
735        let (sender, receiver) = oneshot::channel();
736        (
737            WorkerRequest::SyncRegion(RegionSyncRequest {
738                region_id,
739                manifest_version,
740                sender,
741            }),
742            receiver,
743        )
744    }
745}
746
747/// DDL request to a region.
748#[derive(Debug)]
749pub(crate) enum DdlRequest {
750    Create(RegionCreateRequest),
751    Drop,
752    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
753    Close(RegionCloseRequest),
754    Alter(RegionAlterRequest),
755    Flush(RegionFlushRequest),
756    Compact(RegionCompactRequest),
757    Truncate(RegionTruncateRequest),
758    Catchup(RegionCatchupRequest),
759}
760
761/// Sender and Ddl request.
762#[derive(Debug)]
763pub(crate) struct SenderDdlRequest {
764    /// Region id of the request.
765    pub(crate) region_id: RegionId,
766    /// Result sender.
767    pub(crate) sender: OptionOutputTx,
768    /// Ddl request.
769    pub(crate) request: DdlRequest,
770}
771
772/// Notification from a background job.
773#[derive(Debug)]
774pub(crate) enum BackgroundNotify {
775    /// Flush has finished.
776    FlushFinished(FlushFinished),
777    /// Flush has failed.
778    FlushFailed(FlushFailed),
779    /// Compaction has finished.
780    CompactionFinished(CompactionFinished),
781    /// Compaction has failed.
782    CompactionFailed(CompactionFailed),
783    /// Truncate result.
784    Truncate(TruncateResult),
785    /// Region change result.
786    RegionChange(RegionChangeResult),
787    /// Region edit result.
788    RegionEdit(RegionEditResult),
789}
790
791/// Notifies a flush job is finished.
792#[derive(Debug)]
793pub(crate) struct FlushFinished {
794    /// Region id.
795    pub(crate) region_id: RegionId,
796    /// Entry id of flushed data.
797    pub(crate) flushed_entry_id: EntryId,
798    /// Flush result senders.
799    pub(crate) senders: Vec<OutputTx>,
800    /// Flush timer.
801    pub(crate) _timer: HistogramTimer,
802    /// Region edit to apply.
803    pub(crate) edit: RegionEdit,
804    /// Memtables to remove.
805    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
806}
807
808impl FlushFinished {
809    /// Marks the flush job as successful and observes the timer.
810    pub(crate) fn on_success(self) {
811        for sender in self.senders {
812            sender.send(Ok(0));
813        }
814    }
815}
816
817impl OnFailure for FlushFinished {
818    fn on_failure(&mut self, err: Error) {
819        let err = Arc::new(err);
820        for sender in self.senders.drain(..) {
821            sender.send(Err(err.clone()).context(FlushRegionSnafu {
822                region_id: self.region_id,
823            }));
824        }
825    }
826}
827
828/// Notifies a flush job is failed.
829#[derive(Debug)]
830pub(crate) struct FlushFailed {
831    /// The error source of the failure.
832    pub(crate) err: Arc<Error>,
833}
834
835/// Notifies a compaction job has finished.
836#[derive(Debug)]
837pub(crate) struct CompactionFinished {
838    /// Region id.
839    pub(crate) region_id: RegionId,
840    /// Compaction result senders.
841    pub(crate) senders: Vec<OutputTx>,
842    /// Start time of compaction task.
843    pub(crate) start_time: Instant,
844    /// Region edit to apply.
845    pub(crate) edit: RegionEdit,
846}
847
848impl CompactionFinished {
849    pub fn on_success(self) {
850        // only update compaction time on success
851        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
852
853        for sender in self.senders {
854            sender.send(Ok(0));
855        }
856        info!("Successfully compacted region: {}", self.region_id);
857    }
858}
859
860impl OnFailure for CompactionFinished {
861    /// Compaction succeeded but failed to update manifest or region's already been dropped.
862    fn on_failure(&mut self, err: Error) {
863        let err = Arc::new(err);
864        for sender in self.senders.drain(..) {
865            sender.send(Err(err.clone()).context(CompactRegionSnafu {
866                region_id: self.region_id,
867            }));
868        }
869    }
870}
871
872/// A failing compaction result.
873#[derive(Debug)]
874pub(crate) struct CompactionFailed {
875    pub(crate) region_id: RegionId,
876    /// The error source of the failure.
877    pub(crate) err: Arc<Error>,
878}
879
880/// Notifies the truncate result of a region.
881#[derive(Debug)]
882pub(crate) struct TruncateResult {
883    /// Region id.
884    pub(crate) region_id: RegionId,
885    /// Result sender.
886    pub(crate) sender: OptionOutputTx,
887    /// Truncate result.
888    pub(crate) result: Result<()>,
889    /// Truncated entry id.
890    pub(crate) truncated_entry_id: EntryId,
891    /// Truncated sequence.
892    pub(crate) truncated_sequence: SequenceNumber,
893}
894
895/// Notifies the region the result of writing region change action.
896#[derive(Debug)]
897pub(crate) struct RegionChangeResult {
898    /// Region id.
899    pub(crate) region_id: RegionId,
900    /// The new region metadata to apply.
901    pub(crate) new_meta: RegionMetadataRef,
902    /// Result sender.
903    pub(crate) sender: OptionOutputTx,
904    /// Result from the manifest manager.
905    pub(crate) result: Result<()>,
906}
907
908/// Request to edit a region directly.
909#[derive(Debug)]
910pub(crate) struct RegionEditRequest {
911    pub(crate) region_id: RegionId,
912    pub(crate) edit: RegionEdit,
913    /// The sender to notify the result to the region engine.
914    pub(crate) tx: Sender<Result<()>>,
915}
916
917/// Notifies the regin the result of editing region.
918#[derive(Debug)]
919pub(crate) struct RegionEditResult {
920    /// Region id.
921    pub(crate) region_id: RegionId,
922    /// Result sender.
923    pub(crate) sender: Sender<Result<()>>,
924    /// Region edit to apply.
925    pub(crate) edit: RegionEdit,
926    /// Result from the manifest manager.
927    pub(crate) result: Result<()>,
928}
929
930#[derive(Debug)]
931pub(crate) struct RegionSyncRequest {
932    pub(crate) region_id: RegionId,
933    pub(crate) manifest_version: ManifestVersion,
934    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
935    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
936}
937
938#[cfg(test)]
939mod tests {
940    use api::v1::value::ValueData;
941    use api::v1::{Row, SemanticType};
942    use datatypes::prelude::ConcreteDataType;
943    use datatypes::schema::ColumnDefaultConstraint;
944    use mito_codec::test_util::i64_value;
945    use store_api::metadata::RegionMetadataBuilder;
946
947    use super::*;
948    use crate::error::Error;
949    use crate::test_util::ts_ms_value;
950
951    fn new_column_schema(
952        name: &str,
953        data_type: ColumnDataType,
954        semantic_type: SemanticType,
955    ) -> ColumnSchema {
956        ColumnSchema {
957            column_name: name.to_string(),
958            datatype: data_type as i32,
959            semantic_type: semantic_type as i32,
960            ..Default::default()
961        }
962    }
963
964    fn check_invalid_request(err: &Error, expect: &str) {
965        if let Error::InvalidRequest {
966            region_id: _,
967            reason,
968            location: _,
969        } = err
970        {
971            assert_eq!(reason, expect);
972        } else {
973            panic!("Unexpected error {err}")
974        }
975    }
976
977    #[test]
978    fn test_write_request_duplicate_column() {
979        let rows = Rows {
980            schema: vec![
981                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
982                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
983            ],
984            rows: vec![],
985        };
986
987        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
988        check_invalid_request(&err, "duplicate column c0");
989    }
990
991    #[test]
992    fn test_valid_write_request() {
993        let rows = Rows {
994            schema: vec![
995                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
996                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
997            ],
998            rows: vec![Row {
999                values: vec![i64_value(1), i64_value(2)],
1000            }],
1001        };
1002
1003        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1004        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1005        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1006        assert_eq!(None, request.column_index_by_name("c2"));
1007    }
1008
1009    #[test]
1010    fn test_write_request_column_num() {
1011        let rows = Rows {
1012            schema: vec![
1013                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1014                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1015            ],
1016            rows: vec![Row {
1017                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1018            }],
1019        };
1020
1021        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1022        check_invalid_request(&err, "row has 3 columns but schema has 2");
1023    }
1024
1025    fn new_region_metadata() -> RegionMetadata {
1026        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1027        builder
1028            .push_column_metadata(ColumnMetadata {
1029                column_schema: datatypes::schema::ColumnSchema::new(
1030                    "ts",
1031                    ConcreteDataType::timestamp_millisecond_datatype(),
1032                    false,
1033                ),
1034                semantic_type: SemanticType::Timestamp,
1035                column_id: 1,
1036            })
1037            .push_column_metadata(ColumnMetadata {
1038                column_schema: datatypes::schema::ColumnSchema::new(
1039                    "k0",
1040                    ConcreteDataType::int64_datatype(),
1041                    true,
1042                ),
1043                semantic_type: SemanticType::Tag,
1044                column_id: 2,
1045            })
1046            .primary_key(vec![2]);
1047        builder.build().unwrap()
1048    }
1049
1050    #[test]
1051    fn test_check_schema() {
1052        let rows = Rows {
1053            schema: vec![
1054                new_column_schema(
1055                    "ts",
1056                    ColumnDataType::TimestampMillisecond,
1057                    SemanticType::Timestamp,
1058                ),
1059                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1060            ],
1061            rows: vec![Row {
1062                values: vec![ts_ms_value(1), i64_value(2)],
1063            }],
1064        };
1065        let metadata = new_region_metadata();
1066
1067        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1068        request.check_schema(&metadata).unwrap();
1069    }
1070
1071    #[test]
1072    fn test_column_type() {
1073        let rows = Rows {
1074            schema: vec![
1075                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1076                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1077            ],
1078            rows: vec![Row {
1079                values: vec![i64_value(1), i64_value(2)],
1080            }],
1081        };
1082        let metadata = new_region_metadata();
1083
1084        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1085        let err = request.check_schema(&metadata).unwrap_err();
1086        check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1087    }
1088
1089    #[test]
1090    fn test_semantic_type() {
1091        let rows = Rows {
1092            schema: vec![
1093                new_column_schema(
1094                    "ts",
1095                    ColumnDataType::TimestampMillisecond,
1096                    SemanticType::Tag,
1097                ),
1098                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1099            ],
1100            rows: vec![Row {
1101                values: vec![ts_ms_value(1), i64_value(2)],
1102            }],
1103        };
1104        let metadata = new_region_metadata();
1105
1106        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1107        let err = request.check_schema(&metadata).unwrap_err();
1108        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1109    }
1110
1111    #[test]
1112    fn test_column_nullable() {
1113        let rows = Rows {
1114            schema: vec![
1115                new_column_schema(
1116                    "ts",
1117                    ColumnDataType::TimestampMillisecond,
1118                    SemanticType::Timestamp,
1119                ),
1120                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1121            ],
1122            rows: vec![Row {
1123                values: vec![Value { value_data: None }, i64_value(2)],
1124            }],
1125        };
1126        let metadata = new_region_metadata();
1127
1128        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1129        let err = request.check_schema(&metadata).unwrap_err();
1130        check_invalid_request(&err, "column ts is not null but input has null");
1131    }
1132
1133    #[test]
1134    fn test_column_default() {
1135        let rows = Rows {
1136            schema: vec![new_column_schema(
1137                "k0",
1138                ColumnDataType::Int64,
1139                SemanticType::Tag,
1140            )],
1141            rows: vec![Row {
1142                values: vec![i64_value(1)],
1143            }],
1144        };
1145        let metadata = new_region_metadata();
1146
1147        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1148        let err = request.check_schema(&metadata).unwrap_err();
1149        check_invalid_request(&err, "missing column ts");
1150    }
1151
1152    #[test]
1153    fn test_unknown_column() {
1154        let rows = Rows {
1155            schema: vec![
1156                new_column_schema(
1157                    "ts",
1158                    ColumnDataType::TimestampMillisecond,
1159                    SemanticType::Timestamp,
1160                ),
1161                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1162                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1163            ],
1164            rows: vec![Row {
1165                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1166            }],
1167        };
1168        let metadata = new_region_metadata();
1169
1170        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1171        let err = request.check_schema(&metadata).unwrap_err();
1172        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1173    }
1174
1175    #[test]
1176    fn test_fill_impure_columns_err() {
1177        let rows = Rows {
1178            schema: vec![new_column_schema(
1179                "k0",
1180                ColumnDataType::Int64,
1181                SemanticType::Tag,
1182            )],
1183            rows: vec![Row {
1184                values: vec![i64_value(1)],
1185            }],
1186        };
1187        let metadata = {
1188            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1189            builder
1190                .push_column_metadata(ColumnMetadata {
1191                    column_schema: datatypes::schema::ColumnSchema::new(
1192                        "ts",
1193                        ConcreteDataType::timestamp_millisecond_datatype(),
1194                        false,
1195                    )
1196                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1197                        "now()".to_string(),
1198                    )))
1199                    .unwrap(),
1200                    semantic_type: SemanticType::Timestamp,
1201                    column_id: 1,
1202                })
1203                .push_column_metadata(ColumnMetadata {
1204                    column_schema: datatypes::schema::ColumnSchema::new(
1205                        "k0",
1206                        ConcreteDataType::int64_datatype(),
1207                        true,
1208                    ),
1209                    semantic_type: SemanticType::Tag,
1210                    column_id: 2,
1211                })
1212                .primary_key(vec![2]);
1213            builder.build().unwrap()
1214        };
1215
1216        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1217        let err = request.check_schema(&metadata).unwrap_err();
1218        assert!(err.is_fill_default());
1219        assert!(request
1220            .fill_missing_columns(&metadata)
1221            .unwrap_err()
1222            .to_string()
1223            .contains("unexpected impure default value with region_id"));
1224    }
1225
1226    #[test]
1227    fn test_fill_missing_columns() {
1228        let rows = Rows {
1229            schema: vec![new_column_schema(
1230                "ts",
1231                ColumnDataType::TimestampMillisecond,
1232                SemanticType::Timestamp,
1233            )],
1234            rows: vec![Row {
1235                values: vec![ts_ms_value(1)],
1236            }],
1237        };
1238        let metadata = new_region_metadata();
1239
1240        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1241        let err = request.check_schema(&metadata).unwrap_err();
1242        assert!(err.is_fill_default());
1243        request.fill_missing_columns(&metadata).unwrap();
1244
1245        let expect_rows = Rows {
1246            schema: vec![new_column_schema(
1247                "ts",
1248                ColumnDataType::TimestampMillisecond,
1249                SemanticType::Timestamp,
1250            )],
1251            rows: vec![Row {
1252                values: vec![ts_ms_value(1)],
1253            }],
1254        };
1255        assert_eq!(expect_rows, request.rows);
1256    }
1257
1258    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1259        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1260        builder
1261            .push_column_metadata(ColumnMetadata {
1262                column_schema: datatypes::schema::ColumnSchema::new(
1263                    "ts",
1264                    ConcreteDataType::timestamp_millisecond_datatype(),
1265                    false,
1266                ),
1267                semantic_type: SemanticType::Timestamp,
1268                column_id: 1,
1269            })
1270            .push_column_metadata(ColumnMetadata {
1271                column_schema: datatypes::schema::ColumnSchema::new(
1272                    "k0",
1273                    ConcreteDataType::int64_datatype(),
1274                    true,
1275                ),
1276                semantic_type: SemanticType::Tag,
1277                column_id: 2,
1278            })
1279            .primary_key(vec![2]);
1280        builder
1281    }
1282
1283    fn region_metadata_two_fields() -> RegionMetadata {
1284        let mut builder = builder_with_ts_tag();
1285        builder
1286            .push_column_metadata(ColumnMetadata {
1287                column_schema: datatypes::schema::ColumnSchema::new(
1288                    "f0",
1289                    ConcreteDataType::int64_datatype(),
1290                    true,
1291                ),
1292                semantic_type: SemanticType::Field,
1293                column_id: 3,
1294            })
1295            // Column is not nullable.
1296            .push_column_metadata(ColumnMetadata {
1297                column_schema: datatypes::schema::ColumnSchema::new(
1298                    "f1",
1299                    ConcreteDataType::int64_datatype(),
1300                    false,
1301                )
1302                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1303                    datatypes::value::Value::Int64(100),
1304                )))
1305                .unwrap(),
1306                semantic_type: SemanticType::Field,
1307                column_id: 4,
1308            });
1309        builder.build().unwrap()
1310    }
1311
1312    #[test]
1313    fn test_fill_missing_for_delete() {
1314        let rows = Rows {
1315            schema: vec![new_column_schema(
1316                "ts",
1317                ColumnDataType::TimestampMillisecond,
1318                SemanticType::Timestamp,
1319            )],
1320            rows: vec![Row {
1321                values: vec![ts_ms_value(1)],
1322            }],
1323        };
1324        let metadata = region_metadata_two_fields();
1325
1326        let mut request =
1327            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1328        let err = request.check_schema(&metadata).unwrap_err();
1329        check_invalid_request(&err, "delete requests need column k0");
1330        let err = request.fill_missing_columns(&metadata).unwrap_err();
1331        check_invalid_request(&err, "delete requests need column k0");
1332
1333        let rows = Rows {
1334            schema: vec![
1335                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1336                new_column_schema(
1337                    "ts",
1338                    ColumnDataType::TimestampMillisecond,
1339                    SemanticType::Timestamp,
1340                ),
1341            ],
1342            rows: vec![Row {
1343                values: vec![i64_value(100), ts_ms_value(1)],
1344            }],
1345        };
1346        let mut request =
1347            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1348        let err = request.check_schema(&metadata).unwrap_err();
1349        assert!(err.is_fill_default());
1350        request.fill_missing_columns(&metadata).unwrap();
1351
1352        let expect_rows = Rows {
1353            schema: vec![
1354                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1355                new_column_schema(
1356                    "ts",
1357                    ColumnDataType::TimestampMillisecond,
1358                    SemanticType::Timestamp,
1359                ),
1360                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1361            ],
1362            // Column f1 is not nullable and we use 0 for padding.
1363            rows: vec![Row {
1364                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1365            }],
1366        };
1367        assert_eq!(expect_rows, request.rows);
1368    }
1369
1370    #[test]
1371    fn test_fill_missing_without_default_in_delete() {
1372        let mut builder = builder_with_ts_tag();
1373        builder
1374            // f0 is nullable.
1375            .push_column_metadata(ColumnMetadata {
1376                column_schema: datatypes::schema::ColumnSchema::new(
1377                    "f0",
1378                    ConcreteDataType::int64_datatype(),
1379                    true,
1380                ),
1381                semantic_type: SemanticType::Field,
1382                column_id: 3,
1383            })
1384            // f1 is not nullable and don't has default.
1385            .push_column_metadata(ColumnMetadata {
1386                column_schema: datatypes::schema::ColumnSchema::new(
1387                    "f1",
1388                    ConcreteDataType::int64_datatype(),
1389                    false,
1390                ),
1391                semantic_type: SemanticType::Field,
1392                column_id: 4,
1393            });
1394        let metadata = builder.build().unwrap();
1395
1396        let rows = Rows {
1397            schema: vec![
1398                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1399                new_column_schema(
1400                    "ts",
1401                    ColumnDataType::TimestampMillisecond,
1402                    SemanticType::Timestamp,
1403                ),
1404            ],
1405            // Missing f0 (nullable), f1 (not nullable).
1406            rows: vec![Row {
1407                values: vec![i64_value(100), ts_ms_value(1)],
1408            }],
1409        };
1410        let mut request =
1411            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1412        let err = request.check_schema(&metadata).unwrap_err();
1413        assert!(err.is_fill_default());
1414        request.fill_missing_columns(&metadata).unwrap();
1415
1416        let expect_rows = Rows {
1417            schema: vec![
1418                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1419                new_column_schema(
1420                    "ts",
1421                    ColumnDataType::TimestampMillisecond,
1422                    SemanticType::Timestamp,
1423                ),
1424                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1425            ],
1426            // Column f1 is not nullable and we use 0 for padding.
1427            rows: vec![Row {
1428                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1429            }],
1430        };
1431        assert_eq!(expect_rows, request.rows);
1432    }
1433
1434    #[test]
1435    fn test_no_default() {
1436        let rows = Rows {
1437            schema: vec![new_column_schema(
1438                "k0",
1439                ColumnDataType::Int64,
1440                SemanticType::Tag,
1441            )],
1442            rows: vec![Row {
1443                values: vec![i64_value(1)],
1444            }],
1445        };
1446        let metadata = new_region_metadata();
1447
1448        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1449        let err = request.fill_missing_columns(&metadata).unwrap_err();
1450        check_invalid_request(&err, "column ts does not have default value");
1451    }
1452
1453    #[test]
1454    fn test_missing_and_invalid() {
1455        // Missing f0 and f1 has invalid type (string).
1456        let rows = Rows {
1457            schema: vec![
1458                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1459                new_column_schema(
1460                    "ts",
1461                    ColumnDataType::TimestampMillisecond,
1462                    SemanticType::Timestamp,
1463                ),
1464                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1465            ],
1466            rows: vec![Row {
1467                values: vec![
1468                    i64_value(100),
1469                    ts_ms_value(1),
1470                    Value {
1471                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1472                    },
1473                ],
1474            }],
1475        };
1476        let metadata = region_metadata_two_fields();
1477
1478        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1479        let err = request.check_schema(&metadata).unwrap_err();
1480        check_invalid_request(
1481            &err,
1482            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1483        );
1484    }
1485
1486    #[test]
1487    fn test_write_request_metadata() {
1488        let rows = Rows {
1489            schema: vec![
1490                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1491                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1492            ],
1493            rows: vec![Row {
1494                values: vec![i64_value(1), i64_value(2)],
1495            }],
1496        };
1497
1498        let metadata = Arc::new(new_region_metadata());
1499        let request = WriteRequest::new(
1500            RegionId::new(1, 1),
1501            OpType::Put,
1502            rows,
1503            Some(metadata.clone()),
1504        )
1505        .unwrap();
1506
1507        assert!(request.region_metadata.is_some());
1508        assert_eq!(
1509            request.region_metadata.unwrap().region_id,
1510            RegionId::new(1, 1)
1511        );
1512    }
1513}