1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37 MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40 AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41 RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42 RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
50 FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
51 Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64#[derive(Debug)]
66pub struct WriteRequest {
67 pub region_id: RegionId,
69 pub op_type: OpType,
71 pub rows: Rows,
73 pub name_to_index: HashMap<String, usize>,
75 pub has_null: Vec<bool>,
77 pub hint: Option<WriteHint>,
79 pub(crate) region_metadata: Option<RegionMetadataRef>,
81}
82
83impl WriteRequest {
84 pub fn new(
88 region_id: RegionId,
89 op_type: OpType,
90 rows: Rows,
91 region_metadata: Option<RegionMetadataRef>,
92 ) -> Result<WriteRequest> {
93 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
94 for (index, column) in rows.schema.iter().enumerate() {
95 ensure!(
96 name_to_index
97 .insert(column.column_name.clone(), index)
98 .is_none(),
99 InvalidRequestSnafu {
100 region_id,
101 reason: format!("duplicate column {}", column.column_name),
102 }
103 );
104 }
105
106 let mut has_null = vec![false; rows.schema.len()];
107 for row in &rows.rows {
108 ensure!(
109 row.values.len() == rows.schema.len(),
110 InvalidRequestSnafu {
111 region_id,
112 reason: format!(
113 "row has {} columns but schema has {}",
114 row.values.len(),
115 rows.schema.len()
116 ),
117 }
118 );
119
120 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
121 validate_proto_value(region_id, value, column_schema)?;
122
123 if value.value_data.is_none() {
124 has_null[i] = true;
125 }
126 }
127 }
128
129 Ok(WriteRequest {
130 region_id,
131 op_type,
132 rows,
133 name_to_index,
134 has_null,
135 hint: None,
136 region_metadata,
137 })
138 }
139
140 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
142 self.hint = hint;
143 self
144 }
145
146 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
148 infer_primary_key_encoding_from_hint(self.hint.as_ref())
149 }
150
151 pub(crate) fn estimated_size(&self) -> usize {
153 let row_size = self
154 .rows
155 .rows
156 .first()
157 .map(|row| row.encoded_len())
158 .unwrap_or(0);
159 row_size * self.rows.rows.len()
160 }
161
162 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
164 self.name_to_index.get(name).copied()
165 }
166
167 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
172 debug_assert_eq!(self.region_id, metadata.region_id);
173
174 let region_id = self.region_id;
175 let mut rows_columns: HashMap<_, _> = self
177 .rows
178 .schema
179 .iter()
180 .map(|column| (&column.column_name, column))
181 .collect();
182
183 let mut need_fill_default = false;
184 for column in &metadata.column_metadatas {
186 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
187 ensure!(
189 is_column_type_value_eq(
190 input_col.datatype,
191 input_col.datatype_extension.clone(),
192 &column.column_schema.data_type
193 ),
194 InvalidRequestSnafu {
195 region_id,
196 reason: format!(
197 "column {} expect type {:?}, given: {}({})",
198 column.column_schema.name,
199 column.column_schema.data_type,
200 ColumnDataType::try_from(input_col.datatype)
201 .map(|v| v.as_str_name())
202 .unwrap_or("Unknown"),
203 input_col.datatype,
204 )
205 }
206 );
207
208 ensure!(
210 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
211 InvalidRequestSnafu {
212 region_id,
213 reason: format!(
214 "column {} has semantic type {:?}, given: {}({})",
215 column.column_schema.name,
216 column.semantic_type,
217 api::v1::SemanticType::try_from(input_col.semantic_type)
218 .map(|v| v.as_str_name())
219 .unwrap_or("Unknown"),
220 input_col.semantic_type
221 ),
222 }
223 );
224
225 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
228 ensure!(
229 !has_null || column.column_schema.is_nullable(),
230 InvalidRequestSnafu {
231 region_id,
232 reason: format!(
233 "column {} is not null but input has null",
234 column.column_schema.name
235 ),
236 }
237 );
238 } else {
239 self.check_missing_column(column)?;
241
242 need_fill_default = true;
243 }
244 }
245
246 if !rows_columns.is_empty() {
248 let names: Vec<_> = rows_columns.into_keys().collect();
249 return InvalidRequestSnafu {
250 region_id,
251 reason: format!("unknown columns: {:?}", names),
252 }
253 .fail();
254 }
255
256 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
258
259 Ok(())
260 }
261
262 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
267 debug_assert_eq!(self.region_id, metadata.region_id);
268
269 let mut columns_to_fill = vec![];
270 for column in &metadata.column_metadatas {
271 if !self.name_to_index.contains_key(&column.column_schema.name) {
272 columns_to_fill.push(column);
273 }
274 }
275 self.fill_columns(columns_to_fill)?;
276
277 Ok(())
278 }
279
280 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
282 if let Err(e) = self.check_schema(metadata) {
283 if e.is_fill_default() {
284 self.fill_missing_columns(metadata)?;
288 } else {
289 return Err(e);
290 }
291 }
292
293 Ok(())
294 }
295
296 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
298 let mut default_values = Vec::with_capacity(columns.len());
299 let mut columns_to_fill = Vec::with_capacity(columns.len());
300 for column in columns {
301 let default_value = self.column_default_value(column)?;
302 if default_value.value_data.is_some() {
303 default_values.push(default_value);
304 columns_to_fill.push(column);
305 }
306 }
307
308 for row in &mut self.rows.rows {
309 row.values.extend(default_values.iter().cloned());
310 }
311
312 for column in columns_to_fill {
313 let (datatype, datatype_ext) =
314 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
315 .with_context(|_| ConvertColumnDataTypeSnafu {
316 reason: format!(
317 "no protobuf type for column {} ({:?})",
318 column.column_schema.name, column.column_schema.data_type
319 ),
320 })?
321 .to_parts();
322 self.rows.schema.push(ColumnSchema {
323 column_name: column.column_schema.name.clone(),
324 datatype: datatype as i32,
325 semantic_type: column.semantic_type as i32,
326 datatype_extension: datatype_ext,
327 options: options_from_column_schema(&column.column_schema),
328 });
329 }
330
331 Ok(())
332 }
333
334 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
336 if self.op_type == OpType::Delete {
337 if column.semantic_type == SemanticType::Field {
338 return Ok(());
341 } else {
342 return InvalidRequestSnafu {
343 region_id: self.region_id,
344 reason: format!("delete requests need column {}", column.column_schema.name),
345 }
346 .fail();
347 }
348 }
349
350 ensure!(
352 column.column_schema.is_nullable()
353 || column.column_schema.default_constraint().is_some(),
354 InvalidRequestSnafu {
355 region_id: self.region_id,
356 reason: format!("missing column {}", column.column_schema.name),
357 }
358 );
359
360 Ok(())
361 }
362
363 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
365 let default_value = match self.op_type {
366 OpType::Delete => {
367 ensure!(
368 column.semantic_type == SemanticType::Field,
369 InvalidRequestSnafu {
370 region_id: self.region_id,
371 reason: format!(
372 "delete requests need column {}",
373 column.column_schema.name
374 ),
375 }
376 );
377
378 if column.column_schema.is_nullable() {
383 datatypes::value::Value::Null
384 } else {
385 column.column_schema.data_type.default_value()
386 }
387 }
388 OpType::Put => {
389 if column.column_schema.is_default_impure() {
391 UnexpectedSnafu {
392 reason: format!(
393 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
394 self.region_id,
395 column.column_schema.name,
396 column.column_schema.default_constraint(),
397 ),
398 }
399 .fail()?
400 }
401 column
402 .column_schema
403 .create_default()
404 .context(CreateDefaultSnafu {
405 region_id: self.region_id,
406 column: &column.column_schema.name,
407 })?
408 .with_context(|| InvalidRequestSnafu {
410 region_id: self.region_id,
411 reason: format!(
412 "column {} does not have default value",
413 column.column_schema.name
414 ),
415 })?
416 }
417 };
418
419 Ok(api::helper::to_grpc_value(default_value))
421 }
422}
423
424pub(crate) fn validate_proto_value(
426 region_id: RegionId,
427 value: &Value,
428 column_schema: &ColumnSchema,
429) -> Result<()> {
430 if let Some(value_type) = proto_value_type(value) {
431 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
432 InvalidRequestSnafu {
433 region_id,
434 reason: format!(
435 "column {} has unknown type {}",
436 column_schema.column_name, column_schema.datatype
437 ),
438 }
439 .build()
440 })?;
441 ensure!(
442 proto_value_type_match(column_type, value_type),
443 InvalidRequestSnafu {
444 region_id,
445 reason: format!(
446 "value has type {:?}, but column {} has type {:?}({})",
447 value_type, column_schema.column_name, column_type, column_schema.datatype,
448 ),
449 }
450 );
451 }
452
453 Ok(())
454}
455
456fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
457 match (column_type, value_type) {
458 (ct, vt) if ct == vt => true,
459 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
460 (ColumnDataType::Json, ColumnDataType::Binary) => true,
461 _ => false,
462 }
463}
464
465#[derive(Debug)]
467pub struct OutputTx(Sender<Result<AffectedRows>>);
468
469impl OutputTx {
470 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
472 OutputTx(sender)
473 }
474
475 pub(crate) fn send(self, result: Result<AffectedRows>) {
477 let _ = self.0.send(result);
479 }
480}
481
482#[derive(Debug)]
484pub(crate) struct OptionOutputTx(Option<OutputTx>);
485
486impl OptionOutputTx {
487 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
489 OptionOutputTx(sender)
490 }
491
492 pub(crate) fn none() -> OptionOutputTx {
494 OptionOutputTx(None)
495 }
496
497 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
499 if let Some(sender) = self.0.take() {
500 sender.send(result);
501 }
502 }
503
504 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
506 if let Some(sender) = self.0.take() {
507 sender.send(result);
508 }
509 }
510
511 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
513 self.0.take()
514 }
515}
516
517impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
518 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
519 Self::new(Some(OutputTx::new(sender)))
520 }
521}
522
523impl OnFailure for OptionOutputTx {
524 fn on_failure(&mut self, err: Error) {
525 self.send_mut(Err(err));
526 }
527}
528
529pub(crate) trait OnFailure {
531 fn on_failure(&mut self, err: Error);
533}
534
535#[derive(Debug)]
537pub(crate) struct SenderWriteRequest {
538 pub(crate) sender: OptionOutputTx,
540 pub(crate) request: WriteRequest,
541}
542
543pub(crate) struct SenderBulkRequest {
544 pub(crate) sender: OptionOutputTx,
545 pub(crate) region_id: RegionId,
546 pub(crate) request: BulkPart,
547 pub(crate) region_metadata: RegionMetadataRef,
548}
549
550#[derive(Debug)]
552pub(crate) struct WorkerRequestWithTime {
553 pub(crate) request: WorkerRequest,
554 pub(crate) created_at: Instant,
555}
556
557impl WorkerRequestWithTime {
558 pub(crate) fn new(request: WorkerRequest) -> Self {
559 Self {
560 request,
561 created_at: Instant::now(),
562 }
563 }
564}
565
566#[derive(Debug)]
568pub(crate) enum WorkerRequest {
569 Write(SenderWriteRequest),
571
572 Ddl(SenderDdlRequest),
574
575 Background {
577 region_id: RegionId,
579 notify: BackgroundNotify,
581 },
582
583 SetRegionRoleStateGracefully {
585 region_id: RegionId,
587 region_role_state: SettableRegionRoleState,
589 sender: Sender<SetRegionRoleStateResponse>,
591 },
592
593 Stop,
595
596 EditRegion(RegionEditRequest),
598
599 SyncRegion(RegionSyncRequest),
601
602 BulkInserts {
604 metadata: Option<RegionMetadataRef>,
605 request: RegionBulkInsertsRequest,
606 sender: OptionOutputTx,
607 },
608
609 RemapManifests(RemapManifestsRequest),
611
612 CopyRegionFrom(CopyRegionFromRequest),
614}
615
616impl WorkerRequest {
617 pub(crate) fn new_open_region_request(
619 region_id: RegionId,
620 request: RegionOpenRequest,
621 entry_receiver: Option<WalEntryReceiver>,
622 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
623 let (sender, receiver) = oneshot::channel();
624
625 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
626 region_id,
627 sender: sender.into(),
628 request: DdlRequest::Open((request, entry_receiver)),
629 });
630
631 (worker_request, receiver)
632 }
633
634 pub(crate) fn new_catchup_region_request(
636 region_id: RegionId,
637 request: RegionCatchupRequest,
638 entry_receiver: Option<WalEntryReceiver>,
639 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
640 let (sender, receiver) = oneshot::channel();
641 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
642 region_id,
643 sender: sender.into(),
644 request: DdlRequest::Catchup((request, entry_receiver)),
645 });
646 (worker_request, receiver)
647 }
648
649 pub(crate) fn try_from_region_request(
651 region_id: RegionId,
652 value: RegionRequest,
653 region_metadata: Option<RegionMetadataRef>,
654 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
655 let (sender, receiver) = oneshot::channel();
656 let worker_request = match value {
657 RegionRequest::Put(v) => {
658 let mut write_request =
659 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
660 .with_hint(v.hint);
661 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
662 && let Some(region_metadata) = ®ion_metadata
663 {
664 write_request.maybe_fill_missing_columns(region_metadata)?;
665 }
666 WorkerRequest::Write(SenderWriteRequest {
667 sender: sender.into(),
668 request: write_request,
669 })
670 }
671 RegionRequest::Delete(v) => {
672 let mut write_request =
673 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
674 .with_hint(v.hint);
675 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
676 && let Some(region_metadata) = ®ion_metadata
677 {
678 write_request.maybe_fill_missing_columns(region_metadata)?;
679 }
680 WorkerRequest::Write(SenderWriteRequest {
681 sender: sender.into(),
682 request: write_request,
683 })
684 }
685 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
686 region_id,
687 sender: sender.into(),
688 request: DdlRequest::Create(v),
689 }),
690 RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
691 region_id,
692 sender: sender.into(),
693 request: DdlRequest::Drop(v),
694 }),
695 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
696 region_id,
697 sender: sender.into(),
698 request: DdlRequest::Open((v, None)),
699 }),
700 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
701 region_id,
702 sender: sender.into(),
703 request: DdlRequest::Close(v),
704 }),
705 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
706 region_id,
707 sender: sender.into(),
708 request: DdlRequest::Alter(v),
709 }),
710 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
711 region_id,
712 sender: sender.into(),
713 request: DdlRequest::Flush(v),
714 }),
715 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
716 region_id,
717 sender: sender.into(),
718 request: DdlRequest::Compact(v),
719 }),
720 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
721 region_id,
722 sender: sender.into(),
723 request: DdlRequest::BuildIndex(v),
724 }),
725 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
726 region_id,
727 sender: sender.into(),
728 request: DdlRequest::Truncate(v),
729 }),
730 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
731 region_id,
732 sender: sender.into(),
733 request: DdlRequest::Catchup((v, None)),
734 }),
735 RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
736 region_id,
737 sender: sender.into(),
738 request: DdlRequest::EnterStaging(v),
739 }),
740 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
741 metadata: region_metadata,
742 sender: sender.into(),
743 request: region_bulk_inserts_request,
744 },
745 RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
746 region_id,
747 sender: sender.into(),
748 request: DdlRequest::ApplyStagingManifest(v),
749 }),
750 };
751
752 Ok((worker_request, receiver))
753 }
754
755 pub(crate) fn new_set_readonly_gracefully(
756 region_id: RegionId,
757 region_role_state: SettableRegionRoleState,
758 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
759 let (sender, receiver) = oneshot::channel();
760
761 (
762 WorkerRequest::SetRegionRoleStateGracefully {
763 region_id,
764 region_role_state,
765 sender,
766 },
767 receiver,
768 )
769 }
770
771 pub(crate) fn new_sync_region_request(
772 region_id: RegionId,
773 manifest_version: ManifestVersion,
774 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
775 let (sender, receiver) = oneshot::channel();
776 (
777 WorkerRequest::SyncRegion(RegionSyncRequest {
778 region_id,
779 manifest_version,
780 sender,
781 }),
782 receiver,
783 )
784 }
785
786 #[allow(clippy::type_complexity)]
793 pub(crate) fn try_from_remap_manifests_request(
794 store_api::region_engine::RemapManifestsRequest {
795 region_id,
796 input_regions,
797 region_mapping,
798 new_partition_exprs,
799 }: store_api::region_engine::RemapManifestsRequest,
800 ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
801 let (sender, receiver) = oneshot::channel();
802 let new_partition_exprs = new_partition_exprs
803 .into_iter()
804 .map(|(k, v)| {
805 Ok((
806 k,
807 PartitionExpr::from_json_str(&v)
808 .context(InvalidPartitionExprSnafu { expr: v })?
809 .context(MissingPartitionExprSnafu { region_id: k })?,
810 ))
811 })
812 .collect::<Result<HashMap<_, _>>>()?;
813
814 let request = RemapManifestsRequest {
815 region_id,
816 input_regions,
817 region_mapping,
818 new_partition_exprs,
819 sender,
820 };
821
822 Ok((WorkerRequest::RemapManifests(request), receiver))
823 }
824
825 pub(crate) fn try_from_copy_region_from_request(
827 region_id: RegionId,
828 store_api::region_engine::MitoCopyRegionFromRequest {
829 source_region_id,
830 parallelism,
831 }: store_api::region_engine::MitoCopyRegionFromRequest,
832 ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
833 let (sender, receiver) = oneshot::channel();
834 let request = CopyRegionFromRequest {
835 region_id,
836 source_region_id,
837 parallelism,
838 sender,
839 };
840 Ok((WorkerRequest::CopyRegionFrom(request), receiver))
841 }
842}
843
844#[derive(Debug)]
846pub(crate) enum DdlRequest {
847 Create(RegionCreateRequest),
848 Drop(RegionDropRequest),
849 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
850 Close(RegionCloseRequest),
851 Alter(RegionAlterRequest),
852 Flush(RegionFlushRequest),
853 Compact(RegionCompactRequest),
854 BuildIndex(RegionBuildIndexRequest),
855 Truncate(RegionTruncateRequest),
856 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
857 EnterStaging(EnterStagingRequest),
858 ApplyStagingManifest(ApplyStagingManifestRequest),
859}
860
861#[derive(Debug)]
863pub(crate) struct SenderDdlRequest {
864 pub(crate) region_id: RegionId,
866 pub(crate) sender: OptionOutputTx,
868 pub(crate) request: DdlRequest,
870}
871
872#[derive(Debug)]
874pub(crate) enum BackgroundNotify {
875 FlushFinished(FlushFinished),
877 FlushFailed(FlushFailed),
879 IndexBuildFinished(IndexBuildFinished),
881 IndexBuildStopped(IndexBuildStopped),
883 IndexBuildFailed(IndexBuildFailed),
885 CompactionFinished(CompactionFinished),
887 CompactionFailed(CompactionFailed),
889 Truncate(TruncateResult),
891 RegionChange(RegionChangeResult),
893 RegionEdit(RegionEditResult),
895 EnterStaging(EnterStagingResult),
897 CopyRegionFromFinished(CopyRegionFromFinished),
899}
900
901#[derive(Debug)]
903pub(crate) struct FlushFinished {
904 pub(crate) region_id: RegionId,
906 pub(crate) flushed_entry_id: EntryId,
908 pub(crate) senders: Vec<OutputTx>,
910 pub(crate) _timer: HistogramTimer,
912 pub(crate) edit: RegionEdit,
914 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
916 pub(crate) is_staging: bool,
918 pub(crate) flush_reason: FlushReason,
920}
921
922impl FlushFinished {
923 pub(crate) fn on_success(self) {
925 for sender in self.senders {
926 sender.send(Ok(0));
927 }
928 }
929}
930
931impl OnFailure for FlushFinished {
932 fn on_failure(&mut self, err: Error) {
933 let err = Arc::new(err);
934 for sender in self.senders.drain(..) {
935 sender.send(Err(err.clone()).context(FlushRegionSnafu {
936 region_id: self.region_id,
937 }));
938 }
939 }
940}
941
942#[derive(Debug)]
944pub(crate) struct FlushFailed {
945 pub(crate) err: Arc<Error>,
947}
948
949#[derive(Debug)]
950pub(crate) struct IndexBuildFinished {
951 #[allow(dead_code)]
952 pub(crate) region_id: RegionId,
953 pub(crate) edit: RegionEdit,
954}
955
956#[derive(Debug)]
958pub(crate) struct IndexBuildStopped {
959 #[allow(dead_code)]
960 pub(crate) region_id: RegionId,
961 pub(crate) file_id: FileId,
962}
963
964#[derive(Debug)]
966pub(crate) struct IndexBuildFailed {
967 pub(crate) err: Arc<Error>,
968}
969
970#[derive(Debug)]
972pub(crate) struct CompactionFinished {
973 pub(crate) region_id: RegionId,
975 pub(crate) senders: Vec<OutputTx>,
977 pub(crate) start_time: Instant,
979 pub(crate) edit: RegionEdit,
981}
982
983impl CompactionFinished {
984 pub fn on_success(self) {
985 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
987
988 for sender in self.senders {
989 sender.send(Ok(0));
990 }
991 info!("Successfully compacted region: {}", self.region_id);
992 }
993}
994
995impl OnFailure for CompactionFinished {
996 fn on_failure(&mut self, err: Error) {
998 let err = Arc::new(err);
999 for sender in self.senders.drain(..) {
1000 sender.send(Err(err.clone()).context(CompactRegionSnafu {
1001 region_id: self.region_id,
1002 }));
1003 }
1004 }
1005}
1006
1007#[derive(Debug)]
1009pub(crate) struct CompactionFailed {
1010 pub(crate) region_id: RegionId,
1011 pub(crate) err: Arc<Error>,
1013}
1014
1015#[derive(Debug)]
1017pub(crate) struct TruncateResult {
1018 pub(crate) region_id: RegionId,
1020 pub(crate) sender: OptionOutputTx,
1022 pub(crate) result: Result<()>,
1024 pub(crate) kind: TruncateKind,
1025}
1026
1027#[derive(Debug)]
1029pub(crate) struct RegionChangeResult {
1030 pub(crate) region_id: RegionId,
1032 pub(crate) new_meta: RegionMetadataRef,
1034 pub(crate) sender: OptionOutputTx,
1036 pub(crate) result: Result<()>,
1038 pub(crate) need_index: bool,
1040 pub(crate) new_options: Option<RegionOptions>,
1042}
1043
1044#[derive(Debug)]
1046pub(crate) struct EnterStagingResult {
1047 pub(crate) region_id: RegionId,
1049 pub(crate) partition_expr: String,
1051 pub(crate) sender: OptionOutputTx,
1053 pub(crate) result: Result<()>,
1055}
1056
1057#[derive(Debug)]
1058pub(crate) struct CopyRegionFromFinished {
1059 pub(crate) region_id: RegionId,
1061 pub(crate) edit: RegionEdit,
1063 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1065}
1066
1067#[derive(Debug)]
1069pub(crate) struct RegionEditRequest {
1070 pub(crate) region_id: RegionId,
1071 pub(crate) edit: RegionEdit,
1072 pub(crate) tx: Sender<Result<()>>,
1074}
1075
1076#[derive(Debug)]
1078pub(crate) struct RegionEditResult {
1079 pub(crate) region_id: RegionId,
1081 pub(crate) sender: Sender<Result<()>>,
1083 pub(crate) edit: RegionEdit,
1085 pub(crate) result: Result<()>,
1087 pub(crate) update_region_state: bool,
1089 pub(crate) is_staging: bool,
1091}
1092
1093#[derive(Debug)]
1094pub(crate) struct BuildIndexRequest {
1095 pub(crate) region_id: RegionId,
1096 pub(crate) build_type: IndexBuildType,
1097 pub(crate) file_metas: Vec<FileMeta>,
1099}
1100
1101#[derive(Debug)]
1102pub(crate) struct RegionSyncRequest {
1103 pub(crate) region_id: RegionId,
1104 pub(crate) manifest_version: ManifestVersion,
1105 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1107}
1108
1109#[derive(Debug)]
1110pub(crate) struct RemapManifestsRequest {
1111 pub(crate) region_id: RegionId,
1113 pub(crate) input_regions: Vec<RegionId>,
1115 pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1117 pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1119 pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1123}
1124
1125#[derive(Debug)]
1126pub(crate) struct CopyRegionFromRequest {
1127 pub(crate) region_id: RegionId,
1129 pub(crate) source_region_id: RegionId,
1131 pub(crate) parallelism: usize,
1133 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1135}
1136
1137#[cfg(test)]
1138mod tests {
1139 use api::v1::value::ValueData;
1140 use api::v1::{Row, SemanticType};
1141 use datatypes::prelude::ConcreteDataType;
1142 use datatypes::schema::ColumnDefaultConstraint;
1143 use mito_codec::test_util::i64_value;
1144 use store_api::metadata::RegionMetadataBuilder;
1145
1146 use super::*;
1147 use crate::error::Error;
1148 use crate::test_util::ts_ms_value;
1149
1150 fn new_column_schema(
1151 name: &str,
1152 data_type: ColumnDataType,
1153 semantic_type: SemanticType,
1154 ) -> ColumnSchema {
1155 ColumnSchema {
1156 column_name: name.to_string(),
1157 datatype: data_type as i32,
1158 semantic_type: semantic_type as i32,
1159 ..Default::default()
1160 }
1161 }
1162
1163 fn check_invalid_request(err: &Error, expect: &str) {
1164 if let Error::InvalidRequest {
1165 region_id: _,
1166 reason,
1167 location: _,
1168 } = err
1169 {
1170 assert_eq!(reason, expect);
1171 } else {
1172 panic!("Unexpected error {err}")
1173 }
1174 }
1175
1176 #[test]
1177 fn test_write_request_duplicate_column() {
1178 let rows = Rows {
1179 schema: vec![
1180 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1181 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1182 ],
1183 rows: vec![],
1184 };
1185
1186 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1187 check_invalid_request(&err, "duplicate column c0");
1188 }
1189
1190 #[test]
1191 fn test_valid_write_request() {
1192 let rows = Rows {
1193 schema: vec![
1194 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1195 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1196 ],
1197 rows: vec![Row {
1198 values: vec![i64_value(1), i64_value(2)],
1199 }],
1200 };
1201
1202 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1203 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1204 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1205 assert_eq!(None, request.column_index_by_name("c2"));
1206 }
1207
1208 #[test]
1209 fn test_write_request_column_num() {
1210 let rows = Rows {
1211 schema: vec![
1212 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1213 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1214 ],
1215 rows: vec![Row {
1216 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1217 }],
1218 };
1219
1220 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1221 check_invalid_request(&err, "row has 3 columns but schema has 2");
1222 }
1223
1224 fn new_region_metadata() -> RegionMetadata {
1225 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1226 builder
1227 .push_column_metadata(ColumnMetadata {
1228 column_schema: datatypes::schema::ColumnSchema::new(
1229 "ts",
1230 ConcreteDataType::timestamp_millisecond_datatype(),
1231 false,
1232 ),
1233 semantic_type: SemanticType::Timestamp,
1234 column_id: 1,
1235 })
1236 .push_column_metadata(ColumnMetadata {
1237 column_schema: datatypes::schema::ColumnSchema::new(
1238 "k0",
1239 ConcreteDataType::int64_datatype(),
1240 true,
1241 ),
1242 semantic_type: SemanticType::Tag,
1243 column_id: 2,
1244 })
1245 .primary_key(vec![2]);
1246 builder.build().unwrap()
1247 }
1248
1249 #[test]
1250 fn test_check_schema() {
1251 let rows = Rows {
1252 schema: vec![
1253 new_column_schema(
1254 "ts",
1255 ColumnDataType::TimestampMillisecond,
1256 SemanticType::Timestamp,
1257 ),
1258 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1259 ],
1260 rows: vec![Row {
1261 values: vec![ts_ms_value(1), i64_value(2)],
1262 }],
1263 };
1264 let metadata = new_region_metadata();
1265
1266 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1267 request.check_schema(&metadata).unwrap();
1268 }
1269
1270 #[test]
1271 fn test_column_type() {
1272 let rows = Rows {
1273 schema: vec![
1274 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1275 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1276 ],
1277 rows: vec![Row {
1278 values: vec![i64_value(1), i64_value(2)],
1279 }],
1280 };
1281 let metadata = new_region_metadata();
1282
1283 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1284 let err = request.check_schema(&metadata).unwrap_err();
1285 check_invalid_request(
1286 &err,
1287 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1288 );
1289 }
1290
1291 #[test]
1292 fn test_semantic_type() {
1293 let rows = Rows {
1294 schema: vec![
1295 new_column_schema(
1296 "ts",
1297 ColumnDataType::TimestampMillisecond,
1298 SemanticType::Tag,
1299 ),
1300 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1301 ],
1302 rows: vec![Row {
1303 values: vec![ts_ms_value(1), i64_value(2)],
1304 }],
1305 };
1306 let metadata = new_region_metadata();
1307
1308 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1309 let err = request.check_schema(&metadata).unwrap_err();
1310 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1311 }
1312
1313 #[test]
1314 fn test_column_nullable() {
1315 let rows = Rows {
1316 schema: vec![
1317 new_column_schema(
1318 "ts",
1319 ColumnDataType::TimestampMillisecond,
1320 SemanticType::Timestamp,
1321 ),
1322 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1323 ],
1324 rows: vec![Row {
1325 values: vec![Value { value_data: None }, i64_value(2)],
1326 }],
1327 };
1328 let metadata = new_region_metadata();
1329
1330 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1331 let err = request.check_schema(&metadata).unwrap_err();
1332 check_invalid_request(&err, "column ts is not null but input has null");
1333 }
1334
1335 #[test]
1336 fn test_column_default() {
1337 let rows = Rows {
1338 schema: vec![new_column_schema(
1339 "k0",
1340 ColumnDataType::Int64,
1341 SemanticType::Tag,
1342 )],
1343 rows: vec![Row {
1344 values: vec![i64_value(1)],
1345 }],
1346 };
1347 let metadata = new_region_metadata();
1348
1349 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1350 let err = request.check_schema(&metadata).unwrap_err();
1351 check_invalid_request(&err, "missing column ts");
1352 }
1353
1354 #[test]
1355 fn test_unknown_column() {
1356 let rows = Rows {
1357 schema: vec![
1358 new_column_schema(
1359 "ts",
1360 ColumnDataType::TimestampMillisecond,
1361 SemanticType::Timestamp,
1362 ),
1363 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1364 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1365 ],
1366 rows: vec![Row {
1367 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1368 }],
1369 };
1370 let metadata = new_region_metadata();
1371
1372 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1373 let err = request.check_schema(&metadata).unwrap_err();
1374 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1375 }
1376
1377 #[test]
1378 fn test_fill_impure_columns_err() {
1379 let rows = Rows {
1380 schema: vec![new_column_schema(
1381 "k0",
1382 ColumnDataType::Int64,
1383 SemanticType::Tag,
1384 )],
1385 rows: vec![Row {
1386 values: vec![i64_value(1)],
1387 }],
1388 };
1389 let metadata = {
1390 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1391 builder
1392 .push_column_metadata(ColumnMetadata {
1393 column_schema: datatypes::schema::ColumnSchema::new(
1394 "ts",
1395 ConcreteDataType::timestamp_millisecond_datatype(),
1396 false,
1397 )
1398 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1399 "now()".to_string(),
1400 )))
1401 .unwrap(),
1402 semantic_type: SemanticType::Timestamp,
1403 column_id: 1,
1404 })
1405 .push_column_metadata(ColumnMetadata {
1406 column_schema: datatypes::schema::ColumnSchema::new(
1407 "k0",
1408 ConcreteDataType::int64_datatype(),
1409 true,
1410 ),
1411 semantic_type: SemanticType::Tag,
1412 column_id: 2,
1413 })
1414 .primary_key(vec![2]);
1415 builder.build().unwrap()
1416 };
1417
1418 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1419 let err = request.check_schema(&metadata).unwrap_err();
1420 assert!(err.is_fill_default());
1421 assert!(
1422 request
1423 .fill_missing_columns(&metadata)
1424 .unwrap_err()
1425 .to_string()
1426 .contains("unexpected impure default value with region_id")
1427 );
1428 }
1429
1430 #[test]
1431 fn test_fill_missing_columns() {
1432 let rows = Rows {
1433 schema: vec![new_column_schema(
1434 "ts",
1435 ColumnDataType::TimestampMillisecond,
1436 SemanticType::Timestamp,
1437 )],
1438 rows: vec![Row {
1439 values: vec![ts_ms_value(1)],
1440 }],
1441 };
1442 let metadata = new_region_metadata();
1443
1444 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1445 let err = request.check_schema(&metadata).unwrap_err();
1446 assert!(err.is_fill_default());
1447 request.fill_missing_columns(&metadata).unwrap();
1448
1449 let expect_rows = Rows {
1450 schema: vec![new_column_schema(
1451 "ts",
1452 ColumnDataType::TimestampMillisecond,
1453 SemanticType::Timestamp,
1454 )],
1455 rows: vec![Row {
1456 values: vec![ts_ms_value(1)],
1457 }],
1458 };
1459 assert_eq!(expect_rows, request.rows);
1460 }
1461
1462 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1463 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1464 builder
1465 .push_column_metadata(ColumnMetadata {
1466 column_schema: datatypes::schema::ColumnSchema::new(
1467 "ts",
1468 ConcreteDataType::timestamp_millisecond_datatype(),
1469 false,
1470 ),
1471 semantic_type: SemanticType::Timestamp,
1472 column_id: 1,
1473 })
1474 .push_column_metadata(ColumnMetadata {
1475 column_schema: datatypes::schema::ColumnSchema::new(
1476 "k0",
1477 ConcreteDataType::int64_datatype(),
1478 true,
1479 ),
1480 semantic_type: SemanticType::Tag,
1481 column_id: 2,
1482 })
1483 .primary_key(vec![2]);
1484 builder
1485 }
1486
1487 fn region_metadata_two_fields() -> RegionMetadata {
1488 let mut builder = builder_with_ts_tag();
1489 builder
1490 .push_column_metadata(ColumnMetadata {
1491 column_schema: datatypes::schema::ColumnSchema::new(
1492 "f0",
1493 ConcreteDataType::int64_datatype(),
1494 true,
1495 ),
1496 semantic_type: SemanticType::Field,
1497 column_id: 3,
1498 })
1499 .push_column_metadata(ColumnMetadata {
1501 column_schema: datatypes::schema::ColumnSchema::new(
1502 "f1",
1503 ConcreteDataType::int64_datatype(),
1504 false,
1505 )
1506 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1507 datatypes::value::Value::Int64(100),
1508 )))
1509 .unwrap(),
1510 semantic_type: SemanticType::Field,
1511 column_id: 4,
1512 });
1513 builder.build().unwrap()
1514 }
1515
1516 #[test]
1517 fn test_fill_missing_for_delete() {
1518 let rows = Rows {
1519 schema: vec![new_column_schema(
1520 "ts",
1521 ColumnDataType::TimestampMillisecond,
1522 SemanticType::Timestamp,
1523 )],
1524 rows: vec![Row {
1525 values: vec![ts_ms_value(1)],
1526 }],
1527 };
1528 let metadata = region_metadata_two_fields();
1529
1530 let mut request =
1531 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1532 let err = request.check_schema(&metadata).unwrap_err();
1533 check_invalid_request(&err, "delete requests need column k0");
1534 let err = request.fill_missing_columns(&metadata).unwrap_err();
1535 check_invalid_request(&err, "delete requests need column k0");
1536
1537 let rows = Rows {
1538 schema: vec![
1539 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1540 new_column_schema(
1541 "ts",
1542 ColumnDataType::TimestampMillisecond,
1543 SemanticType::Timestamp,
1544 ),
1545 ],
1546 rows: vec![Row {
1547 values: vec![i64_value(100), ts_ms_value(1)],
1548 }],
1549 };
1550 let mut request =
1551 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1552 let err = request.check_schema(&metadata).unwrap_err();
1553 assert!(err.is_fill_default());
1554 request.fill_missing_columns(&metadata).unwrap();
1555
1556 let expect_rows = Rows {
1557 schema: vec![
1558 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1559 new_column_schema(
1560 "ts",
1561 ColumnDataType::TimestampMillisecond,
1562 SemanticType::Timestamp,
1563 ),
1564 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1565 ],
1566 rows: vec![Row {
1568 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1569 }],
1570 };
1571 assert_eq!(expect_rows, request.rows);
1572 }
1573
1574 #[test]
1575 fn test_fill_missing_without_default_in_delete() {
1576 let mut builder = builder_with_ts_tag();
1577 builder
1578 .push_column_metadata(ColumnMetadata {
1580 column_schema: datatypes::schema::ColumnSchema::new(
1581 "f0",
1582 ConcreteDataType::int64_datatype(),
1583 true,
1584 ),
1585 semantic_type: SemanticType::Field,
1586 column_id: 3,
1587 })
1588 .push_column_metadata(ColumnMetadata {
1590 column_schema: datatypes::schema::ColumnSchema::new(
1591 "f1",
1592 ConcreteDataType::int64_datatype(),
1593 false,
1594 ),
1595 semantic_type: SemanticType::Field,
1596 column_id: 4,
1597 });
1598 let metadata = builder.build().unwrap();
1599
1600 let rows = Rows {
1601 schema: vec![
1602 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1603 new_column_schema(
1604 "ts",
1605 ColumnDataType::TimestampMillisecond,
1606 SemanticType::Timestamp,
1607 ),
1608 ],
1609 rows: vec![Row {
1611 values: vec![i64_value(100), ts_ms_value(1)],
1612 }],
1613 };
1614 let mut request =
1615 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1616 let err = request.check_schema(&metadata).unwrap_err();
1617 assert!(err.is_fill_default());
1618 request.fill_missing_columns(&metadata).unwrap();
1619
1620 let expect_rows = Rows {
1621 schema: vec![
1622 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1623 new_column_schema(
1624 "ts",
1625 ColumnDataType::TimestampMillisecond,
1626 SemanticType::Timestamp,
1627 ),
1628 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1629 ],
1630 rows: vec![Row {
1632 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1633 }],
1634 };
1635 assert_eq!(expect_rows, request.rows);
1636 }
1637
1638 #[test]
1639 fn test_no_default() {
1640 let rows = Rows {
1641 schema: vec![new_column_schema(
1642 "k0",
1643 ColumnDataType::Int64,
1644 SemanticType::Tag,
1645 )],
1646 rows: vec![Row {
1647 values: vec![i64_value(1)],
1648 }],
1649 };
1650 let metadata = new_region_metadata();
1651
1652 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1653 let err = request.fill_missing_columns(&metadata).unwrap_err();
1654 check_invalid_request(&err, "column ts does not have default value");
1655 }
1656
1657 #[test]
1658 fn test_missing_and_invalid() {
1659 let rows = Rows {
1661 schema: vec![
1662 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1663 new_column_schema(
1664 "ts",
1665 ColumnDataType::TimestampMillisecond,
1666 SemanticType::Timestamp,
1667 ),
1668 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1669 ],
1670 rows: vec![Row {
1671 values: vec![
1672 i64_value(100),
1673 ts_ms_value(1),
1674 Value {
1675 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1676 },
1677 ],
1678 }],
1679 };
1680 let metadata = region_metadata_two_fields();
1681
1682 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1683 let err = request.check_schema(&metadata).unwrap_err();
1684 check_invalid_request(
1685 &err,
1686 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1687 );
1688 }
1689
1690 #[test]
1691 fn test_write_request_metadata() {
1692 let rows = Rows {
1693 schema: vec![
1694 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1695 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1696 ],
1697 rows: vec![Row {
1698 values: vec![i64_value(1), i64_value(2)],
1699 }],
1700 };
1701
1702 let metadata = Arc::new(new_region_metadata());
1703 let request = WriteRequest::new(
1704 RegionId::new(1, 1),
1705 OpType::Put,
1706 rows,
1707 Some(metadata.clone()),
1708 )
1709 .unwrap();
1710
1711 assert!(request.region_metadata.is_some());
1712 assert_eq!(
1713 request.region_metadata.unwrap().region_id,
1714 RegionId::new(1, 1)
1715 );
1716 }
1717}