common_meta/ddl/create_flow/metadata.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::OptionExt;
16
17use crate::ddl::create_flow::CreateFlowProcedure;
18use crate::error::{self, Result};
19use crate::key::table_name::TableNameKey;
20
21impl CreateFlowProcedure {
22 /// Allocates the [FlowId].
23 pub(crate) async fn allocate_flow_id(&mut self) -> Result<()> {
24 //TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
25 let partitions = 1;
26 let (flow_id, peers) = self
27 .context
28 .flow_metadata_allocator
29 .create(partitions)
30 .await?;
31 self.data.flow_id = Some(flow_id);
32 self.data.peers = peers;
33
34 Ok(())
35 }
36
37 /// Ensures all source tables exist and collects source table ids
38 pub(crate) async fn collect_source_tables(&mut self) -> Result<()> {
39 // Ensures all source tables exist.
40 let keys = self
41 .data
42 .task
43 .source_table_names
44 .iter()
45 .map(|name| TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name))
46 .collect::<Vec<_>>();
47
48 let source_table_ids = self
49 .context
50 .table_metadata_manager
51 .table_name_manager()
52 .batch_get(keys)
53 .await?;
54
55 let source_table_ids = self
56 .data
57 .task
58 .source_table_names
59 .iter()
60 .zip(source_table_ids)
61 .map(|(name, table_id)| {
62 Ok(table_id
63 .with_context(|| error::TableNotFoundSnafu {
64 table_name: name.to_string(),
65 })?
66 .table_id())
67 })
68 .collect::<Result<Vec<_>>>()?;
69
70 self.data.source_table_ids = source_table_ids;
71 Ok(())
72 }
73}