1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23 ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
36use store_api::region_request::{
37 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
38 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
39 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
40};
41use store_api::storage::{RegionId, SequenceNumber};
42use store_api::ManifestVersion;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::RegionEdit;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56#[derive(Debug)]
58pub struct WriteRequest {
59 pub region_id: RegionId,
61 pub op_type: OpType,
63 pub rows: Rows,
65 pub name_to_index: HashMap<String, usize>,
67 pub has_null: Vec<bool>,
69 pub hint: Option<WriteHint>,
71 pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76 pub fn new(
80 region_id: RegionId,
81 op_type: OpType,
82 rows: Rows,
83 region_metadata: Option<RegionMetadataRef>,
84 ) -> Result<WriteRequest> {
85 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86 for (index, column) in rows.schema.iter().enumerate() {
87 ensure!(
88 name_to_index
89 .insert(column.column_name.clone(), index)
90 .is_none(),
91 InvalidRequestSnafu {
92 region_id,
93 reason: format!("duplicate column {}", column.column_name),
94 }
95 );
96 }
97
98 let mut has_null = vec![false; rows.schema.len()];
99 for row in &rows.rows {
100 ensure!(
101 row.values.len() == rows.schema.len(),
102 InvalidRequestSnafu {
103 region_id,
104 reason: format!(
105 "row has {} columns but schema has {}",
106 row.values.len(),
107 rows.schema.len()
108 ),
109 }
110 );
111
112 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113 validate_proto_value(region_id, value, column_schema)?;
114
115 if value.value_data.is_none() {
116 has_null[i] = true;
117 }
118 }
119 }
120
121 Ok(WriteRequest {
122 region_id,
123 op_type,
124 rows,
125 name_to_index,
126 has_null,
127 hint: None,
128 region_metadata,
129 })
130 }
131
132 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134 self.hint = hint;
135 self
136 }
137
138 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140 infer_primary_key_encoding_from_hint(self.hint.as_ref())
141 }
142
143 pub(crate) fn estimated_size(&self) -> usize {
145 let row_size = self
146 .rows
147 .rows
148 .first()
149 .map(|row| row.encoded_len())
150 .unwrap_or(0);
151 row_size * self.rows.rows.len()
152 }
153
154 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156 self.name_to_index.get(name).copied()
157 }
158
159 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164 debug_assert_eq!(self.region_id, metadata.region_id);
165
166 let region_id = self.region_id;
167 let mut rows_columns: HashMap<_, _> = self
169 .rows
170 .schema
171 .iter()
172 .map(|column| (&column.column_name, column))
173 .collect();
174
175 let mut need_fill_default = false;
176 for column in &metadata.column_metadatas {
178 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179 ensure!(
181 is_column_type_value_eq(
182 input_col.datatype,
183 input_col.datatype_extension,
184 &column.column_schema.data_type
185 ),
186 InvalidRequestSnafu {
187 region_id,
188 reason: format!(
189 "column {} expect type {:?}, given: {}({})",
190 column.column_schema.name,
191 column.column_schema.data_type,
192 ColumnDataType::try_from(input_col.datatype)
193 .map(|v| v.as_str_name())
194 .unwrap_or("Unknown"),
195 input_col.datatype,
196 )
197 }
198 );
199
200 ensure!(
202 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203 InvalidRequestSnafu {
204 region_id,
205 reason: format!(
206 "column {} has semantic type {:?}, given: {}({})",
207 column.column_schema.name,
208 column.semantic_type,
209 api::v1::SemanticType::try_from(input_col.semantic_type)
210 .map(|v| v.as_str_name())
211 .unwrap_or("Unknown"),
212 input_col.semantic_type
213 ),
214 }
215 );
216
217 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220 ensure!(
221 !has_null || column.column_schema.is_nullable(),
222 InvalidRequestSnafu {
223 region_id,
224 reason: format!(
225 "column {} is not null but input has null",
226 column.column_schema.name
227 ),
228 }
229 );
230 } else {
231 self.check_missing_column(column)?;
233
234 need_fill_default = true;
235 }
236 }
237
238 if !rows_columns.is_empty() {
240 let names: Vec<_> = rows_columns.into_keys().collect();
241 return InvalidRequestSnafu {
242 region_id,
243 reason: format!("unknown columns: {:?}", names),
244 }
245 .fail();
246 }
247
248 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251 Ok(())
252 }
253
254 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259 debug_assert_eq!(self.region_id, metadata.region_id);
260
261 let mut columns_to_fill = vec![];
262 for column in &metadata.column_metadatas {
263 if !self.name_to_index.contains_key(&column.column_schema.name) {
264 columns_to_fill.push(column);
265 }
266 }
267 self.fill_columns(columns_to_fill)?;
268
269 Ok(())
270 }
271
272 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274 if let Err(e) = self.check_schema(metadata) {
275 if e.is_fill_default() {
276 self.fill_missing_columns(metadata)?;
280 } else {
281 return Err(e);
282 }
283 }
284
285 Ok(())
286 }
287
288 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290 let mut default_values = Vec::with_capacity(columns.len());
291 let mut columns_to_fill = Vec::with_capacity(columns.len());
292 for column in columns {
293 let default_value = self.column_default_value(column)?;
294 if default_value.value_data.is_some() {
295 default_values.push(default_value);
296 columns_to_fill.push(column);
297 }
298 }
299
300 for row in &mut self.rows.rows {
301 row.values.extend(default_values.iter().cloned());
302 }
303
304 for column in columns_to_fill {
305 let (datatype, datatype_ext) =
306 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307 .with_context(|_| ConvertColumnDataTypeSnafu {
308 reason: format!(
309 "no protobuf type for column {} ({:?})",
310 column.column_schema.name, column.column_schema.data_type
311 ),
312 })?
313 .to_parts();
314 self.rows.schema.push(ColumnSchema {
315 column_name: column.column_schema.name.clone(),
316 datatype: datatype as i32,
317 semantic_type: column.semantic_type as i32,
318 datatype_extension: datatype_ext,
319 options: options_from_column_schema(&column.column_schema),
320 });
321 }
322
323 Ok(())
324 }
325
326 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328 if self.op_type == OpType::Delete {
329 if column.semantic_type == SemanticType::Field {
330 return Ok(());
333 } else {
334 return InvalidRequestSnafu {
335 region_id: self.region_id,
336 reason: format!("delete requests need column {}", column.column_schema.name),
337 }
338 .fail();
339 }
340 }
341
342 ensure!(
344 column.column_schema.is_nullable()
345 || column.column_schema.default_constraint().is_some(),
346 InvalidRequestSnafu {
347 region_id: self.region_id,
348 reason: format!("missing column {}", column.column_schema.name),
349 }
350 );
351
352 Ok(())
353 }
354
355 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357 let default_value = match self.op_type {
358 OpType::Delete => {
359 ensure!(
360 column.semantic_type == SemanticType::Field,
361 InvalidRequestSnafu {
362 region_id: self.region_id,
363 reason: format!(
364 "delete requests need column {}",
365 column.column_schema.name
366 ),
367 }
368 );
369
370 if column.column_schema.is_nullable() {
375 datatypes::value::Value::Null
376 } else {
377 column.column_schema.data_type.default_value()
378 }
379 }
380 OpType::Put => {
381 if column.column_schema.is_default_impure() {
383 UnexpectedSnafu {
384 reason: format!(
385 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
386 self.region_id,
387 column.column_schema.name,
388 column.column_schema.default_constraint(),
389 ),
390 }
391 .fail()?
392 }
393 column
394 .column_schema
395 .create_default()
396 .context(CreateDefaultSnafu {
397 region_id: self.region_id,
398 column: &column.column_schema.name,
399 })?
400 .with_context(|| InvalidRequestSnafu {
402 region_id: self.region_id,
403 reason: format!(
404 "column {} does not have default value",
405 column.column_schema.name
406 ),
407 })?
408 }
409 };
410
411 to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
413 region_id: self.region_id,
414 reason: format!(
415 "no protobuf type for default value of column {} ({:?})",
416 column.column_schema.name, column.column_schema.data_type
417 ),
418 })
419 }
420}
421
422pub(crate) fn validate_proto_value(
424 region_id: RegionId,
425 value: &Value,
426 column_schema: &ColumnSchema,
427) -> Result<()> {
428 if let Some(value_type) = proto_value_type(value) {
429 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
430 InvalidRequestSnafu {
431 region_id,
432 reason: format!(
433 "column {} has unknown type {}",
434 column_schema.column_name, column_schema.datatype
435 ),
436 }
437 .build()
438 })?;
439 ensure!(
440 proto_value_type_match(column_type, value_type),
441 InvalidRequestSnafu {
442 region_id,
443 reason: format!(
444 "value has type {:?}, but column {} has type {:?}({})",
445 value_type, column_schema.column_name, column_type, column_schema.datatype,
446 ),
447 }
448 );
449 }
450
451 Ok(())
452}
453
454fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
455 match (column_type, value_type) {
456 (ct, vt) if ct == vt => true,
457 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
458 (ColumnDataType::Json, ColumnDataType::Binary) => true,
459 _ => false,
460 }
461}
462
463#[derive(Debug)]
465pub struct OutputTx(Sender<Result<AffectedRows>>);
466
467impl OutputTx {
468 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
470 OutputTx(sender)
471 }
472
473 pub(crate) fn send(self, result: Result<AffectedRows>) {
475 let _ = self.0.send(result);
477 }
478}
479
480#[derive(Debug)]
482pub(crate) struct OptionOutputTx(Option<OutputTx>);
483
484impl OptionOutputTx {
485 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
487 OptionOutputTx(sender)
488 }
489
490 pub(crate) fn none() -> OptionOutputTx {
492 OptionOutputTx(None)
493 }
494
495 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
497 if let Some(sender) = self.0.take() {
498 sender.send(result);
499 }
500 }
501
502 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
504 if let Some(sender) = self.0.take() {
505 sender.send(result);
506 }
507 }
508
509 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
511 self.0.take()
512 }
513}
514
515impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
516 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
517 Self::new(Some(OutputTx::new(sender)))
518 }
519}
520
521impl OnFailure for OptionOutputTx {
522 fn on_failure(&mut self, err: Error) {
523 self.send_mut(Err(err));
524 }
525}
526
527pub(crate) trait OnFailure {
529 fn on_failure(&mut self, err: Error);
531}
532
533#[derive(Debug)]
535pub(crate) struct SenderWriteRequest {
536 pub(crate) sender: OptionOutputTx,
538 pub(crate) request: WriteRequest,
539}
540
541pub(crate) struct SenderBulkRequest {
542 pub(crate) sender: OptionOutputTx,
543 pub(crate) region_id: RegionId,
544 pub(crate) request: BulkPart,
545 pub(crate) region_metadata: RegionMetadataRef,
546}
547
548#[derive(Debug)]
550pub(crate) struct WorkerRequestWithTime {
551 pub(crate) request: WorkerRequest,
552 pub(crate) created_at: Instant,
553}
554
555impl WorkerRequestWithTime {
556 pub(crate) fn new(request: WorkerRequest) -> Self {
557 Self {
558 request,
559 created_at: Instant::now(),
560 }
561 }
562}
563
564#[derive(Debug)]
566pub(crate) enum WorkerRequest {
567 Write(SenderWriteRequest),
569
570 Ddl(SenderDdlRequest),
572
573 Background {
575 region_id: RegionId,
577 notify: BackgroundNotify,
579 },
580
581 SetRegionRoleStateGracefully {
583 region_id: RegionId,
585 region_role_state: SettableRegionRoleState,
587 sender: Sender<SetRegionRoleStateResponse>,
589 },
590
591 Stop,
593
594 EditRegion(RegionEditRequest),
596
597 SyncRegion(RegionSyncRequest),
599
600 BulkInserts {
602 metadata: Option<RegionMetadataRef>,
603 request: RegionBulkInsertsRequest,
604 sender: OptionOutputTx,
605 },
606}
607
608impl WorkerRequest {
609 pub(crate) fn new_open_region_request(
610 region_id: RegionId,
611 request: RegionOpenRequest,
612 entry_receiver: Option<WalEntryReceiver>,
613 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
614 let (sender, receiver) = oneshot::channel();
615
616 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
617 region_id,
618 sender: sender.into(),
619 request: DdlRequest::Open((request, entry_receiver)),
620 });
621
622 (worker_request, receiver)
623 }
624
625 pub(crate) fn try_from_region_request(
627 region_id: RegionId,
628 value: RegionRequest,
629 region_metadata: Option<RegionMetadataRef>,
630 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
631 let (sender, receiver) = oneshot::channel();
632 let worker_request = match value {
633 RegionRequest::Put(v) => {
634 let mut write_request =
635 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
636 .with_hint(v.hint);
637 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
638 && let Some(region_metadata) = ®ion_metadata
639 {
640 write_request.maybe_fill_missing_columns(region_metadata)?;
641 }
642 WorkerRequest::Write(SenderWriteRequest {
643 sender: sender.into(),
644 request: write_request,
645 })
646 }
647 RegionRequest::Delete(v) => {
648 let mut write_request =
649 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
650 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651 && let Some(region_metadata) = ®ion_metadata
652 {
653 write_request.maybe_fill_missing_columns(region_metadata)?;
654 }
655 WorkerRequest::Write(SenderWriteRequest {
656 sender: sender.into(),
657 request: write_request,
658 })
659 }
660 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
661 region_id,
662 sender: sender.into(),
663 request: DdlRequest::Create(v),
664 }),
665 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
666 region_id,
667 sender: sender.into(),
668 request: DdlRequest::Drop,
669 }),
670 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
671 region_id,
672 sender: sender.into(),
673 request: DdlRequest::Open((v, None)),
674 }),
675 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
676 region_id,
677 sender: sender.into(),
678 request: DdlRequest::Close(v),
679 }),
680 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
681 region_id,
682 sender: sender.into(),
683 request: DdlRequest::Alter(v),
684 }),
685 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
686 region_id,
687 sender: sender.into(),
688 request: DdlRequest::Flush(v),
689 }),
690 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
691 region_id,
692 sender: sender.into(),
693 request: DdlRequest::Compact(v),
694 }),
695 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
696 region_id,
697 sender: sender.into(),
698 request: DdlRequest::Truncate(v),
699 }),
700 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
701 region_id,
702 sender: sender.into(),
703 request: DdlRequest::Catchup(v),
704 }),
705 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
706 metadata: region_metadata,
707 sender: sender.into(),
708 request: region_bulk_inserts_request,
709 },
710 };
711
712 Ok((worker_request, receiver))
713 }
714
715 pub(crate) fn new_set_readonly_gracefully(
716 region_id: RegionId,
717 region_role_state: SettableRegionRoleState,
718 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
719 let (sender, receiver) = oneshot::channel();
720
721 (
722 WorkerRequest::SetRegionRoleStateGracefully {
723 region_id,
724 region_role_state,
725 sender,
726 },
727 receiver,
728 )
729 }
730
731 pub(crate) fn new_sync_region_request(
732 region_id: RegionId,
733 manifest_version: ManifestVersion,
734 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
735 let (sender, receiver) = oneshot::channel();
736 (
737 WorkerRequest::SyncRegion(RegionSyncRequest {
738 region_id,
739 manifest_version,
740 sender,
741 }),
742 receiver,
743 )
744 }
745}
746
747#[derive(Debug)]
749pub(crate) enum DdlRequest {
750 Create(RegionCreateRequest),
751 Drop,
752 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
753 Close(RegionCloseRequest),
754 Alter(RegionAlterRequest),
755 Flush(RegionFlushRequest),
756 Compact(RegionCompactRequest),
757 Truncate(RegionTruncateRequest),
758 Catchup(RegionCatchupRequest),
759}
760
761#[derive(Debug)]
763pub(crate) struct SenderDdlRequest {
764 pub(crate) region_id: RegionId,
766 pub(crate) sender: OptionOutputTx,
768 pub(crate) request: DdlRequest,
770}
771
772#[derive(Debug)]
774pub(crate) enum BackgroundNotify {
775 FlushFinished(FlushFinished),
777 FlushFailed(FlushFailed),
779 CompactionFinished(CompactionFinished),
781 CompactionFailed(CompactionFailed),
783 Truncate(TruncateResult),
785 RegionChange(RegionChangeResult),
787 RegionEdit(RegionEditResult),
789}
790
791#[derive(Debug)]
793pub(crate) struct FlushFinished {
794 pub(crate) region_id: RegionId,
796 pub(crate) flushed_entry_id: EntryId,
798 pub(crate) senders: Vec<OutputTx>,
800 pub(crate) _timer: HistogramTimer,
802 pub(crate) edit: RegionEdit,
804 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
806}
807
808impl FlushFinished {
809 pub(crate) fn on_success(self) {
811 for sender in self.senders {
812 sender.send(Ok(0));
813 }
814 }
815}
816
817impl OnFailure for FlushFinished {
818 fn on_failure(&mut self, err: Error) {
819 let err = Arc::new(err);
820 for sender in self.senders.drain(..) {
821 sender.send(Err(err.clone()).context(FlushRegionSnafu {
822 region_id: self.region_id,
823 }));
824 }
825 }
826}
827
828#[derive(Debug)]
830pub(crate) struct FlushFailed {
831 pub(crate) err: Arc<Error>,
833}
834
835#[derive(Debug)]
837pub(crate) struct CompactionFinished {
838 pub(crate) region_id: RegionId,
840 pub(crate) senders: Vec<OutputTx>,
842 pub(crate) start_time: Instant,
844 pub(crate) edit: RegionEdit,
846}
847
848impl CompactionFinished {
849 pub fn on_success(self) {
850 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
852
853 for sender in self.senders {
854 sender.send(Ok(0));
855 }
856 info!("Successfully compacted region: {}", self.region_id);
857 }
858}
859
860impl OnFailure for CompactionFinished {
861 fn on_failure(&mut self, err: Error) {
863 let err = Arc::new(err);
864 for sender in self.senders.drain(..) {
865 sender.send(Err(err.clone()).context(CompactRegionSnafu {
866 region_id: self.region_id,
867 }));
868 }
869 }
870}
871
872#[derive(Debug)]
874pub(crate) struct CompactionFailed {
875 pub(crate) region_id: RegionId,
876 pub(crate) err: Arc<Error>,
878}
879
880#[derive(Debug)]
882pub(crate) struct TruncateResult {
883 pub(crate) region_id: RegionId,
885 pub(crate) sender: OptionOutputTx,
887 pub(crate) result: Result<()>,
889 pub(crate) truncated_entry_id: EntryId,
891 pub(crate) truncated_sequence: SequenceNumber,
893}
894
895#[derive(Debug)]
897pub(crate) struct RegionChangeResult {
898 pub(crate) region_id: RegionId,
900 pub(crate) new_meta: RegionMetadataRef,
902 pub(crate) sender: OptionOutputTx,
904 pub(crate) result: Result<()>,
906}
907
908#[derive(Debug)]
910pub(crate) struct RegionEditRequest {
911 pub(crate) region_id: RegionId,
912 pub(crate) edit: RegionEdit,
913 pub(crate) tx: Sender<Result<()>>,
915}
916
917#[derive(Debug)]
919pub(crate) struct RegionEditResult {
920 pub(crate) region_id: RegionId,
922 pub(crate) sender: Sender<Result<()>>,
924 pub(crate) edit: RegionEdit,
926 pub(crate) result: Result<()>,
928}
929
930#[derive(Debug)]
931pub(crate) struct RegionSyncRequest {
932 pub(crate) region_id: RegionId,
933 pub(crate) manifest_version: ManifestVersion,
934 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
936}
937
938#[cfg(test)]
939mod tests {
940 use api::v1::value::ValueData;
941 use api::v1::{Row, SemanticType};
942 use datatypes::prelude::ConcreteDataType;
943 use datatypes::schema::ColumnDefaultConstraint;
944 use mito_codec::test_util::i64_value;
945 use store_api::metadata::RegionMetadataBuilder;
946
947 use super::*;
948 use crate::error::Error;
949 use crate::test_util::ts_ms_value;
950
951 fn new_column_schema(
952 name: &str,
953 data_type: ColumnDataType,
954 semantic_type: SemanticType,
955 ) -> ColumnSchema {
956 ColumnSchema {
957 column_name: name.to_string(),
958 datatype: data_type as i32,
959 semantic_type: semantic_type as i32,
960 ..Default::default()
961 }
962 }
963
964 fn check_invalid_request(err: &Error, expect: &str) {
965 if let Error::InvalidRequest {
966 region_id: _,
967 reason,
968 location: _,
969 } = err
970 {
971 assert_eq!(reason, expect);
972 } else {
973 panic!("Unexpected error {err}")
974 }
975 }
976
977 #[test]
978 fn test_write_request_duplicate_column() {
979 let rows = Rows {
980 schema: vec![
981 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
982 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
983 ],
984 rows: vec![],
985 };
986
987 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
988 check_invalid_request(&err, "duplicate column c0");
989 }
990
991 #[test]
992 fn test_valid_write_request() {
993 let rows = Rows {
994 schema: vec![
995 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
996 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
997 ],
998 rows: vec![Row {
999 values: vec![i64_value(1), i64_value(2)],
1000 }],
1001 };
1002
1003 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1004 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1005 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1006 assert_eq!(None, request.column_index_by_name("c2"));
1007 }
1008
1009 #[test]
1010 fn test_write_request_column_num() {
1011 let rows = Rows {
1012 schema: vec![
1013 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1014 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1015 ],
1016 rows: vec![Row {
1017 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1018 }],
1019 };
1020
1021 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1022 check_invalid_request(&err, "row has 3 columns but schema has 2");
1023 }
1024
1025 fn new_region_metadata() -> RegionMetadata {
1026 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1027 builder
1028 .push_column_metadata(ColumnMetadata {
1029 column_schema: datatypes::schema::ColumnSchema::new(
1030 "ts",
1031 ConcreteDataType::timestamp_millisecond_datatype(),
1032 false,
1033 ),
1034 semantic_type: SemanticType::Timestamp,
1035 column_id: 1,
1036 })
1037 .push_column_metadata(ColumnMetadata {
1038 column_schema: datatypes::schema::ColumnSchema::new(
1039 "k0",
1040 ConcreteDataType::int64_datatype(),
1041 true,
1042 ),
1043 semantic_type: SemanticType::Tag,
1044 column_id: 2,
1045 })
1046 .primary_key(vec![2]);
1047 builder.build().unwrap()
1048 }
1049
1050 #[test]
1051 fn test_check_schema() {
1052 let rows = Rows {
1053 schema: vec![
1054 new_column_schema(
1055 "ts",
1056 ColumnDataType::TimestampMillisecond,
1057 SemanticType::Timestamp,
1058 ),
1059 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1060 ],
1061 rows: vec![Row {
1062 values: vec![ts_ms_value(1), i64_value(2)],
1063 }],
1064 };
1065 let metadata = new_region_metadata();
1066
1067 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1068 request.check_schema(&metadata).unwrap();
1069 }
1070
1071 #[test]
1072 fn test_column_type() {
1073 let rows = Rows {
1074 schema: vec![
1075 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1076 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1077 ],
1078 rows: vec![Row {
1079 values: vec![i64_value(1), i64_value(2)],
1080 }],
1081 };
1082 let metadata = new_region_metadata();
1083
1084 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1085 let err = request.check_schema(&metadata).unwrap_err();
1086 check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1087 }
1088
1089 #[test]
1090 fn test_semantic_type() {
1091 let rows = Rows {
1092 schema: vec![
1093 new_column_schema(
1094 "ts",
1095 ColumnDataType::TimestampMillisecond,
1096 SemanticType::Tag,
1097 ),
1098 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1099 ],
1100 rows: vec![Row {
1101 values: vec![ts_ms_value(1), i64_value(2)],
1102 }],
1103 };
1104 let metadata = new_region_metadata();
1105
1106 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1107 let err = request.check_schema(&metadata).unwrap_err();
1108 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1109 }
1110
1111 #[test]
1112 fn test_column_nullable() {
1113 let rows = Rows {
1114 schema: vec![
1115 new_column_schema(
1116 "ts",
1117 ColumnDataType::TimestampMillisecond,
1118 SemanticType::Timestamp,
1119 ),
1120 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1121 ],
1122 rows: vec![Row {
1123 values: vec![Value { value_data: None }, i64_value(2)],
1124 }],
1125 };
1126 let metadata = new_region_metadata();
1127
1128 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1129 let err = request.check_schema(&metadata).unwrap_err();
1130 check_invalid_request(&err, "column ts is not null but input has null");
1131 }
1132
1133 #[test]
1134 fn test_column_default() {
1135 let rows = Rows {
1136 schema: vec![new_column_schema(
1137 "k0",
1138 ColumnDataType::Int64,
1139 SemanticType::Tag,
1140 )],
1141 rows: vec![Row {
1142 values: vec![i64_value(1)],
1143 }],
1144 };
1145 let metadata = new_region_metadata();
1146
1147 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1148 let err = request.check_schema(&metadata).unwrap_err();
1149 check_invalid_request(&err, "missing column ts");
1150 }
1151
1152 #[test]
1153 fn test_unknown_column() {
1154 let rows = Rows {
1155 schema: vec![
1156 new_column_schema(
1157 "ts",
1158 ColumnDataType::TimestampMillisecond,
1159 SemanticType::Timestamp,
1160 ),
1161 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1162 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1163 ],
1164 rows: vec![Row {
1165 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1166 }],
1167 };
1168 let metadata = new_region_metadata();
1169
1170 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1171 let err = request.check_schema(&metadata).unwrap_err();
1172 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1173 }
1174
1175 #[test]
1176 fn test_fill_impure_columns_err() {
1177 let rows = Rows {
1178 schema: vec![new_column_schema(
1179 "k0",
1180 ColumnDataType::Int64,
1181 SemanticType::Tag,
1182 )],
1183 rows: vec![Row {
1184 values: vec![i64_value(1)],
1185 }],
1186 };
1187 let metadata = {
1188 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1189 builder
1190 .push_column_metadata(ColumnMetadata {
1191 column_schema: datatypes::schema::ColumnSchema::new(
1192 "ts",
1193 ConcreteDataType::timestamp_millisecond_datatype(),
1194 false,
1195 )
1196 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1197 "now()".to_string(),
1198 )))
1199 .unwrap(),
1200 semantic_type: SemanticType::Timestamp,
1201 column_id: 1,
1202 })
1203 .push_column_metadata(ColumnMetadata {
1204 column_schema: datatypes::schema::ColumnSchema::new(
1205 "k0",
1206 ConcreteDataType::int64_datatype(),
1207 true,
1208 ),
1209 semantic_type: SemanticType::Tag,
1210 column_id: 2,
1211 })
1212 .primary_key(vec![2]);
1213 builder.build().unwrap()
1214 };
1215
1216 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1217 let err = request.check_schema(&metadata).unwrap_err();
1218 assert!(err.is_fill_default());
1219 assert!(request
1220 .fill_missing_columns(&metadata)
1221 .unwrap_err()
1222 .to_string()
1223 .contains("unexpected impure default value with region_id"));
1224 }
1225
1226 #[test]
1227 fn test_fill_missing_columns() {
1228 let rows = Rows {
1229 schema: vec![new_column_schema(
1230 "ts",
1231 ColumnDataType::TimestampMillisecond,
1232 SemanticType::Timestamp,
1233 )],
1234 rows: vec![Row {
1235 values: vec![ts_ms_value(1)],
1236 }],
1237 };
1238 let metadata = new_region_metadata();
1239
1240 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1241 let err = request.check_schema(&metadata).unwrap_err();
1242 assert!(err.is_fill_default());
1243 request.fill_missing_columns(&metadata).unwrap();
1244
1245 let expect_rows = Rows {
1246 schema: vec![new_column_schema(
1247 "ts",
1248 ColumnDataType::TimestampMillisecond,
1249 SemanticType::Timestamp,
1250 )],
1251 rows: vec![Row {
1252 values: vec![ts_ms_value(1)],
1253 }],
1254 };
1255 assert_eq!(expect_rows, request.rows);
1256 }
1257
1258 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1259 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1260 builder
1261 .push_column_metadata(ColumnMetadata {
1262 column_schema: datatypes::schema::ColumnSchema::new(
1263 "ts",
1264 ConcreteDataType::timestamp_millisecond_datatype(),
1265 false,
1266 ),
1267 semantic_type: SemanticType::Timestamp,
1268 column_id: 1,
1269 })
1270 .push_column_metadata(ColumnMetadata {
1271 column_schema: datatypes::schema::ColumnSchema::new(
1272 "k0",
1273 ConcreteDataType::int64_datatype(),
1274 true,
1275 ),
1276 semantic_type: SemanticType::Tag,
1277 column_id: 2,
1278 })
1279 .primary_key(vec![2]);
1280 builder
1281 }
1282
1283 fn region_metadata_two_fields() -> RegionMetadata {
1284 let mut builder = builder_with_ts_tag();
1285 builder
1286 .push_column_metadata(ColumnMetadata {
1287 column_schema: datatypes::schema::ColumnSchema::new(
1288 "f0",
1289 ConcreteDataType::int64_datatype(),
1290 true,
1291 ),
1292 semantic_type: SemanticType::Field,
1293 column_id: 3,
1294 })
1295 .push_column_metadata(ColumnMetadata {
1297 column_schema: datatypes::schema::ColumnSchema::new(
1298 "f1",
1299 ConcreteDataType::int64_datatype(),
1300 false,
1301 )
1302 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1303 datatypes::value::Value::Int64(100),
1304 )))
1305 .unwrap(),
1306 semantic_type: SemanticType::Field,
1307 column_id: 4,
1308 });
1309 builder.build().unwrap()
1310 }
1311
1312 #[test]
1313 fn test_fill_missing_for_delete() {
1314 let rows = Rows {
1315 schema: vec![new_column_schema(
1316 "ts",
1317 ColumnDataType::TimestampMillisecond,
1318 SemanticType::Timestamp,
1319 )],
1320 rows: vec![Row {
1321 values: vec![ts_ms_value(1)],
1322 }],
1323 };
1324 let metadata = region_metadata_two_fields();
1325
1326 let mut request =
1327 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1328 let err = request.check_schema(&metadata).unwrap_err();
1329 check_invalid_request(&err, "delete requests need column k0");
1330 let err = request.fill_missing_columns(&metadata).unwrap_err();
1331 check_invalid_request(&err, "delete requests need column k0");
1332
1333 let rows = Rows {
1334 schema: vec![
1335 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1336 new_column_schema(
1337 "ts",
1338 ColumnDataType::TimestampMillisecond,
1339 SemanticType::Timestamp,
1340 ),
1341 ],
1342 rows: vec![Row {
1343 values: vec![i64_value(100), ts_ms_value(1)],
1344 }],
1345 };
1346 let mut request =
1347 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1348 let err = request.check_schema(&metadata).unwrap_err();
1349 assert!(err.is_fill_default());
1350 request.fill_missing_columns(&metadata).unwrap();
1351
1352 let expect_rows = Rows {
1353 schema: vec![
1354 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1355 new_column_schema(
1356 "ts",
1357 ColumnDataType::TimestampMillisecond,
1358 SemanticType::Timestamp,
1359 ),
1360 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1361 ],
1362 rows: vec![Row {
1364 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1365 }],
1366 };
1367 assert_eq!(expect_rows, request.rows);
1368 }
1369
1370 #[test]
1371 fn test_fill_missing_without_default_in_delete() {
1372 let mut builder = builder_with_ts_tag();
1373 builder
1374 .push_column_metadata(ColumnMetadata {
1376 column_schema: datatypes::schema::ColumnSchema::new(
1377 "f0",
1378 ConcreteDataType::int64_datatype(),
1379 true,
1380 ),
1381 semantic_type: SemanticType::Field,
1382 column_id: 3,
1383 })
1384 .push_column_metadata(ColumnMetadata {
1386 column_schema: datatypes::schema::ColumnSchema::new(
1387 "f1",
1388 ConcreteDataType::int64_datatype(),
1389 false,
1390 ),
1391 semantic_type: SemanticType::Field,
1392 column_id: 4,
1393 });
1394 let metadata = builder.build().unwrap();
1395
1396 let rows = Rows {
1397 schema: vec![
1398 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1399 new_column_schema(
1400 "ts",
1401 ColumnDataType::TimestampMillisecond,
1402 SemanticType::Timestamp,
1403 ),
1404 ],
1405 rows: vec![Row {
1407 values: vec![i64_value(100), ts_ms_value(1)],
1408 }],
1409 };
1410 let mut request =
1411 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1412 let err = request.check_schema(&metadata).unwrap_err();
1413 assert!(err.is_fill_default());
1414 request.fill_missing_columns(&metadata).unwrap();
1415
1416 let expect_rows = Rows {
1417 schema: vec![
1418 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1419 new_column_schema(
1420 "ts",
1421 ColumnDataType::TimestampMillisecond,
1422 SemanticType::Timestamp,
1423 ),
1424 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1425 ],
1426 rows: vec![Row {
1428 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1429 }],
1430 };
1431 assert_eq!(expect_rows, request.rows);
1432 }
1433
1434 #[test]
1435 fn test_no_default() {
1436 let rows = Rows {
1437 schema: vec![new_column_schema(
1438 "k0",
1439 ColumnDataType::Int64,
1440 SemanticType::Tag,
1441 )],
1442 rows: vec![Row {
1443 values: vec![i64_value(1)],
1444 }],
1445 };
1446 let metadata = new_region_metadata();
1447
1448 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1449 let err = request.fill_missing_columns(&metadata).unwrap_err();
1450 check_invalid_request(&err, "column ts does not have default value");
1451 }
1452
1453 #[test]
1454 fn test_missing_and_invalid() {
1455 let rows = Rows {
1457 schema: vec![
1458 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1459 new_column_schema(
1460 "ts",
1461 ColumnDataType::TimestampMillisecond,
1462 SemanticType::Timestamp,
1463 ),
1464 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1465 ],
1466 rows: vec![Row {
1467 values: vec![
1468 i64_value(100),
1469 ts_ms_value(1),
1470 Value {
1471 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1472 },
1473 ],
1474 }],
1475 };
1476 let metadata = region_metadata_two_fields();
1477
1478 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1479 let err = request.check_schema(&metadata).unwrap_err();
1480 check_invalid_request(
1481 &err,
1482 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1483 );
1484 }
1485
1486 #[test]
1487 fn test_write_request_metadata() {
1488 let rows = Rows {
1489 schema: vec![
1490 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1491 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1492 ],
1493 rows: vec![Row {
1494 values: vec![i64_value(1), i64_value(2)],
1495 }],
1496 };
1497
1498 let metadata = Arc::new(new_region_metadata());
1499 let request = WriteRequest::new(
1500 RegionId::new(1, 1),
1501 OpType::Put,
1502 rows,
1503 Some(metadata.clone()),
1504 )
1505 .unwrap();
1506
1507 assert!(request.region_metadata.is_some());
1508 assert_eq!(
1509 request.region_metadata.unwrap().region_id,
1510 RegionId::new(1, 1)
1511 );
1512 }
1513}