1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23 to_proto_value,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
39 RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
40 RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{FileId, RegionId};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::MemtableId;
51use crate::memtable::bulk::part::BulkPart;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::region::options::RegionOptions;
54use crate::sst::file::FileMeta;
55use crate::sst::index::IndexBuildType;
56use crate::wal::EntryId;
57use crate::wal::entry_distributor::WalEntryReceiver;
58
59#[derive(Debug)]
61pub struct WriteRequest {
62 pub region_id: RegionId,
64 pub op_type: OpType,
66 pub rows: Rows,
68 pub name_to_index: HashMap<String, usize>,
70 pub has_null: Vec<bool>,
72 pub hint: Option<WriteHint>,
74 pub(crate) region_metadata: Option<RegionMetadataRef>,
76}
77
78impl WriteRequest {
79 pub fn new(
83 region_id: RegionId,
84 op_type: OpType,
85 rows: Rows,
86 region_metadata: Option<RegionMetadataRef>,
87 ) -> Result<WriteRequest> {
88 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
89 for (index, column) in rows.schema.iter().enumerate() {
90 ensure!(
91 name_to_index
92 .insert(column.column_name.clone(), index)
93 .is_none(),
94 InvalidRequestSnafu {
95 region_id,
96 reason: format!("duplicate column {}", column.column_name),
97 }
98 );
99 }
100
101 let mut has_null = vec![false; rows.schema.len()];
102 for row in &rows.rows {
103 ensure!(
104 row.values.len() == rows.schema.len(),
105 InvalidRequestSnafu {
106 region_id,
107 reason: format!(
108 "row has {} columns but schema has {}",
109 row.values.len(),
110 rows.schema.len()
111 ),
112 }
113 );
114
115 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
116 validate_proto_value(region_id, value, column_schema)?;
117
118 if value.value_data.is_none() {
119 has_null[i] = true;
120 }
121 }
122 }
123
124 Ok(WriteRequest {
125 region_id,
126 op_type,
127 rows,
128 name_to_index,
129 has_null,
130 hint: None,
131 region_metadata,
132 })
133 }
134
135 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
137 self.hint = hint;
138 self
139 }
140
141 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
143 infer_primary_key_encoding_from_hint(self.hint.as_ref())
144 }
145
146 pub(crate) fn estimated_size(&self) -> usize {
148 let row_size = self
149 .rows
150 .rows
151 .first()
152 .map(|row| row.encoded_len())
153 .unwrap_or(0);
154 row_size * self.rows.rows.len()
155 }
156
157 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
159 self.name_to_index.get(name).copied()
160 }
161
162 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
167 debug_assert_eq!(self.region_id, metadata.region_id);
168
169 let region_id = self.region_id;
170 let mut rows_columns: HashMap<_, _> = self
172 .rows
173 .schema
174 .iter()
175 .map(|column| (&column.column_name, column))
176 .collect();
177
178 let mut need_fill_default = false;
179 for column in &metadata.column_metadatas {
181 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
182 ensure!(
184 is_column_type_value_eq(
185 input_col.datatype,
186 input_col.datatype_extension.clone(),
187 &column.column_schema.data_type
188 ),
189 InvalidRequestSnafu {
190 region_id,
191 reason: format!(
192 "column {} expect type {:?}, given: {}({})",
193 column.column_schema.name,
194 column.column_schema.data_type,
195 ColumnDataType::try_from(input_col.datatype)
196 .map(|v| v.as_str_name())
197 .unwrap_or("Unknown"),
198 input_col.datatype,
199 )
200 }
201 );
202
203 ensure!(
205 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
206 InvalidRequestSnafu {
207 region_id,
208 reason: format!(
209 "column {} has semantic type {:?}, given: {}({})",
210 column.column_schema.name,
211 column.semantic_type,
212 api::v1::SemanticType::try_from(input_col.semantic_type)
213 .map(|v| v.as_str_name())
214 .unwrap_or("Unknown"),
215 input_col.semantic_type
216 ),
217 }
218 );
219
220 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
223 ensure!(
224 !has_null || column.column_schema.is_nullable(),
225 InvalidRequestSnafu {
226 region_id,
227 reason: format!(
228 "column {} is not null but input has null",
229 column.column_schema.name
230 ),
231 }
232 );
233 } else {
234 self.check_missing_column(column)?;
236
237 need_fill_default = true;
238 }
239 }
240
241 if !rows_columns.is_empty() {
243 let names: Vec<_> = rows_columns.into_keys().collect();
244 return InvalidRequestSnafu {
245 region_id,
246 reason: format!("unknown columns: {:?}", names),
247 }
248 .fail();
249 }
250
251 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
253
254 Ok(())
255 }
256
257 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
262 debug_assert_eq!(self.region_id, metadata.region_id);
263
264 let mut columns_to_fill = vec![];
265 for column in &metadata.column_metadatas {
266 if !self.name_to_index.contains_key(&column.column_schema.name) {
267 columns_to_fill.push(column);
268 }
269 }
270 self.fill_columns(columns_to_fill)?;
271
272 Ok(())
273 }
274
275 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
277 if let Err(e) = self.check_schema(metadata) {
278 if e.is_fill_default() {
279 self.fill_missing_columns(metadata)?;
283 } else {
284 return Err(e);
285 }
286 }
287
288 Ok(())
289 }
290
291 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
293 let mut default_values = Vec::with_capacity(columns.len());
294 let mut columns_to_fill = Vec::with_capacity(columns.len());
295 for column in columns {
296 let default_value = self.column_default_value(column)?;
297 if default_value.value_data.is_some() {
298 default_values.push(default_value);
299 columns_to_fill.push(column);
300 }
301 }
302
303 for row in &mut self.rows.rows {
304 row.values.extend(default_values.iter().cloned());
305 }
306
307 for column in columns_to_fill {
308 let (datatype, datatype_ext) =
309 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
310 .with_context(|_| ConvertColumnDataTypeSnafu {
311 reason: format!(
312 "no protobuf type for column {} ({:?})",
313 column.column_schema.name, column.column_schema.data_type
314 ),
315 })?
316 .to_parts();
317 self.rows.schema.push(ColumnSchema {
318 column_name: column.column_schema.name.clone(),
319 datatype: datatype as i32,
320 semantic_type: column.semantic_type as i32,
321 datatype_extension: datatype_ext,
322 options: options_from_column_schema(&column.column_schema),
323 });
324 }
325
326 Ok(())
327 }
328
329 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
331 if self.op_type == OpType::Delete {
332 if column.semantic_type == SemanticType::Field {
333 return Ok(());
336 } else {
337 return InvalidRequestSnafu {
338 region_id: self.region_id,
339 reason: format!("delete requests need column {}", column.column_schema.name),
340 }
341 .fail();
342 }
343 }
344
345 ensure!(
347 column.column_schema.is_nullable()
348 || column.column_schema.default_constraint().is_some(),
349 InvalidRequestSnafu {
350 region_id: self.region_id,
351 reason: format!("missing column {}", column.column_schema.name),
352 }
353 );
354
355 Ok(())
356 }
357
358 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
360 let default_value = match self.op_type {
361 OpType::Delete => {
362 ensure!(
363 column.semantic_type == SemanticType::Field,
364 InvalidRequestSnafu {
365 region_id: self.region_id,
366 reason: format!(
367 "delete requests need column {}",
368 column.column_schema.name
369 ),
370 }
371 );
372
373 if column.column_schema.is_nullable() {
378 datatypes::value::Value::Null
379 } else {
380 column.column_schema.data_type.default_value()
381 }
382 }
383 OpType::Put => {
384 if column.column_schema.is_default_impure() {
386 UnexpectedSnafu {
387 reason: format!(
388 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
389 self.region_id,
390 column.column_schema.name,
391 column.column_schema.default_constraint(),
392 ),
393 }
394 .fail()?
395 }
396 column
397 .column_schema
398 .create_default()
399 .context(CreateDefaultSnafu {
400 region_id: self.region_id,
401 column: &column.column_schema.name,
402 })?
403 .with_context(|| InvalidRequestSnafu {
405 region_id: self.region_id,
406 reason: format!(
407 "column {} does not have default value",
408 column.column_schema.name
409 ),
410 })?
411 }
412 };
413
414 Ok(to_proto_value(default_value))
416 }
417}
418
419pub(crate) fn validate_proto_value(
421 region_id: RegionId,
422 value: &Value,
423 column_schema: &ColumnSchema,
424) -> Result<()> {
425 if let Some(value_type) = proto_value_type(value) {
426 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
427 InvalidRequestSnafu {
428 region_id,
429 reason: format!(
430 "column {} has unknown type {}",
431 column_schema.column_name, column_schema.datatype
432 ),
433 }
434 .build()
435 })?;
436 ensure!(
437 proto_value_type_match(column_type, value_type),
438 InvalidRequestSnafu {
439 region_id,
440 reason: format!(
441 "value has type {:?}, but column {} has type {:?}({})",
442 value_type, column_schema.column_name, column_type, column_schema.datatype,
443 ),
444 }
445 );
446 }
447
448 Ok(())
449}
450
451fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
452 match (column_type, value_type) {
453 (ct, vt) if ct == vt => true,
454 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
455 (ColumnDataType::Json, ColumnDataType::Binary) => true,
456 _ => false,
457 }
458}
459
460#[derive(Debug)]
462pub struct OutputTx(Sender<Result<AffectedRows>>);
463
464impl OutputTx {
465 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
467 OutputTx(sender)
468 }
469
470 pub(crate) fn send(self, result: Result<AffectedRows>) {
472 let _ = self.0.send(result);
474 }
475}
476
477#[derive(Debug)]
479pub(crate) struct OptionOutputTx(Option<OutputTx>);
480
481impl OptionOutputTx {
482 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
484 OptionOutputTx(sender)
485 }
486
487 pub(crate) fn none() -> OptionOutputTx {
489 OptionOutputTx(None)
490 }
491
492 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
494 if let Some(sender) = self.0.take() {
495 sender.send(result);
496 }
497 }
498
499 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
501 if let Some(sender) = self.0.take() {
502 sender.send(result);
503 }
504 }
505
506 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
508 self.0.take()
509 }
510}
511
512impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
513 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
514 Self::new(Some(OutputTx::new(sender)))
515 }
516}
517
518impl OnFailure for OptionOutputTx {
519 fn on_failure(&mut self, err: Error) {
520 self.send_mut(Err(err));
521 }
522}
523
524pub(crate) trait OnFailure {
526 fn on_failure(&mut self, err: Error);
528}
529
530#[derive(Debug)]
532pub(crate) struct SenderWriteRequest {
533 pub(crate) sender: OptionOutputTx,
535 pub(crate) request: WriteRequest,
536}
537
538pub(crate) struct SenderBulkRequest {
539 pub(crate) sender: OptionOutputTx,
540 pub(crate) region_id: RegionId,
541 pub(crate) request: BulkPart,
542 pub(crate) region_metadata: RegionMetadataRef,
543}
544
545#[derive(Debug)]
547pub(crate) struct WorkerRequestWithTime {
548 pub(crate) request: WorkerRequest,
549 pub(crate) created_at: Instant,
550}
551
552impl WorkerRequestWithTime {
553 pub(crate) fn new(request: WorkerRequest) -> Self {
554 Self {
555 request,
556 created_at: Instant::now(),
557 }
558 }
559}
560
561#[derive(Debug)]
563pub(crate) enum WorkerRequest {
564 Write(SenderWriteRequest),
566
567 Ddl(SenderDdlRequest),
569
570 Background {
572 region_id: RegionId,
574 notify: BackgroundNotify,
576 },
577
578 SetRegionRoleStateGracefully {
580 region_id: RegionId,
582 region_role_state: SettableRegionRoleState,
584 sender: Sender<SetRegionRoleStateResponse>,
586 },
587
588 Stop,
590
591 EditRegion(RegionEditRequest),
593
594 SyncRegion(RegionSyncRequest),
596
597 BulkInserts {
599 metadata: Option<RegionMetadataRef>,
600 request: RegionBulkInsertsRequest,
601 sender: OptionOutputTx,
602 },
603}
604
605impl WorkerRequest {
606 pub(crate) fn new_open_region_request(
608 region_id: RegionId,
609 request: RegionOpenRequest,
610 entry_receiver: Option<WalEntryReceiver>,
611 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
612 let (sender, receiver) = oneshot::channel();
613
614 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
615 region_id,
616 sender: sender.into(),
617 request: DdlRequest::Open((request, entry_receiver)),
618 });
619
620 (worker_request, receiver)
621 }
622
623 pub(crate) fn new_catchup_region_request(
625 region_id: RegionId,
626 request: RegionCatchupRequest,
627 entry_receiver: Option<WalEntryReceiver>,
628 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
629 let (sender, receiver) = oneshot::channel();
630 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
631 region_id,
632 sender: sender.into(),
633 request: DdlRequest::Catchup((request, entry_receiver)),
634 });
635 (worker_request, receiver)
636 }
637
638 pub(crate) fn try_from_region_request(
640 region_id: RegionId,
641 value: RegionRequest,
642 region_metadata: Option<RegionMetadataRef>,
643 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
644 let (sender, receiver) = oneshot::channel();
645 let worker_request = match value {
646 RegionRequest::Put(v) => {
647 let mut write_request =
648 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
649 .with_hint(v.hint);
650 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651 && let Some(region_metadata) = ®ion_metadata
652 {
653 write_request.maybe_fill_missing_columns(region_metadata)?;
654 }
655 WorkerRequest::Write(SenderWriteRequest {
656 sender: sender.into(),
657 request: write_request,
658 })
659 }
660 RegionRequest::Delete(v) => {
661 let mut write_request =
662 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
663 .with_hint(v.hint);
664 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
665 && let Some(region_metadata) = ®ion_metadata
666 {
667 write_request.maybe_fill_missing_columns(region_metadata)?;
668 }
669 WorkerRequest::Write(SenderWriteRequest {
670 sender: sender.into(),
671 request: write_request,
672 })
673 }
674 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
675 region_id,
676 sender: sender.into(),
677 request: DdlRequest::Create(v),
678 }),
679 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
680 region_id,
681 sender: sender.into(),
682 request: DdlRequest::Drop,
683 }),
684 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
685 region_id,
686 sender: sender.into(),
687 request: DdlRequest::Open((v, None)),
688 }),
689 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
690 region_id,
691 sender: sender.into(),
692 request: DdlRequest::Close(v),
693 }),
694 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
695 region_id,
696 sender: sender.into(),
697 request: DdlRequest::Alter(v),
698 }),
699 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
700 region_id,
701 sender: sender.into(),
702 request: DdlRequest::Flush(v),
703 }),
704 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
705 region_id,
706 sender: sender.into(),
707 request: DdlRequest::Compact(v),
708 }),
709 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
710 region_id,
711 sender: sender.into(),
712 request: DdlRequest::BuildIndex(v),
713 }),
714 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
715 region_id,
716 sender: sender.into(),
717 request: DdlRequest::Truncate(v),
718 }),
719 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
720 region_id,
721 sender: sender.into(),
722 request: DdlRequest::Catchup((v, None)),
723 }),
724 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
725 metadata: region_metadata,
726 sender: sender.into(),
727 request: region_bulk_inserts_request,
728 },
729 };
730
731 Ok((worker_request, receiver))
732 }
733
734 pub(crate) fn new_set_readonly_gracefully(
735 region_id: RegionId,
736 region_role_state: SettableRegionRoleState,
737 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
738 let (sender, receiver) = oneshot::channel();
739
740 (
741 WorkerRequest::SetRegionRoleStateGracefully {
742 region_id,
743 region_role_state,
744 sender,
745 },
746 receiver,
747 )
748 }
749
750 pub(crate) fn new_sync_region_request(
751 region_id: RegionId,
752 manifest_version: ManifestVersion,
753 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
754 let (sender, receiver) = oneshot::channel();
755 (
756 WorkerRequest::SyncRegion(RegionSyncRequest {
757 region_id,
758 manifest_version,
759 sender,
760 }),
761 receiver,
762 )
763 }
764}
765
766#[derive(Debug)]
768pub(crate) enum DdlRequest {
769 Create(RegionCreateRequest),
770 Drop,
771 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
772 Close(RegionCloseRequest),
773 Alter(RegionAlterRequest),
774 Flush(RegionFlushRequest),
775 Compact(RegionCompactRequest),
776 BuildIndex(RegionBuildIndexRequest),
777 Truncate(RegionTruncateRequest),
778 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
779}
780
781#[derive(Debug)]
783pub(crate) struct SenderDdlRequest {
784 pub(crate) region_id: RegionId,
786 pub(crate) sender: OptionOutputTx,
788 pub(crate) request: DdlRequest,
790}
791
792#[derive(Debug)]
794pub(crate) enum BackgroundNotify {
795 FlushFinished(FlushFinished),
797 FlushFailed(FlushFailed),
799 IndexBuildFinished(IndexBuildFinished),
801 IndexBuildStopped(IndexBuildStopped),
803 IndexBuildFailed(IndexBuildFailed),
805 CompactionFinished(CompactionFinished),
807 CompactionFailed(CompactionFailed),
809 Truncate(TruncateResult),
811 RegionChange(RegionChangeResult),
813 RegionEdit(RegionEditResult),
815}
816
817#[derive(Debug)]
819pub(crate) struct FlushFinished {
820 pub(crate) region_id: RegionId,
822 pub(crate) flushed_entry_id: EntryId,
824 pub(crate) senders: Vec<OutputTx>,
826 pub(crate) _timer: HistogramTimer,
828 pub(crate) edit: RegionEdit,
830 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
832}
833
834impl FlushFinished {
835 pub(crate) fn on_success(self) {
837 for sender in self.senders {
838 sender.send(Ok(0));
839 }
840 }
841}
842
843impl OnFailure for FlushFinished {
844 fn on_failure(&mut self, err: Error) {
845 let err = Arc::new(err);
846 for sender in self.senders.drain(..) {
847 sender.send(Err(err.clone()).context(FlushRegionSnafu {
848 region_id: self.region_id,
849 }));
850 }
851 }
852}
853
854#[derive(Debug)]
856pub(crate) struct FlushFailed {
857 pub(crate) err: Arc<Error>,
859}
860
861#[derive(Debug)]
862pub(crate) struct IndexBuildFinished {
863 #[allow(dead_code)]
864 pub(crate) region_id: RegionId,
865 pub(crate) edit: RegionEdit,
866}
867
868#[derive(Debug)]
870pub(crate) struct IndexBuildStopped {
871 #[allow(dead_code)]
872 pub(crate) region_id: RegionId,
873 pub(crate) file_id: FileId,
874}
875
876#[derive(Debug)]
878pub(crate) struct IndexBuildFailed {
879 pub(crate) err: Arc<Error>,
880}
881
882#[derive(Debug)]
884pub(crate) struct CompactionFinished {
885 pub(crate) region_id: RegionId,
887 pub(crate) senders: Vec<OutputTx>,
889 pub(crate) start_time: Instant,
891 pub(crate) edit: RegionEdit,
893}
894
895impl CompactionFinished {
896 pub fn on_success(self) {
897 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
899
900 for sender in self.senders {
901 sender.send(Ok(0));
902 }
903 info!("Successfully compacted region: {}", self.region_id);
904 }
905}
906
907impl OnFailure for CompactionFinished {
908 fn on_failure(&mut self, err: Error) {
910 let err = Arc::new(err);
911 for sender in self.senders.drain(..) {
912 sender.send(Err(err.clone()).context(CompactRegionSnafu {
913 region_id: self.region_id,
914 }));
915 }
916 }
917}
918
919#[derive(Debug)]
921pub(crate) struct CompactionFailed {
922 pub(crate) region_id: RegionId,
923 pub(crate) err: Arc<Error>,
925}
926
927#[derive(Debug)]
929pub(crate) struct TruncateResult {
930 pub(crate) region_id: RegionId,
932 pub(crate) sender: OptionOutputTx,
934 pub(crate) result: Result<()>,
936 pub(crate) kind: TruncateKind,
937}
938
939#[derive(Debug)]
941pub(crate) struct RegionChangeResult {
942 pub(crate) region_id: RegionId,
944 pub(crate) new_meta: RegionMetadataRef,
946 pub(crate) sender: OptionOutputTx,
948 pub(crate) result: Result<()>,
950 pub(crate) need_index: bool,
952 pub(crate) new_options: Option<RegionOptions>,
954}
955
956#[derive(Debug)]
958pub(crate) struct RegionEditRequest {
959 pub(crate) region_id: RegionId,
960 pub(crate) edit: RegionEdit,
961 pub(crate) tx: Sender<Result<()>>,
963}
964
965#[derive(Debug)]
967pub(crate) struct RegionEditResult {
968 pub(crate) region_id: RegionId,
970 pub(crate) sender: Sender<Result<()>>,
972 pub(crate) edit: RegionEdit,
974 pub(crate) result: Result<()>,
976 pub(crate) update_region_state: bool,
978}
979
980#[derive(Debug)]
981pub(crate) struct BuildIndexRequest {
982 pub(crate) region_id: RegionId,
983 pub(crate) build_type: IndexBuildType,
984 pub(crate) file_metas: Vec<FileMeta>,
986}
987
988#[derive(Debug)]
989pub(crate) struct RegionSyncRequest {
990 pub(crate) region_id: RegionId,
991 pub(crate) manifest_version: ManifestVersion,
992 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
994}
995
996#[cfg(test)]
997mod tests {
998 use api::v1::value::ValueData;
999 use api::v1::{Row, SemanticType};
1000 use datatypes::prelude::ConcreteDataType;
1001 use datatypes::schema::ColumnDefaultConstraint;
1002 use mito_codec::test_util::i64_value;
1003 use store_api::metadata::RegionMetadataBuilder;
1004
1005 use super::*;
1006 use crate::error::Error;
1007 use crate::test_util::ts_ms_value;
1008
1009 fn new_column_schema(
1010 name: &str,
1011 data_type: ColumnDataType,
1012 semantic_type: SemanticType,
1013 ) -> ColumnSchema {
1014 ColumnSchema {
1015 column_name: name.to_string(),
1016 datatype: data_type as i32,
1017 semantic_type: semantic_type as i32,
1018 ..Default::default()
1019 }
1020 }
1021
1022 fn check_invalid_request(err: &Error, expect: &str) {
1023 if let Error::InvalidRequest {
1024 region_id: _,
1025 reason,
1026 location: _,
1027 } = err
1028 {
1029 assert_eq!(reason, expect);
1030 } else {
1031 panic!("Unexpected error {err}")
1032 }
1033 }
1034
1035 #[test]
1036 fn test_write_request_duplicate_column() {
1037 let rows = Rows {
1038 schema: vec![
1039 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1040 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1041 ],
1042 rows: vec![],
1043 };
1044
1045 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1046 check_invalid_request(&err, "duplicate column c0");
1047 }
1048
1049 #[test]
1050 fn test_valid_write_request() {
1051 let rows = Rows {
1052 schema: vec![
1053 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1054 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1055 ],
1056 rows: vec![Row {
1057 values: vec![i64_value(1), i64_value(2)],
1058 }],
1059 };
1060
1061 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1062 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1063 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1064 assert_eq!(None, request.column_index_by_name("c2"));
1065 }
1066
1067 #[test]
1068 fn test_write_request_column_num() {
1069 let rows = Rows {
1070 schema: vec![
1071 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1072 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1073 ],
1074 rows: vec![Row {
1075 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1076 }],
1077 };
1078
1079 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1080 check_invalid_request(&err, "row has 3 columns but schema has 2");
1081 }
1082
1083 fn new_region_metadata() -> RegionMetadata {
1084 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1085 builder
1086 .push_column_metadata(ColumnMetadata {
1087 column_schema: datatypes::schema::ColumnSchema::new(
1088 "ts",
1089 ConcreteDataType::timestamp_millisecond_datatype(),
1090 false,
1091 ),
1092 semantic_type: SemanticType::Timestamp,
1093 column_id: 1,
1094 })
1095 .push_column_metadata(ColumnMetadata {
1096 column_schema: datatypes::schema::ColumnSchema::new(
1097 "k0",
1098 ConcreteDataType::int64_datatype(),
1099 true,
1100 ),
1101 semantic_type: SemanticType::Tag,
1102 column_id: 2,
1103 })
1104 .primary_key(vec![2]);
1105 builder.build().unwrap()
1106 }
1107
1108 #[test]
1109 fn test_check_schema() {
1110 let rows = Rows {
1111 schema: vec![
1112 new_column_schema(
1113 "ts",
1114 ColumnDataType::TimestampMillisecond,
1115 SemanticType::Timestamp,
1116 ),
1117 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1118 ],
1119 rows: vec![Row {
1120 values: vec![ts_ms_value(1), i64_value(2)],
1121 }],
1122 };
1123 let metadata = new_region_metadata();
1124
1125 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1126 request.check_schema(&metadata).unwrap();
1127 }
1128
1129 #[test]
1130 fn test_column_type() {
1131 let rows = Rows {
1132 schema: vec![
1133 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1134 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1135 ],
1136 rows: vec![Row {
1137 values: vec![i64_value(1), i64_value(2)],
1138 }],
1139 };
1140 let metadata = new_region_metadata();
1141
1142 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1143 let err = request.check_schema(&metadata).unwrap_err();
1144 check_invalid_request(
1145 &err,
1146 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1147 );
1148 }
1149
1150 #[test]
1151 fn test_semantic_type() {
1152 let rows = Rows {
1153 schema: vec![
1154 new_column_schema(
1155 "ts",
1156 ColumnDataType::TimestampMillisecond,
1157 SemanticType::Tag,
1158 ),
1159 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1160 ],
1161 rows: vec![Row {
1162 values: vec![ts_ms_value(1), i64_value(2)],
1163 }],
1164 };
1165 let metadata = new_region_metadata();
1166
1167 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1168 let err = request.check_schema(&metadata).unwrap_err();
1169 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1170 }
1171
1172 #[test]
1173 fn test_column_nullable() {
1174 let rows = Rows {
1175 schema: vec![
1176 new_column_schema(
1177 "ts",
1178 ColumnDataType::TimestampMillisecond,
1179 SemanticType::Timestamp,
1180 ),
1181 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1182 ],
1183 rows: vec![Row {
1184 values: vec![Value { value_data: None }, i64_value(2)],
1185 }],
1186 };
1187 let metadata = new_region_metadata();
1188
1189 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1190 let err = request.check_schema(&metadata).unwrap_err();
1191 check_invalid_request(&err, "column ts is not null but input has null");
1192 }
1193
1194 #[test]
1195 fn test_column_default() {
1196 let rows = Rows {
1197 schema: vec![new_column_schema(
1198 "k0",
1199 ColumnDataType::Int64,
1200 SemanticType::Tag,
1201 )],
1202 rows: vec![Row {
1203 values: vec![i64_value(1)],
1204 }],
1205 };
1206 let metadata = new_region_metadata();
1207
1208 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1209 let err = request.check_schema(&metadata).unwrap_err();
1210 check_invalid_request(&err, "missing column ts");
1211 }
1212
1213 #[test]
1214 fn test_unknown_column() {
1215 let rows = Rows {
1216 schema: vec![
1217 new_column_schema(
1218 "ts",
1219 ColumnDataType::TimestampMillisecond,
1220 SemanticType::Timestamp,
1221 ),
1222 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1223 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1224 ],
1225 rows: vec![Row {
1226 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1227 }],
1228 };
1229 let metadata = new_region_metadata();
1230
1231 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1232 let err = request.check_schema(&metadata).unwrap_err();
1233 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1234 }
1235
1236 #[test]
1237 fn test_fill_impure_columns_err() {
1238 let rows = Rows {
1239 schema: vec![new_column_schema(
1240 "k0",
1241 ColumnDataType::Int64,
1242 SemanticType::Tag,
1243 )],
1244 rows: vec![Row {
1245 values: vec![i64_value(1)],
1246 }],
1247 };
1248 let metadata = {
1249 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1250 builder
1251 .push_column_metadata(ColumnMetadata {
1252 column_schema: datatypes::schema::ColumnSchema::new(
1253 "ts",
1254 ConcreteDataType::timestamp_millisecond_datatype(),
1255 false,
1256 )
1257 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1258 "now()".to_string(),
1259 )))
1260 .unwrap(),
1261 semantic_type: SemanticType::Timestamp,
1262 column_id: 1,
1263 })
1264 .push_column_metadata(ColumnMetadata {
1265 column_schema: datatypes::schema::ColumnSchema::new(
1266 "k0",
1267 ConcreteDataType::int64_datatype(),
1268 true,
1269 ),
1270 semantic_type: SemanticType::Tag,
1271 column_id: 2,
1272 })
1273 .primary_key(vec![2]);
1274 builder.build().unwrap()
1275 };
1276
1277 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1278 let err = request.check_schema(&metadata).unwrap_err();
1279 assert!(err.is_fill_default());
1280 assert!(
1281 request
1282 .fill_missing_columns(&metadata)
1283 .unwrap_err()
1284 .to_string()
1285 .contains("unexpected impure default value with region_id")
1286 );
1287 }
1288
1289 #[test]
1290 fn test_fill_missing_columns() {
1291 let rows = Rows {
1292 schema: vec![new_column_schema(
1293 "ts",
1294 ColumnDataType::TimestampMillisecond,
1295 SemanticType::Timestamp,
1296 )],
1297 rows: vec![Row {
1298 values: vec![ts_ms_value(1)],
1299 }],
1300 };
1301 let metadata = new_region_metadata();
1302
1303 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1304 let err = request.check_schema(&metadata).unwrap_err();
1305 assert!(err.is_fill_default());
1306 request.fill_missing_columns(&metadata).unwrap();
1307
1308 let expect_rows = Rows {
1309 schema: vec![new_column_schema(
1310 "ts",
1311 ColumnDataType::TimestampMillisecond,
1312 SemanticType::Timestamp,
1313 )],
1314 rows: vec![Row {
1315 values: vec![ts_ms_value(1)],
1316 }],
1317 };
1318 assert_eq!(expect_rows, request.rows);
1319 }
1320
1321 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1322 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1323 builder
1324 .push_column_metadata(ColumnMetadata {
1325 column_schema: datatypes::schema::ColumnSchema::new(
1326 "ts",
1327 ConcreteDataType::timestamp_millisecond_datatype(),
1328 false,
1329 ),
1330 semantic_type: SemanticType::Timestamp,
1331 column_id: 1,
1332 })
1333 .push_column_metadata(ColumnMetadata {
1334 column_schema: datatypes::schema::ColumnSchema::new(
1335 "k0",
1336 ConcreteDataType::int64_datatype(),
1337 true,
1338 ),
1339 semantic_type: SemanticType::Tag,
1340 column_id: 2,
1341 })
1342 .primary_key(vec![2]);
1343 builder
1344 }
1345
1346 fn region_metadata_two_fields() -> RegionMetadata {
1347 let mut builder = builder_with_ts_tag();
1348 builder
1349 .push_column_metadata(ColumnMetadata {
1350 column_schema: datatypes::schema::ColumnSchema::new(
1351 "f0",
1352 ConcreteDataType::int64_datatype(),
1353 true,
1354 ),
1355 semantic_type: SemanticType::Field,
1356 column_id: 3,
1357 })
1358 .push_column_metadata(ColumnMetadata {
1360 column_schema: datatypes::schema::ColumnSchema::new(
1361 "f1",
1362 ConcreteDataType::int64_datatype(),
1363 false,
1364 )
1365 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1366 datatypes::value::Value::Int64(100),
1367 )))
1368 .unwrap(),
1369 semantic_type: SemanticType::Field,
1370 column_id: 4,
1371 });
1372 builder.build().unwrap()
1373 }
1374
1375 #[test]
1376 fn test_fill_missing_for_delete() {
1377 let rows = Rows {
1378 schema: vec![new_column_schema(
1379 "ts",
1380 ColumnDataType::TimestampMillisecond,
1381 SemanticType::Timestamp,
1382 )],
1383 rows: vec![Row {
1384 values: vec![ts_ms_value(1)],
1385 }],
1386 };
1387 let metadata = region_metadata_two_fields();
1388
1389 let mut request =
1390 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1391 let err = request.check_schema(&metadata).unwrap_err();
1392 check_invalid_request(&err, "delete requests need column k0");
1393 let err = request.fill_missing_columns(&metadata).unwrap_err();
1394 check_invalid_request(&err, "delete requests need column k0");
1395
1396 let rows = Rows {
1397 schema: vec![
1398 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1399 new_column_schema(
1400 "ts",
1401 ColumnDataType::TimestampMillisecond,
1402 SemanticType::Timestamp,
1403 ),
1404 ],
1405 rows: vec![Row {
1406 values: vec![i64_value(100), ts_ms_value(1)],
1407 }],
1408 };
1409 let mut request =
1410 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1411 let err = request.check_schema(&metadata).unwrap_err();
1412 assert!(err.is_fill_default());
1413 request.fill_missing_columns(&metadata).unwrap();
1414
1415 let expect_rows = Rows {
1416 schema: vec![
1417 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1418 new_column_schema(
1419 "ts",
1420 ColumnDataType::TimestampMillisecond,
1421 SemanticType::Timestamp,
1422 ),
1423 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1424 ],
1425 rows: vec![Row {
1427 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1428 }],
1429 };
1430 assert_eq!(expect_rows, request.rows);
1431 }
1432
1433 #[test]
1434 fn test_fill_missing_without_default_in_delete() {
1435 let mut builder = builder_with_ts_tag();
1436 builder
1437 .push_column_metadata(ColumnMetadata {
1439 column_schema: datatypes::schema::ColumnSchema::new(
1440 "f0",
1441 ConcreteDataType::int64_datatype(),
1442 true,
1443 ),
1444 semantic_type: SemanticType::Field,
1445 column_id: 3,
1446 })
1447 .push_column_metadata(ColumnMetadata {
1449 column_schema: datatypes::schema::ColumnSchema::new(
1450 "f1",
1451 ConcreteDataType::int64_datatype(),
1452 false,
1453 ),
1454 semantic_type: SemanticType::Field,
1455 column_id: 4,
1456 });
1457 let metadata = builder.build().unwrap();
1458
1459 let rows = Rows {
1460 schema: vec![
1461 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1462 new_column_schema(
1463 "ts",
1464 ColumnDataType::TimestampMillisecond,
1465 SemanticType::Timestamp,
1466 ),
1467 ],
1468 rows: vec![Row {
1470 values: vec![i64_value(100), ts_ms_value(1)],
1471 }],
1472 };
1473 let mut request =
1474 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1475 let err = request.check_schema(&metadata).unwrap_err();
1476 assert!(err.is_fill_default());
1477 request.fill_missing_columns(&metadata).unwrap();
1478
1479 let expect_rows = Rows {
1480 schema: vec![
1481 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1482 new_column_schema(
1483 "ts",
1484 ColumnDataType::TimestampMillisecond,
1485 SemanticType::Timestamp,
1486 ),
1487 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1488 ],
1489 rows: vec![Row {
1491 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1492 }],
1493 };
1494 assert_eq!(expect_rows, request.rows);
1495 }
1496
1497 #[test]
1498 fn test_no_default() {
1499 let rows = Rows {
1500 schema: vec![new_column_schema(
1501 "k0",
1502 ColumnDataType::Int64,
1503 SemanticType::Tag,
1504 )],
1505 rows: vec![Row {
1506 values: vec![i64_value(1)],
1507 }],
1508 };
1509 let metadata = new_region_metadata();
1510
1511 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1512 let err = request.fill_missing_columns(&metadata).unwrap_err();
1513 check_invalid_request(&err, "column ts does not have default value");
1514 }
1515
1516 #[test]
1517 fn test_missing_and_invalid() {
1518 let rows = Rows {
1520 schema: vec![
1521 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1522 new_column_schema(
1523 "ts",
1524 ColumnDataType::TimestampMillisecond,
1525 SemanticType::Timestamp,
1526 ),
1527 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1528 ],
1529 rows: vec![Row {
1530 values: vec![
1531 i64_value(100),
1532 ts_ms_value(1),
1533 Value {
1534 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1535 },
1536 ],
1537 }],
1538 };
1539 let metadata = region_metadata_two_fields();
1540
1541 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1542 let err = request.check_schema(&metadata).unwrap_err();
1543 check_invalid_request(
1544 &err,
1545 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1546 );
1547 }
1548
1549 #[test]
1550 fn test_write_request_metadata() {
1551 let rows = Rows {
1552 schema: vec![
1553 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1554 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1555 ],
1556 rows: vec![Row {
1557 values: vec![i64_value(1), i64_value(2)],
1558 }],
1559 };
1560
1561 let metadata = Arc::new(new_region_metadata());
1562 let request = WriteRequest::new(
1563 RegionId::new(1, 1),
1564 OpType::Put,
1565 rows,
1566 Some(metadata.clone()),
1567 )
1568 .unwrap();
1569
1570 assert!(request.region_metadata.is_some());
1571 assert_eq!(
1572 request.region_metadata.unwrap().region_id,
1573 RegionId::new(1, 1)
1574 );
1575 }
1576}