1use std::sync::Arc;
16
17use cache::build_datanode_cache_registry;
18use catalog::kvbackend::MetaKvBackend;
19use common_base::Plugins;
20use common_meta::cache::LayeredCacheRegistryBuilder;
21use common_telemetry::info;
22use common_version::{short_version, version};
23use datanode::datanode::DatanodeBuilder;
24use datanode::service::DatanodeServiceBuilder;
25use meta_client::MetaClientType;
26use snafu::{OptionExt, ResultExt};
27use tracing_appender::non_blocking::WorkerGuard;
28
29use crate::datanode::{DatanodeOptions, Instance, APP_NAME};
30use crate::error::{MetaClientInitSnafu, MissingConfigSnafu, Result, StartDatanodeSnafu};
31use crate::{create_resource_limit_metrics, log_versions};
32
33pub struct InstanceBuilder {
35 guard: Vec<WorkerGuard>,
36 opts: DatanodeOptions,
37 datanode_builder: DatanodeBuilder,
38}
39
40impl InstanceBuilder {
41 pub async fn try_new_with_init(
44 mut opts: DatanodeOptions,
45 mut plugins: Plugins,
46 ) -> Result<Self> {
47 let guard = Self::init(&mut opts, &mut plugins).await?;
48
49 let datanode_builder = Self::datanode_builder(&opts, plugins).await?;
50
51 Ok(Self {
52 guard,
53 opts,
54 datanode_builder,
55 })
56 }
57
58 async fn init(opts: &mut DatanodeOptions, plugins: &mut Plugins) -> Result<Vec<WorkerGuard>> {
59 common_runtime::init_global_runtimes(&opts.runtime);
60
61 let dn_opts = &mut opts.component;
62 let guard = common_telemetry::init_global_logging(
63 APP_NAME,
64 &dn_opts.logging,
65 &dn_opts.tracing,
66 dn_opts.node_id.map(|x| x.to_string()),
67 None,
68 );
69
70 log_versions(version(), short_version(), APP_NAME);
71 create_resource_limit_metrics(APP_NAME);
72
73 plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
74 .await
75 .context(StartDatanodeSnafu)?;
76
77 dn_opts.grpc.detect_server_addr();
78
79 info!("Initialized Datanode instance with {:#?}", opts);
80 Ok(guard)
81 }
82
83 async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result<DatanodeBuilder> {
84 let dn_opts = &opts.component;
85
86 let member_id = dn_opts
87 .node_id
88 .context(MissingConfigSnafu { msg: "'node_id'" })?;
89 let meta_client_options = dn_opts.meta_client.as_ref().context(MissingConfigSnafu {
90 msg: "meta client options",
91 })?;
92 let client = meta_client::create_meta_client(
93 MetaClientType::Datanode { member_id },
94 meta_client_options,
95 Some(&plugins),
96 None,
97 )
98 .await
99 .context(MetaClientInitSnafu)?;
100
101 let backend = Arc::new(MetaKvBackend {
102 client: client.clone(),
103 });
104 let mut builder = DatanodeBuilder::new(dn_opts.clone(), plugins.clone(), backend.clone());
105
106 let registry = Arc::new(
107 LayeredCacheRegistryBuilder::default()
108 .add_cache_registry(build_datanode_cache_registry(backend))
109 .build(),
110 );
111 builder
112 .with_cache_registry(registry)
113 .with_meta_client(client.clone());
114 Ok(builder)
115 }
116
117 pub fn mut_datanode_builder(&mut self) -> &mut DatanodeBuilder {
120 &mut self.datanode_builder
121 }
122
123 pub async fn build(self) -> Result<Instance> {
125 let mut datanode = self
126 .datanode_builder
127 .build()
128 .await
129 .context(StartDatanodeSnafu)?;
130
131 let services = DatanodeServiceBuilder::new(&self.opts.component)
132 .with_default_grpc_server(&datanode.region_server())
133 .enable_http_service()
134 .build()
135 .context(StartDatanodeSnafu)?;
136 datanode.setup_services(services);
137
138 Ok(Instance::new(datanode, self.guard))
139 }
140}