common_meta/ddl/drop_flow/
metadata.rs1use common_catalog::format_full_flow_name;
16use snafu::{ensure, OptionExt};
17
18use crate::ddl::drop_flow::DropFlowProcedure;
19use crate::error::{self, Result};
20
21impl DropFlowProcedure {
22 pub(crate) async fn fill_flow_metadata(&mut self) -> Result<()> {
24 let catalog_name = &self.data.task.catalog_name;
25 let flow_name = &self.data.task.flow_name;
26 let flow_info_value = self
27 .context
28 .flow_metadata_manager
29 .flow_info_manager()
30 .get(self.data.task.flow_id)
31 .await?
32 .with_context(|| error::FlowNotFoundSnafu {
33 flow_name: format_full_flow_name(catalog_name, flow_name),
34 })?;
35
36 let flow_route_values = self
37 .context
38 .flow_metadata_manager
39 .flow_route_manager()
40 .routes(self.data.task.flow_id)
41 .await?
42 .into_iter()
43 .map(|(_, value)| value)
44 .collect::<Vec<_>>();
45 ensure!(
46 !flow_route_values.is_empty(),
47 error::FlowRouteNotFoundSnafu {
48 flow_name: format_full_flow_name(catalog_name, flow_name),
49 }
50 );
51 self.data.flow_info_value = Some(flow_info_value);
52 self.data.flow_route_values = flow_route_values;
53
54 Ok(())
55 }
56}